From 4c538a4d7333c82ccd1bdc2997cdea47ba22fd3d Mon Sep 17 00:00:00 2001 From: Maximilien Cuony Date: Thu, 4 Jun 2026 10:02:54 +0200 Subject: [PATCH] [metrics] Add SCD metrics: subscriptions, constraints and operational intents count --- cmds/core-service/main.go | 58 +++++++++++++++++++ pkg/scd/repos/repos.go | 9 +++ pkg/scd/store/raftstore/constraints.go | 4 ++ .../store/raftstore/operational_intents.go | 4 ++ pkg/scd/store/raftstore/subscriptions.go | 4 ++ pkg/scd/store/sqlstore/constraints.go | 6 ++ pkg/scd/store/sqlstore/operational_intents.go | 6 ++ pkg/scd/store/sqlstore/subscriptions.go | 6 ++ 8 files changed, 97 insertions(+) diff --git a/cmds/core-service/main.go b/cmds/core-service/main.go index 7974cba36..83b1c6651 100644 --- a/cmds/core-service/main.go +++ b/cmds/core-service/main.go @@ -146,6 +146,14 @@ func createSCDServer(ctx context.Context, logger *zap.Logger) (*scd.Server, erro return nil, err } + if *enableOpenTelemetry { + err = registerSCDMetrics(ctx, scdStore) + + if err != nil { + return nil, stacktrace.Propagate(err, "Unable to setup metrics") + } + } + return &scd.Server{ Store: scdStore, DSSReportHandler: &scd.JSONLoggingReceivedReportHandler{ReportLogger: logger}, @@ -189,6 +197,56 @@ func registerRIDMetrics(ctx context.Context, store rids.Store) error { return err } +func registerSCDMetrics(ctx context.Context, store scds.Store) error { + + meter := otel.Meter("scd") + + _, err := meter.Int64ObservableUpDownCounter( + "scd_subscriptions_total", + metric.WithDescription("Number of scd subscriptions"), + metric.WithInt64Callback(newCachedObservation(func(ctx context.Context) (int64, error) { + repo, err := store.Interact(ctx) + if err != nil { + return 0, stacktrace.Propagate(err, "Unable to interact with store") + } + count, err := repo.CountSubscriptions(ctx) + return count, err + })), + ) + if err != nil { + return err + } + _, err = meter.Int64ObservableUpDownCounter( + "scd_operational_intents_total", + metric.WithDescription("Number of scd operational intents"), + metric.WithInt64Callback(newCachedObservation(func(ctx context.Context) (int64, error) { + repo, err := store.Interact(ctx) + if err != nil { + return 0, stacktrace.Propagate(err, "Unable to interact with store") + } + count, err := repo.CountOperationalIntents(ctx) + return count, err + })), + ) + if err != nil { + return err + } + _, err = meter.Int64ObservableUpDownCounter( + "scd_constraints_total", + metric.WithDescription("Number of scd constraints"), + metric.WithInt64Callback(newCachedObservation(func(ctx context.Context) (int64, error) { + repo, err := store.Interact(ctx) + if err != nil { + return 0, stacktrace.Propagate(err, "Unable to interact with store") + } + count, err := repo.CountConstraints(ctx) + return count, err + })), + ) + + return err +} + // RunHTTPServer starts the DSS HTTP server. func RunHTTPServer(ctx context.Context, ctxCanceler func(), address, locality string) error { logger := logging.WithValuesFromContext(ctx, logging.Logger).With(zap.String("address", address)) diff --git a/pkg/scd/repos/repos.go b/pkg/scd/repos/repos.go index ee4e31492..07bd711c6 100644 --- a/pkg/scd/repos/repos.go +++ b/pkg/scd/repos/repos.go @@ -33,6 +33,9 @@ type OperationalIntent interface { // ListExpiredOperationalIntents lists all operational intents older than the threshold. // Their age is determined by their end time, or by their update time if they do not have an end time. ListExpiredOperationalIntents(ctx context.Context, threshold time.Time) ([]*scdmodels.OperationalIntent, error) + + // Count the number of existing operational intent + CountOperationalIntents(ctx context.Context) (int64, error) } // Subscription abstracts subscription-specific interactions with the backing repository. @@ -64,6 +67,9 @@ type Subscription interface { // ListExpiredSubscriptions lists all subscriptions older than the threshold. // Their age is determined by their end time, or by their update time if they do not have an end time. ListExpiredSubscriptions(ctx context.Context, threshold time.Time) ([]*scdmodels.Subscription, error) + + // Count the number of existing subscriptions + CountSubscriptions(ctx context.Context) (int64, error) } type UssAvailability interface { @@ -88,6 +94,9 @@ type Constraint interface { // deleted subscription. Returns nil and an error if the Constraint does // not exist. DeleteConstraint(ctx context.Context, id dssmodels.ID) error + + // Count the number of existing constraint + CountConstraints(ctx context.Context) (int64, error) } // scd.repos.Repository aggregates all SCD-specific repo interfaces to perform SCD operations on diff --git a/pkg/scd/store/raftstore/constraints.go b/pkg/scd/store/raftstore/constraints.go index 11b1646cc..0983add14 100644 --- a/pkg/scd/store/raftstore/constraints.go +++ b/pkg/scd/store/raftstore/constraints.go @@ -24,3 +24,7 @@ func (r *repo) UpsertConstraint(_ context.Context, constraint *scdmodels.Constra func (r *repo) DeleteConstraint(_ context.Context, id dssmodels.ID) error { return stacktrace.NewErrorWithCode(dsserr.NotImplemented, "DeleteConstraint not implemented for raftstore") } + +func (r *repo) CountConstraints(_ context.Context) (int64, error) { + return 0, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "CountConstraint not implemented for raftstore") +} diff --git a/pkg/scd/store/raftstore/operational_intents.go b/pkg/scd/store/raftstore/operational_intents.go index 3b4ed1996..3a6af6acd 100644 --- a/pkg/scd/store/raftstore/operational_intents.go +++ b/pkg/scd/store/raftstore/operational_intents.go @@ -33,3 +33,7 @@ func (r *repo) GetDependentOperationalIntents(_ context.Context, subscriptionID func (r *repo) ListExpiredOperationalIntents(_ context.Context, threshold time.Time) ([]*scdmodels.OperationalIntent, error) { return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "ListExpiredOperationalIntents not implemented for raftstore") } + +func (r *repo) CountOperationalIntents(_ context.Context) (int64, error) { + return 0, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "CountOperationalIntents not implemented for raftstore") +} diff --git a/pkg/scd/store/raftstore/subscriptions.go b/pkg/scd/store/raftstore/subscriptions.go index 08e8d0dcc..7808a25a9 100644 --- a/pkg/scd/store/raftstore/subscriptions.go +++ b/pkg/scd/store/raftstore/subscriptions.go @@ -38,3 +38,7 @@ func (r *repo) LockSubscriptionsOnCells(_ context.Context, cells s2.CellUnion, s func (r *repo) ListExpiredSubscriptions(_ context.Context, threshold time.Time) ([]*scdmodels.Subscription, error) { return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "ListExpiredSubscriptions not implemented for raftstore") } + +func (r *repo) CountSubscriptions(_ context.Context) (int64, error) { + return 0, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "CountSubscriptions not implemented for raftstore") +} diff --git a/pkg/scd/store/sqlstore/constraints.go b/pkg/scd/store/sqlstore/constraints.go index 45580b851..364853d92 100644 --- a/pkg/scd/store/sqlstore/constraints.go +++ b/pkg/scd/store/sqlstore/constraints.go @@ -239,3 +239,9 @@ func (c *repo) SearchConstraints(ctx context.Context, v4d *dssmodels.Volume4D) ( return constraints, nil } + +func (c *repo) CountConstraints(ctx context.Context) (int64, error) { + var count int64 + err := c.q.QueryRow(ctx, "SELECT COUNT(*) FROM scd_constraints").Scan(&count) + return count, err +} diff --git a/pkg/scd/store/sqlstore/operational_intents.go b/pkg/scd/store/sqlstore/operational_intents.go index 2a6250624..6033c8f5c 100644 --- a/pkg/scd/store/sqlstore/operational_intents.go +++ b/pkg/scd/store/sqlstore/operational_intents.go @@ -384,3 +384,9 @@ func (s *repo) ListExpiredOperationalIntents(ctx context.Context, threshold time return result, nil } + +func (s *repo) CountOperationalIntents(ctx context.Context) (int64, error) { + var count int64 + err := s.q.QueryRow(ctx, "SELECT COUNT(*) FROM scd_operations").Scan(&count) + return count, err +} diff --git a/pkg/scd/store/sqlstore/subscriptions.go b/pkg/scd/store/sqlstore/subscriptions.go index 0089fa69e..57060a8fd 100644 --- a/pkg/scd/store/sqlstore/subscriptions.go +++ b/pkg/scd/store/sqlstore/subscriptions.go @@ -451,3 +451,9 @@ func (c *repo) ListExpiredSubscriptions(ctx context.Context, threshold time.Time return subscriptions, nil } + +func (c *repo) CountSubscriptions(ctx context.Context) (int64, error) { + var count int64 + err := c.q.QueryRow(ctx, "SELECT COUNT(*) FROM scd_subscriptions").Scan(&count) + return count, err +}