Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions cmds/core-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
"github.com/interuss/dss/pkg/versioning"
"github.com/interuss/stacktrace"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -117,6 +119,14 @@ func createRIDServers(ctx context.Context, locality string, logger *zap.Logger)
return nil, nil, stacktrace.Propagate(err, "Unable to interact with store")
}

if *enableOpenTelemetry {
err = registerRIDMetrics(ctx, ridStore)

if err != nil {
return nil, nil, stacktrace.Propagate(err, "Unable to setup metrics")
}
}

app := application.NewFromTransactor(ridStore, logger)
return &rid_v1.Server{
App: app,
Expand All @@ -143,6 +153,42 @@ func createSCDServer(ctx context.Context, logger *zap.Logger) (*scd.Server, erro
}, nil
}

func registerRIDMetrics(ctx context.Context, store rids.Store) error {

meter := otel.Meter("rid")

_, err := meter.Int64ObservableUpDownCounter(
"rid_subscriptions_total",
metric.WithDescription("Number of rid subscriptions"),
metric.WithInt64Callback(newCachedObservation(func(ctx context.Context) (int64, error) {
repo, err := store.Interact(ctx)
if err != nil {
return 0, stacktrace.Propagate(err, "Unable to interact with store")
}
count, err := repo.CountSubscriptions(ctx)
return count, err
})),
)
if err != nil {
return err
}

_, err = meter.Int64ObservableUpDownCounter(
"rid_identification_service_areas_total",
metric.WithDescription("Number of rid ISAs"),
metric.WithInt64Callback(newCachedObservation(func(ctx context.Context) (int64, error) {
repo, err := store.Interact(ctx)
if err != nil {
return 0, stacktrace.Propagate(err, "Unable to interact with store")
}
count, err := repo.CountISAs(ctx)
return count, err
})),
)

return err
}

