Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ require (
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/common v0.66.1 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.uber.org/automaxprocs v1.6.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
126 changes: 116 additions & 10 deletions pkg/fsm/events/events.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,31 @@
// Package events provides Kubernetes event recording
//
// The events package offers a high-level interface for recording Kubernetes events
// with deduplication to optionally prevent spam and reduce noise in event logs.
//
// # Basic Usage
//
// Create an EventRecorder using NewEventRecorder:
//
// eventRecorder := events.NewEventRecorder("my-controller", manager, metrics)
//
// Record different types of events:
//
// // Record a ready event (always deduplicated)
// eventRecorder.RecordReady(obj, "Object is now ready")
//
// // Record a normal event with optional deduplication
// eventRecorder.RecordEvent(obj, "ProcessingComplete", "Processing finished successfully", true)
//
// // Record a warning event with optional deduplication
// eventRecorder.RecordWarning(obj, "ValidationFailed", "Invalid configuration detected", true)
package events

import (
"context"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -20,29 +45,70 @@ type EventRecorder struct {
metrics *metrics.Metrics

controllerName string

client client.Client
scheme *runtime.Scheme
}

// NewEventRecorder creates a new EventRecorder for the given controller and manager.
// Metrics is optional and can be nil. If provided, it will be used to emit metrics for each event.
//
// The EventRecorder provides a high-level interface for recording Kubernetes events.
//
// Parameters:
// - controllerName: Name of the controller (used for event attribution)
// - manager: Controller-runtime manager
// - metrics: Optional metrics recorder (can be nil to disable metrics)
//
// Returns a configured EventRecorder ready for use.
func NewEventRecorder(controllerName string, manager ctrl.Manager, metrics *metrics.Metrics) *EventRecorder {
return &EventRecorder{recorder: manager.GetEventRecorderFor(controllerName), metrics: metrics, controllerName: controllerName}
return &EventRecorder{
recorder: manager.GetEventRecorderFor(controllerName),
metrics: metrics,
controllerName: controllerName,
client: manager.GetClient(),
scheme: manager.GetScheme(),
}
}

// RecordReady records a ready event for the given object.
// message is optional and defaults to "Object is ready".
//
// This method always enables deduplication to prevent spam of "ready" events.
// If message is empty, it defaults to "Object is ready".
//
// Parameters:
// - obj: The Kubernetes object to record the event for
// - message: Optional message (defaults to "Object is ready" if empty)
//
// Example:
//
// eventRecorder.RecordReady(pod, "Pod is ready to serve traffic")
func (e *EventRecorder) RecordReady(obj client.Object, message string) {
if message == "" {
message = "Object is ready"
}
e.recorder.Event(obj, eventTypeNormal, eventReadyReason, message)

if e.metrics != nil {
e.metrics.RecordEvent(obj.GetObjectKind().GroupVersionKind(), obj.GetName(), obj.GetNamespace(), eventTypeNormal, eventReadyReason, e.controllerName)
}
e.RecordEvent(obj, eventReadyReason, message, true)
}

// RecordWarning records a warning event for the given object.
func (e *EventRecorder) RecordWarning(obj client.Object, reason string, message string) {
//
// Warning events indicate errors, failures, or problematic conditions.
//
// Parameters:
// - obj: The Kubernetes object to record the event for
// - reason: The reason for the warning (e.g., "ValidationFailed", "ResourceNotFound")
// - message: Descriptive message explaining the warning
// - deduplicationEnabled: If true, identical events will not be recorded
//
// Example:
//
// eventRecorder.RecordWarning(pod, "ImagePullFailed", "Failed to pull image: nginx:latest", true)
func (e *EventRecorder) RecordWarning(obj client.Object, reason string, message string, deduplicationEnabled bool) {
if deduplicationEnabled {
if e.isDuplicateEventForObject(obj, eventTypeWarning, reason, message) {
return // Skip recording duplicate event
}
}

e.recorder.Event(obj, eventTypeWarning, reason, message)

if e.metrics != nil {
Expand All @@ -51,10 +117,50 @@ func (e *EventRecorder) RecordWarning(obj client.Object, reason string, message
}

// RecordEvent records a normal event for the given object.
func (e *EventRecorder) RecordEvent(obj client.Object, reason string, message string) {
//
// Normal events indicate successful operations, state changes, or informational updates.
//
// Parameters:
// - obj: The Kubernetes object to record the event for
// - reason: The reason for the event (e.g., "ProcessingComplete", "ConfigUpdated")
// - message: Descriptive message explaining the event
// - deduplicationEnabled: If true, identical events will not be recorded
//
// Example:
//
// eventRecorder.RecordEvent(deployment, "ScalingComplete", "Scaled to 3 replicas", true)
func (e *EventRecorder) RecordEvent(obj client.Object, reason string, message string, deduplicationEnabled bool) {
if deduplicationEnabled {
if e.isDuplicateEventForObject(obj, eventTypeNormal, reason, message) {
return // Skip recording duplicate event
}
}

e.recorder.Event(obj, eventTypeNormal, reason, message)

if e.metrics != nil {
e.metrics.RecordEvent(obj.GetObjectKind().GroupVersionKind(), obj.GetName(), obj.GetNamespace(), eventTypeNormal, reason, e.controllerName)
}
}

// isDuplicateEventForObject checks if an event with the same type, reason, and message already exists for the object.
func (e *EventRecorder) isDuplicateEventForObject(obj client.Object, eventType, reason, message string) bool {
// Get existing events using UID field selector
eventList := &corev1.EventList{}
fieldSelector := client.MatchingFields{
"involvedObject.uid": string(obj.GetUID()),
}
err := e.client.List(context.Background(), eventList, fieldSelector)
if err != nil {
// If we can't retrieve existing events, err on the side of caution and allow the event
return false
}

// Check if any existing event matches the type, reason, and message
for _, event := range eventList.Items {
if event.Type == eventType && event.Reason == reason && event.Message == message {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me think about this more deeply because I think we might want some additional complexity here or still require client to do some work. As an example if I am marking my object as ready in an event, I want to capture what metadata.generation the object is ready at. If the current generation is 5 but we marked it ready for generation 1, the event is eventually not useful.

More and more it feels like its up to the client to determine what the dedup logic is so if we are going to add to the sdk, I just want to think a bit more deeply. Curious on your thoughts as well

return true
}
}
return false
}
Loading