Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion adapter/sqs_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -1219,10 +1219,22 @@ func resolveListQueuesStart(names []string, token string) int {
}

func (s *SQSServer) scanQueueNames(ctx context.Context) ([]string, error) {
return s.scanQueueNamesAt(ctx, s.nextTxnReadTS(ctx))
}

// scanQueueNamesAt is scanQueueNames with the read timestamp passed
// in. SnapshotQueueDepths uses this so the membership scan and the
// per-queue counter reads share one MVCC view — without it, a queue
// created or deleted between the two HLC ticks would either be
// reported with stale counters or silently missed for the tick,
// producing spurious ForgetQueue calls in the observer's diff loop.
// Other callers (AdminListQueues, the reaper, the catalog walk)
// don't need cross-call consistency and continue to use the
// fresh-ts wrapper above.
func (s *SQSServer) scanQueueNamesAt(ctx context.Context, readTS uint64) ([]string, error) {
prefix := []byte(SqsQueueMetaPrefix)
end := prefixScanEnd(prefix)
start := bytes.Clone(prefix)
readTS := s.nextTxnReadTS(ctx)
var names []string
for {
kvs, err := s.store.ScanAt(ctx, start, end, sqsQueueScanPageLimit, readTS)
Expand Down
119 changes: 119 additions & 0 deletions adapter/sqs_depth_source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package adapter

import (
"context"
"log/slog"
)

// SQSQueueDepth is one queue's depth-attribute snapshot, the unit
// the SQSServer hands to monitoring.SQSObserver on each tick. The
// fields mirror sqsApproxCounters byte-for-byte and the public
// AdminQueueCounters JSON shape — operators see consistent numbers
// in dashboards and the admin SPA.
type SQSQueueDepth struct {
Queue string
Visible int64
NotVisible int64
Delayed int64
}

// SnapshotQueueDepths satisfies monitoring.SQSDepthSource. The
// observer Start loop calls this on every tick.
//
// Returns:
//
// - (snaps, true) — leader, scrape OK. Observer writes snaps to
// the gauges and diffs against the previous tick (forgetting
// any queue that disappeared from this snapshot).
// - (nil, true) — this node is a follower (leader-only emission
// keeps gauges consistent with AdminListQueues / AdminDescribeQueue
// at the same instant — follower scans would race the leader's
// writes). Empty-but-OK so the observer ForgetQueue's any
// gauges this node was emitting before stepping down.
// - (nil, false) — leader, but scrape failed (transient
// catalog-read error or ctx cancel mid-scan). Tells the
// observer to skip this tick: leave existing gauges in place
// rather than wiping every depth series — a single failed
// scrape would otherwise dashboard-render as a false "all
// queues drained" event until the next successful tick.
//
// Per-queue scan errors (loadQueueMetaAt / scanApproxCounters)
// remain handled in-line by snapshotOneQueueDepth: the offending
// queue is dropped from this tick's snapshot but ok stays true,
// so the observer ForgetQueue's just that one queue's gauges.
// Only a top-level scanQueueNames failure (which would silently
// turn into "no queues anywhere") flips ok to false.
func (s *SQSServer) SnapshotQueueDepths(ctx context.Context) ([]SQSQueueDepth, bool) {
if s == nil || s.coordinator == nil || s.store == nil || !s.coordinator.IsLeader() {
return nil, true
}
// Take ONE read timestamp up front and pass it through both the
// membership scan and the per-queue counter scans. With separate
// timestamps the membership read and the per-queue reads land on
// different MVCC views, so a queue created or deleted between
// them would be silently missed (or reported with stale
// counters) for one tick — and the observer's "current vs
// previous" diff would then ForgetQueue it spuriously, dashboard-
// rendering as a phantom drop. One ts per tick is also lighter
// on the leader's HLC than two.
readTS := s.nextTxnReadTS(ctx)
names, err := s.scanQueueNamesAt(ctx, readTS)
if err != nil {
slog.Warn("sqs depth snapshot: scanQueueNamesAt failed", "err", err)
return nil, false
}
out := make([]SQSQueueDepth, 0, len(names))
for _, name := range names {
if err := ctx.Err(); err != nil {
// ctx cancel mid-iteration: partial snapshot is
// useless because the observer would diff against it
// and ForgetQueue everything we hadn't reached yet.
// Signal skip-tick instead.
return nil, false
}
if snap, ok := s.snapshotOneQueueDepth(ctx, name, readTS); ok {
out = append(out, snap)
}
}
return out, true
}

// snapshotOneQueueDepth runs the per-queue catalog read pair
// (loadQueueMetaAt + scanApproxCounters) at the caller-supplied
// readTS and returns the resulting snapshot. Pulled out of the
// loop body so SnapshotQueueDepths stays under the cyclop budget;
// ok=false means "skip this queue from this tick" (queue gone,
// transient catalog read failure). Per-queue scan errors are
// logged and the offending queue is dropped from this tick's
// snapshot rather than aborting the entire pass.
func (s *SQSServer) snapshotOneQueueDepth(ctx context.Context, name string, readTS uint64) (SQSQueueDepth, bool) {
meta, exists, err := s.loadQueueMetaAt(ctx, name, readTS)
if err != nil {
// Log meta-read errors symmetrically with the
// scanApproxCounters branch below — both the function-level
// comment and the SnapshotQueueDepths contract state that
// per-queue scan errors are logged. A flapping catalog read
// would otherwise show up as a dashboard gauge gap with
// nothing in the logs to explain it.
slog.Warn("sqs depth snapshot: loadQueueMetaAt failed", "queue", name, "err", err)
return SQSQueueDepth{}, false
}
if !exists {
// Queue is genuinely gone (DeleteQueue tombstoned, generation
// fully drained). No log: the observer's diff will
// ForgetQueue this gauge on its own and a stable steady-state
// "queue no longer exists" is not an error worth alerting on.
return SQSQueueDepth{}, false
}
counters, err := s.scanApproxCounters(ctx, name, meta.Generation, readTS)
if err != nil {
slog.Warn("sqs depth snapshot: counters failed", "queue", name, "err", err)
return SQSQueueDepth{}, false
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
return SQSQueueDepth{
Queue: name,
Visible: counters.Visible,
NotVisible: counters.NotVisible,
Delayed: counters.Delayed,
}, true
}
28 changes: 14 additions & 14 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ toolchain go1.26.2

require (
github.com/Jille/grpc-multi-resolver v1.3.0
github.com/aws/aws-sdk-go-v2 v1.41.6
github.com/aws/aws-sdk-go-v2/config v1.32.16
github.com/aws/aws-sdk-go-v2/credentials v1.19.15
github.com/aws/aws-sdk-go-v2 v1.41.7
github.com/aws/aws-sdk-go-v2/config v1.32.17
github.com/aws/aws-sdk-go-v2/credentials v1.19.16
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.57.2
github.com/aws/smithy-go v1.25.0
github.com/aws/smithy-go v1.25.1
github.com/cockroachdb/errors v1.12.0
github.com/cockroachdb/pebble/v2 v2.1.4
github.com/emirpasic/gods v1.18.1
Expand Down Expand Up @@ -40,17 +40,17 @@ require (
github.com/DataDog/zstd v1.5.7 // indirect
github.com/RaduBerinde/axisds v0.1.0 // indirect
github.com/RaduBerinde/btreemap v0.0.0-20250419174037-3d62b7205d54 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.22 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.22 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.22 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.23 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.8 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.23 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.23 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.23 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.24 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.9 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.11.22 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.22 // indirect
github.com/aws/aws-sdk-go-v2/service/signin v1.0.10 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.30.16 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.20 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.42.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.23 // indirect
github.com/aws/aws-sdk-go-v2/service/signin v1.0.11 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.30.17 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.21 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.42.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cockroachdb/crlib v0.0.0-20241112164430-1264a2edc35b // indirect
Expand Down
56 changes: 28 additions & 28 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,38 +8,38 @@ github.com/RaduBerinde/btreemap v0.0.0-20250419174037-3d62b7205d54 h1:bsU8Tzxr/P
github.com/RaduBerinde/btreemap v0.0.0-20250419174037-3d62b7205d54/go.mod h1:0tr7FllbE9gJkHq7CVeeDDFAFKQVy5RnCSSNBOvdqbc=
github.com/aclements/go-perfevent v0.0.0-20240301234650-f7843625020f h1:JjxwchlOepwsUWcQwD2mLUAGE9aCp0/ehy6yCHFBOvo=
github.com/aclements/go-perfevent v0.0.0-20240301234650-f7843625020f/go.mod h1:tMDTce/yLLN/SK8gMOxQfnyeMeCg8KGzp0D1cbECEeo=
github.com/aws/aws-sdk-go-v2 v1.41.6 h1:1AX0AthnBQzMx1vbmir3Y4WsnJgiydmnJjiLu+LvXOg=
github.com/aws/aws-sdk-go-v2 v1.41.6/go.mod h1:dy0UzBIfwSeot4grGvY1AqFWN5zgziMmWGzysDnHFcQ=
github.com/aws/aws-sdk-go-v2/config v1.32.16 h1:Q0iQ7quUgJP0F/SCRTieScnaMdXr9h/2+wze1u3cNeM=
github.com/aws/aws-sdk-go-v2/config v1.32.16/go.mod h1:duCCnJEFqpt2RC6no1iK6q+8HpwOAkiUua0pY507dQc=
github.com/aws/aws-sdk-go-v2/credentials v1.19.15 h1:fyvgWTszojq8hEnMi8PPBTvZdTtEVmAVyo+NFLHBhH4=
github.com/aws/aws-sdk-go-v2/credentials v1.19.15/go.mod h1:gJiYyMOjNg8OEdRWOf3CrFQxM2a98qmrtjx1zuiQfB8=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.22 h1:IOGsJ1xVWhsi+ZO7/NW8OuZZBtMJLZbk4P5HDjJO0jQ=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.22/go.mod h1:b+hYdbU+jGKfXE8kKM6g1+h+L/Go3vMvzlxBsiuGsxg=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.22 h1:GmLa5Kw1ESqtFpXsx5MmC84QWa/ZrLZvlJGa2y+4kcQ=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.22/go.mod h1:6sW9iWm9DK9YRpRGga/qzrzNLgKpT2cIxb7Vo2eNOp0=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.22 h1:dY4kWZiSaXIzxnKlj17nHnBcXXBfac6UlsAx2qL6XrU=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.22/go.mod h1:KIpEUx0JuRZLO7U6cbV204cWAEco2iC3l061IxlwLtI=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.23 h1:FPXsW9+gMuIeKmz7j6ENWcWtBGTe1kH8r9thNt5Uxx4=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.23/go.mod h1:7J8iGMdRKk6lw2C+cMIphgAnT8uTwBwNOsGkyOCm80U=
github.com/aws/aws-sdk-go-v2 v1.41.7 h1:DWpAJt66FmnnaRIOT/8ASTucrvuDPZASqhhLey6tLY8=
github.com/aws/aws-sdk-go-v2 v1.41.7/go.mod h1:4LAfZOPHNVNQEckOACQx60Y8pSRjIkNZQz1w92xpMJc=
github.com/aws/aws-sdk-go-v2/config v1.32.17 h1:FpL4/758/diKwqbytU0prpuiu60fgXKUWCpDJtApclU=
github.com/aws/aws-sdk-go-v2/config v1.32.17/go.mod h1:OXqUMzgXytfoF9JaKkhrOYsyh72t9G+MJH8mMRaexOE=
github.com/aws/aws-sdk-go-v2/credentials v1.19.16 h1:r3RJBuU7X9ibt8RHbMjWE6y60QbKBiII6wSrXnapxSU=
github.com/aws/aws-sdk-go-v2/credentials v1.19.16/go.mod h1:6cx7zqDENJDbBIIWX6P8s0h6hqHC8Avbjh9Dseo27ug=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.23 h1:UuSfcORqNSz/ey3VPRS8TcVH2Ikf0/sC+Hdj400QI6U=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.23/go.mod h1:+G/OSGiOFnSOkYloKj/9M35s74LgVAdJBSD5lsFfqKg=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.23 h1:GpT/TrnBYuE5gan2cZbTtvP+JlHsutdmlV2YfEyNde0=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.23/go.mod h1:xYWD6BS9ywC5bS3sz9Xh04whO/hzK2plt2Zkyrp4JuA=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.23 h1:bpd8vxhlQi2r1hiueOw02f/duEPTMK59Q4QMAoTTtTo=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.23/go.mod h1:15DfR2nw+CRHIk0tqNyifu3G1YdAOy68RftkhMDDwYk=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.24 h1:OQqn11BtaYv1WLUowvcA30MpzIu8Ti4pcLPIIyoKZrA=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.24/go.mod h1:X5ZJyfwVrWA96GzPmUCWFQaEARPR7gCrpq2E92PJwAE=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.57.2 h1:J2ibOhlMLx1o6QwDFsHHfbQjaZ6t5LXodiLNuK6jbZA=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.57.2/go.mod h1:Tj8VcffnduuewrM8HN8xQ9wzzez0CJ0FGSGEovq7Sgs=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.8 h1:HtOTYcbVcGABLOVuPYaIihj6IlkqubBwFj10K5fxRek=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.8/go.mod h1:VsK9abqQeGlzPgUr+isNWzPlK2vKe9INMLWnY65f5Xs=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.9 h1:FLudkZLt5ci0ozzgkVo8BJGwvqNaZbTWb3UcucAateA=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.9/go.mod h1:w7wZ/s9qK7c8g4al+UyoF1Sp/Z45UwMGcqIzLWVQHWk=
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.11.22 h1:8IXbJCgOn8ztzvRUOm27iCeTSxmPW45JsSDW3EGi16M=
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.11.22/go.mod h1:l53RbOWvncp4DEmlEz6dSXJS913AIxtFqkJZ+Xz7pHs=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.22 h1:PUmZeJU6Y1Lbvt9WFuJ0ugUK2xn6hIWUBBbKuOWF30s=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.22/go.mod h1:nO6egFBoAaoXze24a2C0NjQCvdpk8OueRoYimvEB9jo=
github.com/aws/aws-sdk-go-v2/service/signin v1.0.10 h1:a1Fq/KXn75wSzoJaPQTgZO0wHGqE9mjFnylnqEPTchA=
github.com/aws/aws-sdk-go-v2/service/signin v1.0.10/go.mod h1:p6+MXNxW7IA6dMgHfTAzljuwSKD0NCm/4lbS4t6+7vI=
github.com/aws/aws-sdk-go-v2/service/sso v1.30.16 h1:x6bKbmDhsgSZwv6q19wY/u3rLk/3FGjJWyqKcIRufpE=
github.com/aws/aws-sdk-go-v2/service/sso v1.30.16/go.mod h1:CudnEVKRtLn0+3uMV0yEXZ+YZOKnAtUJ5DmDhilVnIw=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.20 h1:oK/njaL8GtyEihkWMD4k3VgHCT64RQKkZwh0DG5j8ak=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.20/go.mod h1:JHs8/y1f3zY7U5WcuzoJ/yAYGYtNIVPKLIbp61euvmg=
github.com/aws/aws-sdk-go-v2/service/sts v1.42.0 h1:ks8KBcZPh3PYISr5dAiXCM5/Thcuxk8l+PG4+A0exds=
github.com/aws/aws-sdk-go-v2/service/sts v1.42.0/go.mod h1:pFw33T0WLvXU3rw1WBkpMlkgIn54eCB5FYLhjDc9Foo=
github.com/aws/smithy-go v1.25.0 h1:Sz/XJ64rwuiKtB6j98nDIPyYrV1nVNJ4YU74gttcl5U=
github.com/aws/smithy-go v1.25.0/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.23 h1:pbrxO/kuIwgEsOPLkaHu0O+m4fNgLU8B3vxQ+72jTPw=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.23/go.mod h1:/CMNUqoj46HpS3MNRDEDIwcgEnrtZlKRaHNaHxIFpNA=
github.com/aws/aws-sdk-go-v2/service/signin v1.0.11 h1:TdJ+HdzOBhU8+iVAOGUTU63VXopcumCOF1paFulHWZc=
github.com/aws/aws-sdk-go-v2/service/signin v1.0.11/go.mod h1:R82ZRExE/nheo0N+T8zHPcLRTcH8MGsnR3BiVGX0TwI=
github.com/aws/aws-sdk-go-v2/service/sso v1.30.17 h1:7byT8HUWrgoRp6sXjxtZwgOKfhss5fW6SkLBtqzgRoE=
github.com/aws/aws-sdk-go-v2/service/sso v1.30.17/go.mod h1:xNWknVi4Ezm1vg1QsB/5EWpAJURq22uqd38U8qKvOJc=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.21 h1:+1Kl1zx6bWi4X7cKi3VYh29h8BvsCoHQEQ6ST9X8w7w=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.21/go.mod h1:4vIRDq+CJB2xFAXZ+YgGUTiEft7oAQlhIs71xcSeuVg=
github.com/aws/aws-sdk-go-v2/service/sts v1.42.1 h1:F/M5Y9I3nwr2IEpshZgh1GeHpOItExNM9L1euNuh/fk=
github.com/aws/aws-sdk-go-v2/service/sts v1.42.1/go.mod h1:mTNxImtovCOEEuD65mKW7DCsL+2gjEH+RPEAexAzAio=
github.com/aws/smithy-go v1.25.1 h1:J8ERsGSU7d+aCmdQur5Txg6bVoYelvQJgtZehD12GkI=
github.com/aws/smithy-go v1.25.1/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
Expand Down
64 changes: 64 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,63 @@ func startMonitoringCollectors(ctx context.Context, reg *monitoring.Registry, ru
}
}

// startSQSDepthObserver wires the SQS adapter (when enabled on this
// node) into the monitoring registry's SQSObserver so the
// elastickv_sqs_queue_messages gauges start updating. Mirrors the
// Raft / Redis pattern: the source is plugged in once after startup,
// then the observer owns the ticker. nil sqsServer (e.g.
// --sqsAddress empty on this node) is a no-op.
//
// The thin adapter exists because monitoring.SQSQueueDepth and
// adapter.SQSQueueDepth are intentionally distinct types — having
// the adapter import monitoring would invert the dependency
// direction (every adapter would then know about Prometheus).
// Conversion is a fixed 4-field copy and a shape mismatch surfaces
// at compile time here, not at runtime on the metrics path.
func startSQSDepthObserver(ctx context.Context, reg *monitoring.Registry, sqsServer *adapter.SQSServer) {
if reg == nil || sqsServer == nil {
return
}
if observer := reg.SQSObserver(); observer != nil {
observer.Start(ctx, sqsDepthSourceAdapter{inner: sqsServer}, 0)
}
}

// sqsDepthSourceAdapter bridges *adapter.SQSServer (which returns
// []adapter.SQSQueueDepth) to monitoring.SQSDepthSource (which
// expects []monitoring.SQSQueueDepth). Same shape both sides; the
// loop is a fixed-size copy.
type sqsDepthSourceAdapter struct {
inner *adapter.SQSServer
}

func (a sqsDepthSourceAdapter) SnapshotQueueDepths(ctx context.Context) ([]monitoring.SQSQueueDepth, bool) {
if a.inner == nil {
// Empty-but-OK: nothing to emit. Mirrors the
// follower / nil-receiver case of the underlying source.
return nil, true
}
snaps, ok := a.inner.SnapshotQueueDepths(ctx)
if !ok {
// Propagate skip-tick verbatim so the observer leaves
// existing gauges alone on a transient scan failure.
return nil, false
}
if len(snaps) == 0 {
return nil, true
}
out := make([]monitoring.SQSQueueDepth, len(snaps))
for i, s := range snaps {
out[i] = monitoring.SQSQueueDepth{
Queue: s.Queue,
Visible: s.Visible,
NotVisible: s.NotVisible,
Delayed: s.Delayed,
}
}
return out, true
}

// writeConflictMonitorSources extracts the MVCC stores that expose
// per-(kind, key_prefix) OCC conflict counters so monitoring can poll
// them for the elastickv_store_write_conflict_total metric. Every
Expand Down Expand Up @@ -1551,6 +1608,13 @@ func (r *runtimeServerRunner) start() error {
return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err)
}
r.sqsServer = sqsServer
// Plug the SQS adapter into the monitoring registry's depth
// observer (see startSQSDepthObserver). nil sqsServer (e.g.
// --sqsAddress empty on this node) is a no-op so single-binary
// tests don't need to construct a fake source.
if r.sqsServer != nil {
startSQSDepthObserver(r.ctx, r.metricsRegistry, r.sqsServer)
}
if err := startMetricsServer(r.ctx, r.lc, r.eg, r.metricsAddress, r.metricsToken, r.metricsRegistry.Handler()); err != nil {
return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err)
}
Expand Down
Loading
Loading