Skip to content
This repository was archived by the owner on Dec 4, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 133 additions & 9 deletions pkg/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ const (
// Limit is required to avoid memory spikes during cache initialization.
// The default limit of 50 is chosen based on experiments.
defaultListSemaphoreWeight = 50
// defaultEventProcessingInterval is the default interval for processing events
defaultEventProcessingInterval = 100 * time.Millisecond
)

const (
Expand All @@ -75,6 +77,11 @@ type apiMeta struct {
watchCancel context.CancelFunc
}

type eventMeta struct {
event watch.EventType
un *unstructured.Unstructured
}

// ClusterInfo holds cluster cache stats
type ClusterInfo struct {
// Server holds cluster API server URL
Expand All @@ -96,6 +103,9 @@ type ClusterInfo struct {
// OnEventHandler is a function that handles Kubernetes event
type OnEventHandler func(event watch.EventType, un *unstructured.Unstructured)

// OnProcessEventsHandler handles process events event
type OnProcessEventsHandler func(duration time.Duration, processedEventsNumber int)

// OnPopulateResourceInfoHandler returns additional resource metadata that should be stored in cache
type OnPopulateResourceInfoHandler func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool)

Expand Down Expand Up @@ -137,6 +147,8 @@ type ClusterCache interface {
OnResourceUpdated(handler OnResourceUpdatedHandler) Unsubscribe
// OnEvent register event handler that is executed every time when new K8S event received
OnEvent(handler OnEventHandler) Unsubscribe
// OnProcessEventsHandler register event handler that is executed every time when events were processed
OnProcessEventsHandler(handler OnProcessEventsHandler) Unsubscribe
}

type WeightedSemaphore interface {
Expand All @@ -153,6 +165,7 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa
cache := &clusterCache{
settings: Settings{ResourceHealthOverride: &noopSettings{}, ResourcesFilter: &noopSettings{}},
apisMeta: make(map[schema.GroupKind]*apiMeta),
eventMetaCh: nil,
listPageSize: defaultListPageSize,
listPageBufferSize: defaultListPageBufferSize,
listSemaphore: semaphore.NewWeighted(defaultListSemaphoreWeight),
Expand All @@ -169,8 +182,10 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa
},
watchResyncTimeout: defaultWatchResyncTimeout,
clusterSyncRetryTimeout: ClusterRetryTimeout,
eventProcessingInterval: defaultEventProcessingInterval,
resourceUpdatedHandlers: map[uint64]OnResourceUpdatedHandler{},
eventHandlers: map[uint64]OnEventHandler{},
processEventsHandlers: map[uint64]OnProcessEventsHandler{},
log: log,
listRetryLimit: 1,
listRetryUseBackoff: false,
Expand All @@ -185,16 +200,20 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa
type clusterCache struct {
syncStatus clusterCacheSync

apisMeta map[schema.GroupKind]*apiMeta
serverVersion string
apiResources []kube.APIResourceInfo
apisMeta map[schema.GroupKind]*apiMeta
batchEventsProcessing bool
eventMetaCh chan eventMeta
Comment on lines +204 to +205
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These need docstrings. I'd also rename to batchEventsProcessingEnabled to make clear that it's a feature flag.

serverVersion string
apiResources []kube.APIResourceInfo
// namespacedResources is a simple map which indicates a groupKind is namespaced
namespacedResources map[schema.GroupKind]bool

// maximum time we allow watches to run before relisting the group/kind and restarting the watch
watchResyncTimeout time.Duration
// sync retry timeout for cluster when sync error happens
clusterSyncRetryTimeout time.Duration
// ticker interval for events processing
eventProcessingInterval time.Duration

// size of a page for list operations pager.
listPageSize int64
Expand Down Expand Up @@ -224,6 +243,7 @@ type clusterCache struct {
populateResourceInfoHandler OnPopulateResourceInfoHandler
resourceUpdatedHandlers map[uint64]OnResourceUpdatedHandler
eventHandlers map[uint64]OnEventHandler
processEventsHandlers map[uint64]OnProcessEventsHandler
openAPISchema openapi.Resources
gvkParser *managedfields.GvkParser

Expand Down Expand Up @@ -299,6 +319,29 @@ func (c *clusterCache) getEventHandlers() []OnEventHandler {
return handlers
}

// OnProcessEventsHandler register event handler that is executed every time when events were processed
func (c *clusterCache) OnProcessEventsHandler(handler OnProcessEventsHandler) Unsubscribe {
c.handlersLock.Lock()
defer c.handlersLock.Unlock()
key := c.handlerKey
c.handlerKey++
c.processEventsHandlers[key] = handler
return func() {
c.handlersLock.Lock()
defer c.handlersLock.Unlock()
delete(c.processEventsHandlers, key)
}
}
func (c *clusterCache) getProcessEventsHandlers() []OnProcessEventsHandler {
c.handlersLock.Lock()
defer c.handlersLock.Unlock()
handlers := make([]OnProcessEventsHandler, 0, len(c.processEventsHandlers))
for _, h := range c.processEventsHandlers {
handlers = append(handlers, h)
}
return handlers
}
Comment on lines +335 to +343
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not have the caller just use the map directly?


// GetServerVersion returns observed cluster version
func (c *clusterCache) GetServerVersion() string {
return c.serverVersion
Expand Down Expand Up @@ -440,6 +483,10 @@ func (c *clusterCache) Invalidate(opts ...UpdateSettingsFunc) {
for i := range opts {
opts[i](c)
}

if c.batchEventsProcessing {
c.invalidateEventMeta()
}
c.apisMeta = nil
c.namespacedResources = nil
c.log.Info("Invalidated cluster")
Expand Down Expand Up @@ -669,7 +716,7 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
return fmt.Errorf("Failed to convert to *unstructured.Unstructured: %v", event.Object)
}

c.processEvent(event.Type, obj)
c.recordEvent(event.Type, obj)
if kube.IsCRD(obj) {
var resources []kube.APIResourceInfo
crd := v1.CustomResourceDefinition{}
Expand Down Expand Up @@ -823,6 +870,12 @@ func (c *clusterCache) sync() error {
for i := range c.apisMeta {
c.apisMeta[i].watchCancel()
}

if c.batchEventsProcessing {
c.invalidateEventMeta()
c.eventMetaCh = make(chan eventMeta)
}

c.apisMeta = make(map[schema.GroupKind]*apiMeta)
c.resources = make(map[kube.ResourceKey]*Resource)
c.namespacedResources = make(map[schema.GroupKind]bool)
Expand Down Expand Up @@ -864,6 +917,10 @@ func (c *clusterCache) sync() error {
return err
}

if c.batchEventsProcessing {
go c.processEvents()
}

// Each API is processed in parallel, so we need to take out a lock when we update clusterCache fields.
lock := sync.Mutex{}
err = kube.RunAllAsync(len(apis), func(i int) error {
Expand Down Expand Up @@ -926,6 +983,14 @@ func (c *clusterCache) sync() error {
return nil
}

// invalidateEventMeta closes the eventMeta channel if it is open
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It looks like some functions, like this one, have an assumption that c.lock is held when they are called. In my experience that can lead to bugs with future maintenance when someone doesn't realize a lock is needed to call the function. I personally name functions in a way that this is indicated, but that is certainly a personal style choice. At the very least, it might be worth indicating the lock requirement in the go doc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mpelekh can you update the go doc?

func (c *clusterCache) invalidateEventMeta() {
if c.eventMetaCh != nil {
close(c.eventMetaCh)
c.eventMetaCh = nil
}
}

// EnsureSynced checks cache state and synchronizes it if necessary
func (c *clusterCache) EnsureSynced() error {
syncStatus := &c.syncStatus
Expand Down Expand Up @@ -1231,7 +1296,7 @@ func (c *clusterCache) GetManagedLiveObjs(targetObjs []*unstructured.Unstructure
return managedObjs, nil
}

func (c *clusterCache) processEvent(event watch.EventType, un *unstructured.Unstructured) {
func (c *clusterCache) recordEvent(event watch.EventType, un *unstructured.Unstructured) {
for _, h := range c.getEventHandlers() {
h(event, un)
}
Expand All @@ -1240,15 +1305,74 @@ func (c *clusterCache) processEvent(event watch.EventType, un *unstructured.Unst
return
}

if c.batchEventsProcessing {
c.eventMetaCh <- eventMeta{event, un}
} else {
c.lock.Lock()
defer c.lock.Unlock()
c.processEvent(key, eventMeta{event, un})
}
}

func (c *clusterCache) processEvents() {
log := c.log.WithValues("functionName", "processItems")
log.V(1).Info("Start processing events")

c.lock.Lock()
defer c.lock.Unlock()
ch := c.eventMetaCh
c.lock.Unlock()

eventMetas := make([]eventMeta, 0)
ticker := time.NewTicker(c.eventProcessingInterval)
defer ticker.Stop()

for {
select {
case evMeta, ok := <-ch:
if !ok {
log.V(2).Info("Event processing channel closed, finish processing")
return
}
eventMetas = append(eventMetas, evMeta)
case <-ticker.C:
if len(eventMetas) > 0 {
c.processEventsBatch(eventMetas)
eventMetas = eventMetas[:0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why assign eventMetas[:0] instead of nil or empty slice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for review @andrii-korotkov-verkada

Setting eventMetas to eventMetas[:0] retains the original slice’s underlying array, which can help reduce memory allocations if the slice is reused frequently. Assigning nil would release the underlying array, and creating a new empty slice would lead to additional allocations. This approach allows us to efficiently clear the slice while keeping the capacity intact for future use.

}
}
}
}

func (c *clusterCache) processEventsBatch(eventMetas []eventMeta) {
log := c.log.WithValues("functionName", "processEventsBatch")
start := time.Now()
c.lock.Lock()
log.V(1).Info("Lock acquired (ms)", "duration", time.Since(start).Milliseconds())
defer func() {
c.lock.Unlock()
duration := time.Since(start)
// Update the metric with the duration of the events processing
for _, handler := range c.getProcessEventsHandlers() {
handler(duration, len(eventMetas))
}
}()

for _, evMeta := range eventMetas {
key := kube.GetResourceKey(evMeta.un)
c.processEvent(key, evMeta)
}

log.V(1).Info("Processed events (ms)", "count", len(eventMetas), "duration", time.Since(start).Milliseconds())
}

func (c *clusterCache) processEvent(key kube.ResourceKey, evMeta eventMeta) {
existingNode, exists := c.resources[key]
if event == watch.Deleted {
if evMeta.event == watch.Deleted {
if exists {
c.onNodeRemoved(key)
}
} else if event != watch.Deleted {
c.onNodeUpdated(existingNode, c.newResource(un))
} else {
c.onNodeUpdated(existingNode, c.newResource(evMeta.un))
}
}

Expand Down
Loading
Loading