Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions NEXT_RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,16 @@ The release notes should contain at least the following sections:

## Mandatory migration tasks

* `enable_opentelemetry` has been splited into two flags: `enable_metrics` and `enable_tracing`

## Optional migration tasks

## Important information

* Fixed a bug where the `evict` command ignored entries without a locality. If your DSS instance does not have a locality set, the next `evict` run may be slow while it processes the backlog of old entries.
* Fixed a bug where Helm charts and Tanka files didn't actually perform any actions via the `evict` command when run via cron jobs (because no locality was set and no delete flag was specified). If you have a large number of entries, the next run may be slow while it processes the backlog of old entries.
* AWS load balancer names are no longer enforced by Helm charts or Tanka files. Existing clusters will retain their current names, while new ones will use names automatically generated by AWS.
* Grafana version deployed by Helm charts or Tanka files have been upgraded to 13.0. Ensure to read grafana changelog based on your current version.

## Minimal database schema version

Expand Down
115 changes: 110 additions & 5 deletions cmds/core-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
"github.com/interuss/dss/pkg/versioning"
"github.com/interuss/stacktrace"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
)

Expand All @@ -52,8 +54,9 @@ var (
logLevel = flag.String("log_level", logging.DefaultLevel.String(), "The log level")
dumpRequests = flag.Bool("dump_requests", false, "Log full HTTP request and response (note: will dump sensitive information to logs; intended only for debugging and/or development)")
profServiceName = flag.String("gcp_prof_service_name", "", "Service name for the Go profiler")
enableOpenTelemetry = flag.Bool("enable_opentelemetry", false, "Enable OpenTelemetry, including traces and activation metric endpoint")
metricsListeningAddress = flag.String("metrics_addr", ":8079", "Address and port that the OpenTelemetry prometheus service binds to and listens on for incoming connections")
enableMetrics = flag.Bool("enable_metrics", false, "Enable metric endpoint")
enableTracing = flag.Bool("enable_tracing", false, "Enable tracing")
metricsListeningAddress = flag.String("metrics_addr", ":8079", "Address and port that the for the prometheus-compatible metric service binds to and listens on for incoming connections")

pkFile = flag.String("public_key_files", "", "Path to public Keys to use for JWT decoding, separated by commas.")
jwksEndpoint = flag.String("jwks_endpoint", "", "URL pointing to an endpoint serving JWKS")
Expand Down Expand Up @@ -117,6 +120,14 @@ func createRIDServers(ctx context.Context, locality string, logger *zap.Logger)
return nil, nil, stacktrace.Propagate(err, "Unable to interact with store")
}

if *enableMetrics {
err = registerRIDMetrics(ctx, ridStore)

if err != nil {
return nil, nil, stacktrace.Propagate(err, "Unable to setup metrics")
}
}

app := application.NewFromTransactor(ridStore, logger)
return &rid_v1.Server{
App: app,
Expand All @@ -136,13 +147,107 @@ func createSCDServer(ctx context.Context, logger *zap.Logger) (*scd.Server, erro
return nil, err
}

if *enableMetrics {
err = registerSCDMetrics(ctx, scdStore)

if err != nil {
return nil, stacktrace.Propagate(err, "Unable to setup metrics")
}
}

return &scd.Server{
Store: scdStore,
DSSReportHandler: &scd.JSONLoggingReceivedReportHandler{ReportLogger: logger},
AllowHTTPBaseUrls: *allowHTTPBaseUrls,
}, nil
}

func registerRIDMetrics(ctx context.Context, store rids.Store) error {

meter := otel.Meter("rid")

_, err := meter.Int64ObservableGauge(
"rid_subscriptions_count",
metric.WithDescription("Number of rid subscriptions"),
metric.WithInt64Callback(newCachedGauge(func(ctx context.Context) (int64, error) {
repo, err := store.Interact(ctx)
if err != nil {
return 0, stacktrace.Propagate(err, "Unable to interact with store")
}
count, err := repo.CountSubscriptions(ctx)
return int64(count), err
})),
)
if err != nil {
return err
}

_, err = meter.Int64ObservableGauge(
"rid_identification_service_areas_count",
metric.WithDescription("Number of rid ISAs"),
metric.WithInt64Callback(newCachedGauge(func(ctx context.Context) (int64, error) {
repo, err := store.Interact(ctx)
if err != nil {
return 0, stacktrace.Propagate(err, "Unable to interact with store")
}
count, err := repo.CountISAs(ctx)
return int64(count), err
})),
)

return err
}

