Skip to content

Commit 182bda1

Browse files
authored
chore(queue): refactor queue package (#8)
1 parent 8931613 commit 182bda1

File tree

5 files changed

+15
-147
lines changed

5 files changed

+15
-147
lines changed

.github/workflows/go.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ jobs:
1414
- name: Setup golangci-lint
1515
uses: golangci/golangci-lint-action@v3
1616
with:
17-
version: v1.49.0
17+
version: latest
1818
args: --verbose
1919

2020
# Label of the container job

go.mod

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,15 @@ module github.com/golang-queue/rabbitmq
33
go 1.18
44

55
require (
6-
github.com/golang-queue/queue v0.1.3
6+
github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98
77
github.com/rabbitmq/amqp091-go v1.5.0
88
github.com/stretchr/testify v1.8.1
99
go.uber.org/goleak v1.2.0
1010
)
1111

1212
require (
1313
github.com/davecgh/go-spew v1.1.1 // indirect
14-
github.com/goccy/go-json v0.9.7 // indirect
14+
github.com/goccy/go-json v0.10.0 // indirect
1515
github.com/pmezard/go-difflib v1.0.0 // indirect
1616
gopkg.in/yaml.v3 v3.0.1 // indirect
1717
)

go.sum

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
22
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
33
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
4-
github.com/goccy/go-json v0.9.7 h1:IcB+Aqpx/iMHu5Yooh7jEzJk1JZ7Pjtmys2ukPr7EeM=
5-
github.com/goccy/go-json v0.9.7/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
6-
github.com/golang-queue/queue v0.1.3 h1:FGIrn8e0fN8EmL3glP0rFEcYVtWUGMEeqX4h4nnzh40=
7-
github.com/golang-queue/queue v0.1.3/go.mod h1:h/PhaoMwT5Jc4sQNus7APgWBUItm6QC9k6JtmwrsRos=
4+
github.com/goccy/go-json v0.10.0 h1:mXKd9Qw4NuzShiRlOXKews24ufknHO7gx30lsDyokKA=
5+
github.com/goccy/go-json v0.10.0/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
6+
github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98 h1:T2DoUcMWZr6uSUQAr5wCEzOiwHB1zJOiATAZ4BUAefg=
7+
github.com/golang-queue/queue v0.1.4-0.20221230133718-0314ef173f98/go.mod h1:8P7IgwdxwKh0/W1I9yCuQQGI8OHIuc7fIHi4OYr1COU=
88
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
99
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
1010
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
1111
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
12-
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
1312
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
13+
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
1414
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
1515
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
1616
github.com/rabbitmq/amqp091-go v1.5.0 h1:VouyHPBu1CrKyJVfteGknGOGCzmOz0zcv/tONLkb7rg=

rabbitmq.go

+4-56
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/golang-queue/queue"
1111
"github.com/golang-queue/queue/core"
12+
"github.com/golang-queue/queue/job"
1213

1314
amqp "github.com/rabbitmq/amqp091-go"
1415
)
@@ -99,62 +100,9 @@ func (w *Worker) startConsumer() (err error) {
99100
return err
100101
}
101102

