Skip to content

Commit e39e2d1

Browse files
committed
chore(queue): add auto-ack flag
Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent f804803 commit e39e2d1

File tree

2 files changed

+26
-13
lines changed

2 files changed

+26
-13
lines changed

options.go

+9
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type options struct {
2222
exchangeName string
2323
// Exchange Types: Direct, Fanout, Topic and Headers
2424
exchangeType string
25+
autoAck bool
2526
}
2627

2728
// WithAddr setup the URI
@@ -60,6 +61,13 @@ func WithTag(tag string) Option {
6061
}
6162
}
6263

64+
// WithAutoAck enable message auto-ack
65+
func WithAutoAck(val bool) Option {
66+
return func(w *options) {
67+
w.autoAck = val
68+
}
69+
}
70+
6371
// WithSubj setup the topic
6472
func WithSubj(subj string) Option {
6573
return func(w *options) {
@@ -89,6 +97,7 @@ func newOptions(opts ...Option) options {
8997
exchangeName: "test-exchange",
9098
exchangeType: "direct",
9199
logger: queue.NewLogger(),
100+
autoAck: false,
92101
runFunc: func(context.Context, core.QueuedMessage) error {
93102
return nil
94103
},

rabbitmq.go

+17-13
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,13 @@ func (w *Worker) startConsumer() (err error) {
7777
}
7878

7979
w.tasks, err = w.channel.Consume(
80-
q.Name, // queue
81-
w.opts.tag, // consumer
82-
false, // auto-ack
83-
false, // exclusive
84-
false, // no-local
85-
false, // no-wait
86-
nil, // args
80+
q.Name, // queue
81+
w.opts.tag, // consumer
82+
w.opts.autoAck, // auto-ack
83+
false, // exclusive
84+
false, // no-local
85+
false, // no-wait
86+
nil, // args
8787
)
8888

8989
if err != nil {
@@ -153,21 +153,22 @@ func (w *Worker) Run(task core.QueuedMessage) error {
153153
}
154154

155155
// Shutdown worker
156-
func (w *Worker) Shutdown() error {
156+
func (w *Worker) Shutdown() (err error) {
157157
if !atomic.CompareAndSwapInt32(&w.stopFlag, 0, 1) {
158158
return queue.ErrQueueShutdown
159159
}
160160

161161
w.stopOnce.Do(func() {
162162
close(w.stop)
163-
if err := w.channel.Cancel(w.opts.tag, true); err != nil {
164-
w.opts.logger.Error(err)
163+
if err = w.channel.Cancel(w.opts.tag, true); err != nil {
164+
w.opts.logger.Error("consumer cancel failed:", err)
165165
}
166-
if err := w.conn.Close(); err != nil {
167-
w.opts.logger.Error(err)
166+
if err = w.conn.Close(); err != nil {
167+
w.opts.logger.Error("AMQP connection close error:", err)
168168
}
169169
})
170-
return nil
170+
171+
return err
171172
}
172173

173174
// Queue send notification to queue
@@ -214,6 +215,9 @@ loop:
214215
}
215216
var data queue.Job
216217
_ = json.Unmarshal(task.Body, &data)
218+
if !w.opts.autoAck {
219+
task.Ack(w.opts.autoAck)
220+
}
217221
return &data, nil
218222
case <-time.After(1 * time.Second):
219223
if clock == 5 {

0 commit comments

Comments
 (0)