Skip to content
This repository was archived by the owner on May 28, 2021. It is now read-only.

Implement MySQL version upgrade support #207

Merged
merged 2 commits into from
Aug 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 70 additions & 3 deletions pkg/controllers/cluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package cluster
import (
"context"
"fmt"
"strings"
"time"

apps "k8s.io/api/apps/v1beta1"
Expand All @@ -38,6 +39,7 @@ import (
record "k8s.io/client-go/tools/record"
workqueue "k8s.io/client-go/util/workqueue"

"github.com/coreos/go-semver/semver"
"github.com/golang/glog"
"github.com/pkg/errors"

Expand Down Expand Up @@ -390,9 +392,13 @@ func (m *MySQLController) syncHandler(key string) error {
}

// Upgrade the required component resources the current MySQLOperator version.
err = m.ensureMySQLOperatorVersion(cluster, ss, buildversion.GetBuildVersion())
if err != nil {
return err
if err := m.ensureMySQLOperatorVersion(cluster, ss, buildversion.GetBuildVersion()); err != nil {
return errors.Wrap(err, "ensuring MySQL Operator version")
}

// Upgrade the MySQL server version if required.
if err := m.ensureMySQLVersion(cluster, ss); err != nil {
return errors.Wrap(err, "ensuring MySQL version")
}

// If this number of the members on the Cluster does not equal the
Expand Down Expand Up @@ -421,6 +427,67 @@ func (m *MySQLController) syncHandler(key string) error {
return nil
}

func getMySQLContainerIndex(containers []corev1.Container) (int, error) {
for i, c := range containers {
if c.Name == statefulsets.MySQLServerName {
return i, nil
}
}

return 0, errors.Errorf("no %q container found", statefulsets.MySQLServerName)
}

// splitImage splits an image into its name and version.
func splitImage(image string) (string, string, error) {
parts := strings.Split(image, ":")
if len(parts) < 2 {
return "", "", errors.Errorf("invalid image %q", image)
}
return strings.Join(parts[:len(parts)-1], ""), parts[len(parts)-1], nil
}

func (m *MySQLController) ensureMySQLVersion(c *v1alpha1.Cluster, ss *apps.StatefulSet) error {
index, err := getMySQLContainerIndex(ss.Spec.Template.Spec.Containers)
if err != nil {
return errors.Wrapf(err, "getting MySQL container for StatefulSet %q", ss.Name)
}
imageName, actualVersion, err := splitImage(ss.Spec.Template.Spec.Containers[index].Image)
if err != nil {
return errors.Wrapf(err, "getting MySQL version for StatefulSet %q", ss.Name)
}

actual, err := semver.NewVersion(actualVersion)
if err != nil {
return errors.Wrap(err, "parsing StatuefulSet MySQL version")
}
expected, err := semver.NewVersion(c.Spec.Version)
if err != nil {
return errors.Wrap(err, "parsing Cluster MySQL version")
}

switch expected.Compare(*actual) {
case -1:
return errors.Errorf("attempted unsupported downgrade from %q to %q", actual, expected)
case 0:
return nil
}

updated := ss.DeepCopy()
updated.Spec.Template.Spec.Containers[index].Image = fmt.Sprintf("%s:%s", imageName, c.Spec.Version)
// NOTE: We do this as previously we defaulted to the OnDelete strategy
// so clusters created with previous versions would not support upgrades.
updated.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{
Type: apps.RollingUpdateStatefulSetStrategyType,
}

err = m.statefulSetControl.Patch(ss, updated)
if err != nil {
return errors.Wrap(err, "patching StatefulSet")
}

return nil
}

// ensureMySQLOperatorVersion updates the MySQLOperator resource types that
//require it to make it consistent with the specified operator version.
func (m *MySQLController) ensureMySQLOperatorVersion(c *v1alpha1.Cluster, ss *apps.StatefulSet, operatorVersion string) error {
Expand Down
73 changes: 73 additions & 0 deletions pkg/controllers/cluster/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import (
"k8s.io/client-go/kubernetes/fake"
cache "k8s.io/client-go/tools/cache"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/oracle/mysql-operator/pkg/apis/mysql/v1alpha1"
"github.com/oracle/mysql-operator/pkg/constants"
"github.com/oracle/mysql-operator/pkg/controllers/util"
Expand All @@ -42,6 +45,76 @@ import (
buildversion "github.com/oracle/mysql-operator/pkg/version"
)

func TestGetMySQLContainerIndex(t *testing.T) {
testCases := map[string]struct {
containers []v1.Container
index int
errors bool
}{
"empty_errors": {
containers: []v1.Container{},
errors: true,
},
"mysql_server_only": {
containers: []v1.Container{{Name: "mysql"}},
index: 0,
},
"mysql_server_and_agent": {
containers: []v1.Container{{Name: "mysql-agent"}, {Name: "mysql"}},
index: 1,
},
"mysql_agent_only": {
containers: []v1.Container{{Name: "mysql-agent"}},
errors: true,
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
index, err := getMySQLContainerIndex(tc.containers)
if tc.errors {
require.Error(t, err)
} else {
require.NoError(t, err)
assert.Equal(t, index, tc.index)
}
})
}
}

func TestSplitImage(t *testing.T) {
testCases := map[string]struct {
image string
name string
version string
errors bool
}{
"8.0.11": {
image: "mysql/mysql-server:8.0.11",
name: "mysql/mysql-server",
version: "8.0.11",
errors: false,
},
"invalid": {
image: "mysql/mysql-server",
errors: true,
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
name, version, err := splitImage(tc.image)
if tc.errors {
require.Error(t, err)
} else {
require.NoError(t, err)
assert.Equal(t, name, tc.name)
assert.Equal(t, version, tc.version)
}
})
}
}

func mockOperatorConfig() operatoropts.MySQLOperatorOpts {
opts := operatoropts.MySQLOperatorOpts{}
opts.EnsureDefaults()
Expand Down
3 changes: 3 additions & 0 deletions pkg/resources/statefulsets/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,9 @@ func NewForCluster(cluster *v1alpha1.Cluster, images operatoropts.Images, servic
Volumes: podVolumes,
},
},
UpdateStrategy: apps.StatefulSetUpdateStrategy{
Type: apps.RollingUpdateStatefulSetStrategyType,
},
ServiceName: serviceName,
},
}
Expand Down
94 changes: 92 additions & 2 deletions test/e2e/framework/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"strings"
"time"

apps "k8s.io/api/apps/v1beta1"
"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -35,6 +37,7 @@ import (
"github.com/oracle/mysql-operator/pkg/controllers/cluster/labeler"
mysqlclientset "github.com/oracle/mysql-operator/pkg/generated/clientset/versioned"
"github.com/oracle/mysql-operator/pkg/resources/secrets"
"github.com/oracle/mysql-operator/pkg/resources/statefulsets"
)

