Skip to content

Commit ca08324

Browse files
committed
test: add Example_fanout_exchange testing
1 parent 833d454 commit ca08324

File tree

2 files changed

+114
-31
lines changed

2 files changed

+114
-31
lines changed

example_test.go

+114
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package rabbitmq
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"github.com/golang-queue/queue"
9+
"github.com/golang-queue/queue/core"
10+
)
11+
12+
// Direct Exchange
13+
func Example_direct_exchange() {
14+
m := mockMessage{
15+
Message: "foo",
16+
}
17+
w := NewWorker(
18+
WithSubj("direct_queue"),
19+
WithExchangeName("direct_exchange"),
20+
WithRoutingKey("direct_queue"),
21+
WithTag("direct_queue"),
22+
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
23+
fmt.Println("get data:", string(m.Bytes()))
24+
return nil
25+
}),
26+
)
27+
q, err := queue.NewQueue(
28+
queue.WithWorker(w),
29+
queue.WithWorkerCount(1),
30+
)
31+
if err != nil {
32+
w.opts.logger.Error(err)
33+
}
34+
35+
q.Start()
36+
time.Sleep(200 * time.Millisecond)
37+
q.Queue(m)
38+
q.Queue(m)
39+
time.Sleep(200 * time.Millisecond)
40+
q.Release()
41+
42+
// Output:
43+
// get data: foo
44+
// get data: foo
45+
}
46+
47+
// Fanout Exchange
48+
func Example_fanout_exchange() {
49+
m := mockMessage{
50+
Message: "foo",
51+
}
52+
w1 := NewWorker(
53+
WithSubj("fanout_queue_1"),
54+
WithExchangeName("fanout_exchange"),
55+
WithExchangeType("fanout"),
56+
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
57+
fmt.Println("worker01 get data:", string(m.Bytes()))
58+
return nil
59+
}),
60+
)
61+
62+
q1, err := queue.NewQueue(
63+
queue.WithWorker(w1),
64+
)
65+
if err != nil {
66+
w1.opts.logger.Error(err)
67+
}
68+
69+
q1.Start()
70+
time.Sleep(200 * time.Millisecond)
71+
72+
w2 := NewWorker(
73+
WithSubj("fanout_queue_2"),
74+
WithExchangeName("fanout_exchange"),
75+
WithExchangeType("fanout"),
76+
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
77+
fmt.Println("worker02 get data:", string(m.Bytes()))
78+
return nil
79+
}),
80+
)
81+
82+
q2, err := queue.NewQueue(
83+
queue.WithWorker(w2),
84+
)
85+
if err != nil {
86+
w2.opts.logger.Error(err)
87+
}
88+
89+
q2.Start()
90+
time.Sleep(200 * time.Millisecond)
91+
92+
w := NewWorker(
93+
WithExchangeName("fanout_exchange"),
94+
WithExchangeType("fanout"),
95+
)
96+
97+
q, err := queue.NewQueue(
98+
queue.WithWorker(w),
99+
)
100+
if err != nil {
101+
w.opts.logger.Error(err)
102+
}
103+
104+
time.Sleep(200 * time.Millisecond)
105+
q.Queue(m)
106+
time.Sleep(200 * time.Millisecond)
107+
q.Release()
108+
q1.Release()
109+
q2.Release()
110+
111+
// Unordered Output:
112+
// worker01 get data: foo
113+
// worker02 get data: foo
114+
}

rabbitmq_test.go

-31
Original file line numberDiff line numberDiff line change
@@ -316,34 +316,3 @@ func TestJobComplete(t *testing.T) {
316316
assert.Error(t, err)
317317
assert.Equal(t, errors.New("job completed"), err)
318318
}
319-
320-
func Example_direct_queue() {
321-
m := mockMessage{
322-
Message: "foo",
323-
}
324-
w := NewWorker(
325-
WithSubj("direct_queue"),
326-
WithRoutingKey("direct_queue"),
327-
WithTag("direct_queue"),
328-
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
329-
fmt.Println("get data:", string(m.Bytes()))
330-
return nil
331-
}),
332-
)
333-
q, err := queue.NewQueue(
334-
queue.WithWorker(w),
335-
queue.WithWorkerCount(1),
336-
)
337-
if err != nil {
338-
w.opts.logger.Error(err)
339-
}
340-
341-
q.Start()
342-
time.Sleep(200 * time.Millisecond)
343-
q.Queue(m)
344-
time.Sleep(200 * time.Millisecond)
345-
q.Release()
346-
347-
// Output:
348-
// get data: foo
349-
}

0 commit comments

Comments
 (0)