Skip to content

Commit 56bf9a9

Browse files
committed
bugfix: flaky queue/Example_connectionPool
We need to wait for a queue configuration on all instances before start a work. Closes #278
1 parent adc6b51 commit 56bf9a9

File tree

1 file changed

+21
-19
lines changed

1 file changed

+21
-19
lines changed

queue/example_connection_pool_test.go

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ type QueueConnectionHandler struct {
1818
name string
1919
cfg queue.Cfg
2020

21-
uuid uuid.UUID
22-
registered bool
23-
err error
24-
mutex sync.Mutex
25-
masterUpdated chan struct{}
26-
masterCnt int32
21+
uuid uuid.UUID
22+
registered bool
23+
err error
24+
mutex sync.Mutex
25+
updated chan struct{}
26+
masterCnt int32
2727
}
2828

2929
// QueueConnectionHandler implements the ConnectionHandler interface.
@@ -32,9 +32,9 @@ var _ connection_pool.ConnectionHandler = &QueueConnectionHandler{}
3232
// NewQueueConnectionHandler creates a QueueConnectionHandler object.
3333
func NewQueueConnectionHandler(name string, cfg queue.Cfg) *QueueConnectionHandler {
3434
return &QueueConnectionHandler{
35-
name: name,
36-
cfg: cfg,
37-
masterUpdated: make(chan struct{}, 10),
35+
name: name,
36+
cfg: cfg,
37+
updated: make(chan struct{}, 10),
3838
}
3939
}
4040

@@ -53,11 +53,9 @@ func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection,
5353
}
5454

5555
master := role == connection_pool.MasterRole
56-
if master {
57-
defer func() {
58-
h.masterUpdated <- struct{}{}
59-
}()
60-
}
56+
defer func() {
57+
h.updated <- struct{}{}
58+
}()
6159

6260
// Set up a queue module configuration for an instance.
6361
q := queue.New(conn, h.name)
@@ -106,7 +104,7 @@ func (h *QueueConnectionHandler) Deactivated(conn *tarantool.Connection,
106104

107105
// Closes closes a QueueConnectionHandler object.
108106
func (h *QueueConnectionHandler) Close() {
109-
close(h.masterUpdated)
107+
close(h.updated)
110108
}
111109

112110
// Example demonstrates how to use the queue package with the connection_pool
@@ -155,8 +153,10 @@ func Example_connectionPool() {
155153
}
156154
defer connPool.Close()
157155

158-
// Wait for a master instance identification in the queue.
159-
<-h.masterUpdated
156+
// Wait for a queue initialization and master instance identification in
157+
// the queue.
158+
<-h.updated
159+
<-h.updated
160160
if h.err != nil {
161161
fmt.Printf("Unable to identify in the pool: %s", h.err)
162162
return
@@ -183,8 +183,10 @@ func Example_connectionPool() {
183183
return
184184
}
185185

186-
// Wait for a new master instance re-identification.
187-
<-h.masterUpdated
186+
// Wait for a replica instance connection and a new master instance
187+
// re-identification.
188+
<-h.updated
189+
<-h.updated
188190
h.mutex.Lock()
189191
err = h.err
190192
h.mutex.Unlock()

0 commit comments

Comments
 (0)