From f59a36148e0e2624fffa86b192aefef56bb91106 Mon Sep 17 00:00:00 2001 From: Andrew Pryde Date: Wed, 15 Aug 2018 16:40:10 +0100 Subject: [PATCH 1/2] Implement MySQL version upgrade support --- pkg/controllers/cluster/controller.go | 71 ++++++++++++++++++++++- pkg/resources/statefulsets/statefulset.go | 3 + 2 files changed, 71 insertions(+), 3 deletions(-) diff --git a/pkg/controllers/cluster/controller.go b/pkg/controllers/cluster/controller.go index 17ca7265b..cb3c108be 100644 --- a/pkg/controllers/cluster/controller.go +++ b/pkg/controllers/cluster/controller.go @@ -17,6 +17,7 @@ package cluster import ( "context" "fmt" + "strings" "time" apps "k8s.io/api/apps/v1beta1" @@ -38,6 +39,7 @@ import ( record "k8s.io/client-go/tools/record" workqueue "k8s.io/client-go/util/workqueue" + "github.com/coreos/go-semver/semver" "github.com/golang/glog" "github.com/pkg/errors" @@ -390,9 +392,13 @@ func (m *MySQLController) syncHandler(key string) error { } // Upgrade the required component resources the current MySQLOperator version. - err = m.ensureMySQLOperatorVersion(cluster, ss, buildversion.GetBuildVersion()) - if err != nil { - return err + if err := m.ensureMySQLOperatorVersion(cluster, ss, buildversion.GetBuildVersion()); err != nil { + return errors.Wrap(err, "ensuring MySQL Operator version") + } + + // Upgrade the MySQL server version if required. + if err := m.ensureMySQLVersion(cluster, ss); err != nil { + return errors.Wrap(err, "ensuring MySQL version") } // If this number of the members on the Cluster does not equal the @@ -421,6 +427,65 @@ func (m *MySQLController) syncHandler(key string) error { return nil } +func (m *MySQLController) ensureMySQLVersion(c *v1alpha1.Cluster, ss *apps.StatefulSet) error { + var index int + { + var found bool + for i, c := range ss.Spec.Template.Spec.Containers { + if c.Name == statefulsets.MySQLServerName { + index = i + found = true + break + } + } + + if !found { + return errors.Errorf("no %q container found for StatefulSet %q", statefulsets.MySQLServerName, ss.Name) + } + } + + image := ss.Spec.Template.Spec.Containers[index].Image + parts := strings.Split(image, ":") + if len(parts) < 2 { + return errors.Errorf("invalid image %q for StatefulSet %q", image, ss.Name) + } + actualVersion := parts[len(parts)-1] + + actual, err := semver.NewVersion(actualVersion) + if err != nil { + return errors.Wrap(err, "parsing StatuefulSet MySQL version") + } + expected, err := semver.NewVersion(c.Spec.Version) + if err != nil { + return errors.Wrap(err, "parsing Cluster MySQL version") + } + + switch expected.Compare(*actual) { + case -1: + return errors.Errorf("attempted unsupported downgrade from %q to %q", actual, expected) + case 0: + return nil + } + + updated := ss.DeepCopy() + + updated.Spec.Template.Spec.Containers[index].Image = fmt.Sprintf( + "%s:%s", strings.Join(parts[:len(parts)-1], ""), expected.String(), + ) + // NOTE: We do this as previously we defaulted to the OnDelete strategy + // so clusters created with previous versions would not support upgrades. + updated.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{ + Type: apps.RollingUpdateStatefulSetStrategyType, + } + + err = m.statefulSetControl.Patch(ss, updated) + if err != nil { + return errors.Wrap(err, "patching StatefulSet") + } + + return nil +} + // ensureMySQLOperatorVersion updates the MySQLOperator resource types that //require it to make it consistent with the specified operator version. func (m *MySQLController) ensureMySQLOperatorVersion(c *v1alpha1.Cluster, ss *apps.StatefulSet, operatorVersion string) error { diff --git a/pkg/resources/statefulsets/statefulset.go b/pkg/resources/statefulsets/statefulset.go index 4609ef26c..8ed3abd52 100644 --- a/pkg/resources/statefulsets/statefulset.go +++ b/pkg/resources/statefulsets/statefulset.go @@ -378,6 +378,9 @@ func NewForCluster(cluster *v1alpha1.Cluster, images operatoropts.Images, servic Volumes: podVolumes, }, }, + UpdateStrategy: apps.StatefulSetUpdateStrategy{ + Type: apps.RollingUpdateStatefulSetStrategyType, + }, ServiceName: serviceName, }, } From 7cad2429b30c6070f94bffd2b022a3931f9d2ad7 Mon Sep 17 00:00:00 2001 From: Andrew Pryde Date: Fri, 17 Aug 2018 15:30:12 +0100 Subject: [PATCH 2/2] Test MySQL upgrades --- pkg/controllers/cluster/controller.go | 44 +++++----- pkg/controllers/cluster/controller_test.go | 73 +++++++++++++++++ test/e2e/framework/cluster.go | 94 +++++++++++++++++++++- test/e2e/upgrade.go | 62 ++++++++++++++ 4 files changed, 250 insertions(+), 23 deletions(-) create mode 100644 test/e2e/upgrade.go diff --git a/pkg/controllers/cluster/controller.go b/pkg/controllers/cluster/controller.go index cb3c108be..9db425ffb 100644 --- a/pkg/controllers/cluster/controller.go +++ b/pkg/controllers/cluster/controller.go @@ -427,29 +427,34 @@ func (m *MySQLController) syncHandler(key string) error { return nil } -func (m *MySQLController) ensureMySQLVersion(c *v1alpha1.Cluster, ss *apps.StatefulSet) error { - var index int - { - var found bool - for i, c := range ss.Spec.Template.Spec.Containers { - if c.Name == statefulsets.MySQLServerName { - index = i - found = true - break - } - } - - if !found { - return errors.Errorf("no %q container found for StatefulSet %q", statefulsets.MySQLServerName, ss.Name) +func getMySQLContainerIndex(containers []corev1.Container) (int, error) { + for i, c := range containers { + if c.Name == statefulsets.MySQLServerName { + return i, nil } } - image := ss.Spec.Template.Spec.Containers[index].Image + return 0, errors.Errorf("no %q container found", statefulsets.MySQLServerName) +} + +// splitImage splits an image into its name and version. +func splitImage(image string) (string, string, error) { parts := strings.Split(image, ":") if len(parts) < 2 { - return errors.Errorf("invalid image %q for StatefulSet %q", image, ss.Name) + return "", "", errors.Errorf("invalid image %q", image) + } + return strings.Join(parts[:len(parts)-1], ""), parts[len(parts)-1], nil +} + +func (m *MySQLController) ensureMySQLVersion(c *v1alpha1.Cluster, ss *apps.StatefulSet) error { + index, err := getMySQLContainerIndex(ss.Spec.Template.Spec.Containers) + if err != nil { + return errors.Wrapf(err, "getting MySQL container for StatefulSet %q", ss.Name) + } + imageName, actualVersion, err := splitImage(ss.Spec.Template.Spec.Containers[index].Image) + if err != nil { + return errors.Wrapf(err, "getting MySQL version for StatefulSet %q", ss.Name) } - actualVersion := parts[len(parts)-1] actual, err := semver.NewVersion(actualVersion) if err != nil { @@ -468,10 +473,7 @@ func (m *MySQLController) ensureMySQLVersion(c *v1alpha1.Cluster, ss *apps.State } updated := ss.DeepCopy() - - updated.Spec.Template.Spec.Containers[index].Image = fmt.Sprintf( - "%s:%s", strings.Join(parts[:len(parts)-1], ""), expected.String(), - ) + updated.Spec.Template.Spec.Containers[index].Image = fmt.Sprintf("%s:%s", imageName, c.Spec.Version) // NOTE: We do this as previously we defaulted to the OnDelete strategy // so clusters created with previous versions would not support upgrades. updated.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{ diff --git a/pkg/controllers/cluster/controller_test.go b/pkg/controllers/cluster/controller_test.go index f8bc41191..9ddb96d67 100644 --- a/pkg/controllers/cluster/controller_test.go +++ b/pkg/controllers/cluster/controller_test.go @@ -30,6 +30,9 @@ import ( "k8s.io/client-go/kubernetes/fake" cache "k8s.io/client-go/tools/cache" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/oracle/mysql-operator/pkg/apis/mysql/v1alpha1" "github.com/oracle/mysql-operator/pkg/constants" "github.com/oracle/mysql-operator/pkg/controllers/util" @@ -42,6 +45,76 @@ import ( buildversion "github.com/oracle/mysql-operator/pkg/version" ) +func TestGetMySQLContainerIndex(t *testing.T) { + testCases := map[string]struct { + containers []v1.Container + index int + errors bool + }{ + "empty_errors": { + containers: []v1.Container{}, + errors: true, + }, + "mysql_server_only": { + containers: []v1.Container{{Name: "mysql"}}, + index: 0, + }, + "mysql_server_and_agent": { + containers: []v1.Container{{Name: "mysql-agent"}, {Name: "mysql"}}, + index: 1, + }, + "mysql_agent_only": { + containers: []v1.Container{{Name: "mysql-agent"}}, + errors: true, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + index, err := getMySQLContainerIndex(tc.containers) + if tc.errors { + require.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, index, tc.index) + } + }) + } +} + +func TestSplitImage(t *testing.T) { + testCases := map[string]struct { + image string + name string + version string + errors bool + }{ + "8.0.11": { + image: "mysql/mysql-server:8.0.11", + name: "mysql/mysql-server", + version: "8.0.11", + errors: false, + }, + "invalid": { + image: "mysql/mysql-server", + errors: true, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + name, version, err := splitImage(tc.image) + if tc.errors { + require.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, name, tc.name) + assert.Equal(t, version, tc.version) + } + }) + } +} + func mockOperatorConfig() operatoropts.MySQLOperatorOpts { opts := operatoropts.MySQLOperatorOpts{} opts.EnsureDefaults() diff --git a/test/e2e/framework/cluster.go b/test/e2e/framework/cluster.go index 6a49f7ad0..7363b72e5 100644 --- a/test/e2e/framework/cluster.go +++ b/test/e2e/framework/cluster.go @@ -19,6 +19,8 @@ import ( "strings" "time" + apps "k8s.io/api/apps/v1beta1" + "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -35,6 +37,7 @@ import ( "github.com/oracle/mysql-operator/pkg/controllers/cluster/labeler" mysqlclientset "github.com/oracle/mysql-operator/pkg/generated/clientset/versioned" "github.com/oracle/mysql-operator/pkg/resources/secrets" + "github.com/oracle/mysql-operator/pkg/resources/statefulsets" ) // TestDBName is the name of database to use when executing test SQL queries. @@ -112,7 +115,7 @@ func (j *ClusterTestJig) CreateAndAwaitClusterOrFail(namespace string, members i return cluster } -func (j *ClusterTestJig) waitForConditionOrFail(namespace, name string, timeout time.Duration, message string, conditionFn func(*v1alpha1.Cluster) bool) *v1alpha1.Cluster { +func (j *ClusterTestJig) WaitForConditionOrFail(namespace, name string, timeout time.Duration, message string, conditionFn func(*v1alpha1.Cluster) bool) *v1alpha1.Cluster { var cluster *v1alpha1.Cluster pollFunc := func() (bool, error) { c, err := j.MySQLClient.MySQLV1alpha1().Clusters(namespace).Get(name, metav1.GetOptions{}) @@ -135,12 +138,99 @@ func (j *ClusterTestJig) waitForConditionOrFail(namespace, name string, timeout // the running phase. func (j *ClusterTestJig) WaitForClusterReadyOrFail(namespace, name string, timeout time.Duration) *v1alpha1.Cluster { Logf("Waiting up to %v for Cluster \"%s/%s\" to be ready", timeout, namespace, name) - cluster := j.waitForConditionOrFail(namespace, name, timeout, "have all nodes ready", func(cluster *v1alpha1.Cluster) bool { + cluster := j.WaitForConditionOrFail(namespace, name, timeout, "have all nodes ready", func(cluster *v1alpha1.Cluster) bool { return clusterutil.IsClusterReady(cluster) }) return cluster } +// WaitForClusterUpgradedOrFail waits for a MySQL cluster to be upgraded to the +// given version or fails. +func (j *ClusterTestJig) WaitForClusterUpgradedOrFail(namespace, name, version string, timeout time.Duration) *v1alpha1.Cluster { + Logf("Waiting up to %v for Cluster \"%s/%s\" to be upgraded", timeout, namespace, name) + + cluster := j.WaitForConditionOrFail(namespace, name, timeout, "be upgraded ", func(cluster *v1alpha1.Cluster) bool { + set, err := j.KubeClient.AppsV1beta1().StatefulSets(cluster.Namespace).Get(cluster.Name, metav1.GetOptions{}) + if err != nil { + Failf("Failed to get StatefulSet %[1]q for Cluster %[1]q: %[2]v", name, err) + } + + set = j.waitForSSRollingUpdate(set) + + var actualVersion string + { + var found bool + var index int + for i, c := range set.Spec.Template.Spec.Containers { + if c.Name == statefulsets.MySQLServerName { + index = i + found = true + break + } + } + + if !found { + Failf("no %q container found for StatefulSet %q", statefulsets.MySQLServerName, set.Name) + } + image := set.Spec.Template.Spec.Containers[index].Image + parts := strings.Split(image, ":") + if len(parts) < 2 { + Failf("invalid image %q for StatefulSet %q", image, set.Name) + } + actualVersion = parts[len(parts)-1] + } + + return actualVersion == version + }) + return cluster +} + +// waitForSSState periodically polls for the ss and its pods until the until function returns either true or an error +func (j *ClusterTestJig) waitForSSState(ss *apps.StatefulSet, until func(*apps.StatefulSet, *v1.PodList) (bool, error)) { + pollErr := wait.PollImmediate(Poll, DefaultTimeout, + func() (bool, error) { + ssGet, err := j.KubeClient.AppsV1beta1().StatefulSets(ss.Namespace).Get(ss.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + + selector, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector) + ExpectNoError(err) + podList, err := j.KubeClient.CoreV1().Pods(ss.Namespace).List(metav1.ListOptions{LabelSelector: selector.String()}) + ExpectNoError(err) + + return until(ssGet, podList) + }) + if pollErr != nil { + Failf("Failed waiting for state update: %v", pollErr) + } +} + +// waitForRollingUpdate waits for all Pods in set to exist and have the correct revision and for the RollingUpdate to +// complete. set must have a RollingUpdateStatefulSetStrategyType. +func (j *ClusterTestJig) waitForSSRollingUpdate(set *apps.StatefulSet) *apps.StatefulSet { + var pods *v1.PodList + if set.Spec.UpdateStrategy.Type != apps.RollingUpdateStatefulSetStrategyType { + Failf("StatefulSet %s/%s attempt to wait for rolling update with updateStrategy %s", + set.Namespace, + set.Name, + set.Spec.UpdateStrategy.Type) + } + Logf("Waiting for StatefulSet %s/%s to complete update", set.Namespace, set.Name) + j.waitForSSState(set, func(set2 *apps.StatefulSet, pods2 *v1.PodList) (bool, error) { + set = set2 + pods = pods2 + if len(pods.Items) < int(*set.Spec.Replicas) { + return false, nil + } + if set.Status.UpdateRevision != set.Status.CurrentRevision { + return false, nil + } + return true, nil + }) + return set +} + // SanityCheckCluster checks basic properties of a given Cluster match // our expectations. func (j *ClusterTestJig) SanityCheckCluster(cluster *v1alpha1.Cluster) { diff --git a/test/e2e/upgrade.go b/test/e2e/upgrade.go new file mode 100644 index 000000000..a59000196 --- /dev/null +++ b/test/e2e/upgrade.go @@ -0,0 +1,62 @@ +// Copyright 2018 Oracle and/or its affiliates. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/oracle/mysql-operator/pkg/apis/mysql/v1alpha1" + mysqlclientset "github.com/oracle/mysql-operator/pkg/generated/clientset/versioned" + "github.com/oracle/mysql-operator/test/e2e/framework" +) + +var _ = Describe("MySQL Upgrade", func() { + f := framework.NewDefaultFramework("upgrade") + + var mcs mysqlclientset.Interface + BeforeEach(func() { + mcs = f.MySQLClientSet + }) + + It("should be possible to upgrade a cluster from 8.0.11 to 8.0.12", func() { + jig := framework.NewClusterTestJig(mcs, f.ClientSet, "upgrade-test") + + By("creating an 8.0.11 cluster") + + cluster := jig.CreateAndAwaitClusterOrFail(f.Namespace.Name, 3, func(c *v1alpha1.Cluster) { + c.Spec.Version = "8.0.11" + }, framework.DefaultTimeout) + + expected, err := framework.WriteSQLTest(cluster, cluster.Name+"-0") + Expect(err).NotTo(HaveOccurred()) + + By("triggering an upgrade to 8.0.12") + + cluster.Spec.Version = "8.0.12" + cluster, err = mcs.MySQLV1alpha1().Clusters(cluster.Namespace).Update(cluster) + Expect(err).NotTo(HaveOccurred()) + + By("waiting for the upgrade to complete") + + cluster = jig.WaitForClusterUpgradedOrFail(cluster.Namespace, cluster.Name, "8.0.12", framework.DefaultTimeout) + + By("testing we can read from the upgraded database") + + actual, err := framework.ReadSQLTest(cluster, cluster.Name+"-0") + Expect(err).NotTo(HaveOccurred()) + Expect(actual).To(Equal(expected)) + }) +})