// TestDBName is the name of database to use when executing test SQL queries.
Expand Down Expand Up @@ -112,7 +115,7 @@ func (j *ClusterTestJig) CreateAndAwaitClusterOrFail(namespace string, members i
return cluster
}

func (j *ClusterTestJig) waitForConditionOrFail(namespace, name string, timeout time.Duration, message string, conditionFn func(*v1alpha1.Cluster) bool) *v1alpha1.Cluster {
func (j *ClusterTestJig) WaitForConditionOrFail(namespace, name string, timeout time.Duration, message string, conditionFn func(*v1alpha1.Cluster) bool) *v1alpha1.Cluster {
var cluster *v1alpha1.Cluster
pollFunc := func() (bool, error) {
c, err := j.MySQLClient.MySQLV1alpha1().Clusters(namespace).Get(name, metav1.GetOptions{})
Expand All @@ -135,12 +138,99 @@ func (j *ClusterTestJig) waitForConditionOrFail(namespace, name string, timeout
// the running phase.
func (j *ClusterTestJig) WaitForClusterReadyOrFail(namespace, name string, timeout time.Duration) *v1alpha1.Cluster {
Logf("Waiting up to %v for Cluster \"%s/%s\" to be ready", timeout, namespace, name)
cluster := j.waitForConditionOrFail(namespace, name, timeout, "have all nodes ready", func(cluster *v1alpha1.Cluster) bool {
cluster := j.WaitForConditionOrFail(namespace, name, timeout, "have all nodes ready", func(cluster *v1alpha1.Cluster) bool {
return clusterutil.IsClusterReady(cluster)
})
return cluster
}

// WaitForClusterUpgradedOrFail waits for a MySQL cluster to be upgraded to the
// given version or fails.
func (j *ClusterTestJig) WaitForClusterUpgradedOrFail(namespace, name, version string, timeout time.Duration) *v1alpha1.Cluster {
Logf("Waiting up to %v for Cluster \"%s/%s\" to be upgraded", timeout, namespace, name)

cluster := j.WaitForConditionOrFail(namespace, name, timeout, "be upgraded ", func(cluster *v1alpha1.Cluster) bool {
set, err := j.KubeClient.AppsV1beta1().StatefulSets(cluster.Namespace).Get(cluster.Name, metav1.GetOptions{})
if err != nil {
Failf("Failed to get StatefulSet %[1]q for Cluster %[1]q: %[2]v", name, err)
}

set = j.waitForSSRollingUpdate(set)

var actualVersion string
{
var found bool
var index int
for i, c := range set.Spec.Template.Spec.Containers {
if c.Name == statefulsets.MySQLServerName {
index = i
found = true
break
}
}

if !found {
Failf("no %q container found for StatefulSet %q", statefulsets.MySQLServerName, set.Name)
}
image := set.Spec.Template.Spec.Containers[index].Image
parts := strings.Split(image, ":")
if len(parts) < 2 {
Failf("invalid image %q for StatefulSet %q", image, set.Name)
}
actualVersion = parts[len(parts)-1]
}

return actualVersion == version
})
return cluster
}

// waitForSSState periodically polls for the ss and its pods until the until function returns either true or an error
func (j *ClusterTestJig) waitForSSState(ss *apps.StatefulSet, until func(*apps.StatefulSet, *v1.PodList) (bool, error)) {
pollErr := wait.PollImmediate(Poll, DefaultTimeout,
func() (bool, error) {
ssGet, err := j.KubeClient.AppsV1beta1().StatefulSets(ss.Namespace).Get(ss.Name, metav1.GetOptions{})
if err != nil {
return false, err
}

selector, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector)
ExpectNoError(err)
podList, err := j.KubeClient.CoreV1().Pods(ss.Namespace).List(metav1.ListOptions{LabelSelector: selector.String()})
ExpectNoError(err)

return until(ssGet, podList)
})
if pollErr != nil {
Failf("Failed waiting for state update: %v", pollErr)
}
}

// waitForRollingUpdate waits for all Pods in set to exist and have the correct revision and for the RollingUpdate to
// complete. set must have a RollingUpdateStatefulSetStrategyType.
func (j *ClusterTestJig) waitForSSRollingUpdate(set *apps.StatefulSet) *apps.StatefulSet {
var pods *v1.PodList
if set.Spec.UpdateStrategy.Type != apps.RollingUpdateStatefulSetStrategyType {
Failf("StatefulSet %s/%s attempt to wait for rolling update with updateStrategy %s",
set.Namespace,
set.Name,
set.Spec.UpdateStrategy.Type)
}
Logf("Waiting for StatefulSet %s/%s to complete update", set.Namespace, set.Name)
j.waitForSSState(set, func(set2 *apps.StatefulSet, pods2 *v1.PodList) (bool, error) {
set = set2
pods = pods2
if len(pods.Items) < int(*set.Spec.Replicas) {
return false, nil
}
if set.Status.UpdateRevision != set.Status.CurrentRevision {
return false, nil
}
return true, nil
})
return set
}

// SanityCheckCluster checks basic properties of a given Cluster match
// our expectations.
func (j *ClusterTestJig) SanityCheckCluster(cluster *v1alpha1.Cluster) {
Expand Down
62 changes: 62 additions & 0 deletions test/e2e/upgrade.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2018 Oracle and/or its affiliates. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package e2e

import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"github.com/oracle/mysql-operator/pkg/apis/mysql/v1alpha1"
mysqlclientset "github.com/oracle/mysql-operator/pkg/generated/clientset/versioned"
"github.com/oracle/mysql-operator/test/e2e/framework"
)

var _ = Describe("MySQL Upgrade", func() {
f := framework.NewDefaultFramework("upgrade")

var mcs mysqlclientset.Interface
BeforeEach(func() {
mcs = f.MySQLClientSet
})

It("should be possible to upgrade a cluster from 8.0.11 to 8.0.12", func() {
jig := framework.NewClusterTestJig(mcs, f.ClientSet, "upgrade-test")

By("creating an 8.0.11 cluster")

cluster := jig.CreateAndAwaitClusterOrFail(f.Namespace.Name, 3, func(c *v1alpha1.Cluster) {
c.Spec.Version = "8.0.11"
}, framework.DefaultTimeout)

expected, err := framework.WriteSQLTest(cluster, cluster.Name+"-0")
Expect(err).NotTo(HaveOccurred())

By("triggering an upgrade to 8.0.12")

cluster.Spec.Version = "8.0.12"
cluster, err = mcs.MySQLV1alpha1().Clusters(cluster.Namespace).Update(cluster)
Expect(err).NotTo(HaveOccurred())

By("waiting for the upgrade to complete")

cluster = jig.WaitForClusterUpgradedOrFail(cluster.Namespace, cluster.Name, "8.0.12", framework.DefaultTimeout)

By("testing we can read from the upgraded database")

actual, err := framework.ReadSQLTest(cluster, cluster.Name+"-0")
Expect(err).NotTo(HaveOccurred())
Expect(actual).To(Equal(expected))
})
})