func registerSCDMetrics(ctx context.Context, store scds.Store) error {

meter := otel.Meter("scd")

_, err := meter.Int64ObservableGauge(
"scd_subscriptions_count",
metric.WithDescription("Number of scd subscriptions"),
metric.WithInt64Callback(newCachedGauge(func(ctx context.Context) (int64, error) {
repo, err := store.Interact(ctx)
if err != nil {
return 0, stacktrace.Propagate(err, "Unable to interact with store")
}
count, err := repo.CountSubscriptions(ctx)
return int64(count), err
})),
)
if err != nil {
return err
}
_, err = meter.Int64ObservableGauge(
"scd_operational_intents_count",
metric.WithDescription("Number of scd operational intents"),
metric.WithInt64Callback(newCachedGauge(func(ctx context.Context) (int64, error) {
repo, err := store.Interact(ctx)
if err != nil {
return 0, stacktrace.Propagate(err, "Unable to interact with store")
}
count, err := repo.CountOperationalIntents(ctx)
return int64(count), err
})),
)
if err != nil {
return err
}
_, err = meter.Int64ObservableGauge(
"scd_constraints_count",
metric.WithDescription("Number of scd constraints"),
metric.WithInt64Callback(newCachedGauge(func(ctx context.Context) (int64, error) {
repo, err := store.Interact(ctx)
if err != nil {
return 0, stacktrace.Propagate(err, "Unable to interact with store")
}
count, err := repo.CountConstraints(ctx)
return int64(count), err
})),
)

return err
}

