Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions cmds/core-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
"github.com/interuss/dss/pkg/versioning"
"github.com/interuss/stacktrace"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -117,6 +119,14 @@ func createRIDServers(ctx context.Context, locality string, logger *zap.Logger)
return nil, nil, stacktrace.Propagate(err, "Unable to interact with store")
}

if *enableOpenTelemetry {
err = registerRIDMetrics(ctx, ridStore)

if err != nil {
return nil, nil, stacktrace.Propagate(err, "Unable to setup metrics")
}
}

app := application.NewFromTransactor(ridStore, logger)
return &rid_v1.Server{
App: app,
Expand All @@ -136,13 +146,107 @@ func createSCDServer(ctx context.Context, logger *zap.Logger) (*scd.Server, erro
return nil, err
}

if *enableOpenTelemetry {
err = registerSCDMetrics(ctx, scdStore)

if err != nil {
return nil, stacktrace.Propagate(err, "Unable to setup metrics")
}
}

return &scd.Server{
Store: scdStore,
DSSReportHandler: &scd.JSONLoggingReceivedReportHandler{ReportLogger: logger},
AllowHTTPBaseUrls: *allowHTTPBaseUrls,
}, nil
}

func registerRIDMetrics(ctx context.Context, store rids.Store) error {

meter := otel.Meter("rid")

_, err := meter.Int64ObservableUpDownCounter(
"rid_subscriptions_count",
metric.WithDescription("Number of rid subscriptions"),
metric.WithInt64Callback(newCachedGauge(func(ctx context.Context) (int64, error) {
repo, err := store.Interact(ctx)
if err != nil {
return 0, stacktrace.Propagate(err, "Unable to interact with store")
}
count, err := repo.CountSubscriptions(ctx)
return count, err
})),
)
if err != nil {
return err
}

_, err = meter.Int64ObservableUpDownCounter(
"rid_identification_service_areas_count",
metric.WithDescription("Number of rid ISAs"),
metric.WithInt64Callback(newCachedGauge(func(ctx context.Context) (int64, error) {
repo, err := store.Interact(ctx)
if err != nil {
return 0, stacktrace.Propagate(err, "Unable to interact with store")
}
count, err := repo.CountISAs(ctx)
return count, err
})),
)

return err
}

func registerSCDMetrics(ctx context.Context, store scds.Store) error {

meter := otel.Meter("scd")

_, err := meter.Int64ObservableUpDownCounter(
"scd_subscriptions_count",
metric.WithDescription("Number of scd subscriptions"),
metric.WithInt64Callback(newCachedGauge(func(ctx context.Context) (int64, error) {
repo, err := store.Interact(ctx)
if err != nil {
return 0, stacktrace.Propagate(err, "Unable to interact with store")
}
count, err := repo.CountSubscriptions(ctx)
return count, err
})),
)
if err != nil {
return err
}
_, err = meter.Int64ObservableUpDownCounter(
"scd_operational_intents_count",
metric.WithDescription("Number of scd operational intents"),
metric.WithInt64Callback(newCachedGauge(func(ctx context.Context) (int64, error) {
repo, err := store.Interact(ctx)
if err != nil {
return 0, stacktrace.Propagate(err, "Unable to interact with store")
}
count, err := repo.CountOperationalIntents(ctx)
return count, err
})),
)
if err != nil {
return err
}
_, err = meter.Int64ObservableUpDownCounter(
"scd_constraints_count",
metric.WithDescription("Number of scd constraints"),
metric.WithInt64Callback(newCachedGauge(func(ctx context.Context) (int64, error) {
repo, err := store.Interact(ctx)
if err != nil {
return 0, stacktrace.Propagate(err, "Unable to interact with store")
}
count, err := repo.CountConstraints(ctx)
return count, err
})),
)

return err
}

