Skip to content

Commit 4b8976e

Browse files
committed
chore: initial rabbit mq client
Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent f1b107c commit 4b8976e

File tree

5 files changed

+401
-0
lines changed

5 files changed

+401
-0
lines changed

go.mod

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
module github.com/golang-queue/rabbitmq
2+
3+
go 1.18
4+
5+
require (
6+
github.com/golang-queue/queue v0.1.3
7+
github.com/rabbitmq/amqp091-go v1.3.4
8+
github.com/stretchr/testify v1.7.3
9+
go.uber.org/goleak v1.1.12
10+
)
11+
12+
require (
13+
github.com/davecgh/go-spew v1.1.1 // indirect
14+
github.com/goccy/go-json v0.9.7 // indirect
15+
github.com/pmezard/go-difflib v1.0.0 // indirect
16+
gopkg.in/yaml.v3 v3.0.1 // indirect
17+
)

go.sum

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
2+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
3+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
4+
github.com/goccy/go-json v0.9.7 h1:IcB+Aqpx/iMHu5Yooh7jEzJk1JZ7Pjtmys2ukPr7EeM=
5+
github.com/goccy/go-json v0.9.7/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
6+
github.com/golang-queue/queue v0.1.3 h1:FGIrn8e0fN8EmL3glP0rFEcYVtWUGMEeqX4h4nnzh40=
7+
github.com/golang-queue/queue v0.1.3/go.mod h1:h/PhaoMwT5Jc4sQNus7APgWBUItm6QC9k6JtmwrsRos=
8+
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
9+
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
10+
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
11+
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
12+
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
13+
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
14+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
15+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
16+
github.com/rabbitmq/amqp091-go v1.3.4 h1:tXuIslN1nhDqs2t6Jrz3BAoqvt4qIZzxvdbdcxWtHYU=
17+
github.com/rabbitmq/amqp091-go v1.3.4/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM=
18+
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
19+
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
20+
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
21+
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
22+
github.com/stretchr/testify v1.7.3 h1:dAm0YRdRQlWojc3CrCRgPBzG5f941d0zvAKu7qY4e+I=
23+
github.com/stretchr/testify v1.7.3/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
24+
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
25+
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
26+
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
27+
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
28+
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
29+
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
30+
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
31+
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
32+
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
33+
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
34+
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
35+
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
36+
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
37+
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
38+
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
39+
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
40+
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
41+
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
42+
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
43+
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
44+
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
45+
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
46+
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
47+
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
48+
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
49+
golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA=
50+
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
51+
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
52+
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
53+
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
54+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
55+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
56+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
57+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
58+
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
59+
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

options.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package rabbitmq
2+
3+
import (
4+
"context"
5+
6+
"github.com/golang-queue/queue"
7+
"github.com/golang-queue/queue/core"
8+
)
9+
10+
// Option for queue system
11+
type Option func(*options)
12+
13+
type options struct {
14+
runFunc func(context.Context, core.QueuedMessage) error
15+
logger queue.Logger
16+
addr string
17+
subj string
18+
}
19+
20+
// WithAddr setup the addr of NATS
21+
func WithAddr(addr string) Option {
22+
return func(w *options) {
23+
w.addr = "nats://" + addr
24+
}
25+
}
26+
27+
// WithSubj setup the subject of NATS
28+
func WithSubj(subj string) Option {
29+
return func(w *options) {
30+
w.subj = subj
31+
}
32+
}
33+
34+
// WithRunFunc setup the run func of queue
35+
func WithRunFunc(fn func(context.Context, core.QueuedMessage) error) Option {
36+
return func(w *options) {
37+
w.runFunc = fn
38+
}
39+
}
40+
41+
// WithLogger set custom logger
42+
func WithLogger(l queue.Logger) Option {
43+
return func(w *options) {
44+
w.logger = l
45+
}
46+
}
47+
48+
func newOptions(opts ...Option) options {
49+
defaultOpts := options{
50+
addr: "amqp://guest:guest@localhost:5672/",
51+
subj: "queue",
52+
logger: queue.NewLogger(),
53+
runFunc: func(context.Context, core.QueuedMessage) error {
54+
return nil
55+
},
56+
}
57+
58+
// Loop through each option
59+
for _, opt := range opts {
60+
// Call the option giving the instantiated
61+
opt(&defaultOpts)
62+
}
63+
64+
return defaultOpts
65+
}