// RunHTTPServer starts the DSS HTTP server.
func RunHTTPServer(ctx context.Context, ctxCanceler func(), address, locality string) error {
logger := logging.WithValuesFromContext(ctx, logging.Logger).With(zap.String("address", address))
Expand Down
34 changes: 34 additions & 0 deletions cmds/core-service/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ import (
"context"
"errors"
"net/http"
"sync"
"time"

"github.com/interuss/dss/pkg/logging"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/prometheus"
ometric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
Expand Down Expand Up @@ -104,3 +107,34 @@ func serveMetrics(ctx context.Context, listeningAddress string) {
}
logger.Info("Prometheus endpoint started", zap.String("listeningAddress", listeningAddress))
}

// Small helper to cache metrics
type cachedObservation struct {
mu sync.Mutex
last int64
fetchedAt time.Time
ttl time.Duration
fetch func(context.Context) (int64, error)
}

func (c *cachedObservation) Observe(ctx context.Context, o ometric.Int64Observer) error {
c.mu.Lock()
defer c.mu.Unlock()
Comment thread
mickmis marked this conversation as resolved.
if time.Since(c.fetchedAt) < c.ttl {
Comment thread
mickmis marked this conversation as resolved.
o.Observe(c.last)
return nil
}
v, err := c.fetch(ctx)
if err != nil {
return err
}
c.last = v
c.fetchedAt = time.Now()
o.Observe(v)
return nil
}

func newCachedObservation(fetch func(context.Context) (int64, error)) ometric.Int64Callback {
g := &cachedObservation{ttl: time.Second, fetch: fetch}
return g.Observe
}
5 changes: 5 additions & 0 deletions pkg/rid/application/isa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ func (store *isaStore) ListExpiredISAs(ctx context.Context, writer string, thres
return make([]*ridmodels.IdentificationServiceArea, 0), nil
}

// Implements repos.ISA.CountISAs
func (store *isaStore) CountISAs(ctx context.Context) (int64, error) {
return int64(len(store.isas)), nil
}

func TestISAUpdateIdxCells(t *testing.T) {
ctx := context.Background()
app, cleanup := setUpISAApp(ctx, t)
Expand Down
5 changes: 5 additions & 0 deletions pkg/rid/application/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ func (store *subscriptionStore) ListExpiredSubscriptions(ctx context.Context, wr
return make([]*ridmodels.Subscription, 0), nil
}

// Implements repos.ISA.CountSubscriptions
func (store *subscriptionStore) CountSubscriptions(ctx context.Context) (int64, error) {
return int64(len(store.subs)), nil
}

func TestBadOwner(t *testing.T) {
ctx := context.Background()
app, cleanup := setUpSubApp(ctx, t)
Expand Down
3 changes: 3 additions & 0 deletions pkg/rid/repos/isa.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,7 @@ type ISA interface {

// ListExpiredISAs lists all expired ISAs based on writer
ListExpiredISAs(ctx context.Context, writer string, threshold time.Time) ([]*ridmodels.IdentificationServiceArea, error)

// Count the number of existing ISA
CountISAs(ctx context.Context) (int64, error)
}
3 changes: 3 additions & 0 deletions pkg/rid/repos/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,7 @@ type Subscription interface {

// ListExpiredSubscriptions lists all expired Subscriptions based on writer.
ListExpiredSubscriptions(ctx context.Context, writer string, threshold time.Time) ([]*ridmodels.Subscription, error)

// Count the number of existing subscriptions
CountSubscriptions(ctx context.Context) (int64, error)
}
4 changes: 4 additions & 0 deletions pkg/rid/store/raftstore/identification_service_area.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,7 @@ func (r *repo) SearchISAs(_ context.Context, cells s2.CellUnion, earliest *time.
func (r *repo) ListExpiredISAs(_ context.Context, writer string, threshold time.Time) ([]*ridmodels.IdentificationServiceArea, error) {
return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "ListExpiredISAs not implemented for raftstore")
}

func (r *repo) CountISAs(_ context.Context) (int64, error) {
return 0, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "CountISAs not implemented for raftstore")
}
4 changes: 4 additions & 0 deletions pkg/rid/store/raftstore/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,7 @@ func (r *repo) MaxSubscriptionCountInCellsByOwner(_ context.Context, cells s2.Ce
func (r *repo) ListExpiredSubscriptions(_ context.Context, writer string, threshold time.Time) ([]*ridmodels.Subscription, error) {
return nil, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "ListExpiredSubscriptions not implemented for raftstore")
}

func (r *repo) CountSubscriptions(_ context.Context) (int64, error) {
return 0, stacktrace.NewErrorWithCode(dsserr.NotImplemented, "CountSubscriptions not implemented for raftstore")
}
6 changes: 6 additions & 0 deletions pkg/rid/store/sqlstore/identification_service_area.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,3 +239,9 @@ func (r *repo) ListExpiredISAs(ctx context.Context, writer string, threshold tim
LIMIT $3`, isaFields)
return r.fetchISAs(ctx, isasInCellsQuery, threshold, writer, dssmodels.MaxResultLimit)
}

func (r *repo) CountISAs(ctx context.Context) (int64, error) {
var count int64
err := r.QueryRow(ctx, "SELECT COUNT(*) FROM identification_service_areas").Scan(&count)
return count, err
}
36 changes: 36 additions & 0 deletions pkg/rid/store/sqlstore/identification_service_area_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,3 +321,39 @@ func TestListExpiredISAsWithEmptyWriter(t *testing.T) {
require.NoError(t, err)
require.Len(t, serviceAreas, 1)
}

func TestStoreCountISAs(t *testing.T) {
var (
ctx = context.Background()
store, tearDownStore = setUpStore(ctx, t)
)
defer tearDownStore()

repo, err := store.Interact(ctx)
require.NoError(t, err)

// Insert the ISA.
copy := *serviceArea
isa, err := repo.InsertISA(ctx, &copy)
require.NoError(t, err)
require.NotNil(t, isa)

//Cound should be one
count, err := repo.CountISAs(ctx)
require.NoError(t, err)
require.Equal(t, count, int64(1))

// Delete the ISA.
// Ensure a fresh Get, then delete still updates the sub indexes
isa, err = repo.GetISA(ctx, isa.ID, false)
require.NoError(t, err)

serviceAreaOut, err := repo.DeleteISA(ctx, isa)
require.NoError(t, err)
require.Equal(t, isa, serviceAreaOut)

//Cound should be zero
count, err = repo.CountISAs(ctx)
require.NoError(t, err)
require.Equal(t, count, int64(0))
}
6 changes: 6 additions & 0 deletions pkg/rid/store/sqlstore/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,3 +298,9 @@ func (r *repo) ListExpiredSubscriptions(ctx context.Context, writer string, thre
writer = $2`, subscriptionFields)
return r.process(ctx, query, threshold, writer)
}

func (r *repo) CountSubscriptions(ctx context.Context) (int64, error) {
var count int64
err := r.QueryRow(ctx, "SELECT COUNT(*) FROM subscriptions").Scan(&count)
return count, err
}
33 changes: 33 additions & 0 deletions pkg/rid/store/sqlstore/subscriptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,3 +375,36 @@ func TestListExpiredSubscriptionsWithEmptyWriter(t *testing.T) {
require.NoError(t, err)
require.Len(t, subscriptions, 1)
}

func TestStoreCountSubscription(t *testing.T) {
var (
ctx = context.Background()
store, tearDownStore = setUpStore(ctx, t)
)
defer tearDownStore()

repo, err := store.Interact(ctx)
require.NoError(t, err)

for _, r := range subscriptionsPool {
t.Run(r.name, func(t *testing.T) {
sub1, err := repo.InsertSubscription(ctx, r.input)
require.NoError(t, err)
require.NotNil(t, sub1)

//Cound should be one
count, err := repo.CountSubscriptions(ctx)
require.NoError(t, err)
require.Equal(t, count, int64(1))

sub4, err := repo.DeleteSubscription(ctx, sub1)
require.NoError(t, err)
require.NotNil(t, sub4)

//Cound should be zero
count, err = repo.CountSubscriptions(ctx)
require.NoError(t, err)
require.Equal(t, count, int64(0))
})
}
}
Loading