From ba7a4ed902bdcfdfc0eaf20d4e63ee7dc8981fde Mon Sep 17 00:00:00 2001 From: Maximilien Cuony Date: Thu, 4 Jun 2026 10:01:46 +0200 Subject: [PATCH] [metrics] Add RID metrics: subscriptions and isa count --- cmds/core-service/main.go | 46 +++++++++++++++++++ cmds/core-service/otel.go | 34 ++++++++++++++ pkg/rid/application/isa_test.go | 5 ++ pkg/rid/application/subscription_test.go | 5 ++ pkg/rid/repos/isa.go | 3 ++ pkg/rid/repos/subscription.go | 3 ++ .../raftstore/identification_service_area.go | 4 ++ pkg/rid/store/raftstore/subscriptions.go | 4 ++ .../sqlstore/identification_service_area.go | 6 +++ .../identification_service_area_test.go | 36 +++++++++++++++ pkg/rid/store/sqlstore/subscriptions.go | 6 +++ pkg/rid/store/sqlstore/subscriptions_test.go | 33 +++++++++++++ 12 files changed, 185 insertions(+) diff --git a/cmds/core-service/main.go b/cmds/core-service/main.go index 8794941ad..7974cba36 100644 --- a/cmds/core-service/main.go +++ b/cmds/core-service/main.go @@ -36,6 +36,8 @@ import ( "github.com/interuss/dss/pkg/versioning" "github.com/interuss/stacktrace" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" "go.uber.org/zap" ) @@ -117,6 +119,14 @@ func createRIDServers(ctx context.Context, locality string, logger *zap.Logger) return nil, nil, stacktrace.Propagate(err, "Unable to interact with store") } + if *enableOpenTelemetry { + err = registerRIDMetrics(ctx, ridStore) + + if err != nil { + return nil, nil, stacktrace.Propagate(err, "Unable to setup metrics") + } + } + app := application.NewFromTransactor(ridStore, logger) return &rid_v1.Server{ App: app, @@ -143,6 +153,42 @@ func createSCDServer(ctx context.Context, logger *zap.Logger) (*scd.Server, erro }, nil } +func registerRIDMetrics(ctx context.Context, store rids.Store) error { + + meter := otel.Meter("rid") + + _, err := meter.Int64ObservableUpDownCounter( + "rid_subscriptions_total", + metric.WithDescription("Number of rid subscriptions"), + metric.WithInt64Callback(newCachedObservation(func(ctx context.Context) (int64, error) { + repo, err := store.Interact(ctx) + if err != nil { + return 0, stacktrace.Propagate(err, "Unable to interact with store") + } + count, err := repo.CountSubscriptions(ctx) + return count, err + })), + ) + if err != nil { + return err + } + + _, err = meter.Int64ObservableUpDownCounter( + "rid_identification_service_areas_total", + metric.WithDescription("Number of rid ISAs"), + metric.WithInt64Callback(newCachedObservation(func(ctx context.Context) (int64, error) { + repo, err := store.Interact(ctx) + if err != nil { + return 0, stacktrace.Propagate(err, "Unable to interact with store") + } + count, err := repo.CountISAs(ctx) + return count, err + })), + ) + + return err +} + // RunHTTPServer starts the DSS HTTP server. func RunHTTPServer(ctx context.Context, ctxCanceler func(), address, locality string) error { logger := logging.WithValuesFromContext(ctx, logging.Logger).With(zap.String("address", address)) diff --git a/cmds/core-service/otel.go b/cmds/core-service/otel.go index 70b895c48..811fe0258 100644 --- a/cmds/core-service/otel.go +++ b/cmds/core-service/otel.go @@ -4,12 +4,15 @@ import ( "context" "errors" "net/http" + "sync" + "time" "github.com/interuss/dss/pkg/logging" "github.com/prometheus/client_golang/prometheus/promhttp" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" "go.opentelemetry.io/otel/exporters/prometheus" + ometric "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" @@ -104,3 +107,34 @@ func serveMetrics(ctx context.Context, listeningAddress string) { } logger.Info("Prometheus endpoint started", zap.String("listeningAddress", listeningAddress)) } + +// Small helper to cache metrics +type cachedObservation struct { + mu sync.Mutex + last int64 + fetchedAt time.Time + ttl time.Duration + fetch func(context.Context) (int64, error) +} + +func (c *cachedObservation) Observe(ctx context.Context, o ometric.Int64Observer) error { + c.mu.Lock() + defer c.mu.Unlock() + if time.Since(c.fetchedAt) < c.ttl { + o.Observe(c.last) + return nil + } + v, err := c.fetch(ctx) + if err != nil { + return err + } + c.last = v + c.fetchedAt = time.Now() + o.Observe(v) + return nil +} + +func newCachedObservation(fetch func(context.Context) (int64, error)) ometric.Int64Callback { + g := &cachedObservation{ttl: time.Second, fetch: fetch} + return g.Observe +} diff --git a/pkg/rid/application/isa_test.go b/pkg/rid/application/isa_test.go index bcc060af7..28ca85e6b 100644 --- a/pkg/rid/application/isa_test.go +++ b/pkg/rid/application/isa_test.go @@ -89,6 +89,11 @@ func (store *isaStore) ListExpiredISAs(ctx context.Context, writer string, thres return make([]*ridmodels.IdentificationServiceArea, 0), nil } +// Implements repos.ISA.CountISAs +func (store *isaStore) CountISAs(ctx context.Context) (int64, error) { + return int64(len(store.isas)), nil +} + func TestISAUpdateIdxCells(t *testing.T) { ctx := context.Background() app, cleanup := setUpISAApp(ctx, t) diff --git a/pkg/rid/application/subscription_test.go b/pkg/rid/application/subscription_test.go index 22c95ea5f..e83fee2ff 100644 --- a/pkg/rid/application/subscription_test.go +++ b/pkg/rid/application/subscription_test.go @@ -162,6 +162,11 @@ func (store *subscriptionStore) ListExpiredSubscriptions(ctx context.Context, wr return make([]*ridmodels.Subscription, 0), nil } +// Implements repos.ISA.CountSubscriptions +func (store *subscriptionStore) CountSubscriptions(ctx context.Context) (int64, error) { + return int64(len(store.subs)), nil +} + func TestBadOwner(t *testing.T) { ctx := context.Background() app, cleanup := setUpSubApp(ctx, t) diff --git a/pkg/rid/repos/isa.go b/pkg/rid/repos/isa.go index f93f4b9f5..5d6ffaef8 100644 --- a/pkg/rid/repos/isa.go +++ b/pkg/rid/repos/isa.go @@ -31,4 +31,7 @@ type ISA interface { // ListExpiredISAs lists all expired ISAs based on writer ListExpiredISAs(ctx context.Context, writer string, threshold time.Time) ([]*ridmodels.IdentificationServiceArea, error) + + // Count the number of existing ISA + CountISAs(ctx context.Context) (int64, error) } diff --git a/pkg/rid/repos/subscription.go b/pkg/rid/repos/subscription.go index 270aa1d22..1ec849f72 100644 --- a/pkg/rid/repos/subscription.go +++ b/pkg/rid/repos/subscription.go @@ -41,4 +41,7 @@ type Subscription interface { // ListExpiredSubscriptions lists all expired Subscriptions based on writer. ListExpiredSubscriptions(ctx context.Context, writer string, threshold time.Time) ([]*ridmodels.Subscription, error) + + // Count the number of existing subscriptions + CountSubscriptions(ctx context.Context) (int64, error) } diff --git a/pkg/rid/store/raftstore/identification_service_area.go b/pkg/rid/store/raftstore/identification_service_area.go index d105d8334..b9f7222a5 100644 --- a/pkg/rid/store/raftstore/identification_service_area.go +++ b/pkg/rid/store/raftstore/identification_service_area.go @@ -34,3 +34,7 @@ func (r *repo) SearchISAs(_ context.Context, cells s2.CellUnion, earliest *time. func (r *repo) ListExpiredISAs(_ context.Context, writer string, threshold time.Time) ([]*ridmodels.IdentificationServiceArea, error) { return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "ListExpiredISAs not implemented for raftstore") } + +func (r *repo) CountISAs(_ context.Context) (int64, error) { + return 0, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "CountISAs not implemented for raftstore") +} diff --git a/pkg/rid/store/raftstore/subscriptions.go b/pkg/rid/store/raftstore/subscriptions.go index 66e4a360b..0c9c261dd 100644 --- a/pkg/rid/store/raftstore/subscriptions.go +++ b/pkg/rid/store/raftstore/subscriptions.go @@ -46,3 +46,7 @@ func (r *repo) MaxSubscriptionCountInCellsByOwner(_ context.Context, cells s2.Ce func (r *repo) ListExpiredSubscriptions(_ context.Context, writer string, threshold time.Time) ([]*ridmodels.Subscription, error) { return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "ListExpiredSubscriptions not implemented for raftstore") } + +func (r *repo) CountSubscriptions(_ context.Context) (int64, error) { + return 0, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "CountSubscriptions not implemented for raftstore") +} diff --git a/pkg/rid/store/sqlstore/identification_service_area.go b/pkg/rid/store/sqlstore/identification_service_area.go index d2179c764..c361344eb 100644 --- a/pkg/rid/store/sqlstore/identification_service_area.go +++ b/pkg/rid/store/sqlstore/identification_service_area.go @@ -239,3 +239,9 @@ func (r *repo) ListExpiredISAs(ctx context.Context, writer string, threshold tim LIMIT $3`, isaFields) return r.fetchISAs(ctx, isasInCellsQuery, threshold, writer, dssmodels.MaxResultLimit) } + +func (r *repo) CountISAs(ctx context.Context) (int64, error) { + var count int64 + err := r.QueryRow(ctx, "SELECT COUNT(*) FROM identification_service_areas").Scan(&count) + return count, err +} diff --git a/pkg/rid/store/sqlstore/identification_service_area_test.go b/pkg/rid/store/sqlstore/identification_service_area_test.go index 0eb329977..67f5d01bd 100644 --- a/pkg/rid/store/sqlstore/identification_service_area_test.go +++ b/pkg/rid/store/sqlstore/identification_service_area_test.go @@ -321,3 +321,39 @@ func TestListExpiredISAsWithEmptyWriter(t *testing.T) { require.NoError(t, err) require.Len(t, serviceAreas, 1) } + +func TestStoreCountISAs(t *testing.T) { + var ( + ctx = context.Background() + store, tearDownStore = setUpStore(ctx, t) + ) + defer tearDownStore() + + repo, err := store.Interact(ctx) + require.NoError(t, err) + + // Insert the ISA. + copy := *serviceArea + isa, err := repo.InsertISA(ctx, ©) + require.NoError(t, err) + require.NotNil(t, isa) + + //Cound should be one + count, err := repo.CountISAs(ctx) + require.NoError(t, err) + require.Equal(t, count, int64(1)) + + // Delete the ISA. + // Ensure a fresh Get, then delete still updates the sub indexes + isa, err = repo.GetISA(ctx, isa.ID, false) + require.NoError(t, err) + + serviceAreaOut, err := repo.DeleteISA(ctx, isa) + require.NoError(t, err) + require.Equal(t, isa, serviceAreaOut) + + //Cound should be zero + count, err = repo.CountISAs(ctx) + require.NoError(t, err) + require.Equal(t, count, int64(0)) +} diff --git a/pkg/rid/store/sqlstore/subscriptions.go b/pkg/rid/store/sqlstore/subscriptions.go index bc794c06b..219cb4630 100644 --- a/pkg/rid/store/sqlstore/subscriptions.go +++ b/pkg/rid/store/sqlstore/subscriptions.go @@ -298,3 +298,9 @@ func (r *repo) ListExpiredSubscriptions(ctx context.Context, writer string, thre writer = $2`, subscriptionFields) return r.process(ctx, query, threshold, writer) } + +func (r *repo) CountSubscriptions(ctx context.Context) (int64, error) { + var count int64 + err := r.QueryRow(ctx, "SELECT COUNT(*) FROM subscriptions").Scan(&count) + return count, err +} diff --git a/pkg/rid/store/sqlstore/subscriptions_test.go b/pkg/rid/store/sqlstore/subscriptions_test.go index ce729847d..afc1c1b32 100644 --- a/pkg/rid/store/sqlstore/subscriptions_test.go +++ b/pkg/rid/store/sqlstore/subscriptions_test.go @@ -375,3 +375,36 @@ func TestListExpiredSubscriptionsWithEmptyWriter(t *testing.T) { require.NoError(t, err) require.Len(t, subscriptions, 1) } + +func TestStoreCountSubscription(t *testing.T) { + var ( + ctx = context.Background() + store, tearDownStore = setUpStore(ctx, t) + ) + defer tearDownStore() + + repo, err := store.Interact(ctx) + require.NoError(t, err) + + for _, r := range subscriptionsPool { + t.Run(r.name, func(t *testing.T) { + sub1, err := repo.InsertSubscription(ctx, r.input) + require.NoError(t, err) + require.NotNil(t, sub1) + + //Cound should be one + count, err := repo.CountSubscriptions(ctx) + require.NoError(t, err) + require.Equal(t, count, int64(1)) + + sub4, err := repo.DeleteSubscription(ctx, sub1) + require.NoError(t, err) + require.NotNil(t, sub4) + + //Cound should be zero + count, err = repo.CountSubscriptions(ctx) + require.NoError(t, err) + require.Equal(t, count, int64(0)) + }) + } +}