// RunHTTPServer starts the DSS HTTP server.
func RunHTTPServer(ctx context.Context, ctxCanceler func(), address, locality string) error {
logger := logging.WithValuesFromContext(ctx, logging.Logger).With(zap.String("address", address))
Expand Down
34 changes: 34 additions & 0 deletions cmds/core-service/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ import (
"context"
"errors"
"net/http"
"sync"
"time"

"github.com/interuss/dss/pkg/logging"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/prometheus"
ometric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
Expand Down Expand Up @@ -104,3 +107,34 @@ func serveMetrics(ctx context.Context, listeningAddress string) {
}
logger.Info("Prometheus endpoint started", zap.String("listeningAddress", listeningAddress))
}

// Small helper to cache metrics
type cachedGauge struct {
mu sync.Mutex
last int64
fetchedAt time.Time
ttl time.Duration
fetch func(context.Context) (int64, error)
}

func (c *cachedGauge) Observe(ctx context.Context, o ometric.Int64Observer) error {
c.mu.Lock()
defer c.mu.Unlock()
if time.Since(c.fetchedAt) < c.ttl {
o.Observe(c.last)
return nil
}
v, err := c.fetch(ctx)
if err != nil {
return err
}
c.last = v
c.fetchedAt = time.Now()
o.Observe(v)
return nil
}

func newCachedGauge(fetch func(context.Context) (int64, error)) ometric.Int64Callback {
g := &cachedGauge{ttl: time.Second, fetch: fetch}
return g.Observe
}
5 changes: 5 additions & 0 deletions pkg/rid/application/isa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ func (store *isaStore) ListExpiredISAs(ctx context.Context, writer string, thres
return make([]*ridmodels.IdentificationServiceArea, 0), nil
}

// Implements repos.ISA.CountISAs
func (store *isaStore) CountISAs(ctx context.Context) (int64, error) {
return int64(len(store.isas)), nil
}

func TestISAUpdateIdxCells(t *testing.T) {
ctx := context.Background()
app, cleanup := setUpISAApp(ctx, t)
Expand Down
5 changes: 5 additions & 0 deletions pkg/rid/application/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ func (store *subscriptionStore) ListExpiredSubscriptions(ctx context.Context, wr
return make([]*ridmodels.Subscription, 0), nil
}

// Implements repos.ISA.CountSubscriptions
func (store *subscriptionStore) CountSubscriptions(ctx context.Context) (int64, error) {
return int64(len(store.subs)), nil
}

func TestBadOwner(t *testing.T) {
ctx := context.Background()
app, cleanup := setUpSubApp(ctx, t)
Expand Down
3 changes: 3 additions & 0 deletions pkg/rid/repos/isa.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,7 @@ type ISA interface {

// ListExpiredISAs lists all expired ISAs based on writer
ListExpiredISAs(ctx context.Context, writer string, threshold time.Time) ([]*ridmodels.IdentificationServiceArea, error)

// Count the number of existing ISA
CountISAs(ctx context.Context) (int64, error)
}
3 changes: 3 additions & 0 deletions pkg/rid/repos/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,7 @@ type Subscription interface {

// ListExpiredSubscriptions lists all expired Subscriptions based on writer.
ListExpiredSubscriptions(ctx context.Context, writer string, threshold time.Time) ([]*ridmodels.Subscription, error)

// Count the number of existing subscriptions
CountSubscriptions(ctx context.Context) (int64, error)
}
4 changes: 4 additions & 0 deletions pkg/rid/store/raftstore/identification_service_area.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,7 @@ func (r *repo) SearchISAs(_ context.Context, cells s2.CellUnion, earliest *time.
func (r *repo) ListExpiredISAs(_ context.Context, writer string, threshold time.Time) ([]*ridmodels.IdentificationServiceArea, error) {
return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "ListExpiredISAs not implemented for raftstore")
}

func (r *repo) CountISAs(_ context.Context) (int64, error) {
return 0, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "CountISAs not implemented for raftstore")
}
4 changes: 4 additions & 0 deletions pkg/rid/store/raftstore/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,7 @@ func (r *repo) MaxSubscriptionCountInCellsByOwner(_ context.Context, cells s2.Ce
func (r *repo) ListExpiredSubscriptions(_ context.Context, writer string, threshold time.Time) ([]*ridmodels.Subscription, error) {
return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "ListExpiredSubscriptions not implemented for raftstore")
}

func (r *repo) CountSubscriptions(_ context.Context) (int64, error) {
return 0, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "CountSubscriptions not implemented for raftstore")
}
6 changes: 6 additions & 0 deletions pkg/rid/store/sqlstore/identification_service_area.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,3 +239,9 @@ func (r *repo) ListExpiredISAs(ctx context.Context, writer string, threshold tim
LIMIT $3`, isaFields)
return r.fetchISAs(ctx, isasInCellsQuery, threshold, writer, dssmodels.MaxResultLimit)
}

func (r *repo) CountISAs(ctx context.Context) (int64, error) {
var count int64
err := r.QueryRow(ctx, "SELECT COUNT(*) FROM identification_service_areas").Scan(&count)
return count, err
}
36 changes: 36 additions & 0 deletions pkg/rid/store/sqlstore/identification_service_area_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,3 +321,39 @@ func TestListExpiredISAsWithEmptyWriter(t *testing.T) {
require.NoError(t, err)
require.Len(t, serviceAreas, 1)
}

func TestStoreCountISAs(t *testing.T) {
var (
ctx = context.Background()
store, tearDownStore = setUpStore(ctx, t)
)
defer tearDownStore()

repo, err := store.Interact(ctx)
require.NoError(t, err)

// Insert the ISA.
copy := *serviceArea
isa, err := repo.InsertISA(ctx, &copy)
require.NoError(t, err)
require.NotNil(t, isa)

//Cound should be one
count, err := repo.CountISAs(ctx)
require.NoError(t, err)
require.Equal(t, count, int64(1))

// Delete the ISA.
// Ensure a fresh Get, then delete still updates the sub indexes
isa, err = repo.GetISA(ctx, isa.ID, false)
require.NoError(t, err)

serviceAreaOut, err := repo.DeleteISA(ctx, isa)
require.NoError(t, err)
require.Equal(t, isa, serviceAreaOut)

//Cound should be zero
count, err = repo.CountISAs(ctx)
require.NoError(t, err)
require.Equal(t, count, int64(0))
}
6 changes: 6 additions & 0 deletions pkg/rid/store/sqlstore/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,3 +298,9 @@ func (r *repo) ListExpiredSubscriptions(ctx context.Context, writer string, thre
writer = $2`, subscriptionFields)
return r.process(ctx, query, threshold, writer)
}

