@@ -18,12 +18,12 @@ type QueueConnectionHandler struct {
18
18
name string
19
19
cfg queue.Cfg
20
20
21
- uuid uuid.UUID
22
- registered bool
23
- err error
24
- mutex sync.Mutex
25
- masterUpdated chan struct {}
26
- masterCnt int32
21
+ uuid uuid.UUID
22
+ registered bool
23
+ err error
24
+ mutex sync.Mutex
25
+ updated chan struct {}
26
+ masterCnt int32
27
27
}
28
28
29
29
// QueueConnectionHandler implements the ConnectionHandler interface.
@@ -32,9 +32,9 @@ var _ connection_pool.ConnectionHandler = &QueueConnectionHandler{}
32
32
// NewQueueConnectionHandler creates a QueueConnectionHandler object.
33
33
func NewQueueConnectionHandler (name string , cfg queue.Cfg ) * QueueConnectionHandler {
34
34
return & QueueConnectionHandler {
35
- name : name ,
36
- cfg : cfg ,
37
- masterUpdated : make (chan struct {}, 10 ),
35
+ name : name ,
36
+ cfg : cfg ,
37
+ updated : make (chan struct {}, 10 ),
38
38
}
39
39
}
40
40
@@ -53,15 +53,24 @@ func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection,
53
53
}
54
54
55
55
master := role == connection_pool .MasterRole
56
- if master {
57
- defer func () {
58
- h .masterUpdated <- struct {}{}
59
- }()
60
- }
61
56
62
- // Set up a queue module configuration for an instance.
63
57
q := queue .New (conn , h .name )
64
58
59
+ // Check is queue ready to work.
60
+ if state , err := q .State (); err != nil {
61
+ h .updated <- struct {}{}
62
+ h .err = err
63
+ return err
64
+ } else if master && state != queue .RunningState {
65
+ return fmt .Errorf ("queue state is not RUNNING: %d" , state )
66
+ } else if ! master && state != queue .InitState && state != queue .WaitingState {
67
+ return fmt .Errorf ("queue state is not INIT and not WAITING: %d" , state )
68
+ }
69
+
70
+ defer func () {
71
+ h .updated <- struct {}{}
72
+ }()
73
+
65
74
// Set up a queue module configuration for an instance. Ideally, this
66
75
// should be done before box.cfg({}) or you need to wait some time
67
76
// before start a work.
@@ -79,10 +88,6 @@ func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection,
79
88
return nil
80
89
}
81
90
82
- if h .err = q .Create (h .cfg ); h .err != nil {
83
- return h .err
84
- }
85
-
86
91
if ! h .registered {
87
92
// We register a shared session at the first time.
88
93
if h .uuid , h .err = q .Identify (nil ); h .err != nil {
@@ -96,6 +101,10 @@ func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection,
96
101
}
97
102
}
98
103
104
+ if h .err = q .Create (h .cfg ); h .err != nil {
105
+ return h .err
106
+ }
107
+
99
108
fmt .Printf ("Master %s is ready to work!\n " , conn .Addr ())
100
109
atomic .AddInt32 (& h .masterCnt , 1 )
101
110
@@ -113,7 +122,7 @@ func (h *QueueConnectionHandler) Deactivated(conn *tarantool.Connection,
113
122
114
123
// Closes closes a QueueConnectionHandler object.
115
124
func (h * QueueConnectionHandler ) Close () {
116
- close (h .masterUpdated )
125
+ close (h .updated )
117
126
}
118
127
119
128
// Example demonstrates how to use the queue package with the connection_pool
@@ -162,8 +171,10 @@ func Example_connectionPool() {
162
171
}
163
172
defer connPool .Close ()
164
173
165
- // Wait for a master instance identification in the queue.
166
- <- h .masterUpdated
174
+ // Wait for a queue initialization and master instance identification in
175
+ // the queue.
176
+ <- h .updated
177
+ <- h .updated
167
178
if h .err != nil {
168
179
fmt .Printf ("Unable to identify in the pool: %s" , h .err )
169
180
return
@@ -184,14 +195,17 @@ func Example_connectionPool() {
184
195
185
196
// Switch a master instance in the pool.
186
197
roles := []bool {true , false }
187
- err = test_helpers .SetClusterRO (servers , connOpts , roles )
188
- if err != nil {
189
- fmt .Printf ("Unable to set cluster roles: %s" , err )
190
- return
198
+ for {
199
+ err := test_helpers .SetClusterRO (servers , connOpts , roles )
200
+ if err == nil {
201
+ break
202
+ }
191
203
}
192
204
193
- // Wait for a new master instance re-identification.
194
- <- h .masterUpdated
205
+ // Wait for a replica instance connection and a new master instance
206
+ // re-identification.
207
+ <- h .updated
208
+ <- h .updated
195
209
h .mutex .Lock ()
196
210
err = h .err
197
211
h .mutex .Unlock ()
@@ -211,17 +225,24 @@ func Example_connectionPool() {
211
225
time .Sleep (poolOpts .CheckTimeout )
212
226
}
213
227
214
- // Take a data from the new master instance.
215
- task , err := q .Take ()
216
- if err != nil {
217
- fmt .Println ("Unable to got task:" , err )
218
- } else if task == nil {
219
- fmt .Println ("task == nil" )
220
- } else if task .Data () == nil {
221
- fmt .Println ("task.Data() == nil" )
222
- } else {
223
- task .Ack ()
224
- fmt .Println ("Got data:" , task .Data ())
228
+ for {
229
+ // Take a data from the new master instance.
230
+ task , err := q .Take ()
231
+
232
+ if err == connection_pool .ErrNoRwInstance {
233
+ // It may be not registered yet by the pool.
234
+ continue
235
+ } else if err != nil {
236
+ fmt .Println ("Unable to got task:" , err )
237
+ } else if task == nil {
238
+ fmt .Println ("task == nil" )
239
+ } else if task .Data () == nil {
240
+ fmt .Println ("task.Data() == nil" )
241
+ } else {
242
+ task .Ack ()
243
+ fmt .Println ("Got data:" , task .Data ())
244
+ }
245
+ break
225
246
}
226
247
227
248
// Output:
0 commit comments