Skip to content

Commit bf40424

Browse files
committed
chore(ring): rename consumer to ring.
Signed-off-by: Bo-Yi.Wu <[email protected]>
1 parent 7218cf9 commit bf40424

File tree

6 files changed

+40
-40
lines changed

6 files changed

+40
-40
lines changed

benchmark_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func testQueue(b *testing.B, pool testqueue) {
3838
}
3939

4040
func BenchmarkNewCusumer(b *testing.B) {
41-
pool := NewConsumer(
41+
pool := NewRing(
4242
WithQueueSize(b.N*count),
4343
WithLogger(emptyLogger{}),
4444
)
@@ -69,14 +69,14 @@ func BenchmarkQueue(b *testing.B) {
6969
}
7070
}
7171

72-
func BenchmarkConsumerPayload(b *testing.B) {
72+
func BenchmarkRingPayload(b *testing.B) {
7373
b.ReportAllocs()
7474

7575
task := &job.Message{
7676
Timeout: 100 * time.Millisecond,
7777
Payload: []byte(`{"timeout":3600000000000}`),
7878
}
79-
w := NewConsumer(
79+
w := NewRing(
8080
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
8181
return nil
8282
}),
@@ -92,7 +92,7 @@ func BenchmarkConsumerPayload(b *testing.B) {
9292
}
9393
}
9494

95-
func BenchmarkConsumerTask(b *testing.B) {
95+
func BenchmarkRingTask(b *testing.B) {
9696
b.ReportAllocs()
9797

9898
task := &job.Message{
@@ -101,7 +101,7 @@ func BenchmarkConsumerTask(b *testing.B) {
101101
return nil
102102
},
103103
}
104-
w := NewConsumer(
104+
w := NewRing(
105105
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
106106
return nil
107107
}),

metric_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
)
1313

1414
func TestMetricData(t *testing.T) {
15-
w := NewConsumer(
15+
w := NewRing(
1616
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
1717
switch string(m.Bytes()) {
1818
case "foo1":

pool.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ package queue
44
func NewPool(size int, opts ...Option) *Queue {
55
o := []Option{
66
WithWorkerCount(size),
7-
WithWorker(NewConsumer(opts...)),
7+
WithWorker(NewRing(opts...)),
88
}
99
o = append(
1010
o,

queue_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func TestHandleTimeout(t *testing.T) {
8181
Timeout: 100 * time.Millisecond,
8282
Payload: []byte("foo"),
8383
}
84-
w := NewConsumer(
84+
w := NewRing(
8585
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
8686
time.Sleep(200 * time.Millisecond)
8787
return nil
@@ -113,7 +113,7 @@ func TestJobComplete(t *testing.T) {
113113
Timeout: 100 * time.Millisecond,
114114
Payload: []byte("foo"),
115115
}
116-
w := NewConsumer(
116+
w := NewRing(
117117
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
118118
return errors.New("job completed")
119119
}),
@@ -134,7 +134,7 @@ func TestJobComplete(t *testing.T) {
134134
Payload: []byte("foo"),
135135
}
136136

137-
w = NewConsumer(
137+
w = NewRing(
138138
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
139139
time.Sleep(200 * time.Millisecond)
140140
return errors.New("job completed")
@@ -159,7 +159,7 @@ func TestTaskJobComplete(t *testing.T) {
159159
return errors.New("job completed")
160160
},
161161
}
162-
w := NewConsumer()
162+
w := NewRing()
163163

164164
q, err := NewQueue(
165165
WithWorker(w),

consumer.go renamed to ring.go

+11-11
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ import (
88
"github.com/golang-queue/queue/core"
99
)
1010

11-
var _ core.Worker = (*Consumer)(nil)
11+
var _ core.Worker = (*Ring)(nil)
1212

13-
// Consumer for simple queue using buffer channel
14-
type Consumer struct {
13+
// Ring for simple queue using buffer channel
14+
type Ring struct {
1515
sync.Mutex
1616
taskQueue []core.QueuedMessage
1717
runFunc func(context.Context, core.QueuedMessage) error
@@ -26,12 +26,12 @@ type Consumer struct {
2626
}
2727

2828
// Run to execute new task
29-
func (s *Consumer) Run(ctx context.Context, task core.QueuedMessage) error {
29+
func (s *Ring) Run(ctx context.Context, task core.QueuedMessage) error {
3030
return s.runFunc(ctx, task)
3131
}
3232

3333
// Shutdown the worker
34-
func (s *Consumer) Shutdown() error {
34+
func (s *Ring) Shutdown() error {
3535
if !atomic.CompareAndSwapInt32(&s.stopFlag, 0, 1) {
3636
return ErrQueueShutdown
3737
}
@@ -45,7 +45,7 @@ func (s *Consumer) Shutdown() error {
4545
}
4646

4747
// Queue send task to the buffer channel
48-
func (s *Consumer) Queue(task core.QueuedMessage) error { //nolint:stylecheck
48+
func (s *Ring) Queue(task core.QueuedMessage) error { //nolint:stylecheck
4949
if atomic.LoadInt32(&s.stopFlag) == 1 {
5050
return ErrQueueShutdown
5151
}
@@ -66,7 +66,7 @@ func (s *Consumer) Queue(task core.QueuedMessage) error { //nolint:stylecheck
6666
}
6767

6868
// Request a new task from channel
69-
func (s *Consumer) Request() (core.QueuedMessage, error) {
69+
func (s *Ring) Request() (core.QueuedMessage, error) {
7070
if atomic.LoadInt32(&s.stopFlag) == 1 && s.count == 0 {
7171
select {
7272
case s.exit <- struct{}{}:
@@ -92,7 +92,7 @@ func (s *Consumer) Request() (core.QueuedMessage, error) {
9292
return data, nil
9393
}
9494

95-
func (q *Consumer) resize(n int) {
95+
func (q *Ring) resize(n int) {
9696
nodes := make([]core.QueuedMessage, n)
9797
if q.head < q.tail {
9898
copy(nodes, q.taskQueue[q.head:q.tail])
@@ -106,10 +106,10 @@ func (q *Consumer) resize(n int) {
106106
q.taskQueue = nodes
107107
}
108108

109-
// NewConsumer for create new Consumer instance
110-
func NewConsumer(opts ...Option) *Consumer {
109+
// NewRing for create new Ring instance
110+
func NewRing(opts ...Option) *Ring {
111111
o := NewOptions(opts...)
112-
w := &Consumer{
112+
w := &Ring{
113113
taskQueue: make([]core.QueuedMessage, 2),
114114
capacity: o.queueSize,
115115
exit: make(chan struct{}),

consumer_test.go renamed to ring_test.go

+18-18
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818
)
1919

2020
func TestMaxCapacity(t *testing.T) {
21-
w := NewConsumer(WithQueueSize(2))
21+
w := NewRing(WithQueueSize(2))
2222

2323
assert.NoError(t, w.Queue(&mockMessage{}))
2424
assert.NoError(t, w.Queue(&mockMessage{}))
@@ -32,7 +32,7 @@ func TestCustomFuncAndWait(t *testing.T) {
3232
m := mockMessage{
3333
message: "foo",
3434
}
35-
w := NewConsumer(
35+
w := NewRing(
3636
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
3737
time.Sleep(500 * time.Millisecond)
3838
return nil
@@ -61,7 +61,7 @@ func TestEnqueueJobAfterShutdown(t *testing.T) {
6161
m := mockMessage{
6262
message: "foo",
6363
}
64-
w := NewConsumer()
64+
w := NewRing()
6565
q, err := NewQueue(
6666
WithWorker(w),
6767
WithWorkerCount(2),
@@ -81,7 +81,7 @@ func TestJobReachTimeout(t *testing.T) {
8181
m := mockMessage{
8282
message: "foo",
8383
}
84-
w := NewConsumer(
84+
w := NewRing(
8585
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
8686
for {
8787
select {
@@ -114,7 +114,7 @@ func TestCancelJobAfterShutdown(t *testing.T) {
114114
m := mockMessage{
115115
message: "foo",
116116
}
117-
w := NewConsumer(
117+
w := NewRing(
118118
WithLogger(NewEmptyLogger()),
119119
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
120120
for {
@@ -147,7 +147,7 @@ func TestCancelJobAfterShutdown(t *testing.T) {
147147
}
148148

149149
func TestGoroutineLeak(t *testing.T) {
150-
w := NewConsumer(
150+
w := NewRing(
151151
WithLogger(NewLogger()),
152152
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
153153
for {
@@ -191,7 +191,7 @@ func TestGoroutinePanic(t *testing.T) {
191191
m := mockMessage{
192192
message: "foo",
193193
}
194-
w := NewConsumer(
194+
w := NewRing(
195195
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
196196
panic("missing something")
197197
}),
@@ -208,7 +208,7 @@ func TestGoroutinePanic(t *testing.T) {
208208
}
209209

210210
func TestIncreaseWorkerCount(t *testing.T) {
211-
w := NewConsumer(
211+
w := NewRing(
212212
WithLogger(NewEmptyLogger()),
213213
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
214214
time.Sleep(500 * time.Millisecond)
@@ -239,7 +239,7 @@ func TestIncreaseWorkerCount(t *testing.T) {
239239
}
240240

241241
func TestDecreaseWorkerCount(t *testing.T) {
242-
w := NewConsumer(
242+
w := NewRing(
243243
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
244244
time.Sleep(100 * time.Millisecond)
245245
return nil
@@ -270,13 +270,13 @@ func TestDecreaseWorkerCount(t *testing.T) {
270270
q.Release()
271271
}
272272

273-
func TestHandleAllJobBeforeShutdownConsumer(t *testing.T) {
273+
func TestHandleAllJobBeforeShutdownRing(t *testing.T) {
274274
controller := gomock.NewController(t)
275275
defer controller.Finish()
276276

277277
m := mocks.NewMockQueuedMessage(controller)
278278

279-
w := NewConsumer(
279+
w := NewRing(
280280
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
281281
time.Sleep(10 * time.Millisecond)
282282
return nil
@@ -303,7 +303,7 @@ func TestHandleAllJobBeforeShutdownConsumer(t *testing.T) {
303303
<-done
304304
}
305305

306-
func TestHandleAllJobBeforeShutdownConsumerInQueue(t *testing.T) {
306+
func TestHandleAllJobBeforeShutdownRingInQueue(t *testing.T) {
307307
controller := gomock.NewController(t)
308308
defer controller.Finish()
309309

@@ -312,7 +312,7 @@ func TestHandleAllJobBeforeShutdownConsumerInQueue(t *testing.T) {
312312

313313
messages := make(chan string, 10)
314314

315-
w := NewConsumer(
315+
w := NewRing(
316316
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
317317
time.Sleep(10 * time.Millisecond)
318318
messages <- string(m.Bytes())
@@ -346,7 +346,7 @@ func TestRetryCountWithNewMessage(t *testing.T) {
346346
keep := make(chan struct{})
347347
count := 1
348348

349-
w := NewConsumer(
349+
w := NewRing(
350350
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
351351
if count%3 != 0 {
352352
count++
@@ -384,7 +384,7 @@ func TestRetryCountWithNewTask(t *testing.T) {
384384
messages := make(chan string, 10)
385385
count := 1
386386

387-
w := NewConsumer()
387+
w := NewRing()
388388

389389
q, err := NewQueue(
390390
WithLogger(NewLogger()),
@@ -422,7 +422,7 @@ func TestCancelRetryCountWithNewTask(t *testing.T) {
422422
messages := make(chan string, 10)
423423
count := 1
424424

425-
w := NewConsumer()
425+
w := NewRing()
426426

427427
q, err := NewQueue(
428428
WithLogger(NewLogger()),
@@ -464,7 +464,7 @@ func TestCancelRetryCountWithNewMessage(t *testing.T) {
464464
messages := make(chan string, 10)
465465
count := 1
466466

467-
w := NewConsumer(
467+
w := NewRing(
468468
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
469469
if count%3 != 0 {
470470
count++
@@ -498,7 +498,7 @@ func TestCancelRetryCountWithNewMessage(t *testing.T) {
498498
}
499499

500500
func TestErrNoTaskInQueue(t *testing.T) {
501-
w := NewConsumer(
501+
w := NewRing(
502502
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
503503
return nil
504504
}),

0 commit comments

Comments
 (0)