From 0d4efa37a19a4b19b446a141bec099781413c8d7 Mon Sep 17 00:00:00 2001 From: Adrian Fernandez De La Torre Date: Sat, 14 Mar 2026 18:30:03 +0100 Subject: [PATCH] Migrate event recorder to fluxcd/pkg/runtime/events.Recorder - Replace kuberecorder.EventRecorder with events.Recorder to support structured event metadata including source object and action type - Upgrade event API from event/v1beta1 to event/v1 - Add action parameter (Reconciled, Failed, Waiting, Applied, Deleted, Progressing) to all event calls for richer event semantics - Pass source reference through reconcile, apply, prune, checkHealth, and finalize methods - Update tests to use events.Recorder Signed-off-by: Adrian Fernandez De La Torre --- .../controller/kustomization_controller.go | 71 ++++++++++++------- .../kustomization_controller_test.go | 4 +- internal/controller/suite_test.go | 3 +- 3 files changed, 51 insertions(+), 27 deletions(-) diff --git a/internal/controller/kustomization_controller.go b/internal/controller/kustomization_controller.go index 93fbcfbf..170d4372 100644 --- a/internal/controller/kustomization_controller.go +++ b/internal/controller/kustomization_controller.go @@ -37,7 +37,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" kerrors "k8s.io/apimachinery/pkg/util/errors" - kuberecorder "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -47,7 +46,7 @@ import ( "github.com/fluxcd/cli-utils/pkg/kstatus/polling/engine" "github.com/fluxcd/cli-utils/pkg/object" apiacl "github.com/fluxcd/pkg/apis/acl" - eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" + eventv1 "github.com/fluxcd/pkg/apis/event/v1" "github.com/fluxcd/pkg/apis/meta" "github.com/fluxcd/pkg/auth" authutils "github.com/fluxcd/pkg/auth/utils" @@ -59,6 +58,7 @@ import ( runtimeClient "github.com/fluxcd/pkg/runtime/client" "github.com/fluxcd/pkg/runtime/conditions" runtimeCtrl "github.com/fluxcd/pkg/runtime/controller" + "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/jitter" "github.com/fluxcd/pkg/runtime/patch" "github.com/fluxcd/pkg/runtime/statusreaders" @@ -85,7 +85,7 @@ import ( // KustomizationReconciler reconciles a Kustomization object type KustomizationReconciler struct { client.Client - kuberecorder.EventRecorder + EventRecorder *events.Recorder runtimeCtrl.Metrics // Kubernetes options @@ -135,6 +135,18 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques // Initialize the runtime patcher with the current version of the object. patcher := patch.NewSerialPatcher(obj, r.Client) + var src sourcev1.Source + // src := &metav1.PartialObjectMetadata{ + // TypeMeta: metav1.TypeMeta{ + // Kind: obj.Spec.SourceRef.Kind, + // APIVersion: obj.Spec.SourceRef.APIVersion, + // }, + // ObjectMeta: metav1.ObjectMeta{ + // Name: obj.Spec.SourceRef.Name, + // Namespace: obj.Spec.SourceRef.Namespace, + // }, + // } + // Finalise the reconciliation and report the results. defer func() { // Patch finalizers, status and conditions. @@ -156,7 +168,8 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques time.Since(reconcileStart).String(), obj.Spec.Interval.Duration.String()) log.Info(msg, "revision", obj.Status.LastAttemptedRevision) - r.event(obj, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, eventv1.EventSeverityInfo, msg, + r.event(obj, &src, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, eventv1.EventSeverityInfo, + eventv1.ActionReconciled, msg, map[string]string{ kustomizev1.GroupVersion.Group + "/" + eventv1.MetaCommitStatusKey: eventv1.MetaCommitStatusUpdateValue, }) @@ -165,7 +178,7 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques // Prune managed resources if the object is under deletion. if !obj.ObjectMeta.DeletionTimestamp.IsZero() { - return r.finalize(ctx, obj) + return r.finalize(ctx, obj, &src) } // Add finalizer first if it doesn't exist to avoid the race condition @@ -190,7 +203,7 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques conditions.MarkFalse(obj, meta.ReadyCondition, meta.InvalidCELExpressionReason, "%s", errMsg) conditions.MarkStalled(obj, meta.InvalidCELExpressionReason, "%s", errMsg) obj.Status.ObservedGeneration = obj.Generation - r.event(obj, "", "", eventv1.EventSeverityError, errMsg, nil) + r.event(obj, &src, "", "", eventv1.EventSeverityError, eventv1.ActionFailed, errMsg, nil) return ctrl.Result{}, reconcile.TerminalError(err) } @@ -202,7 +215,7 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques conditions.MarkFalse(obj, meta.ReadyCondition, meta.FeatureGateDisabledReason, msgFmt, gate) conditions.MarkStalled(obj, meta.FeatureGateDisabledReason, msgFmt, gate) log.Error(auth.ErrObjectLevelWorkloadIdentityNotEnabled, msg) - r.event(obj, "", "", eventv1.EventSeverityError, msg, nil) + r.event(obj, &src, "", "", eventv1.EventSeverityError, eventv1.ActionFailed, msg, nil) return ctrl.Result{}, nil } @@ -220,7 +233,7 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques if acl.IsAccessDenied(err) { conditions.MarkFalse(obj, meta.ReadyCondition, apiacl.AccessDeniedReason, "%s", err) conditions.MarkStalled(obj, apiacl.AccessDeniedReason, "%s", err) - r.event(obj, "", "", eventv1.EventSeverityError, err.Error(), nil) + r.event(obj, &src, "", "", eventv1.EventSeverityError, eventv1.ActionFailed, err.Error(), nil) return ctrl.Result{}, reconcile.TerminalError(err) } @@ -247,7 +260,7 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques conditions.MarkFalse(obj, meta.ReadyCondition, meta.InvalidCELExpressionReason, "%s", errMsg) conditions.MarkStalled(obj, meta.InvalidCELExpressionReason, "%s", errMsg) obj.Status.ObservedGeneration = obj.Generation - r.event(obj, revision, originRevision, eventv1.EventSeverityError, errMsg, nil) + r.event(obj, &artifactSource, revision, originRevision, eventv1.EventSeverityError, eventv1.ActionFailed, errMsg, nil) return ctrl.Result{}, err } @@ -255,7 +268,7 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques conditions.MarkFalse(obj, meta.ReadyCondition, meta.DependencyNotReadyReason, "%s", err) msg := fmt.Sprintf("Dependencies do not meet ready condition, retrying in %s", r.DependencyRequeueInterval.String()) log.Info(msg) - r.event(obj, revision, originRevision, eventv1.EventSeverityInfo, msg, nil) + r.event(obj, &artifactSource, revision, originRevision, eventv1.EventSeverityInfo, eventv1.ActionWaiting, msg, nil) return ctrl.Result{RequeueAfter: r.DependencyRequeueInterval}, nil } log.Info("All dependencies are ready, proceeding with reconciliation") @@ -279,7 +292,7 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques meta.HealthCheckCanceledReason, "New reconciliation triggered by %s/%s/%s", qes.Kind, qes.Namespace, qes.Name) ctrl.LoggerFrom(ctx).Info("New reconciliation triggered, canceling health checks", "trigger", qes) - r.event(obj, revision, originRevision, eventv1.EventSeverityInfo, + r.event(obj, &src, revision, originRevision, eventv1.EventSeverityInfo, eventv1.ActionProgressing, fmt.Sprintf("Health checks canceled due to new reconciliation triggered by %s/%s/%s", qes.Kind, qes.Namespace, qes.Name), nil) return ctrl.Result{}, nil @@ -292,7 +305,7 @@ func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Reques obj.GetRetryInterval().String()), "revision", revision) - r.event(obj, revision, originRevision, eventv1.EventSeverityError, + r.event(obj, &src, revision, originRevision, eventv1.EventSeverityError, eventv1.ActionFailed, reconcileErr.Error(), nil) return ctrl.Result{RequeueAfter: obj.GetRetryInterval()}, nil } @@ -462,7 +475,7 @@ func (r *KustomizationReconciler) reconcile( } // Validate and apply resources in stages. - drifted, changeSetWithSkipped, err := r.apply(ctx, resourceManager, obj, revision, originRevision, objects) + drifted, changeSetWithSkipped, err := r.apply(ctx, resourceManager, obj, &src, revision, originRevision, objects) if err != nil { obj.Status.History.Upsert(checksum, time.Now(), time.Since(reconcileStart), meta.ReconciliationFailedReason, historyMeta) conditions.MarkFalse(obj, meta.ReadyCondition, meta.ReconciliationFailedReason, "%s", err) @@ -501,7 +514,7 @@ func (r *KustomizationReconciler) reconcile( } // Run garbage collection for stale resources that do not have pruning disabled. - if _, err := r.prune(ctx, resourceManager, obj, revision, originRevision, staleObjects); err != nil { + if _, err := r.prune(ctx, resourceManager, obj, &src, revision, originRevision, staleObjects); err != nil { obj.Status.History.Upsert(checksum, time.Now(), time.Since(reconcileStart), meta.PruneFailedReason, historyMeta) conditions.MarkFalse(obj, meta.ReadyCondition, meta.PruneFailedReason, "%s", err) return err @@ -513,6 +526,7 @@ func (r *KustomizationReconciler) reconcile( resourceManager, patcher, obj, + &src, revision, originRevision, isNewRevision, @@ -836,6 +850,7 @@ func (r *KustomizationReconciler) build(ctx context.Context, func (r *KustomizationReconciler) apply(ctx context.Context, manager *ssa.ResourceManager, obj *kustomizev1.Kustomization, + src *sourcev1.Source, revision string, originRevision string, objects []*unstructured.Unstructured) (bool, *ssa.ChangeSet, error) { @@ -960,7 +975,7 @@ func (r *KustomizationReconciler) apply(ctx context.Context, // emit event only if the server-side apply resulted in changes applyLog := strings.TrimSuffix(changeSetLog.String(), "\n") if applyLog != "" { - r.event(obj, revision, originRevision, eventv1.EventSeverityInfo, applyLog, nil) + r.event(obj, src, revision, originRevision, eventv1.EventSeverityInfo, eventv1.ActionApplied, applyLog, nil) } return applyLog != "", resultSet, nil @@ -970,6 +985,7 @@ func (r *KustomizationReconciler) checkHealth(ctx context.Context, manager *ssa.ResourceManager, patcher *patch.SerialPatcher, obj *kustomizev1.Kustomization, + src *sourcev1.Source, revision string, originRevision string, isNewRevision bool, @@ -1036,7 +1052,7 @@ func (r *KustomizationReconciler) checkHealth(ctx context.Context, // Emit recovery event if the previous health check failed. msg := fmt.Sprintf("Health check passed in %s", time.Since(checkStart).String()) if !wasHealthy || (isNewRevision && drifted) { - r.event(obj, revision, originRevision, eventv1.EventSeverityInfo, msg, nil) + r.event(obj, src, revision, originRevision, eventv1.EventSeverityInfo, eventv1.ActionReconciled, msg, nil) } conditions.MarkTrue(obj, meta.HealthyCondition, meta.SucceededReason, "%s", msg) @@ -1050,6 +1066,7 @@ func (r *KustomizationReconciler) checkHealth(ctx context.Context, func (r *KustomizationReconciler) prune(ctx context.Context, manager *ssa.ResourceManager, obj *kustomizev1.Kustomization, + src *sourcev1.Source, revision string, originRevision string, objects []*unstructured.Unstructured) (bool, error) { @@ -1076,7 +1093,7 @@ func (r *KustomizationReconciler) prune(ctx context.Context, // emit event only if the prune operation resulted in changes if changeSet != nil && len(changeSet.Entries) > 0 { log.Info(fmt.Sprintf("garbage collection completed: %s", changeSet.String())) - r.event(obj, revision, originRevision, eventv1.EventSeverityInfo, changeSet.String(), nil) + r.event(obj, src, revision, originRevision, eventv1.EventSeverityInfo, eventv1.ActionDeleted, changeSet.String(), nil) return true, nil } @@ -1114,7 +1131,7 @@ func finalizerShouldDeleteResources(obj *kustomizev1.Kustomization) bool { // If the service account used for impersonation is no longer available or if a timeout occurs // while waiting for resources to be terminated, an error is logged and the finalizer is removed. func (r *KustomizationReconciler) finalize(ctx context.Context, - obj *kustomizev1.Kustomization) (ctrl.Result, error) { + obj *kustomizev1.Kustomization, src *sourcev1.Source) (ctrl.Result, error) { log := ctrl.LoggerFrom(ctx) if finalizerShouldDeleteResources(obj) { objects, _ := inventory.List(obj.Status.Inventory) @@ -1164,14 +1181,16 @@ func (r *KustomizationReconciler) finalize(ctx context.Context, changeSet, err := resourceManager.DeleteAll(ctx, objects, opts) if err != nil { - r.event(obj, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, eventv1.EventSeverityError, "pruning for deleted resource failed", nil) + r.event(obj, src, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, + eventv1.EventSeverityError, eventv1.ActionFailed, "pruning for deleted resource failed", nil) // Return the error so we retry the failed garbage collection return ctrl.Result{}, err } if changeSet != nil && len(changeSet.Entries) > 0 { // Emit event with the resources marked for deletion. - r.event(obj, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, eventv1.EventSeverityInfo, changeSet.String(), nil) + r.event(obj, src, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, + eventv1.EventSeverityInfo, eventv1.ActionDeleted, changeSet.String(), nil) // Wait for the resources marked for deletion to be terminated. if obj.GetDeletionPolicy() == kustomizev1.DeletionPolicyWaitForTermination { @@ -1182,7 +1201,8 @@ func (r *KustomizationReconciler) finalize(ctx context.Context, // Emit an event and log the error if a timeout occurs. msg := "failed to wait for resources termination" log.Error(err, msg) - r.event(obj, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, eventv1.EventSeverityError, msg, nil) + r.event(obj, src, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, + eventv1.EventSeverityError, eventv1.ActionFailed, msg, nil) } } } @@ -1190,7 +1210,8 @@ func (r *KustomizationReconciler) finalize(ctx context.Context, // when the account to impersonate is gone, log the stale objects and continue with the finalization msg := fmt.Sprintf("unable to prune objects: \n%s", ssautil.FmtUnstructuredList(objects)) log.Error(fmt.Errorf("skiping pruning, failed to find account to impersonate"), msg) - r.event(obj, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, eventv1.EventSeverityError, msg, nil) + r.event(obj, src, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, + eventv1.EventSeverityError, eventv1.ActionFailed, msg, nil) } } @@ -1207,7 +1228,9 @@ func (r *KustomizationReconciler) finalize(ctx context.Context, } func (r *KustomizationReconciler) event(obj *kustomizev1.Kustomization, - revision, originRevision, severity, msg string, + src *sourcev1.Source, + revision, originRevision, severity, action string, + msg string, metadata map[string]string) { if metadata == nil { metadata = map[string]string{} @@ -1229,7 +1252,7 @@ func (r *KustomizationReconciler) event(obj *kustomizev1.Kustomization, eventType = corev1.EventTypeWarning } - r.EventRecorder.AnnotatedEventf(obj, metadata, eventType, reason, msg) + r.EventRecorder.AnnotatedEventf(obj, *src, metadata, eventType, reason, action, "%s", msg) } func (r *KustomizationReconciler) finalizeStatus(ctx context.Context, diff --git a/internal/controller/kustomization_controller_test.go b/internal/controller/kustomization_controller_test.go index b3672abe..bbf2cf1e 100644 --- a/internal/controller/kustomization_controller_test.go +++ b/internal/controller/kustomization_controller_test.go @@ -23,13 +23,13 @@ import ( "time" "github.com/fluxcd/pkg/apis/meta" + "github.com/fluxcd/pkg/runtime/events" sourcev1 "github.com/fluxcd/source-controller/api/v1" . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -130,7 +130,7 @@ func TestKustomizationReconciler_deleteBeforeFinalizer(t *testing.T) { r := &KustomizationReconciler{ Client: k8sClient, - EventRecorder: record.NewFakeRecorder(32), + EventRecorder: &events.Recorder{EventRecorder: testEnv.GetEventRecorder("kustomization-controller")}, } // NOTE: Only a real API server responds with an error in this scenario. _, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: client.ObjectKeyFromObject(kustomization)}) diff --git a/internal/controller/suite_test.go b/internal/controller/suite_test.go index 470d549c..a45705c6 100644 --- a/internal/controller/suite_test.go +++ b/internal/controller/suite_test.go @@ -44,6 +44,7 @@ import ( "github.com/fluxcd/pkg/runtime/conditions" kcheck "github.com/fluxcd/pkg/runtime/conditions/check" "github.com/fluxcd/pkg/runtime/controller" + "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/metrics" "github.com/fluxcd/pkg/runtime/testenv" "github.com/fluxcd/pkg/testserver" @@ -179,7 +180,7 @@ func TestMain(m *testing.M) { Client: testEnv, Mapper: testEnv.GetRESTMapper(), APIReader: testEnv, - EventRecorder: testEnv.GetEventRecorderFor(controllerName), + EventRecorder: &events.Recorder{EventRecorder: testEnv.GetEventRecorder(controllerName)}, Metrics: testMetricsH, DependencyRequeueInterval: 2 * time.Second, ConcurrentSSA: 4,