@@ -18,12 +18,12 @@ type QueueConnectionHandler struct {
18
18
name string
19
19
cfg queue.Cfg
20
20
21
- uuid uuid.UUID
22
- registered bool
23
- err error
24
- mutex sync.Mutex
25
- masterUpdated chan struct {}
26
- masterCnt int32
21
+ uuid uuid.UUID
22
+ registered bool
23
+ err error
24
+ mutex sync.Mutex
25
+ updated chan struct {}
26
+ masterCnt int32
27
27
}
28
28
29
29
// QueueConnectionHandler implements the ConnectionHandler interface.
@@ -32,9 +32,9 @@ var _ connection_pool.ConnectionHandler = &QueueConnectionHandler{}
32
32
// NewQueueConnectionHandler creates a QueueConnectionHandler object.
33
33
func NewQueueConnectionHandler (name string , cfg queue.Cfg ) * QueueConnectionHandler {
34
34
return & QueueConnectionHandler {
35
- name : name ,
36
- cfg : cfg ,
37
- masterUpdated : make (chan struct {}, 10 ),
35
+ name : name ,
36
+ cfg : cfg ,
37
+ updated : make (chan struct {}, 10 ),
38
38
}
39
39
}
40
40
@@ -53,14 +53,25 @@ func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection,
53
53
}
54
54
55
55
master := role == connection_pool .MasterRole
56
- if master {
57
- defer func () {
58
- h .masterUpdated <- struct {}{}
59
- }()
56
+
57
+ q := queue .New (conn , h .name )
58
+
59
+ // Check is queue ready to work.
60
+ if state , err := q .State (); err != nil {
61
+ h .updated <- struct {}{}
62
+ h .err = err
63
+ return err
64
+ } else if master && state != queue .RunningState {
65
+ return fmt .Errorf ("queue state is not RUNNING: %d" , state )
66
+ } else if ! master && state != queue .InitState && state != queue .WaitingState {
67
+ return fmt .Errorf ("queue state is not INIT and not WAITING: %d" , state )
60
68
}
61
69
70
+ defer func () {
71
+ h .updated <- struct {}{}
72
+ }()
73
+
62
74
// Set up a queue module configuration for an instance.
63
- q := queue .New (conn , h .name )
64
75
opts := queue.CfgOpts {InReplicaset : true , Ttr : 60 * time .Second }
65
76
66
77
if h .err = q .Cfg (opts ); h .err != nil {
@@ -72,10 +83,6 @@ func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection,
72
83
return nil
73
84
}
74
85
75
- if h .err = q .Create (h .cfg ); h .err != nil {
76
- return h .err
77
- }
78
-
79
86
if ! h .registered {
80
87
// We register a shared session at the first time.
81
88
if h .uuid , h .err = q .Identify (nil ); h .err != nil {
@@ -89,6 +96,10 @@ func (h *QueueConnectionHandler) Discovered(conn *tarantool.Connection,
89
96
}
90
97
}
91
98
99
+ if h .err = q .Create (h .cfg ); h .err != nil {
100
+ return h .err
101
+ }
102
+
92
103
fmt .Printf ("Master %s is ready to work!\n " , conn .Addr ())
93
104
atomic .AddInt32 (& h .masterCnt , 1 )
94
105
@@ -106,7 +117,7 @@ func (h *QueueConnectionHandler) Deactivated(conn *tarantool.Connection,
106
117
107
118
// Closes closes a QueueConnectionHandler object.
108
119
func (h * QueueConnectionHandler ) Close () {
109
- close (h .masterUpdated )
120
+ close (h .updated )
110
121
}
111
122
112
123
// Example demonstrates how to use the queue package with the connection_pool
@@ -155,8 +166,10 @@ func Example_connectionPool() {
155
166
}
156
167
defer connPool .Close ()
157
168
158
- // Wait for a master instance identification in the queue.
159
- <- h .masterUpdated
169
+ // Wait for a queue initialization and master instance identification in
170
+ // the queue.
171
+ <- h .updated
172
+ <- h .updated
160
173
if h .err != nil {
161
174
fmt .Printf ("Unable to identify in the pool: %s" , h .err )
162
175
return
@@ -177,14 +190,17 @@ func Example_connectionPool() {
177
190
178
191
// Switch a master instance in the pool.
179
192
roles := []bool {true , false }
180
- err = test_helpers .SetClusterRO (servers , connOpts , roles )
181
- if err != nil {
182
- fmt .Printf ("Unable to set cluster roles: %s" , err )
183
- return
193
+ for {
194
+ err := test_helpers .SetClusterRO (servers , connOpts , roles )
195
+ if err == nil {
196
+ break
197
+ }
184
198
}
185
199
186
- // Wait for a new master instance re-identification.
187
- <- h .masterUpdated
200
+ // Wait for a replica instance connection and a new master instance
201
+ // re-identification.
202
+ <- h .updated
203
+ <- h .updated
188
204
h .mutex .Lock ()
189
205
err = h .err
190
206
h .mutex .Unlock ()
@@ -204,17 +220,24 @@ func Example_connectionPool() {
204
220
time .Sleep (poolOpts .CheckTimeout )
205
221
}
206
222
207
- // Take a data from the new master instance.
208
- task , err := q .Take ()
209
- if err != nil {
210
- fmt .Println ("Unable to got task:" , err )
211
- } else if task == nil {
212
- fmt .Println ("task == nil" )
213
- } else if task .Data () == nil {
214
- fmt .Println ("task.Data() == nil" )
215
- } else {
216
- task .Ack ()
217
- fmt .Println ("Got data:" , task .Data ())
223
+ for {
224
+ // Take a data from the new master instance.
225
+ task , err := q .Take ()
226
+
227
+ if err == connection_pool .ErrNoRwInstance {
228
+ // It may be not registered yet by the pool.
229
+ continue
230
+ } else if err != nil {
231
+ fmt .Println ("Unable to got task:" , err )
232
+ } else if task == nil {
233
+ fmt .Println ("task == nil" )
234
+ } else if task .Data () == nil {
235
+ fmt .Println ("task.Data() == nil" )
236
+ } else {
237
+ task .Ack ()
238
+ fmt .Println ("Got data:" , task .Data ())
239
+ }
240
+ break
218
241
}
219
242
220
243
// Output:
0 commit comments