102-
func (w *Worker) handle(job *queue.Job) error {
103-
// create channel with buffer size 1 to avoid goroutine leak
104-
done := make(chan error, 1)
105-
panicChan := make(chan interface{}, 1)
106-
startTime := time.Now()
107-
ctx, cancel := context.WithTimeout(context.Background(), job.Timeout)
108-
defer func() {
109-
cancel()
110-
}()
111-
112-
// run the job
113-
go func() {
114-
// handle panic issue
115-
defer func() {
116-
if p := recover(); p != nil {
117-
panicChan <- p
118-
}
119-
}()
120-
121-
// run custom process function
122-
done <- w.opts.runFunc(ctx, job)
123-
}()
124-
125-
select {
126-
case p := <-panicChan:
127-
panic(p)
128-
case <-ctx.Done(): // timeout reached
129-
return ctx.Err()
130-
case <-w.stop: // shutdown service
131-
// cancel job
132-
cancel()
133-
134-
leftTime := job.Timeout - time.Since(startTime)
135-
// wait job
136-
select {
137-
case <-time.After(leftTime):
138-
return context.DeadlineExceeded
139-
case err := <-done: // job finish
140-
return err
141-
case p := <-panicChan:
142-
panic(p)
143-
}
144-
case err := <-done: // job finish
145-
return err
146-
}
147-
}
148-
149103
// Run start the worker
150-
func (w *Worker) Run(task core.QueuedMessage) error {
151-
data, _ := task.(*queue.Job)
152-
153-
if err := w.handle(data); err != nil {
154-
return err
155-
}
156-
157-
return nil
104+
func (w *Worker) Run(ctx context.Context, task core.QueuedMessage) error {
105+
return w.opts.runFunc(ctx, task)
158106
}
159107

160108
// Shutdown worker
@@ -211,7 +159,7 @@ loop:
211159
if !ok {
212160
return nil, queue.ErrQueueHasBeenClosed
213161
}
214-
var data queue.Job
162+
var data job.Message
215163
_ = json.Unmarshal(task.Body, &data)
216164
if !w.opts.autoAck {
217165
_ = task.Ack(w.opts.autoAck)

rabbitmq_test.go

+3-83
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/golang-queue/queue"
1313
"github.com/golang-queue/queue/core"
14+
"github.com/golang-queue/queue/job"
1415

1516
"github.com/stretchr/testify/assert"
1617
"go.uber.org/goleak"
@@ -124,7 +125,7 @@ func TestJobReachTimeout(t *testing.T) {
124125
assert.NoError(t, err)
125126
q.Start()
126127
time.Sleep(50 * time.Millisecond)
127-
assert.NoError(t, q.QueueWithTimeout(20*time.Millisecond, m))
128+
assert.NoError(t, q.Queue(m, job.WithTimeout(20*time.Millisecond)))
128129
time.Sleep(100 * time.Millisecond)
129130
q.Shutdown()
130131
q.Wait()
@@ -161,7 +162,7 @@ func TestCancelJobAfterShutdown(t *testing.T) {
161162
assert.NoError(t, err)
162163
q.Start()
163164
time.Sleep(50 * time.Millisecond)
164-
assert.NoError(t, q.QueueWithTimeout(150*time.Millisecond, m))
165+
assert.NoError(t, q.Queue(m, job.WithTimeout(150*time.Millisecond)))
165166
time.Sleep(100 * time.Millisecond)
166167
q.Shutdown()
167168
q.Wait()
@@ -237,84 +238,3 @@ func TestGoroutinePanic(t *testing.T) {
237238
assert.Error(t, q.Queue(m))
238239
q.Wait()
239240
}
240-
241-
func TestHandleTimeout(t *testing.T) {
242-
job := &queue.Job{
243-
Timeout: 100 * time.Millisecond,
244-
Payload: []byte("foo"),
245-
}
246-
w := NewWorker(
247-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
248-
time.Sleep(200 * time.Millisecond)
249-
return nil
250-
}),
251-
)
252-
253-
err := w.handle(job)
254-
assert.Error(t, err)
255-
assert.Equal(t, context.DeadlineExceeded, err)
256-
assert.NoError(t, w.Shutdown())
257-
258-
job = &queue.Job{
259-
Timeout: 150 * time.Millisecond,
260-
Payload: []byte("foo"),
261-
}
262-
263-
w = NewWorker(
264-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
265-
time.Sleep(200 * time.Millisecond)
266-
return nil
267-
}),
268-
)
269-
270-
done := make(chan error)
271-
go func() {
272-
done <- w.handle(job)
273-
}()
274-
275-
assert.NoError(t, w.Shutdown())
276-
277-
err = <-done
278-
assert.Error(t, err)
279-
assert.Equal(t, context.DeadlineExceeded, err)
280-
}
281-
282-
func TestJobComplete(t *testing.T) {
283-
job := &queue.Job{
284-
Timeout: 100 * time.Millisecond,
285-
Payload: []byte("foo"),
286-
}
287-
w := NewWorker(
288-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
289-
return errors.New("job completed")
290-
}),
291-
)
292-
293-
err := w.handle(job)
294-
assert.Error(t, err)
295-
assert.Equal(t, errors.New("job completed"), err)
296-
assert.NoError(t, w.Shutdown())
297-
298-
job = &queue.Job{
299-
Timeout: 250 * time.Millisecond,
300-
Payload: []byte("foo"),
301-
}
302-
303-
w = NewWorker(
304-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
305-
time.Sleep(200 * time.Millisecond)
306-
return errors.New("job completed")
307-
}),
308-
)
309-
310-
done := make(chan error)
311-
go func() {
312-
done <- w.handle(job)
313-
}()
314-
315-
assert.NoError(t, w.Shutdown())
316-
317-
err = <-done
318-
assert.Error(t, err)
319-
assert.Equal(t, errors.New("job completed"), err)
320-
}

0 commit comments

Comments
 (0)