rabbitmq.go

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
package rabbitmq
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"sync"
7+
"sync/atomic"
8+
"time"
9+
10+
"github.com/golang-queue/queue"
11+
"github.com/golang-queue/queue/core"
12+
13+
amqp "github.com/rabbitmq/amqp091-go"
14+
)
15+
16+
var _ core.Worker = (*Worker)(nil)
17+
18+
// Worker for NSQ
19+
type Worker struct {
20+
client *amqp.Connection
21+
channel *amqp.Channel
22+
stop chan struct{}
23+
exit chan struct{}
24+
stopFlag int32
25+
stopOnce sync.Once
26+
startOnce sync.Once
27+
opts options
28+
tasks <-chan amqp.Delivery
29+
}
30+
31+
// NewWorker for struc
32+
func NewWorker(opts ...Option) *Worker {
33+
var err error
34+
w := &Worker{
35+
opts: newOptions(opts...),
36+
stop: make(chan struct{}),
37+
exit: make(chan struct{}),
38+
tasks: make(chan amqp.Delivery),
39+
}
40+
41+
w.client, err = amqp.Dial(w.opts.addr)
42+
if err != nil {
43+
panic(err)
44+
}
45+
46+
w.channel, err = w.client.Channel()
47+
if err != nil {
48+
panic(err)
49+
}
50+
51+
return w
52+
}
53+
54+
func (w *Worker) startConsumer() (err error) {
55+
w.startOnce.Do(func() {
56+
var err error
57+
q, err := w.channel.QueueDeclare(
58+
w.opts.subj, // name
59+
false, // durable
60+
false, // delete when unused
61+
false, // exclusive
62+
false, // no-wait
63+
nil, // arguments
64+
)
65+
if err != nil {
66+
w.opts.logger.Error(err)
67+
return
68+
}
69+
70+
w.tasks, err = w.channel.Consume(
71+
q.Name, // queue
72+
"", // consumer
73+
true, // auto-ack
74+
false, // exclusive
75+
false, // no-local
76+
false, // no-wait
77+
nil, // args
78+
)
79+
80+
if err != nil {
81+
w.opts.logger.Error(err)
82+
}
83+
})
84+
85+
return err
86+
}
87+
88+
func (w *Worker) handle(job *queue.Job) error {
89+
// create channel with buffer size 1 to avoid goroutine leak
90+
done := make(chan error, 1)
91+
panicChan := make(chan interface{}, 1)
92+
startTime := time.Now()
93+
ctx, cancel := context.WithTimeout(context.Background(), job.Timeout)
94+
defer func() {
95+
cancel()
96+
}()
97+
98+
// run the job
99+
go func() {
100+
// handle panic issue
101+
defer func() {
102+
if p := recover(); p != nil {
103+
panicChan <- p
104+
}
105+
}()
106+
107+
// run custom process function
108+
done <- w.opts.runFunc(ctx, job)
109+
}()
110+
111+
select {
112+
case p := <-panicChan:
113+
panic(p)
114+
case <-ctx.Done(): // timeout reached
115+
return ctx.Err()
116+
case <-w.stop: // shutdown service
117+
// cancel job
118+
cancel()
119+
120+
leftTime := job.Timeout - time.Since(startTime)
121+
// wait job
122+
select {
123+
case <-time.After(leftTime):
124+
return context.DeadlineExceeded
125+
case err := <-done: // job finish
126+
return err
127+
case p := <-panicChan:
128+
panic(p)
129+
}
130+
case err := <-done: // job finish
131+
return err
132+
}
133+
}
134+
135+
// Run start the worker
136+
func (w *Worker) Run(task core.QueuedMessage) error {
137+
data, _ := task.(*queue.Job)
138+
139+
if err := w.handle(data); err != nil {
140+
return err
141+
}
142+
143+
return nil
144+
}
145+
146+
// Shutdown worker
147+
func (w *Worker) Shutdown() error {
148+
if !atomic.CompareAndSwapInt32(&w.stopFlag, 0, 1) {
149+
return queue.ErrQueueShutdown
150+
}
151+
152+
w.stopOnce.Do(func() {
153+
close(w.stop)
154+
select {
155+
case <-w.exit:
156+
case <-time.After(50 * time.Millisecond):
157+
}
158+
if err := w.channel.Cancel("", true); err != nil {
159+
w.opts.logger.Error(err)
160+
}
161+
if err := w.client.Close(); err != nil {
162+
w.opts.logger.Error(err)
163+
}
164+
})
165+
return nil
166+
}
167+
168+
// Queue send notification to queue
169+
func (w *Worker) Queue(job core.QueuedMessage) error {
170+
if atomic.LoadInt32(&w.stopFlag) == 1 {
171+
return queue.ErrQueueShutdown
172+
}
173+
174+
q, err := w.channel.QueueDeclare(
175+
w.opts.subj, // name
176+
false, // durable
177+
false, // delete when unused
178+
false, // exclusive
179+
false, // no-wait
180+
nil, // arguments
181+
)
182+
if err != nil {
183+
return err
184+
}
185+
186+
return w.channel.Publish(
187+
"", // exchange
188+
q.Name, // routing key
189+
false, // mandatory
190+
false, // immediate
191+
amqp.Publishing{
192+
ContentType: "text/plain",
193+
Body: job.Bytes(),
194+
})
195+
}
196+
197+
// Request a new task
198+
func (w *Worker) Request() (core.QueuedMessage, error) {
199+
_ = w.startConsumer()
200+
clock := 0
201+
loop:
202+
for {
203+
select {
204+
case task, ok := <-w.tasks:
205+
if !ok {
206+
return nil, queue.ErrQueueHasBeenClosed
207+
}
208+
var data queue.Job
209+
_ = json.Unmarshal(task.Body, &data)
210+
return &data, nil
211+
case <-time.After(1 * time.Second):
212+
if clock == 5 {
213+
break loop
214+
}
215+
clock += 1
216+
}
217+
}
218+
219+
return nil, queue.ErrNoTaskInQueue
220+
}

0 commit comments

Comments
 (0)