func (r *repo) CountSubscriptions(ctx context.Context) (int64, error) {
var count int64
err := r.QueryRow(ctx, "SELECT COUNT(*) FROM subscriptions").Scan(&count)
return count, err
}
33 changes: 33 additions & 0 deletions pkg/rid/store/sqlstore/subscriptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,3 +375,36 @@ func TestListExpiredSubscriptionsWithEmptyWriter(t *testing.T) {
require.NoError(t, err)
require.Len(t, subscriptions, 1)
}

func TestStoreCountSubscription(t *testing.T) {
var (
ctx = context.Background()
store, tearDownStore = setUpStore(ctx, t)
)
defer tearDownStore()

repo, err := store.Interact(ctx)
require.NoError(t, err)

for _, r := range subscriptionsPool {
t.Run(r.name, func(t *testing.T) {
sub1, err := repo.InsertSubscription(ctx, r.input)
require.NoError(t, err)
require.NotNil(t, sub1)

//Cound should be one
count, err := repo.CountSubscriptions(ctx)
require.NoError(t, err)
require.Equal(t, count, int64(1))

sub4, err := repo.DeleteSubscription(ctx, sub1)
require.NoError(t, err)
require.NotNil(t, sub4)

//Cound should be zero
count, err = repo.CountSubscriptions(ctx)
require.NoError(t, err)
require.Equal(t, count, int64(0))
})
}
}
9 changes: 9 additions & 0 deletions pkg/scd/repos/repos.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type OperationalIntent interface {
// ListExpiredOperationalIntents lists all operational intents older than the threshold.
// Their age is determined by their end time, or by their update time if they do not have an end time.
ListExpiredOperationalIntents(ctx context.Context, threshold time.Time) ([]*scdmodels.OperationalIntent, error)

// Count the number of existing operational intent
CountOperationalIntents(ctx context.Context) (int64, error)
}

