-
Notifications
You must be signed in to change notification settings - Fork 297
fix: avoid resources lock contention utilizing channel #629
Changes from all commits
701dbf3
4ec8f36
65a271a
d36c78a
af50e33
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -57,6 +57,8 @@ const ( | |
| // Limit is required to avoid memory spikes during cache initialization. | ||
| // The default limit of 50 is chosen based on experiments. | ||
| defaultListSemaphoreWeight = 50 | ||
| // defaultEventProcessingInterval is the default interval for processing events | ||
| defaultEventProcessingInterval = 100 * time.Millisecond | ||
| ) | ||
|
|
||
| const ( | ||
|
|
@@ -75,6 +77,11 @@ type apiMeta struct { | |
| watchCancel context.CancelFunc | ||
| } | ||
|
|
||
| type eventMeta struct { | ||
| event watch.EventType | ||
| un *unstructured.Unstructured | ||
| } | ||
|
|
||
| // ClusterInfo holds cluster cache stats | ||
| type ClusterInfo struct { | ||
| // Server holds cluster API server URL | ||
|
|
@@ -96,6 +103,9 @@ type ClusterInfo struct { | |
| // OnEventHandler is a function that handles Kubernetes event | ||
| type OnEventHandler func(event watch.EventType, un *unstructured.Unstructured) | ||
|
|
||
| // OnProcessEventsHandler handles process events event | ||
| type OnProcessEventsHandler func(duration time.Duration, processedEventsNumber int) | ||
|
|
||
| // OnPopulateResourceInfoHandler returns additional resource metadata that should be stored in cache | ||
| type OnPopulateResourceInfoHandler func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) | ||
|
|
||
|
|
@@ -137,6 +147,8 @@ type ClusterCache interface { | |
| OnResourceUpdated(handler OnResourceUpdatedHandler) Unsubscribe | ||
| // OnEvent register event handler that is executed every time when new K8S event received | ||
| OnEvent(handler OnEventHandler) Unsubscribe | ||
| // OnProcessEventsHandler register event handler that is executed every time when events were processed | ||
| OnProcessEventsHandler(handler OnProcessEventsHandler) Unsubscribe | ||
| } | ||
|
|
||
| type WeightedSemaphore interface { | ||
|
|
@@ -153,6 +165,7 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa | |
| cache := &clusterCache{ | ||
| settings: Settings{ResourceHealthOverride: &noopSettings{}, ResourcesFilter: &noopSettings{}}, | ||
| apisMeta: make(map[schema.GroupKind]*apiMeta), | ||
| eventMetaCh: nil, | ||
| listPageSize: defaultListPageSize, | ||
| listPageBufferSize: defaultListPageBufferSize, | ||
| listSemaphore: semaphore.NewWeighted(defaultListSemaphoreWeight), | ||
|
|
@@ -169,8 +182,10 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa | |
| }, | ||
| watchResyncTimeout: defaultWatchResyncTimeout, | ||
| clusterSyncRetryTimeout: ClusterRetryTimeout, | ||
| eventProcessingInterval: defaultEventProcessingInterval, | ||
| resourceUpdatedHandlers: map[uint64]OnResourceUpdatedHandler{}, | ||
| eventHandlers: map[uint64]OnEventHandler{}, | ||
| processEventsHandlers: map[uint64]OnProcessEventsHandler{}, | ||
| log: log, | ||
| listRetryLimit: 1, | ||
| listRetryUseBackoff: false, | ||
|
|
@@ -185,16 +200,20 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa | |
| type clusterCache struct { | ||
| syncStatus clusterCacheSync | ||
|
|
||
| apisMeta map[schema.GroupKind]*apiMeta | ||
| serverVersion string | ||
| apiResources []kube.APIResourceInfo | ||
| apisMeta map[schema.GroupKind]*apiMeta | ||
| batchEventsProcessing bool | ||
| eventMetaCh chan eventMeta | ||
| serverVersion string | ||
| apiResources []kube.APIResourceInfo | ||
| // namespacedResources is a simple map which indicates a groupKind is namespaced | ||
| namespacedResources map[schema.GroupKind]bool | ||
|
|
||
| // maximum time we allow watches to run before relisting the group/kind and restarting the watch | ||
| watchResyncTimeout time.Duration | ||
| // sync retry timeout for cluster when sync error happens | ||
| clusterSyncRetryTimeout time.Duration | ||
| // ticker interval for events processing | ||
| eventProcessingInterval time.Duration | ||
|
|
||
| // size of a page for list operations pager. | ||
| listPageSize int64 | ||
|
|
@@ -224,6 +243,7 @@ type clusterCache struct { | |
| populateResourceInfoHandler OnPopulateResourceInfoHandler | ||
| resourceUpdatedHandlers map[uint64]OnResourceUpdatedHandler | ||
| eventHandlers map[uint64]OnEventHandler | ||
| processEventsHandlers map[uint64]OnProcessEventsHandler | ||
| openAPISchema openapi.Resources | ||
| gvkParser *managedfields.GvkParser | ||
|
|
||
|
|
@@ -299,6 +319,29 @@ func (c *clusterCache) getEventHandlers() []OnEventHandler { | |
| return handlers | ||
| } | ||
|
|
||
| // OnProcessEventsHandler register event handler that is executed every time when events were processed | ||
| func (c *clusterCache) OnProcessEventsHandler(handler OnProcessEventsHandler) Unsubscribe { | ||
| c.handlersLock.Lock() | ||
| defer c.handlersLock.Unlock() | ||
| key := c.handlerKey | ||
| c.handlerKey++ | ||
| c.processEventsHandlers[key] = handler | ||
| return func() { | ||
| c.handlersLock.Lock() | ||
| defer c.handlersLock.Unlock() | ||
| delete(c.processEventsHandlers, key) | ||
| } | ||
| } | ||
| func (c *clusterCache) getProcessEventsHandlers() []OnProcessEventsHandler { | ||
| c.handlersLock.Lock() | ||
| defer c.handlersLock.Unlock() | ||
| handlers := make([]OnProcessEventsHandler, 0, len(c.processEventsHandlers)) | ||
| for _, h := range c.processEventsHandlers { | ||
| handlers = append(handlers, h) | ||
| } | ||
| return handlers | ||
| } | ||
|
Comment on lines
+335
to
+343
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not have the caller just use the map directly? |
||
|
|
||
| // GetServerVersion returns observed cluster version | ||
| func (c *clusterCache) GetServerVersion() string { | ||
| return c.serverVersion | ||
|
|
@@ -440,6 +483,10 @@ func (c *clusterCache) Invalidate(opts ...UpdateSettingsFunc) { | |
| for i := range opts { | ||
| opts[i](c) | ||
| } | ||
|
|
||
| if c.batchEventsProcessing { | ||
| c.invalidateEventMeta() | ||
| } | ||
| c.apisMeta = nil | ||
| c.namespacedResources = nil | ||
| c.log.Info("Invalidated cluster") | ||
|
|
@@ -669,7 +716,7 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo | |
| return fmt.Errorf("Failed to convert to *unstructured.Unstructured: %v", event.Object) | ||
| } | ||
|
|
||
| c.processEvent(event.Type, obj) | ||
| c.recordEvent(event.Type, obj) | ||
| if kube.IsCRD(obj) { | ||
| var resources []kube.APIResourceInfo | ||
| crd := v1.CustomResourceDefinition{} | ||
|
|
@@ -823,6 +870,12 @@ func (c *clusterCache) sync() error { | |
| for i := range c.apisMeta { | ||
| c.apisMeta[i].watchCancel() | ||
| } | ||
|
|
||
| if c.batchEventsProcessing { | ||
| c.invalidateEventMeta() | ||
| c.eventMetaCh = make(chan eventMeta) | ||
| } | ||
|
|
||
| c.apisMeta = make(map[schema.GroupKind]*apiMeta) | ||
| c.resources = make(map[kube.ResourceKey]*Resource) | ||
| c.namespacedResources = make(map[schema.GroupKind]bool) | ||
|
|
@@ -864,6 +917,10 @@ func (c *clusterCache) sync() error { | |
| return err | ||
| } | ||
|
|
||
| if c.batchEventsProcessing { | ||
| go c.processEvents() | ||
| } | ||
|
|
||
| // Each API is processed in parallel, so we need to take out a lock when we update clusterCache fields. | ||
| lock := sync.Mutex{} | ||
| err = kube.RunAllAsync(len(apis), func(i int) error { | ||
|
|
@@ -926,6 +983,14 @@ func (c *clusterCache) sync() error { | |
| return nil | ||
| } | ||
|
|
||
| // invalidateEventMeta closes the eventMeta channel if it is open | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: It looks like some functions, like this one, have an assumption that
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mpelekh can you update the go doc? |
||
| func (c *clusterCache) invalidateEventMeta() { | ||
| if c.eventMetaCh != nil { | ||
| close(c.eventMetaCh) | ||
| c.eventMetaCh = nil | ||
| } | ||
| } | ||
|
|
||
| // EnsureSynced checks cache state and synchronizes it if necessary | ||
| func (c *clusterCache) EnsureSynced() error { | ||
| syncStatus := &c.syncStatus | ||
|
|
@@ -1231,7 +1296,7 @@ func (c *clusterCache) GetManagedLiveObjs(targetObjs []*unstructured.Unstructure | |
| return managedObjs, nil | ||
| } | ||
|
|
||
| func (c *clusterCache) processEvent(event watch.EventType, un *unstructured.Unstructured) { | ||
| func (c *clusterCache) recordEvent(event watch.EventType, un *unstructured.Unstructured) { | ||
| for _, h := range c.getEventHandlers() { | ||
| h(event, un) | ||
| } | ||
|
|
@@ -1240,15 +1305,74 @@ func (c *clusterCache) processEvent(event watch.EventType, un *unstructured.Unst | |
| return | ||
| } | ||
|
|
||
| if c.batchEventsProcessing { | ||
| c.eventMetaCh <- eventMeta{event, un} | ||
| } else { | ||
| c.lock.Lock() | ||
| defer c.lock.Unlock() | ||
| c.processEvent(key, eventMeta{event, un}) | ||
| } | ||
| } | ||
|
|
||
| func (c *clusterCache) processEvents() { | ||
| log := c.log.WithValues("functionName", "processItems") | ||
| log.V(1).Info("Start processing events") | ||
|
|
||
| c.lock.Lock() | ||
| defer c.lock.Unlock() | ||
| ch := c.eventMetaCh | ||
| c.lock.Unlock() | ||
|
|
||
| eventMetas := make([]eventMeta, 0) | ||
| ticker := time.NewTicker(c.eventProcessingInterval) | ||
| defer ticker.Stop() | ||
|
|
||
| for { | ||
| select { | ||
| case evMeta, ok := <-ch: | ||
| if !ok { | ||
| log.V(2).Info("Event processing channel closed, finish processing") | ||
| return | ||
| } | ||
| eventMetas = append(eventMetas, evMeta) | ||
| case <-ticker.C: | ||
| if len(eventMetas) > 0 { | ||
| c.processEventsBatch(eventMetas) | ||
| eventMetas = eventMetas[:0] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why assign
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for review @andrii-korotkov-verkada Setting |
||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func (c *clusterCache) processEventsBatch(eventMetas []eventMeta) { | ||
| log := c.log.WithValues("functionName", "processEventsBatch") | ||
| start := time.Now() | ||
| c.lock.Lock() | ||
| log.V(1).Info("Lock acquired (ms)", "duration", time.Since(start).Milliseconds()) | ||
| defer func() { | ||
| c.lock.Unlock() | ||
| duration := time.Since(start) | ||
| // Update the metric with the duration of the events processing | ||
| for _, handler := range c.getProcessEventsHandlers() { | ||
| handler(duration, len(eventMetas)) | ||
| } | ||
| }() | ||
|
|
||
| for _, evMeta := range eventMetas { | ||
| key := kube.GetResourceKey(evMeta.un) | ||
| c.processEvent(key, evMeta) | ||
| } | ||
|
|
||
| log.V(1).Info("Processed events (ms)", "count", len(eventMetas), "duration", time.Since(start).Milliseconds()) | ||
| } | ||
|
|
||
| func (c *clusterCache) processEvent(key kube.ResourceKey, evMeta eventMeta) { | ||
| existingNode, exists := c.resources[key] | ||
| if event == watch.Deleted { | ||
| if evMeta.event == watch.Deleted { | ||
| if exists { | ||
| c.onNodeRemoved(key) | ||
| } | ||
| } else if event != watch.Deleted { | ||
| c.onNodeUpdated(existingNode, c.newResource(un)) | ||
| } else { | ||
| c.onNodeUpdated(existingNode, c.newResource(evMeta.un)) | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These need docstrings. I'd also rename to
batchEventsProcessingEnabledto make clear that it's a feature flag.