Skip to content

Commit 373a9ca

Browse files
committed
chore(queue): add routing key
Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent e39e2d1 commit 373a9ca

File tree

2 files changed

+25
-23
lines changed

2 files changed

+25
-23
lines changed

options.go

+14-4
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ type options struct {
2323
// Exchange Types: Direct, Fanout, Topic and Headers
2424
exchangeType string
2525
autoAck bool
26+
// AMQP routing key
27+
routingKey string
2628
}
2729

2830
// WithAddr setup the URI
@@ -54,10 +56,17 @@ func WithExchangeType(val string) Option {
5456
}
5557
}
5658

59+
// WithRoutingKey setup AMQP routing key
60+
func WithRoutingKey(val string) Option {
61+
return func(w *options) {
62+
w.routingKey = val
63+
}
64+
}
65+
5766
// WithAddr setup the tag
58-
func WithTag(tag string) Option {
67+
func WithTag(val string) Option {
5968
return func(w *options) {
60-
w.tag = tag
69+
w.tag = val
6170
}
6271
}
6372

@@ -69,9 +78,9 @@ func WithAutoAck(val bool) Option {
6978
}
7079

7180
// WithSubj setup the topic
72-
func WithSubj(subj string) Option {
81+
func WithSubj(val string) Option {
7382
return func(w *options) {
74-
w.subj = subj
83+
w.subj = val
7584
}
7685
}
7786

@@ -96,6 +105,7 @@ func newOptions(opts ...Option) options {
96105
tag: "golang-queue",
97106
exchangeName: "test-exchange",
98107
exchangeType: "direct",
108+
routingKey: "test-key",
99109
logger: queue.NewLogger(),
100110
autoAck: false,
101111
runFunc: func(context.Context, core.QueuedMessage) error {

rabbitmq.go

+11-19
Original file line numberDiff line numberDiff line change
@@ -177,26 +177,18 @@ func (w *Worker) Queue(job core.QueuedMessage) error {
177177
return queue.ErrQueueShutdown
178178
}
179179

180-
q, err := w.channel.QueueDeclare(
181-
w.opts.subj, // name
182-
true, // durable
183-
false, // delete when unused
184-
false, // exclusive
185-
false, // no-wait
186-
nil, // arguments
187-
)
188-
if err != nil {
189-
return err
190-
}
191-
192-
err = w.channel.Publish(
193-
"", // exchange
194-
q.Name, // routing key
195-
false, // mandatory
196-
false, // immediate
180+
err := w.channel.Publish(
181+
w.opts.exchangeName, // exchange
182+
w.opts.routingKey, // routing key
183+
false, // mandatory
184+
false, // immediate
197185
amqp.Publishing{
198-
ContentType: "text/plain",
199-
Body: job.Bytes(),
186+
Headers: amqp.Table{},
187+
ContentType: "text/plain",
188+
ContentEncoding: "",
189+
Body: job.Bytes(),
190+
DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent
191+
Priority: 0, // 0-9
200192
})
201193

202194
return err

0 commit comments

Comments
 (0)