-
Notifications
You must be signed in to change notification settings - Fork 2
feat(sqs/monitoring): per-queue depth gauges + Grafana dashboard #743
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
5f16a7e
feat(sqs/monitoring): per-queue depth gauges + Grafana dashboard
bootjp 1911cc1
monitoring/sqs: ForgetQueue frees cardinality slot + dashboard fix (P…
bootjp 33dca0c
adapter/sqs: single readTS per depth-snapshot tick (PR #743 r2)
bootjp e40ad3b
monitoring/sqs: split counter / depth cardinality budgets (PR #743 r3)
bootjp 9738f75
monitoring/sqs: drop _other gauge when overflow set empties (PR #743 r4)
bootjp 7b5ab13
monitoring/sqs: clean overflow set on real-label promotion (PR #743 r5)
bootjp 1651db7
monitoring/sqs: distinguish skip-tick from empty snapshot (PR #743 r6)
bootjp dd8a3c3
monitoring/sqs: share one readTS across depth-scan + counter reads (P…
bootjp fd3a3b0
adapter/sqs: log loadQueueMetaAt errors symmetrically (PR #743 r8)
bootjp eab2099
monitoring/sqs: reclaim depth slots before admitting new queues (PR #…
bootjp File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,119 @@ | ||
| package adapter | ||
|
|
||
| import ( | ||
| "context" | ||
| "log/slog" | ||
| ) | ||
|
|
||
| // SQSQueueDepth is one queue's depth-attribute snapshot, the unit | ||
| // the SQSServer hands to monitoring.SQSObserver on each tick. The | ||
| // fields mirror sqsApproxCounters byte-for-byte and the public | ||
| // AdminQueueCounters JSON shape — operators see consistent numbers | ||
| // in dashboards and the admin SPA. | ||
| type SQSQueueDepth struct { | ||
| Queue string | ||
| Visible int64 | ||
| NotVisible int64 | ||
| Delayed int64 | ||
| } | ||
|
|
||
| // SnapshotQueueDepths satisfies monitoring.SQSDepthSource. The | ||
| // observer Start loop calls this on every tick. | ||
| // | ||
| // Returns: | ||
| // | ||
| // - (snaps, true) — leader, scrape OK. Observer writes snaps to | ||
| // the gauges and diffs against the previous tick (forgetting | ||
| // any queue that disappeared from this snapshot). | ||
| // - (nil, true) — this node is a follower (leader-only emission | ||
| // keeps gauges consistent with AdminListQueues / AdminDescribeQueue | ||
| // at the same instant — follower scans would race the leader's | ||
| // writes). Empty-but-OK so the observer ForgetQueue's any | ||
| // gauges this node was emitting before stepping down. | ||
| // - (nil, false) — leader, but scrape failed (transient | ||
| // catalog-read error or ctx cancel mid-scan). Tells the | ||
| // observer to skip this tick: leave existing gauges in place | ||
| // rather than wiping every depth series — a single failed | ||
| // scrape would otherwise dashboard-render as a false "all | ||
| // queues drained" event until the next successful tick. | ||
| // | ||
| // Per-queue scan errors (loadQueueMetaAt / scanApproxCounters) | ||
| // remain handled in-line by snapshotOneQueueDepth: the offending | ||
| // queue is dropped from this tick's snapshot but ok stays true, | ||
| // so the observer ForgetQueue's just that one queue's gauges. | ||
| // Only a top-level scanQueueNames failure (which would silently | ||
| // turn into "no queues anywhere") flips ok to false. | ||
| func (s *SQSServer) SnapshotQueueDepths(ctx context.Context) ([]SQSQueueDepth, bool) { | ||
| if s == nil || s.coordinator == nil || s.store == nil || !s.coordinator.IsLeader() { | ||
| return nil, true | ||
| } | ||
| // Take ONE read timestamp up front and pass it through both the | ||
| // membership scan and the per-queue counter scans. With separate | ||
| // timestamps the membership read and the per-queue reads land on | ||
| // different MVCC views, so a queue created or deleted between | ||
| // them would be silently missed (or reported with stale | ||
| // counters) for one tick — and the observer's "current vs | ||
| // previous" diff would then ForgetQueue it spuriously, dashboard- | ||
| // rendering as a phantom drop. One ts per tick is also lighter | ||
| // on the leader's HLC than two. | ||
| readTS := s.nextTxnReadTS(ctx) | ||
| names, err := s.scanQueueNamesAt(ctx, readTS) | ||
| if err != nil { | ||
| slog.Warn("sqs depth snapshot: scanQueueNamesAt failed", "err", err) | ||
| return nil, false | ||
| } | ||
| out := make([]SQSQueueDepth, 0, len(names)) | ||
| for _, name := range names { | ||
| if err := ctx.Err(); err != nil { | ||
| // ctx cancel mid-iteration: partial snapshot is | ||
| // useless because the observer would diff against it | ||
| // and ForgetQueue everything we hadn't reached yet. | ||
| // Signal skip-tick instead. | ||
| return nil, false | ||
| } | ||
| if snap, ok := s.snapshotOneQueueDepth(ctx, name, readTS); ok { | ||
| out = append(out, snap) | ||
| } | ||
| } | ||
| return out, true | ||
| } | ||
|
|
||
| // snapshotOneQueueDepth runs the per-queue catalog read pair | ||
| // (loadQueueMetaAt + scanApproxCounters) at the caller-supplied | ||
| // readTS and returns the resulting snapshot. Pulled out of the | ||
| // loop body so SnapshotQueueDepths stays under the cyclop budget; | ||
| // ok=false means "skip this queue from this tick" (queue gone, | ||
| // transient catalog read failure). Per-queue scan errors are | ||
| // logged and the offending queue is dropped from this tick's | ||
| // snapshot rather than aborting the entire pass. | ||
| func (s *SQSServer) snapshotOneQueueDepth(ctx context.Context, name string, readTS uint64) (SQSQueueDepth, bool) { | ||
| meta, exists, err := s.loadQueueMetaAt(ctx, name, readTS) | ||
| if err != nil { | ||
| // Log meta-read errors symmetrically with the | ||
| // scanApproxCounters branch below — both the function-level | ||
| // comment and the SnapshotQueueDepths contract state that | ||
| // per-queue scan errors are logged. A flapping catalog read | ||
| // would otherwise show up as a dashboard gauge gap with | ||
| // nothing in the logs to explain it. | ||
| slog.Warn("sqs depth snapshot: loadQueueMetaAt failed", "queue", name, "err", err) | ||
| return SQSQueueDepth{}, false | ||
| } | ||
| if !exists { | ||
| // Queue is genuinely gone (DeleteQueue tombstoned, generation | ||
| // fully drained). No log: the observer's diff will | ||
| // ForgetQueue this gauge on its own and a stable steady-state | ||
| // "queue no longer exists" is not an error worth alerting on. | ||
| return SQSQueueDepth{}, false | ||
| } | ||
| counters, err := s.scanApproxCounters(ctx, name, meta.Generation, readTS) | ||
| if err != nil { | ||
| slog.Warn("sqs depth snapshot: counters failed", "queue", name, "err", err) | ||
| return SQSQueueDepth{}, false | ||
| } | ||
| return SQSQueueDepth{ | ||
| Queue: name, | ||
| Visible: counters.Visible, | ||
| NotVisible: counters.NotVisible, | ||
| Delayed: counters.Delayed, | ||
| }, true | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.