@@ -57,6 +57,8 @@ const (
5757 // Limit is required to avoid memory spikes during cache initialization.
5858 // The default limit of 50 is chosen based on experiments.
5959 defaultListSemaphoreWeight = 50
60+ // defaultEventProcessingInterval is the default interval for processing events
61+ defaultEventProcessingInterval = 1 * time .Second
6062)
6163
6264const (
@@ -75,6 +77,11 @@ type apiMeta struct {
7577 watchCancel context.CancelFunc
7678}
7779
80+ type eventMeta struct {
81+ event watch.EventType
82+ un * unstructured.Unstructured
83+ }
84+
7885// ClusterInfo holds cluster cache stats
7986type ClusterInfo struct {
8087 // Server holds cluster API server URL
@@ -96,6 +103,9 @@ type ClusterInfo struct {
96103// OnEventHandler is a function that handles Kubernetes event
97104type OnEventHandler func (event watch.EventType , un * unstructured.Unstructured )
98105
106+ // OnProcessEventsHandler handles process events event
107+ type OnProcessEventsHandler func (duration time.Duration , processedEventsNumber int )
108+
99109// OnPopulateResourceInfoHandler returns additional resource metadata that should be stored in cache
100110type OnPopulateResourceInfoHandler func (un * unstructured.Unstructured , isRoot bool ) (info interface {}, cacheManifest bool )
101111
@@ -137,6 +147,8 @@ type ClusterCache interface {
137147 OnResourceUpdated (handler OnResourceUpdatedHandler ) Unsubscribe
138148 // OnEvent register event handler that is executed every time when new K8S event received
139149 OnEvent (handler OnEventHandler ) Unsubscribe
150+ // OnProcessEventsHandler register event handler that is executed every time when events were processed
151+ OnProcessEventsHandler (handler OnProcessEventsHandler ) Unsubscribe
140152}
141153
142154type WeightedSemaphore interface {
@@ -153,6 +165,7 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa
153165 cache := & clusterCache {
154166 settings : Settings {ResourceHealthOverride : & noopSettings {}, ResourcesFilter : & noopSettings {}},
155167 apisMeta : make (map [schema.GroupKind ]* apiMeta ),
168+ eventMetaCh : nil ,
156169 listPageSize : defaultListPageSize ,
157170 listPageBufferSize : defaultListPageBufferSize ,
158171 listSemaphore : semaphore .NewWeighted (defaultListSemaphoreWeight ),
@@ -169,8 +182,10 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa
169182 },
170183 watchResyncTimeout : defaultWatchResyncTimeout ,
171184 clusterSyncRetryTimeout : ClusterRetryTimeout ,
185+ eventProcessingInterval : defaultEventProcessingInterval ,
172186 resourceUpdatedHandlers : map [uint64 ]OnResourceUpdatedHandler {},
173187 eventHandlers : map [uint64 ]OnEventHandler {},
188+ processEventsHandlers : map [uint64 ]OnProcessEventsHandler {},
174189 log : log ,
175190 listRetryLimit : 1 ,
176191 listRetryUseBackoff : false ,
@@ -186,6 +201,7 @@ type clusterCache struct {
186201 syncStatus clusterCacheSync
187202
188203 apisMeta map [schema.GroupKind ]* apiMeta
204+ eventMetaCh chan eventMeta
189205 serverVersion string
190206 apiResources []kube.APIResourceInfo
191207 // namespacedResources is a simple map which indicates a groupKind is namespaced
@@ -195,6 +211,8 @@ type clusterCache struct {
195211 watchResyncTimeout time.Duration
196212 // sync retry timeout for cluster when sync error happens
197213 clusterSyncRetryTimeout time.Duration
214+ // ticker interval for events processing
215+ eventProcessingInterval time.Duration
198216
199217 // size of a page for list operations pager.
200218 listPageSize int64
@@ -224,6 +242,7 @@ type clusterCache struct {
224242 populateResourceInfoHandler OnPopulateResourceInfoHandler
225243 resourceUpdatedHandlers map [uint64 ]OnResourceUpdatedHandler
226244 eventHandlers map [uint64 ]OnEventHandler
245+ processEventsHandlers map [uint64 ]OnProcessEventsHandler
227246 openAPISchema openapi.Resources
228247 gvkParser * managedfields.GvkParser
229248
@@ -299,6 +318,29 @@ func (c *clusterCache) getEventHandlers() []OnEventHandler {
299318 return handlers
300319}
301320
321+ // OnProcessEventsHandler register event handler that is executed every time when events were processed
322+ func (c * clusterCache ) OnProcessEventsHandler (handler OnProcessEventsHandler ) Unsubscribe {
323+ c .handlersLock .Lock ()
324+ defer c .handlersLock .Unlock ()
325+ key := c .handlerKey
326+ c .handlerKey ++
327+ c .processEventsHandlers [key ] = handler
328+ return func () {
329+ c .handlersLock .Lock ()
330+ defer c .handlersLock .Unlock ()
331+ delete (c .processEventsHandlers , key )
332+ }
333+ }
334+ func (c * clusterCache ) getProcessEventsHandlers () []OnProcessEventsHandler {
335+ c .handlersLock .Lock ()
336+ defer c .handlersLock .Unlock ()
337+ handlers := make ([]OnProcessEventsHandler , 0 , len (c .processEventsHandlers ))
338+ for _ , h := range c .processEventsHandlers {
339+ handlers = append (handlers , h )
340+ }
341+ return handlers
342+ }
343+
302344// GetServerVersion returns observed cluster version
303345func (c * clusterCache ) GetServerVersion () string {
304346 return c .serverVersion
@@ -440,6 +482,8 @@ func (c *clusterCache) Invalidate(opts ...UpdateSettingsFunc) {
440482 for i := range opts {
441483 opts [i ](c )
442484 }
485+
486+ c .invalidateEventMeta ()
443487 c .apisMeta = nil
444488 c .namespacedResources = nil
445489 c .log .Info ("Invalidated cluster" )
@@ -669,7 +713,7 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
669713 return fmt .Errorf ("Failed to convert to *unstructured.Unstructured: %v" , event .Object )
670714 }
671715
672- c .processEvent (event .Type , obj )
716+ c .recordEvent (event .Type , obj )
673717 if kube .IsCRD (obj ) {
674718 var resources []kube.APIResourceInfo
675719 crd := v1.CustomResourceDefinition {}
@@ -823,11 +867,13 @@ func (c *clusterCache) sync() error {
823867 for i := range c .apisMeta {
824868 c .apisMeta [i ].watchCancel ()
825869 }
870+ c .invalidateEventMeta ()
826871 c .apisMeta = make (map [schema.GroupKind ]* apiMeta )
827872 c .resources = make (map [kube.ResourceKey ]* Resource )
828873 c .namespacedResources = make (map [schema.GroupKind ]bool )
829874 config := c .config
830875 version , err := c .kubectl .GetServerVersion (config )
876+ c .eventMetaCh = make (chan eventMeta )
831877
832878 if err != nil {
833879 return err
@@ -864,6 +910,8 @@ func (c *clusterCache) sync() error {
864910 return err
865911 }
866912
913+ go c .processEvents ()
914+
867915 // Each API is processed in parallel, so we need to take out a lock when we update clusterCache fields.
868916 lock := sync.Mutex {}
869917 err = kube .RunAllAsync (len (apis ), func (i int ) error {
@@ -926,6 +974,14 @@ func (c *clusterCache) sync() error {
926974 return nil
927975}
928976
977+ // invalidateEventMeta closes the eventMeta channel if it is open
978+ func (c * clusterCache ) invalidateEventMeta () {
979+ if c .eventMetaCh != nil {
980+ close (c .eventMetaCh )
981+ c .eventMetaCh = nil
982+ }
983+ }
984+
929985// EnsureSynced checks cache state and synchronizes it if necessary
930986func (c * clusterCache ) EnsureSynced () error {
931987 syncStatus := & c .syncStatus
@@ -1231,7 +1287,7 @@ func (c *clusterCache) GetManagedLiveObjs(targetObjs []*unstructured.Unstructure
12311287 return managedObjs , nil
12321288}
12331289
1234- func (c * clusterCache ) processEvent (event watch.EventType , un * unstructured.Unstructured ) {
1290+ func (c * clusterCache ) recordEvent (event watch.EventType , un * unstructured.Unstructured ) {
12351291 for _ , h := range c .getEventHandlers () {
12361292 h (event , un )
12371293 }
@@ -1240,16 +1296,65 @@ func (c *clusterCache) processEvent(event watch.EventType, un *unstructured.Unst
12401296 return
12411297 }
12421298
1299+ c .eventMetaCh <- eventMeta {event , un }
1300+ }
1301+
1302+ func (c * clusterCache ) processEvents () {
1303+ log := c .log .WithValues ("fn" , "processItems" )
1304+ log .V (1 ).Info ("Start processing events" )
1305+
12431306 c .lock .Lock ()
1244- defer c .lock .Unlock ()
1245- existingNode , exists := c .resources [key ]
1246- if event == watch .Deleted {
1247- if exists {
1248- c .onNodeRemoved (key )
1307+ ch := c .eventMetaCh
1308+ c .lock .Unlock ()
1309+
1310+ eventMetas := make ([]eventMeta , 0 )
1311+ ticker := time .NewTicker (c .eventProcessingInterval )
1312+ defer ticker .Stop ()
1313+
1314+ for {
1315+ select {
1316+ case em , ok := <- ch :
1317+ if ! ok {
1318+ log .V (1 ).Info ("Event processing channel closed, finish processing" )
1319+ return
1320+ }
1321+ eventMetas = append (eventMetas , em )
1322+ case <- ticker .C :
1323+ if len (eventMetas ) > 0 {
1324+ c .processEventsBatch (eventMetas )
1325+ eventMetas = eventMetas [:0 ]
1326+ }
1327+ }
1328+ }
1329+ }
1330+
1331+ func (c * clusterCache ) processEventsBatch (eventMetas []eventMeta ) {
1332+ log := c .log .WithValues ("fn" , "processEventsBatch" )
1333+ start := time .Now ()
1334+ c .lock .Lock ()
1335+ log .V (1 ).Info ("Lock acquired (ms)" , "duration" , time .Since (start ).Milliseconds ())
1336+ defer func () {
1337+ c .lock .Unlock ()
1338+ duration := time .Since (start )
1339+ // Update the metric with the duration of the events processing
1340+ for _ , handler := range c .getProcessEventsHandlers () {
1341+ handler (duration , len (eventMetas ))
1342+ }
1343+ }()
1344+
1345+ for _ , em := range eventMetas {
1346+ key := kube .GetResourceKey (em .un )
1347+ existingNode , exists := c .resources [key ]
1348+ if em .event == watch .Deleted {
1349+ if exists {
1350+ c .onNodeRemoved (key )
1351+ }
1352+ } else {
1353+ c .onNodeUpdated (existingNode , c .newResource (em .un ))
12491354 }
1250- } else if event != watch .Deleted {
1251- c .onNodeUpdated (existingNode , c .newResource (un ))
12521355 }
1356+
1357+ log .V (1 ).Info ("Processed events (ms)" , "count" , len (eventMetas ), "duration" , time .Since (start ).Milliseconds ())
12531358}
12541359
12551360func (c * clusterCache ) onNodeUpdated (oldRes * Resource , newRes * Resource ) {
0 commit comments