Skip to content

Commit dc32d40

Browse files
committed
chore(queue): cancel the channel.
Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent 70f27fc commit dc32d40

File tree

2 files changed

+190
-6
lines changed

2 files changed

+190
-6
lines changed

rabbitmq.go

-6
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ type Worker struct {
2020
client *amqp.Connection
2121
channel *amqp.Channel
2222
stop chan struct{}
23-
exit chan struct{}
2423
stopFlag int32
2524
stopOnce sync.Once
2625
startOnce sync.Once
@@ -34,7 +33,6 @@ func NewWorker(opts ...Option) *Worker {
3433
w := &Worker{
3534
opts: newOptions(opts...),
3635
stop: make(chan struct{}),
37-
exit: make(chan struct{}),
3836
tasks: make(chan amqp.Delivery),
3937
}
4038

@@ -151,10 +149,6 @@ func (w *Worker) Shutdown() error {
151149

152150
w.stopOnce.Do(func() {
153151
close(w.stop)
154-
select {
155-
case <-w.exit:
156-
case <-time.After(50 * time.Millisecond):
157-
}
158152
if err := w.channel.Cancel("", true); err != nil {
159153
w.opts.logger.Error(err)
160154
}

rabbitmq_test.go

+190
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,16 @@
11
package rabbitmq
22

33
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"log"
8+
"runtime"
49
"testing"
10+
"time"
511

612
"github.com/golang-queue/queue"
13+
"github.com/golang-queue/queue/core"
714

815
"github.com/stretchr/testify/assert"
916
"go.uber.org/goleak"
@@ -38,3 +45,186 @@ func TestDefaultWorkFlow(t *testing.T) {
3845
q.Start()
3946
q.Release()
4047
}
48+
49+
func TestShutdownWorkFlow(t *testing.T) {
50+
w := NewWorker(
51+
WithSubj("test"),
52+
)
53+
q, err := queue.NewQueue(
54+
queue.WithWorker(w),
55+
queue.WithWorkerCount(2),
56+
)
57+
assert.NoError(t, err)
58+
q.Start()
59+
time.Sleep(1 * time.Second)
60+
q.Shutdown()
61+
// check shutdown once
62+
q.Shutdown()
63+
q.Wait()
64+
}
65+
66+
func TestCustomFuncAndWait(t *testing.T) {
67+
m := &mockMessage{
68+
Message: "foo",
69+
}
70+
w := NewWorker(
71+
WithSubj("test"),
72+
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
73+
log.Println("show message: " + string(m.Bytes()))
74+
time.Sleep(500 * time.Millisecond)
75+
return nil
76+
}),
77+
)
78+
q, err := queue.NewQueue(
79+
queue.WithWorker(w),
80+
queue.WithWorkerCount(2),
81+
)
82+
assert.NoError(t, err)
83+
q.Start()
84+
time.Sleep(100 * time.Millisecond)
85+
assert.NoError(t, q.Queue(m))
86+
assert.NoError(t, q.Queue(m))
87+
assert.NoError(t, q.Queue(m))
88+
assert.NoError(t, q.Queue(m))
89+
time.Sleep(600 * time.Millisecond)
90+
q.Shutdown()
91+
q.Wait()
92+
// you will see the execute time > 1000ms
93+
}
94+
95+
func TestEnqueueJobAfterShutdown(t *testing.T) {
96+
m := mockMessage{
97+
Message: "foo",
98+
}
99+
w := NewWorker()
100+
q, err := queue.NewQueue(
101+
queue.WithWorker(w),
102+
queue.WithWorkerCount(2),
103+
)
104+
assert.NoError(t, err)
105+
q.Start()
106+
time.Sleep(50 * time.Millisecond)
107+
q.Shutdown()
108+
// can't queue task after shutdown
109+
err = q.Queue(m)
110+
assert.Error(t, err)
111+
assert.Equal(t, queue.ErrQueueShutdown, err)
112+
q.Wait()
113+
}
114+
115+
func TestJobReachTimeout(t *testing.T) {
116+
m := mockMessage{
117+
Message: "foo",
118+
}
119+
w := NewWorker(
120+
WithSubj("JobReachTimeout"),
121+
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
122+
for {
123+
select {
124+
case <-ctx.Done():
125+
log.Println("get data:", string(m.Bytes()))
126+
if errors.Is(ctx.Err(), context.Canceled) {
127+
log.Println("queue has been shutdown and cancel the job")
128+
} else if errors.Is(ctx.Err(), context.DeadlineExceeded) {
129+
log.Println("job deadline exceeded")
130+
}
131+
return nil
132+
default:
133+
}
134+
time.Sleep(50 * time.Millisecond)
135+
}
136+
}),
137+
)
138+
q, err := queue.NewQueue(
139+
queue.WithWorker(w),
140+
queue.WithWorkerCount(2),
141+
)
142+
assert.NoError(t, err)
143+
q.Start()
144+
time.Sleep(50 * time.Millisecond)
145+
assert.NoError(t, q.QueueWithTimeout(20*time.Millisecond, m))
146+
time.Sleep(100 * time.Millisecond)
147+
q.Shutdown()
148+
q.Wait()
149+
}
150+
151+
func TestCancelJobAfterShutdown(t *testing.T) {
152+
m := mockMessage{
153+
Message: "test",
154+
}
155+
w := NewWorker(
156+
WithSubj("CancelJob"),
157+
WithLogger(queue.NewLogger()),
158+
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
159+
for {
160+
select {
161+
case <-ctx.Done():
162+
log.Println("get data:", string(m.Bytes()))
163+
if errors.Is(ctx.Err(), context.Canceled) {
164+
log.Println("queue has been shutdown and cancel the job")
165+
} else if errors.Is(ctx.Err(), context.DeadlineExceeded) {
166+
log.Println("job deadline exceeded")
167+
}
168+
return nil
169+
default:
170+
}
171+
time.Sleep(50 * time.Millisecond)
172+
}
173+
}),
174+
)
175+
q, err := queue.NewQueue(
176+
queue.WithWorker(w),
177+
queue.WithWorkerCount(2),
178+
)
179+
assert.NoError(t, err)
180+
q.Start()
181+
time.Sleep(50 * time.Millisecond)
182+
assert.NoError(t, q.QueueWithTimeout(150*time.Millisecond, m))
183+
time.Sleep(100 * time.Millisecond)
184+
q.Shutdown()
185+
q.Wait()
186+
}
187+
188+
func TestGoroutineLeak(t *testing.T) {
189+
m := mockMessage{
190+
Message: "foo",
191+
}
192+
w := NewWorker(
193+
WithSubj("GoroutineLeak"),
194+
WithLogger(queue.NewEmptyLogger()),
195+
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
196+
for {
197+
select {
198+
case <-ctx.Done():
199+
log.Println("get data:", string(m.Bytes()))
200+
if errors.Is(ctx.Err(), context.Canceled) {
201+
log.Println("queue has been shutdown and cancel the job")
202+
} else if errors.Is(ctx.Err(), context.DeadlineExceeded) {
203+
log.Println("job deadline exceeded")
204+
}
205+
return nil
206+
default:
207+
log.Println("get data:", string(m.Bytes()))
208+
time.Sleep(50 * time.Millisecond)
209+
return nil
210+
}
211+
}
212+
}),
213+
)
214+
q, err := queue.NewQueue(
215+
queue.WithLogger(queue.NewEmptyLogger()),
216+
queue.WithWorker(w),
217+
queue.WithWorkerCount(10),
218+
)
219+
assert.NoError(t, err)
220+
q.Start()
221+
time.Sleep(50 * time.Millisecond)
222+
for i := 0; i < 500; i++ {
223+
m.Message = fmt.Sprintf("foobar: %d", i+1)
224+
assert.NoError(t, q.Queue(m))
225+
}
226+
time.Sleep(200 * time.Millisecond)
227+
q.Shutdown()
228+
q.Wait()
229+
fmt.Println("number of goroutines:", runtime.NumGoroutine())
230+
}

0 commit comments

Comments
 (0)