@@ -19,6 +19,8 @@ import (
19
19
"strings"
20
20
"time"
21
21
22
+ apps "k8s.io/api/apps/v1beta1"
23
+ "k8s.io/api/core/v1"
22
24
apierrors "k8s.io/apimachinery/pkg/api/errors"
23
25
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24
26
"k8s.io/apimachinery/pkg/labels"
@@ -35,6 +37,7 @@ import (
35
37
"github.com/oracle/mysql-operator/pkg/controllers/cluster/labeler"
36
38
mysqlclientset "github.com/oracle/mysql-operator/pkg/generated/clientset/versioned"
37
39
"github.com/oracle/mysql-operator/pkg/resources/secrets"
40
+ "github.com/oracle/mysql-operator/pkg/resources/statefulsets"
38
41
)
39
42
40
43
// TestDBName is the name of database to use when executing test SQL queries.
@@ -112,7 +115,7 @@ func (j *ClusterTestJig) CreateAndAwaitClusterOrFail(namespace string, members i
112
115
return cluster
113
116
}
114
117
115
- func (j * ClusterTestJig ) waitForConditionOrFail (namespace , name string , timeout time.Duration , message string , conditionFn func (* v1alpha1.Cluster ) bool ) * v1alpha1.Cluster {
118
+ func (j * ClusterTestJig ) WaitForConditionOrFail (namespace , name string , timeout time.Duration , message string , conditionFn func (* v1alpha1.Cluster ) bool ) * v1alpha1.Cluster {
116
119
var cluster * v1alpha1.Cluster
117
120
pollFunc := func () (bool , error ) {
118
121
c , err := j .MySQLClient .MySQLV1alpha1 ().Clusters (namespace ).Get (name , metav1.GetOptions {})
@@ -135,12 +138,99 @@ func (j *ClusterTestJig) waitForConditionOrFail(namespace, name string, timeout
135
138
// the running phase.
136
139
func (j * ClusterTestJig ) WaitForClusterReadyOrFail (namespace , name string , timeout time.Duration ) * v1alpha1.Cluster {
137
140
Logf ("Waiting up to %v for Cluster \" %s/%s\" to be ready" , timeout , namespace , name )
138
- cluster := j .waitForConditionOrFail (namespace , name , timeout , "have all nodes ready" , func (cluster * v1alpha1.Cluster ) bool {
141
+ cluster := j .WaitForConditionOrFail (namespace , name , timeout , "have all nodes ready" , func (cluster * v1alpha1.Cluster ) bool {
139
142
return clusterutil .IsClusterReady (cluster )
140
143
})
141
144
return cluster
142
145
}
143
146
147
+ // WaitForClusterUpgradedOrFail waits for a MySQL cluster to be upgraded to the
148
+ // given version or fails.
149
+ func (j * ClusterTestJig ) WaitForClusterUpgradedOrFail (namespace , name , version string , timeout time.Duration ) * v1alpha1.Cluster {
150
+ Logf ("Waiting up to %v for Cluster \" %s/%s\" to be upgraded" , timeout , namespace , name )
151
+
152
+ cluster := j .WaitForConditionOrFail (namespace , name , timeout , "be upgraded " , func (cluster * v1alpha1.Cluster ) bool {
153
+ set , err := j .KubeClient .AppsV1beta1 ().StatefulSets (cluster .Namespace ).Get (cluster .Name , metav1.GetOptions {})
154
+ if err != nil {
155
+ Failf ("Failed to get StatefulSet %[1]q for Cluster %[1]q: %[2]v" , name , err )
156
+ }
157
+
158
+ set = j .waitForSSRollingUpdate (set )
159
+
160
+ var actualVersion string
161
+ {
162
+ var found bool
163
+ var index int
164
+ for i , c := range set .Spec .Template .Spec .Containers {
165
+ if c .Name == statefulsets .MySQLServerName {
166
+ index = i
167
+ found = true
168
+ break
169
+ }
170
+ }
171
+
172
+ if ! found {
173
+ Failf ("no %q container found for StatefulSet %q" , statefulsets .MySQLServerName , set .Name )
174
+ }
175
+ image := set .Spec .Template .Spec .Containers [index ].Image
176
+ parts := strings .Split (image , ":" )
177
+ if len (parts ) < 2 {
178
+ Failf ("invalid image %q for StatefulSet %q" , image , set .Name )
179
+ }
180
+ actualVersion = parts [len (parts )- 1 ]
181
+ }
182
+
183
+ return actualVersion == version
184
+ })
185
+ return cluster
186
+ }
187
+
188
+ // waitForSSState periodically polls for the ss and its pods until the until function returns either true or an error
189
+ func (j * ClusterTestJig ) waitForSSState (ss * apps.StatefulSet , until func (* apps.StatefulSet , * v1.PodList ) (bool , error )) {
190
+ pollErr := wait .PollImmediate (Poll , DefaultTimeout ,
191
+ func () (bool , error ) {
192
+ ssGet , err := j .KubeClient .AppsV1beta1 ().StatefulSets (ss .Namespace ).Get (ss .Name , metav1.GetOptions {})
193
+ if err != nil {
194
+ return false , err
195
+ }
196
+
197
+ selector , err := metav1 .LabelSelectorAsSelector (ss .Spec .Selector )
198
+ ExpectNoError (err )
199
+ podList , err := j .KubeClient .CoreV1 ().Pods (ss .Namespace ).List (metav1.ListOptions {LabelSelector : selector .String ()})
200
+ ExpectNoError (err )
201
+
202
+ return until (ssGet , podList )
203
+ })
204
+ if pollErr != nil {
205
+ Failf ("Failed waiting for state update: %v" , pollErr )
206
+ }
207
+ }
208
+
209
+ // waitForRollingUpdate waits for all Pods in set to exist and have the correct revision and for the RollingUpdate to
210
+ // complete. set must have a RollingUpdateStatefulSetStrategyType.
211
+ func (j * ClusterTestJig ) waitForSSRollingUpdate (set * apps.StatefulSet ) * apps.StatefulSet {
212
+ var pods * v1.PodList
213
+ if set .Spec .UpdateStrategy .Type != apps .RollingUpdateStatefulSetStrategyType {
214
+ Failf ("StatefulSet %s/%s attempt to wait for rolling update with updateStrategy %s" ,
215
+ set .Namespace ,
216
+ set .Name ,
217
+ set .Spec .UpdateStrategy .Type )
218
+ }
219
+ Logf ("Waiting for StatefulSet %s/%s to complete update" , set .Namespace , set .Name )
220
+ j .waitForSSState (set , func (set2 * apps.StatefulSet , pods2 * v1.PodList ) (bool , error ) {
221
+ set = set2
222
+ pods = pods2
223
+ if len (pods .Items ) < int (* set .Spec .Replicas ) {
224
+ return false , nil
225
+ }
226
+ if set .Status .UpdateRevision != set .Status .CurrentRevision {
227
+ return false , nil
228
+ }
229
+ return true , nil
230
+ })
231
+ return set
232
+ }
233
+
144
234
// SanityCheckCluster checks basic properties of a given Cluster match
145
235
// our expectations.
146
236
func (j * ClusterTestJig ) SanityCheckCluster (cluster * v1alpha1.Cluster ) {
0 commit comments