// RunHTTPServer starts the DSS HTTP server.
func RunHTTPServer(ctx context.Context, ctxCanceler func(), address, locality string) error {
logger := logging.WithValuesFromContext(ctx, logging.Logger).With(zap.String("address", address))
Expand Down Expand Up @@ -234,7 +339,7 @@ func RunHTTPServer(ctx context.Context, ctxCanceler func(), address, locality st
handler = authorizer.TokenMiddleware(handler)
handler = timeoutMiddleware(*timeout, handler)

if *enableOpenTelemetry {
if *enableMetrics || *enableTracing {
// We use the default settings; the APIRouter handler will override the span value accordingly, as it has more information.
handler = otelhttp.NewHandler(handler, "http")
}
Expand Down Expand Up @@ -341,8 +446,8 @@ func main() {
}

// Set up OpenTelemetry.
if *enableOpenTelemetry {
otelShutdown, err := setupOTelSDK(ctx, *metricsListeningAddress)
if *enableMetrics || *enableTracing {
otelShutdown, err := setupOTelSDK(ctx, *enableMetrics, *enableTracing, *metricsListeningAddress)
if err != nil {
logger.Panic("Failed to initialize OpenTelemetry", zap.Error(err))
}
Expand Down
73 changes: 61 additions & 12 deletions cmds/core-service/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ import (
"context"
"errors"
"net/http"
"sync"
"time"

"github.com/interuss/dss/pkg/logging"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/prometheus"
ometric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
Expand All @@ -20,28 +23,43 @@ import (

// setupOTelSDK bootstraps the OpenTelemetry pipeline.
// If it does not return an error, make sure to call shutdown for proper cleanup.
func setupOTelSDK(ctx context.Context, metricsListeningAddress string) (func(context.Context) error, error) {
func setupOTelSDK(ctx context.Context, enableMetrics bool, enableTracing bool, metricsListeningAddress string) (func(context.Context) error, error) {

// Set up propagator.
prop := newPropagator()
otel.SetTextMapPropagator(prop)

// Set up trace provider.
tracerProvider, err := newTracerProvider(ctx)
if err != nil {
return nil, err
var tracerProvider *trace.TracerProvider
var meterProvider *metric.MeterProvider

if enableTracing {
// Set up trace provider.
tracerProvider, err := newTracerProvider(ctx)
if err != nil {
return nil, err
}
otel.SetTracerProvider(tracerProvider)
}
otel.SetTracerProvider(tracerProvider)

// Set up metrics exporter
meterProvider, err := newMeterProvider(ctx, metricsListeningAddress)
if err != nil {
return nil, err
if enableMetrics {
// Set up metrics exporter
meterProvider, err := newMeterProvider(ctx, metricsListeningAddress)
if err != nil {
return nil, err
}
otel.SetMeterProvider(meterProvider)
}
otel.SetMeterProvider(meterProvider)

shutdown := func(ctx context.Context) error {
return errors.Join(tracerProvider.Shutdown(ctx), meterProvider.Shutdown(ctx))
if tracerProvider != nil && meterProvider != nil {
return errors.Join(tracerProvider.Shutdown(ctx), meterProvider.Shutdown(ctx))
} else if tracerProvider != nil {
return tracerProvider.Shutdown(ctx)
} else if meterProvider != nil {
return meterProvider.Shutdown(ctx)
} else {
return nil
}
}
return shutdown, nil
}
Expand Down Expand Up @@ -104,3 +122,34 @@ func serveMetrics(ctx context.Context, listeningAddress string) {
}
logger.Info("Prometheus endpoint started", zap.String("listeningAddress", listeningAddress))
}

// Small helper to cache metrics
type cachedGauge struct {
mu sync.Mutex
last int64
fetchedAt time.Time
ttl time.Duration
fetch func(context.Context) (int64, error)
}

func (c *cachedGauge) Observe(ctx context.Context, o ometric.Int64Observer) error {
c.mu.Lock()
defer c.mu.Unlock()
if time.Since(c.fetchedAt) < c.ttl {
o.Observe(c.last)
return nil
}
v, err := c.fetch(ctx)
if err != nil {
return err
}
c.last = v
c.fetchedAt = time.Now()
o.Observe(v)
return nil
}

func newCachedGauge(fetch func(context.Context) (int64, error)) ometric.Int64Callback {
g := &cachedGauge{ttl: time.Second, fetch: fetch}
return g.Observe
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ resource "local_file" "helm_chart_values" {
publicEndpoint = "https://${var.app_hostname}"
enableScd = var.enable_scd
enableScdGlobalLock = var.enable_scd_global_lock
enableDssMetrics = var.enable_dss_metrics
locality = "zone=${var.locality}"

evict = {
Expand Down Expand Up @@ -274,6 +275,7 @@ resource "local_file" "helm_chart_values" {
publicEndpoint = "https://${var.app_hostname}"
enableScd = var.enable_scd
enableScdGlobalLock = var.enable_scd_global_lock
enableDssMetrics = var.enable_dss_metrics
locality = "zone=${var.locality}"

evict = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ resource "local_file" "tanka_config_main" {
VAR_CLUSTER_CONTEXT = var.kubernetes_context_name
VAR_ENABLE_SCD = var.enable_scd
VAR_ENABLE_SCD_GLOBAL_LOCK = var.enable_scd_global_lock
VAR_ENABLE_DSS_METRICS = var.enable_dss_metrics
VAR_DB_HOSTNAME_SUFFIX = var.db_hostname_suffix
VAR_LOCALITY = var.locality
VAR_DATASTORE = var.datastore_type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ local metadata = metadataBase {
single_cluster: false,
enableScd: ${VAR_ENABLE_SCD}, // <-- This boolean value is VAR_ENABLE_SCD
enableScdGlobalLock: ${VAR_ENABLE_SCD_GLOBAL_LOCK}, // <-- This boolean value is VAR_ENABLE_SCD_GLOBAL_LOCK
enableDssMetrics: ${VAR_ENABLE_DSS_METRICS}, // <-- This boolean value is VAR_ENABLE_DSS_METRICS
datastore: '${VAR_DATASTORE}',
locality: '${VAR_LOCALITY}',
cockroach+: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,3 +488,16 @@ variable "enable_monitoring" {
}


variable "enable_dss_metrics" {
type = bool
default = false
description = <<-EOT
Enable DSS's prometheus metric.

Require DSS version to be at least 0.23.0.

Example: `true`
EOT
}


6 changes: 6 additions & 0 deletions deploy/infrastructure/modules/terraform-aws-dss/TFVARS.gen.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ Use <code>latest</code> to use the latest schema version.</p>
Use <code>latest</code> to use the latest schema version.</p>
<p>Example: <code>3.1.0</code></p>
<br/>Default value: <code>"latest"</code></td>
</tr><tr>
<td>enable_dss_metrics (<code>bool</code>)</td>
<td><p>Enable DSS's prometheus metric.</p>
<p>Require DSS version to be at least 0.23.0.</p>
<p>Example: <code>true</code></p>
<br/>Default value: <code>false</code></td>
</tr><tr>
<td>enable_monitoring (<code>bool</code>)</td>
<td><p>Set to true to enable monitoring stack with prometheus / grafana.</p>
Expand Down
1 change: 1 addition & 0 deletions deploy/infrastructure/modules/terraform-aws-dss/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ module "terraform-commons-dss" {
enable_monitoring = var.enable_monitoring
enable_scd = var.enable_scd
enable_scd_global_lock = var.enable_scd_global_lock
enable_dss_metrics = var.enable_dss_metrics
prometheus_hostname = var.prometheus_hostname
ip_prometheus = module.terraform-aws-kubernetes.ip_prometheus

Expand Down
13 changes: 13 additions & 0 deletions deploy/infrastructure/modules/terraform-aws-dss/variables.gen.tf
Original file line number Diff line number Diff line change
Expand Up @@ -602,3 +602,16 @@ variable "enable_monitoring" {
}


variable "enable_dss_metrics" {
type = bool
default = false
description = <<-EOT
Enable DSS's prometheus metric.

Require DSS version to be at least 0.23.0.

Example: `true`
EOT
}


Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ Use <code>latest</code> to use the latest schema version.</p>
Use <code>latest</code> to use the latest schema version.</p>
<p>Example: <code>3.1.0</code></p>
<br/>Default value: <code>"latest"</code></td>
</tr><tr>
<td>enable_dss_metrics (<code>bool</code>)</td>
<td><p>Enable DSS's prometheus metric.</p>
<p>Require DSS version to be at least 0.23.0.</p>
<p>Example: <code>true</code></p>
<br/>Default value: <code>false</code></td>
</tr><tr>
<td>enable_monitoring (<code>bool</code>)</td>
<td><p>Set to true to enable monitoring stack with prometheus / grafana.</p>
Expand Down
1 change: 1 addition & 0 deletions deploy/infrastructure/modules/terraform-google-dss/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ module "terraform-commons-dss" {
enable_monitoring = var.enable_monitoring
enable_scd = var.enable_scd
enable_scd_global_lock = var.enable_scd_global_lock
enable_dss_metrics = var.enable_dss_metrics
prometheus_hostname = var.prometheus_hostname
ip_prometheus = module.terraform-google-kubernetes.ip_prometheus

Expand Down
Loading
Loading