Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/v2/weightsandbiases_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,11 @@ func (w *WeightsAndBiases) GetTolerations(spec ManagedInfraSpec) *[]corev1.Toler
return w.Spec.Tolerations
}

// ValidMysqlReplicaCount reports whether r is a count Moco accepts: a positive odd number.
func ValidMysqlReplicaCount(r int32) bool {
return r > 0 && r%2 == 1
}

// WandbAppSpec defines the configuration for the Wandb application deployment.
type WandbAppSpec struct {
Hostname string `json:"hostname"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ mysql:
replicas: 1
volumeSize: 10Gi
micro:
replicas: 2
replicas: 3
volumeSize: 10Gi
resources:
requests:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ mysql:
replicas: 1
volumeSize: 10Gi
micro:
replicas: 2
replicas: 3
volumeSize: 10Gi
resources:
requests:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ mysql:
replicas: 1
volumeSize: 10Gi
micro:
replicas: 2
replicas: 3
volumeSize: 10Gi
resources:
requests:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ mysql:
replicas: 1
volumeSize: 10Gi
micro:
replicas: 2
replicas: 3
volumeSize: 10Gi
resources:
requests:
Expand Down
6 changes: 5 additions & 1 deletion internal/controller/infra/managed/mysql/moco/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ const (
MysqlModuleName = "moco"
MocoMySQLImage = "ghcr.io/cybozu-go/moco/mysql:8.4.8"
DefaultMySQLExporterImage = "prom/mysqld-exporter:v0.15.1"

// Moco names the resulting PVCs "<dataVolumeName>-<cluster.PrefixedName()>-<ordinal>"
// (= "<dataVolumeName>-moco-<cluster>-<n>"); ensurePVCLabels and purge rely on this.
dataVolumeName = mococonstants.MySQLDataVolumeName
)

const (
Expand Down Expand Up @@ -69,7 +73,7 @@ func ToMocoMySQLClusterSpec(
},
VolumeClaimTemplates: []mocov1beta2.PersistentVolumeClaim{
{
ObjectMeta: mocov1beta2.ObjectMeta{Name: "mysql-data"},
ObjectMeta: mocov1beta2.ObjectMeta{Name: dataVolumeName},
Spec: buildPVCSpec(spec.StorageSize),
},
},
Expand Down
98 changes: 98 additions & 0 deletions internal/controller/infra/managed/mysql/moco/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@ import (
mocov1beta2 "github.com/cybozu-go/moco/api/v1beta2"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/samber/lo"
apiv2 "github.com/wandb/operator/api/v2"
"github.com/wandb/operator/internal/controller/common"
"github.com/wandb/operator/pkg/utils"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
corev1ac "k8s.io/client-go/applyconfigurations/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)

var _ = Describe("Moco MySQL specs", func() {
Expand Down Expand Up @@ -74,12 +79,105 @@ var _ = Describe("Moco MySQL specs", func() {
expectMocoOpenShiftPodSecurityContext(podSpec.SecurityContext)
expectMocoOpenShiftContainerSecurityContext(podSpec.Containers[0].SecurityContext)
})

DescribeTable("refuses to forward a replica count Moco rejects",
func(replicas int32) {
ctx := context.Background()
nn := types.NamespacedName{Name: "mysql", Namespace: "wandb"}
cl := fake.NewClientBuilder().WithScheme(mocoScheme()).Build()

desired, cm, err := ToMocoMySQLClusterSpec(
ctx,
apiv2.ManagedMysqlSpec{Name: "mysql", Namespace: "wandb", Replicas: replicas, StorageSize: "10Gi"},
mocoWandb(),
mocoScheme(),
)
Expect(err).NotTo(HaveOccurred())

conditions := WriteState(ctx, cl, nn, desired, cm, nil)

reconciled, found := lo.Find(conditions, func(c metav1.Condition) bool {
return c.Type == common.ReconciledType
})
Expect(found).To(BeTrue())
Expect(reconciled.Status).To(Equal(metav1.ConditionFalse))
Expect(reconciled.Reason).To(Equal(InvalidReplicaCountReason))

// an invalid count must not create a cluster
got := &mocov1beta2.MySQLCluster{}
Expect(apierrors.IsNotFound(cl.Get(ctx, nn, got))).To(BeTrue())
},
Entry("even count", int32(2)),
Entry("zero / unset", int32(0)),
)

It("stamps wandb labels onto Moco's data PVCs", func() {
ctx := context.Background()
wandbLabels := map[string]string{"app.kubernetes.io/managed-by": "wandb"}

// Moco names PVCs "<dataVolumeName>-<PrefixedName()>-<ordinal>".
mocoPVC := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{Name: "mysql-data-moco-mysql-0", Namespace: "wandb"},
}
// A PVC matching the old (pre-Moco) "datadir-" name must NOT be matched.
legacyPVC := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{Name: "datadir-mysql-0", Namespace: "wandb"},
}
cl := fake.NewClientBuilder().
WithScheme(mocoScheme()).
WithObjects(mocoPVC, legacyPVC).
Build()

Expect(ensurePVCLabels(ctx, cl, "wandb", "mysql", wandbLabels)).To(Succeed())

got := &corev1.PersistentVolumeClaim{}
Expect(cl.Get(ctx, types.NamespacedName{Name: "mysql-data-moco-mysql-0", Namespace: "wandb"}, got)).To(Succeed())
Expect(got.Labels).To(HaveKeyWithValue("app.kubernetes.io/managed-by", "wandb"))

legacy := &corev1.PersistentVolumeClaim{}
Expect(cl.Get(ctx, types.NamespacedName{Name: "datadir-mysql-0", Namespace: "wandb"}, legacy)).To(Succeed())
Expect(legacy.Labels).NotTo(HaveKey("app.kubernetes.io/managed-by"))
})

It("backstops a scale-down the webhook can't see, leaving the cluster untouched", func() {
ctx := context.Background()
nn := types.NamespacedName{Name: "mysql", Namespace: "wandb"}

existing := &mocov1beta2.MySQLCluster{
ObjectMeta: metav1.ObjectMeta{Name: "mysql", Namespace: "wandb"},
Spec: mocov1beta2.MySQLClusterSpec{Replicas: 3},
}
cl := fake.NewClientBuilder().WithScheme(mocoScheme()).WithObjects(existing).Build()

desired, cm, err := ToMocoMySQLClusterSpec(
ctx,
apiv2.ManagedMysqlSpec{Name: "mysql", Namespace: "wandb", Replicas: 1, StorageSize: "10Gi"},
mocoWandb(),
mocoScheme(),
)
Expect(err).NotTo(HaveOccurred())

conditions := WriteState(ctx, cl, nn, desired, cm, nil)

reconciled, found := lo.Find(conditions, func(c metav1.Condition) bool {
return c.Type == common.ReconciledType
})
Expect(found).To(BeTrue())
Expect(reconciled.Status).To(Equal(metav1.ConditionFalse))
Expect(reconciled.Reason).To(Equal(ScaleDownUnsupportedReason))
Expect(reconciled.Message).To(ContainSubstring("from 3 to 1 replicas"))

got := &mocov1beta2.MySQLCluster{}
Expect(cl.Get(ctx, nn, got)).To(Succeed())
Expect(got.Spec.Replicas).To(Equal(int32(3)))
})
})

func mocoScheme() *runtime.Scheme {
scheme := runtime.NewScheme()
Expect(apiv2.AddToScheme(scheme)).To(Succeed())
Expect(mocov1beta2.AddToScheme(scheme)).To(Succeed())
Expect(corev1.AddToScheme(scheme)).To(Succeed())
return scheme
}

Expand Down
57 changes: 52 additions & 5 deletions internal/controller/infra/managed/mysql/moco/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"

mocov1beta2 "github.com/cybozu-go/moco/api/v1beta2"
apiv2 "github.com/wandb/operator/api/v2"
"github.com/wandb/operator/internal/controller/common"
"github.com/wandb/operator/internal/logx"
corev1 "k8s.io/api/core/v1"
Expand All @@ -18,6 +19,12 @@ import (
const (
ResourceTypeName = "InnoDBCluster"
AppConnTypeName = "MySQLAppConn"

// InvalidReplicaCountReason: manifest sizing yielded a replica count Moco rejects.
InvalidReplicaCountReason = "InvalidReplicaCount"

// ScaleDownUnsupportedReason: a reconcile would shrink the running cluster, which Moco forbids.
ScaleDownUnsupportedReason = "ScaleDownUnsupported"
)

func WriteState(
Expand Down Expand Up @@ -62,6 +69,44 @@ func WriteState(
desired.Spec.ServerIDBase = actual.Spec.ServerIDBase
}

// Sizing is resolved from the manifest at reconcile time, after the CR
// admission webhook runs, so a bad value reaches here unvalidated.
if !apiv2.ValidMysqlReplicaCount(desired.Spec.Replicas) {
return []metav1.Condition{
{
Type: common.ReconciledType,
Status: metav1.ConditionFalse,
Reason: InvalidReplicaCountReason,
Message: fmt.Sprintf(
"manifest sizing produced %d MySQL replicas; Moco requires a positive odd number",
desired.Spec.Replicas,
),
},
}
}

// Backstop for the webhook's scale-down check: the webhook only sees the CR
// (explicit edits), not the manifest-resolved count or the live cluster.
// Catch those shrinks here instead of letting Moco reject them opaquely.
if actual != nil && desired.Spec.Replicas < actual.Spec.Replicas {
return []metav1.Condition{
{
Type: common.ReconciledType,
Status: metav1.ConditionFalse,
Reason: ScaleDownUnsupportedReason,
Message: fmt.Sprintf(
"cannot scale managed MySQL down from %d to %d replicas; Moco does not support in-place replica reduction (use its manual stop-clustering procedure)",
actual.Spec.Replicas, desired.Spec.Replicas,
),
},
{
Type: MySQLCustomResourceType,
Status: metav1.ConditionTrue,
Reason: common.ResourceExistsReason,
},
}
}

result := make([]metav1.Condition, 0)

if confMap != nil {
Expand Down Expand Up @@ -137,10 +182,11 @@ func WriteState(
return result
}

// ensurePVCLabels patches any PVCs belonging to the moco cluster that are
// missing the wandb labels. PVCs are identified by the name prefix
// "datadir-<clusterName>-" since the moco-operator creates them via
// StatefulSet volumeClaimTemplates and may not propagate custom labels.
// ensurePVCLabels stamps the wandb labels onto Moco's PVCs (missing because Moco
// doesn't propagate them through its StatefulSet volumeClaimTemplates), so
// purgeAssociatedResources can select them by label on teardown. Moco names PVCs
// "<dataVolumeName>-<cluster.PrefixedName()>-<ordinal>" (see Moco pvc.go); the
// prefix is built from those same sources so it can't drift from upstream.
func ensurePVCLabels(
ctx context.Context,
cl client.Client,
Expand All @@ -149,7 +195,8 @@ func ensurePVCLabels(
labels map[string]string,
) error {
log := logx.GetSlog(ctx)
prefix := fmt.Sprintf("datadir-%s-", clusterName)
cluster := &mocov1beta2.MySQLCluster{ObjectMeta: metav1.ObjectMeta{Name: clusterName}}
prefix := fmt.Sprintf("%s-%s-", dataVolumeName, cluster.PrefixedName())

pvcList := &corev1.PersistentVolumeClaimList{}
if err := cl.List(ctx, pvcList, &client.ListOptions{Namespace: namespace}); err != nil {
Expand Down
29 changes: 28 additions & 1 deletion internal/webhook/v2/weightsandbiases_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ func validateChanges(_ context.Context, newWandb *appsv2.WeightsAndBiases, oldWa
var warnings admission.Warnings

allErrors = append(allErrors, validateRedisChanges(newWandb, oldWandb)...)
allErrors = append(allErrors, validateMySQLChanges(newWandb, oldWandb)...)

if len(allErrors) == 0 {
return warnings, nil
Expand All @@ -341,6 +342,32 @@ func validateChanges(_ context.Context, newWandb *appsv2.WeightsAndBiases, oldWa
)
}

// validateMySQLChanges rejects an update that lowers an explicitly-set replica
// count. Moco does not support in-place replica reduction, so catch it at
// admission for immediate feedback. A size-driven change leaves replicas unset
// here (it is resolved from the manifest at reconcile), so this only covers a
// directly-edited count.
func validateMySQLChanges(newWandb, oldWandb *appsv2.WeightsAndBiases) field.ErrorList {
var errors field.ErrorList
mysqlPath := field.NewPath("spec").Child("mysql").Child("managedMysql")
newSpec := newWandb.Spec.MySQL.ManagedMysql
oldSpec := oldWandb.Spec.MySQL.ManagedMysql

if newSpec == nil || oldSpec == nil {
return errors
}

if oldSpec.Replicas != 0 && newSpec.Replicas != 0 && newSpec.Replicas < oldSpec.Replicas {
errors = append(errors, field.Invalid(
mysqlPath.Child("replicas"),
newSpec.Replicas,
"replicas cannot be decreased; Moco does not support in-place replica reduction (use its manual stop-clustering procedure)",
))
}

return errors
}

func validateMySQLSpec(wandb *appsv2.WeightsAndBiases) field.ErrorList {
var errors field.ErrorList
mysqlPath := field.NewPath("spec").Child("mysql")
Expand All @@ -353,7 +380,7 @@ func validateMySQLSpec(wandb *appsv2.WeightsAndBiases) field.ErrorList {
))
}
if spec := wandb.Spec.MySQL.ManagedMysql; spec != nil {
if spec.Replicas != 0 && spec.Replicas%2 == 0 {
if spec.Replicas != 0 && !appsv2.ValidMysqlReplicaCount(spec.Replicas) {
errors = append(errors, field.Invalid(
mysqlPath.Child("managedMysql").Child("replicas"),
spec.Replicas,
Expand Down
18 changes: 18 additions & 0 deletions internal/webhook/v2/weightsandbiases_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,24 @@ var _ = Describe("WeightsAndBiases Webhook", func() {
Expect(warnings).To(BeEmpty())
})

It("rejects decreasing managed MySQL replicas on update", func() {
oldObj.Spec.MySQL.ManagedMysql = &appsv2.ManagedMysqlSpec{Replicas: 3}
obj.Spec.MySQL.ManagedMysql = &appsv2.ManagedMysqlSpec{Replicas: 1}

_, err := validator.ValidateUpdate(ctx, oldObj, obj)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("replicas cannot be decreased"))
})

It("allows increasing managed MySQL replicas on update", func() {
oldObj.Spec.MySQL.ManagedMysql = &appsv2.ManagedMysqlSpec{Replicas: 1}
obj.Spec.MySQL.ManagedMysql = &appsv2.ManagedMysqlSpec{Replicas: 3}

warnings, err := validator.ValidateUpdate(ctx, oldObj, obj)
Expect(err).NotTo(HaveOccurred())
Expect(warnings).To(BeEmpty())
})

It("rejects gatewayAPI config when mode is ingress", func() {
obj.Spec.Networking.Mode = appsv2.NetworkingModeIngress
obj.Spec.Networking.GatewayAPI = &appsv2.GatewayAPIConfig{
Expand Down
2 changes: 1 addition & 1 deletion pkg/wandb/manifest/load_manifest_from_files_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ var _ = Describe("LoadManifestFromFiles", func() {

// Check that MySQL sizing was loaded
Expect(m.Mysql["default"].Sizing["default"].Replicas).To(Equal(int32(1)))
Expect(m.Mysql["default"].Sizing["micro"].Replicas).To(Equal(int32(2)))
Expect(m.Mysql["default"].Sizing["micro"].Replicas).To(Equal(int32(3)))

// Check that Redis sizing was loaded
Expect(m.Redis["default"].Sizing["default"].Replicas).To(Equal(int32(1)))
Expand Down
Loading