// Subscription abstracts subscription-specific interactions with the backing repository.
Expand Down Expand Up @@ -64,6 +67,9 @@ type Subscription interface {
// ListExpiredSubscriptions lists all subscriptions older than the threshold.
// Their age is determined by their end time, or by their update time if they do not have an end time.
ListExpiredSubscriptions(ctx context.Context, threshold time.Time) ([]*scdmodels.Subscription, error)

// Count the number of existing subscriptions
CountSubscriptions(ctx context.Context) (int64, error)
}

type UssAvailability interface {
Expand All @@ -88,6 +94,9 @@ type Constraint interface {
// deleted subscription. Returns nil and an error if the Constraint does
// not exist.
DeleteConstraint(ctx context.Context, id dssmodels.ID) error

// Count the number of existing constraint
CountConstraints(ctx context.Context) (int64, error)
}

// scd.repos.Repository aggregates all SCD-specific repo interfaces to perform SCD operations on
Expand Down
4 changes: 4 additions & 0 deletions pkg/scd/store/raftstore/constraints.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,7 @@ func (r *repo) UpsertConstraint(_ context.Context, constraint *scdmodels.Constra
func (r *repo) DeleteConstraint(_ context.Context, id dssmodels.ID) error {
return stacktrace.NewErrorWithCode(dsserr.NotImplemented, "DeleteConstraint not implemented for raftstore")
}

func (r *repo) CountConstraints(_ context.Context) (int64, error) {
return 0, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "CountConstraint not implemented for raftstore")
}
4 changes: 4 additions & 0 deletions pkg/scd/store/raftstore/operational_intents.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,7 @@ func (r *repo) GetDependentOperationalIntents(_ context.Context, subscriptionID
func (r *repo) ListExpiredOperationalIntents(_ context.Context, threshold time.Time) ([]*scdmodels.OperationalIntent, error) {
return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "ListExpiredOperationalIntents not implemented for raftstore")
}

func (r *repo) CountOperationalIntents(_ context.Context) (int64, error) {
return 0, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "CountOperationalIntents not implemented for raftstore")
}
4 changes: 4 additions & 0 deletions pkg/scd/store/raftstore/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,7 @@ func (r *repo) LockSubscriptionsOnCells(_ context.Context, cells s2.CellUnion, s
func (r *repo) ListExpiredSubscriptions(_ context.Context, threshold time.Time) ([]*scdmodels.Subscription, error) {
return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "ListExpiredSubscriptions not implemented for raftstore")
}

func (r *repo) CountSubscriptions(_ context.Context) (int64, error) {
return 0, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "CountSubscriptions not implemented for raftstore")
}
6 changes: 6 additions & 0 deletions pkg/scd/store/sqlstore/constraints.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,3 +239,9 @@ func (c *repo) SearchConstraints(ctx context.Context, v4d *dssmodels.Volume4D) (

return constraints, nil
}

func (c *repo) CountConstraints(ctx context.Context) (int64, error) {
var count int64
err := c.q.QueryRow(ctx, "SELECT COUNT(*) FROM scd_constraints").Scan(&count)
return count, err
}
Loading
Loading