feat(sqs/monitoring): per-queue depth gauges + Grafana dashboard#743
Conversation
Adds elastickv_sqs_queue_messages{queue,state} (visible /
not_visible / delayed) on top of the existing partition-counter
metric (PR 7a), plus the Grafana dashboard the operator drops
into the existing provisioning chain.
Pattern mirrors the Raft / Redis observer split:
- adapter/sqs_depth_source.go: *SQSServer.SnapshotQueueDepths(ctx)
returns one SQSQueueDepth per known queue, or an empty slice on
followers. Leader-only emission keeps gauge state consistent
with what AdminListQueues would return at the same instant —
followers scanning concurrently would race the leader's writes
and emit conflicting numbers for the same series.
- monitoring/sqs.go (extended): SQSMetrics gains a queueDepth
GaugeVec + ObserveQueueDepth / ForgetQueue helpers. New
SQSObserver type owns the tick loop and the
current-vs-previous queue diff; queues that disappeared since
the last tick get ForgetQueue'd so the dashboard doesn't show
a frozen backlog after DeleteQueue. Same nil-tolerant contract
as RaftObserver.
- monitoring/registry.go: r.SQSObserver() accessor mirrors
r.RaftObserver().
- main.go: startSQSDepthObserver wires the SQS adapter into the
observer after startSQSServer; thin sqsDepthSourceAdapter type
bridges adapter.SQSQueueDepth to monitoring.SQSQueueDepth so
the adapter package doesn't have to import monitoring.
Dashboard (monitoring/grafana/dashboards/elastickv-sqs.json):
- Top stat row: queue count, total backlog, HT-FIFO ops rate,
active leader.
- Per-queue depth timeseries (visible / in-flight / delayed) with
$queue templating.
- Top 20 queues by backlog table.
- HT-FIFO partition activity: ops rate by action and by
(queue, partition) — the partition split is what the original
PR 7a counter was added for; the dashboard surfaces it.
Tests:
- TestSQSMetrics_ObserveQueueDepth_EmitsThreeStates pins the
gauge label triple (visible / not_visible / delayed).
- TestSQSMetrics_ObserveQueueDepth_ClampsNegativeToZero pins the
defensive clamp against -1 sentinels from a future failed scan.
- TestSQSMetrics_ObserveQueueDepth_DropsEmptyQueue pins the
empty-name drop (mirrors the partition-counter rule).
- TestSQSMetrics_ForgetQueue_DropsThreeSeries pins that
ForgetQueue clears every state series for the queue while
leaving the cumulative counter untouched.
- TestSQSMetrics_DepthNilReceiverIsSafe + DepthRegistryWiring
pin the nil-tolerant contract and registry plumbing.
- TestSQSObserver_ObserveOnce_EmitsAndForgets pins the diff
state machine: queues present in the current tick get gauges,
queues that disappeared get ForgetQueue'd.
- TestSQSObserver_ObserveOnce_LeaderStepDownClearsAll pins the
step-down branch (source returns empty slice -> all gauges
cleared).
- TestSQSObserver_NilTolerant pins nil source / nil observer
no-op behaviour.
Self-review (5 lenses):
1. Data loss — N/A; metrics-only, no storage / Raft / FSM touch.
2. Concurrency — observer.mu protects lastSeen; SQSMetrics.mu
protects trackedQueues. Source returns a fresh slice per call
(no shared mutable state). Ticker goroutine stops on ctx.Done.
3. Performance — one catalog scan + one approx-counter scan per
queue per 30s on the leader. scanApproxCounters already
self-caps the per-queue scan so a 1M-message queue doesn't
block. Per-queue cardinality cap (sqsMaxTrackedQueues = 512)
bounds Prometheus series budget.
4. Data consistency — leader-only emission keeps gauges in lock-
step with AdminListQueues / AdminDescribeQueue. Per-queue scan
error -> ForgetQueue (not zero) so a transient blip surfaces as
missing data, not as a false "drained" reading.
5. Test coverage — 8 new tests cover gauge math, observer state
machine, leader step-down, nil-tolerance, and registry wiring.
|
@claude review |
|
Warning Rate limit exceeded
To continue reviewing without waiting, purchase usage credits in the billing tab. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
📝 WalkthroughWalkthroughAdds leader-only SQS queue-depth snapshotting and a polling observer that publishes per-queue visible/not_visible/delayed Gauges, wiring the observer into startup, adding metrics types, tests, and a Grafana dashboard; also bumps AWS SDK patch dependencies. ChangesSQS Queue Depth Monitoring
Sequence DiagramsequenceDiagram
participant Startup as Startup
participant Observer as SQSObserver
participant Source as SQSServer
participant Metrics as SQSMetrics
participant Prometheus as Prometheus
Startup->>Observer: Start(ctx, source, interval)
Observer->>Observer: observeOnce (initial synchronous)
Observer->>Source: SnapshotQueueDepths(ctx)
Note right of Source: verify leader, get single readTS\nscan queue names at readTS
Source->>Source: snapshotOneQueueDepth(name, readTS)
Source-->>Observer: []SQSQueueDepth (or ok=false)
Observer->>Metrics: ObserveQueueDepth(queue, visible, notVisible, delayed)
Metrics->>Prometheus: set gauge values per (queue,state)
Observer->>Observer: schedule next tick (interval)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Claude finished @bootjp's task in 2m 30s —— View job Code Review
Overall the PR is well-structured and follows the existing observer pattern faithfully. The interface design, nil-tolerance contract, and test coverage are solid. There are a few issues to address before merge. Medium severity
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 5f16a7ef61
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| queueLabel := m.queueLabelForCardinalityBudget(queue) | ||
| m.queueDepth.WithLabelValues(queueLabel, sqsQueueStateVisible).Set(float64(maxInt64(0, visible))) | ||
| m.queueDepth.WithLabelValues(queueLabel, sqsQueueStateNotVisible).Set(float64(maxInt64(0, notVisible))) | ||
| m.queueDepth.WithLabelValues(queueLabel, sqsQueueStateDelayed).Set(float64(maxInt64(0, delayed))) |
There was a problem hiding this comment.
Preserve overflow queue depth instead of overwriting gauges
When the tracked queue cap is exceeded (sqsMaxTrackedQueues), queueLabelForCardinalityBudget returns _other, but ObserveQueueDepth writes depth with Set on that shared label. This means multiple overflow queues in the same tick clobber each other (only the last queue scanned is visible), and ForgetQueue for one disappeared overflow queue can delete _other even if other overflow queues are still present. The result is incorrect/missing backlog metrics specifically in deployments with more than 512 queues.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Code Review
This pull request implements queue-depth monitoring for the SQS adapter, introducing a background observer that exports message counts (visible, in-flight, and delayed) as Prometheus gauges. It also adds a corresponding Grafana dashboard and updates AWS SDK dependencies. Feedback identifies a bug in the ForgetQueue logic where the cardinality budget is not correctly managed, potentially leading to label exhaustion. Additionally, it is recommended to use a single read timestamp across all queue snapshots in a single tick to ensure data consistency and improve efficiency.
| func (m *SQSMetrics) ForgetQueue(queue string) { | ||
| if m == nil || queue == "" { | ||
| return | ||
| } | ||
| queueLabel := m.queueLabelForCardinalityBudget(queue) | ||
| m.queueDepth.DeleteLabelValues(queueLabel, sqsQueueStateVisible) | ||
| m.queueDepth.DeleteLabelValues(queueLabel, sqsQueueStateNotVisible) | ||
| m.queueDepth.DeleteLabelValues(queueLabel, sqsQueueStateDelayed) | ||
| } |
There was a problem hiding this comment.
The current implementation of ForgetQueue has two issues: 1. It doesn't remove the queue from trackedQueues, meaning the cardinality budget (sqsMaxTrackedQueues = 512) will eventually be exhausted by deleted queues, forcing all new queues into the _other label forever. 2. If a queue was not tracked (i.e., it was in the overflow bucket), calling queueLabelForCardinality budget will return sqsQueueOverflow, causing DeleteLabelValues to wipe the overflow series for all queues sharing that label. ForgetQueue should check if the queue is specifically tracked, remove it from the map if so, and only then delete the gauge series.
| func (m *SQSMetrics) ForgetQueue(queue string) { | |
| if m == nil || queue == "" { | |
| return | |
| } | |
| queueLabel := m.queueLabelForCardinalityBudget(queue) | |
| m.queueDepth.DeleteLabelValues(queueLabel, sqsQueueStateVisible) | |
| m.queueDepth.DeleteLabelValues(queueLabel, sqsQueueStateNotVisible) | |
| m.queueDepth.DeleteLabelValues(queueLabel, sqsQueueStateDelayed) | |
| } | |
| func (m *SQSMetrics) ForgetQueue(queue string) { | |
| if m == nil || queue == "" { | |
| return | |
| } | |
| m.mu.Lock() | |
| _, tracked := m.trackedQueues[queue] | |
| if tracked { | |
| delete(m.trackedQueues, queue) | |
| } | |
| m.mu.Unlock() | |
| if tracked { | |
| m.queueDepth.DeleteLabelValues(queue, sqsQueueStateVisible) | |
| m.queueDepth.DeleteLabelValues(queue, sqsQueueStateNotVisible) | |
| m.queueDepth.DeleteLabelValues(queue, sqsQueueStateDelayed) | |
| } | |
| } |
References
- To prevent unbounded memory growth and potential OOM issues, apply a fixed bound to collections that can grow from external requests. Ensure that the management of these collections correctly handles deletions to free up budget.
| func (s *SQSServer) SnapshotQueueDepths(ctx context.Context) []SQSQueueDepth { | ||
| if s == nil || s.coordinator == nil || s.store == nil || !s.coordinator.IsLeader() { | ||
| return nil | ||
| } | ||
| names, err := s.scanQueueNames(ctx) | ||
| if err != nil { | ||
| slog.Warn("sqs depth snapshot: scanQueueNames failed", "err", err) | ||
| return nil | ||
| } | ||
| out := make([]SQSQueueDepth, 0, len(names)) | ||
| for _, name := range names { | ||
| if err := ctx.Err(); err != nil { | ||
| return out | ||
| } | ||
| if snap, ok := s.snapshotOneQueueDepth(ctx, name); ok { | ||
| out = append(out, snap) | ||
| } | ||
| } | ||
| return out | ||
| } |
There was a problem hiding this comment.
For better efficiency and consistency across the snapshot, you should acquire a single read timestamp at the start of the pass rather than calling nextTxnReadTS inside the loop for every queue. This ensures that all queue depths in a single tick reflect the same point-in-time state of the catalog and counters.
func (s *SQSServer) SnapshotQueueDepths(ctx context.Context) []SQSQueueDepth {
if s == nil || s.coordinator == nil || s.store == nil || !s.coordinator.IsLeader() {
return nil
}
names, err := s.scanQueueNames(ctx)
if err != nil {
slog.Warn("sqs depth snapshot: scanQueueNames failed", "err", err)
return nil
}
readTS := s.nextTxnReadTS(ctx)
out := make([]SQSQueueDepth, 0, len(names))
for _, name := range names {
if err := ctx.Err(); err != nil {
return out
}
if snap, ok := s.snapshotOneQueueDepth(ctx, name, readTS); ok {
out = append(out, snap)
}
}
return out
}References
- In an MVCC system, all operations within a single transaction must share the same start timestamp to ensure atomicity and a consistent snapshot view.
| func (s *SQSServer) snapshotOneQueueDepth(ctx context.Context, name string) (SQSQueueDepth, bool) { | ||
| readTS := s.nextTxnReadTS(ctx) | ||
| meta, exists, err := s.loadQueueMetaAt(ctx, name, readTS) | ||
| if err != nil || !exists { | ||
| return SQSQueueDepth{}, false | ||
| } | ||
| counters, err := s.scanApproxCounters(ctx, name, meta.Generation, readTS) | ||
| if err != nil { | ||
| slog.Warn("sqs depth snapshot: counters failed", "queue", name, "err", err) | ||
| return SQSQueueDepth{}, false | ||
| } | ||
| return SQSQueueDepth{ | ||
| Queue: name, | ||
| Visible: counters.Visible, | ||
| NotVisible: counters.NotVisible, | ||
| Delayed: counters.Delayed, | ||
| }, true | ||
| } |
There was a problem hiding this comment.
Update the signature to accept the pre-acquired readTS to avoid redundant timestamp generation calls in the loop.
func (s *SQSServer) snapshotOneQueueDepth(ctx context.Context, name string, readTS uint64) (SQSQueueDepth, bool) {
meta, exists, err := s.loadQueueMetaAt(ctx, name, readTS)
if err != nil || !exists {
return SQSQueueDepth{}, false
}
counters, err := s.scanApproxCounters(ctx, name, meta.Generation, readTS)
if err != nil {
slog.Warn("sqs depth snapshot: counters failed", "queue", name, "err", err)
return SQSQueueDepth{}, false
}
return SQSQueueDepth{
Queue: name,
Visible: counters.Visible,
NotVisible: counters.NotVisible,
Delayed: counters.Delayed,
}, true
}…#743 r1) Three findings from the round-1 review on PR #743, plus the suggested test additions: Medium (cardinality leak): - ForgetQueue used to call DeleteLabelValues but never shrank trackedQueues, so the 512-entry budget filled with stale slots and post-cap new queues silently collapsed to _other forever. Fix: take m.mu, delete from trackedQueues, release, then DeleteLabelValues — but only when the queue was tracked. Overflow-mapped queues (queues that hit the cap and got the _other label) skip DeleteLabelValues so we don't tear down the shared _other series for unrelated queues. Low / style: - maxInt64 helper replaced with the Go 1.21+ builtin max[T]. Module already requires Go 1.25 (go.mod) so the builtin is available; the helper was redundant. - Dashboard 'Active leader' stat: the previous expr filtered on 'elastickv_sqs_queue_messages > 0' which returns no data on an idle cluster (every depth = 0). Switched to 'count by (node_id) (elastickv_sqs_queue_messages)' which surfaces the leader regardless of queue contents. - observeOnce: added single-writer-contract comment so a future caller doesn't accidentally introduce concurrent ticks; the CounterVec / GaugeVec writes are individually atomic, but symmetric documentation prevents drift. Tests: - TestSQSMetrics_ForgetQueue_DropsThreeSeries strengthened: now asserts trackedQueues was emptied AND that a post-forget Observe re-emits under the real queue name (not _other). Without the budget cleanup this assertion would have caught the leak. - TestSQSMetrics_ForgetQueue_OverflowQueueIsNoOp pins the overflow-collision safety: ForgetQueue on an _other-mapped queue must not delete the shared _other series and must not shrink the budget. Caller-audit per the standing semantic-change rule: - ForgetQueue's only production caller is SQSObserver.observeOnce. observeOnce calls it from the lastSeen-vs-current diff, only for queues that had been observed in the previous tick — so the not-tracked branch (added with this fix) only fires for overflow queues, never for regular ones, never for never- observed ones. No caller needs an update.
|
@claude review |
|
Claude finished @bootjp's task in 2m 44s —— View job Code Review
The r1 fixup commit ( What r1 fixed (confirmed)
The Remaining itemsLow: Per-queue
|
Round-2 review: optional cleanup. Move nextTxnReadTS out of the per-queue loop in snapshotOneQueueDepth and into the SnapshotQueueDepths caller so every queue in one tick shares the same MVCC read timestamp. Two wins: - Consistency. With per-queue ts the first queue's depth can reflect catalog state the last queue's depth can't (catalog mutation between calls). The Approximate* metric names already signal advisory-only, so this isn't a correctness bug — but one ts per tick gives the dashboard a coherent point-in-time snapshot that matches what AdminListQueues + AdminDescribeQueue would return at the same instant. - HLC pressure. Each nextTxnReadTS call advances the leader's HLC. On a 500-queue deployment this drops 499 unnecessary ticks per scrape interval. Caller audit per the standing semantic-change rule: - snapshotOneQueueDepth signature gained one parameter (readTS uint64). The only caller is SnapshotQueueDepths in the same file; updated to pass the per-tick value through. No other caller exists to audit.
|
@claude review |
|
Claude finished @bootjp's task in 2m 22s —— View job Code Review (r2)
What r2 fixed (confirmed)
The fix is well-implemented. The comment at go.mod deltaAll changes are routine AWS SDK patch bumps ( Standing items from earlier rounds (no new action required)These were called out previously and are consciously unaddressed — documenting them here so the merge decision is explicit. Double nil-guard in
No unit test for OverallAll medium-severity findings from r0 and the low-severity items from r1 are resolved. The three-commit history tells a clean story: initial feature → cardinality/dashboard fixes → single-readTS consistency fix. The interface design, ForgetQueue lifecycle, test coverage, and main.go wiring are all solid. LGTM. |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 33dca0c700
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| _, tracked := m.trackedQueues[queue] | ||
| if tracked { | ||
| delete(m.trackedQueues, queue) | ||
| } |
There was a problem hiding this comment.
Keep partition counter budget separate from depth cleanup
ForgetQueue now deletes entries from trackedQueues, but that map is also used by ObservePartitionMessage to enforce the elastickv_sqs_partition_messages_total cardinality cap. Because counter series are never deleted, removing a queue from trackedQueues lets later queues be admitted with real labels, so queue churn (or a leader step-down that clears lastSeen) can grow partition-counter series beyond sqsMaxTrackedQueues over time. This regresses the original cap and can cause unbounded Prometheus cardinality growth.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
monitoring/sqs.go (1)
248-268: 💤 Low value
Startis not idempotent — repeated calls leak ticker goroutines.Each call to
Startallocates a newtime.Tickerand spawns a new goroutine. CurrentlystartSQSDepthObserveronly runs once at startup, so this is latent, but a future refactor (e.g., re-wiring the source after a leader transition or a reconfig) would silently double the polling rate per call. The cost of guarding is one bool/atomic.♻️ Suggested guard
type SQSObserver struct { metrics *SQSMetrics + started atomic.Bool mu sync.Mutex lastSeen map[string]struct{} } @@ func (o *SQSObserver) Start(ctx context.Context, source SQSDepthSource, interval time.Duration) { if o == nil || source == nil { return } + if !o.started.CompareAndSwap(false, true) { + return + } if interval <= 0 { interval = sqsDepthObserveInterval }Add a corresponding test that calls
Starttwice and asserts only one observation goroutine is active (e.g., via a counter onfakeDepthSource).🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@monitoring/sqs.go` around lines 248 - 268, The Start method on SQSObserver creates a new time.Ticker and goroutine each call which leaks observers; make Start idempotent by adding a start guard (e.g., an atomic.Bool or sync.Once field on SQSObserver such as started or startOnce) so subsequent Start(ctx, source, interval) calls either return immediately or do not create another ticker/goroutine, and ensure the existing ticker is stopped on context cancellation as currently done in the goroutine; update SQSObserver.Start to check and set that guard before calling observeOnce and creating the ticker/goroutine (referencing Start, SQSObserver, and observeOnce), and add a unit test that calls Start twice and asserts only one observation goroutine is active (use a counter on fakeDepthSource).adapter/sqs_depth_source.go (1)
35-54: 💤 Low valueConsider a per-tick deadline on the catalog scan.
SnapshotQueueDepthsinherits the observer's root context, which only cancels at shutdown. IfscanQueueNamesorloadQueueMetaAtstalls (e.g., a transient catalog-read hang), the observer's tick goroutine pins on this call indefinitely — the gauges then freeze at their previous values rather than degrading via the documented "scrape failed = missing series" contract on Line 33. A per-tickcontext.WithTimeout(e.g., bounded bysqsDepthObserveInterval) would let the observer skip a stuck tick andForgetQueuethe disappeared series cleanly. This belongs on the observer side or here at the entrypoint.♻️ Sketch (caller-side variant in monitoring/sqs.go)
func (o *SQSObserver) observeOnce(ctx context.Context, source SQSDepthSource) { if o == nil || o.metrics == nil || source == nil { return } - snaps := source.SnapshotQueueDepths(ctx) + tickCtx, cancel := context.WithTimeout(ctx, sqsDepthObserveInterval) + defer cancel() + snaps := source.SnapshotQueueDepths(tickCtx)Either the observer or the adapter is a fine home for the deadline; pick whichever keeps the contract clearer.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@adapter/sqs_depth_source.go` around lines 35 - 54, SnapshotQueueDepths blocks on the caller/root context so a stalled scanQueueNames or loadQueueMetaAt can hang the observer tick; add a per-tick deadline (use context.WithTimeout) around the whole tick in SnapshotQueueDepths so each invocation uses a bounded context (e.g., use s.sqsDepthObserveInterval or a similar interval constant) when calling scanQueueNames and s.snapshotOneQueueDepth/loadQueueMetaAt, canceling the timeout on return so a stuck tick is abandoned and the observer can ForgetQueue/emit missing series as intended.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@monitoring/grafana/dashboards/elastickv-sqs.json`:
- Around line 158-168: Panel "Active leader" currently uses the PromQL filter
elastickv_sqs_queue_messages > 0 which drops all-zero series and causes "No
data" when queues are drained; update the panel's expr (in the block with
"title": "Active leader", "refId": "A") to remove the > 0 predicate so it
considers zero-valued gauges (e.g. use max by (node_id)
(elastickv_sqs_queue_messages) * 0 + 1) so leaders that report zero are still
shown.
---
Nitpick comments:
In `@adapter/sqs_depth_source.go`:
- Around line 35-54: SnapshotQueueDepths blocks on the caller/root context so a
stalled scanQueueNames or loadQueueMetaAt can hang the observer tick; add a
per-tick deadline (use context.WithTimeout) around the whole tick in
SnapshotQueueDepths so each invocation uses a bounded context (e.g., use
s.sqsDepthObserveInterval or a similar interval constant) when calling
scanQueueNames and s.snapshotOneQueueDepth/loadQueueMetaAt, canceling the
timeout on return so a stuck tick is abandoned and the observer can
ForgetQueue/emit missing series as intended.
In `@monitoring/sqs.go`:
- Around line 248-268: The Start method on SQSObserver creates a new time.Ticker
and goroutine each call which leaks observers; make Start idempotent by adding a
start guard (e.g., an atomic.Bool or sync.Once field on SQSObserver such as
started or startOnce) so subsequent Start(ctx, source, interval) calls either
return immediately or do not create another ticker/goroutine, and ensure the
existing ticker is stopped on context cancellation as currently done in the
goroutine; update SQSObserver.Start to check and set that guard before calling
observeOnce and creating the ticker/goroutine (referencing Start, SQSObserver,
and observeOnce), and add a unit test that calls Start twice and asserts only
one observation goroutine is active (use a counter on fakeDepthSource).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: dffea727-0ee8-4bce-b117-8cf9806ad988
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (7)
adapter/sqs_depth_source.gogo.modmain.gomonitoring/grafana/dashboards/elastickv-sqs.jsonmonitoring/registry.gomonitoring/sqs.gomonitoring/sqs_test.go
P1 (Codex review): ForgetQueue regresses counter cardinality cap.
trackedQueues was a single shared map for both partitionMessages
(CounterVec) and queueDepth (GaugeVec) cardinality budgets. The
r1 fix made ForgetQueue delete the queue's slot so a churn-heavy
deployment (CI workloads, ephemeral queues, leader step-down
clearing the observer's lastSeen) could reuse budget for new
queue names — correct behaviour for the gauge side, since
DeleteLabelValues drops the underlying series too.
But the counter side is cumulative-by-design: Prometheus can't
delete a counter series without losing its observed value, so
ObservePartitionMessage never calls DeleteLabelValues. Reclaiming
the slot for a forgotten queue therefore lets a NEW queue be
admitted under a real label while the OLD queue's counter series
is still alive in Prometheus. Repeating this pushes counter
cardinality past sqsMaxTrackedQueues without bound.
Fix: split trackedQueues into two independent maps —
trackedCounterQueues (one-way; ObservePartitionMessage admits,
ForgetQueue ignores) and trackedDepthQueues (reclaimable;
ForgetQueue frees the slot together with DeleteLabelValues).
Two helpers replace queueLabelForCardinalityBudget:
- admitForCounterBudget — used by ObservePartitionMessage
- admitForDepthBudget — used by ObserveQueueDepth
Caller audit per the standing semantic-change rule:
ForgetQueue: only caller is SQSObserver.observeOnce. The
caller's contract ("drop gauges for queue that disappeared
from this tick's snapshot") is preserved. The narrowed
scope — counter budget no longer reclaimed — is invisible
to the observer because it only touches gauges; counters
are fed via ObservePartitionMessage from a different code
path.
ObservePartitionMessage / ObserveQueueDepth: behaviour from
the caller's POV is unchanged (admit-or-_other on a 512-slot
cap). The only diff is which map backs the cap. No external
contract change.
queueLabelForCardinalityBudget: removed. Grep confirms no
callers remain.
Regression test:
TestSQSMetrics_ForgetQueue_DoesNotReclaimCounterBudget
saturates the counter budget with sqsMaxTrackedQueues distinct
queues, ForgetQueue's all of them, then asserts a fresh queue
collapses to _other (would FAIL pre-fix — verified the test
fails on the previous tip and passes on this commit).
TestSQSMetrics_ForgetQueue_StillReclaimsDepthBudget
pins the converse: gauge budget is still reclaimable, so a
513th distinct queue post-forget gets emitted under its real
name.
Existing ForgetQueue tests updated for the split-map shape.
|
@claude review |
|
Claude finished @bootjp's task in 1m 45s —— View job Code Review (r3)
What r3 fixed (confirmed)The Codex P1 finding was:
Correctness assessment of the fix:
The Both new tests ( Remaining low-severity items (consciously unaddressed; no action required)These were flagged in earlier rounds. All are latent concerns, not bugs in the current code:
Summaryr3 closes the final substantive issue (P1 counter cardinality growth under queue churn). All four review rounds of findings — medium-severity ForgetQueue cardinality leak, low-severity items from r1/r2, and the Codex P1 — are now resolved. The interface design, test coverage, observer state machine, and cardinality budget semantics are all in good shape. LGTM. |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: e40ad3bc9f
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if !tracked { | ||
| // Queue was either never depth-observed or had been collapsed |
There was a problem hiding this comment.
Clear overflow gauge when overflow set becomes empty
When queue cardinality is already saturated, ObserveQueueDepth writes all extra queues into the shared _other label, but ForgetQueue now returns early for those queues because they are not in trackedDepthQueues. In a churn scenario (>512 queues) this leaves _other at its last value even after the final overflow queue disappears, so dashboards keep showing phantom backlog for queues that no longer exist until another overflow queue happens to overwrite it.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
🧹 Nitpick comments (3)
monitoring/sqs.go (2)
184-186: 💤 Low valueOptional: drop the redundant
int64(0)literal inmax.Go's untyped
0will be inferred toint64from the second argument, somax(0, visible)is equivalent and reads more cleanly. Pure readability nit — feel free to ignore.♻️ Proposed simplification
- m.queueDepth.WithLabelValues(queueLabel, sqsQueueStateVisible).Set(float64(max(int64(0), visible))) - m.queueDepth.WithLabelValues(queueLabel, sqsQueueStateNotVisible).Set(float64(max(int64(0), notVisible))) - m.queueDepth.WithLabelValues(queueLabel, sqsQueueStateDelayed).Set(float64(max(int64(0), delayed))) + m.queueDepth.WithLabelValues(queueLabel, sqsQueueStateVisible).Set(float64(max(0, visible))) + m.queueDepth.WithLabelValues(queueLabel, sqsQueueStateNotVisible).Set(float64(max(0, notVisible))) + m.queueDepth.WithLabelValues(queueLabel, sqsQueueStateDelayed).Set(float64(max(0, delayed)))🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@monitoring/sqs.go` around lines 184 - 186, The three lines calling m.queueDepth.WithLabelValues(...) use max(int64(0), ...) with a redundant int64 literal; replace int64(0) with untyped 0 so the calls become max(0, visible), max(0, notVisible), max(0, delayed) to improve readability while leaving behavior unchanged — update the occurrences around queueDepth, queueLabel, and sqsQueueStateVisible/sqsQueueStateNotVisible/sqsQueueStateDelayed where max is invoked.
323-343: 💤 Low valueOptional: bail early if
ctxis already canceled before the synchronous tick.If a caller threads in an already-canceled context (e.g., shutdown raced with startup), the synchronous
observeOncecall still runs and may trigger a leader catalog scan whose result is then immediately torn down when the goroutine exits on the firstselect. A one-line guard makes Start a no-op in that case and matches the existing nil-tolerance policy:♻️ Proposed guard
if interval <= 0 { interval = sqsDepthObserveInterval } + if ctx.Err() != nil { + return + } o.observeOnce(ctx, source)Not a correctness issue — the existing behavior is harmless — just avoids a wasted scan during shutdown races.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@monitoring/sqs.go` around lines 323 - 343, The Start method on SQSObserver currently always calls o.observeOnce(ctx, source) even if ctx is already canceled; add an early guard at the top of Start to return if ctx.Done() is already closed (e.g., check select { case <-ctx.Done(): return default: }), so the synchronous observeOnce is skipped when the context is canceled and behavior still falls back to existing nil checks and interval defaulting (symbols: SQSObserver.Start, o.observeOnce, sqsDepthObserveInterval).monitoring/sqs_test.go (1)
473-485: 💤 Low valueOptional: use a cancelable context to bound a future regression.
If a future change loosens the nil-source guard,
nilObs.Start(context.Background(), &fakeDepthSource{}, time.Millisecond)would leak a goroutine for the lifetime of the test binary. Wiring at.Cleanup-canceled context would make the test self-contained:♻️ Proposed defensive tweak
func TestSQSObserver_NilTolerant(t *testing.T) { t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) var nilObs *SQSObserver require.NotPanics(t, func() { - nilObs.Start(context.Background(), &fakeDepthSource{}, time.Millisecond) + nilObs.Start(ctx, &fakeDepthSource{}, time.Millisecond) nilObs.ObserveOnce(&fakeDepthSource{}) }) obs := newSQSObserver(newSQSMetrics(prometheus.NewRegistry())) require.NotPanics(t, func() { - obs.Start(context.Background(), nil, time.Millisecond) + obs.Start(ctx, nil, time.Millisecond) obs.ObserveOnce(nil) }) }Pure defense-in-depth — the current test passes given the current Start guards.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@monitoring/sqs_test.go` around lines 473 - 485, Wrap Start calls in TestSQSObserver_NilTolerant with a cancelable context to avoid leaking goroutines: create a context with context.WithCancel, register the cancel with t.Cleanup, and pass that ctx into nilObs.Start(...) and obs.Start(...); this ensures any goroutines started by Start (referenced by SQSObserver.Start) are canceled when the test finishes.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@monitoring/sqs_test.go`:
- Around line 473-485: Wrap Start calls in TestSQSObserver_NilTolerant with a
cancelable context to avoid leaking goroutines: create a context with
context.WithCancel, register the cancel with t.Cleanup, and pass that ctx into
nilObs.Start(...) and obs.Start(...); this ensures any goroutines started by
Start (referenced by SQSObserver.Start) are canceled when the test finishes.
In `@monitoring/sqs.go`:
- Around line 184-186: The three lines calling m.queueDepth.WithLabelValues(...)
use max(int64(0), ...) with a redundant int64 literal; replace int64(0) with
untyped 0 so the calls become max(0, visible), max(0, notVisible), max(0,
delayed) to improve readability while leaving behavior unchanged — update the
occurrences around queueDepth, queueLabel, and
sqsQueueStateVisible/sqsQueueStateNotVisible/sqsQueueStateDelayed where max is
invoked.
- Around line 323-343: The Start method on SQSObserver currently always calls
o.observeOnce(ctx, source) even if ctx is already canceled; add an early guard
at the top of Start to return if ctx.Done() is already closed (e.g., check
select { case <-ctx.Done(): return default: }), so the synchronous observeOnce
is skipped when the context is canceled and behavior still falls back to
existing nil checks and interval defaulting (symbols: SQSObserver.Start,
o.observeOnce, sqsDepthObserveInterval).
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 1e30ae55-2cc5-44f6-a1a9-d9ece33ff532
📒 Files selected for processing (4)
adapter/sqs_depth_source.gomonitoring/grafana/dashboards/elastickv-sqs.jsonmonitoring/sqs.gomonitoring/sqs_test.go
P2 (Codex review): ForgetQueue leaves phantom _other backlog after
overflow queues vanish.
When the depth budget saturates, ObserveQueueDepth collapses extra
queues onto the shared sqsQueueOverflow ('_other') label. ForgetQueue
early-returned for these because they were not in trackedDepthQueues —
which lumps two distinct cases together:
- never depth-observed (no series to delete; correct no-op)
- currently mapped to _other (series exists, shared, must drop
only when no overflow queue is still reporting into it)
In a churn scenario with >sqsMaxTrackedQueues distinct queues the
final overflow queue's last value sits on the dashboard until
another overflow queue overwrites it — phantom backlog for queues
that no longer exist.
Fix: track overflow membership in a third map (overflowDepthQueues)
so ForgetQueue can ref-count overflow queues. Two-branch behaviour:
- real-name queue: drop the three state series + free the
trackedDepthQueues slot (unchanged)
- overflow queue: remove from overflowDepthQueues; if the set
is now empty, drop the three _other state series too;
otherwise leave _other alone so other overflow queues keep
reporting into it
Counter side has no equivalent map. Counters are cumulative — the
_other counter legitimately reflects total operations even from
queues that have since been deleted, so there's no phantom-data
problem to solve there.
Caller audit per the standing semantic-change rule:
ForgetQueue: only caller is SQSObserver.observeOnce. The
observer's contract ('drop gauges for queue that disappeared
from this tick's snapshot') is preserved and consistently
extended — overflow queues, previously a silent no-op, now
also stop pinning the shared gauge once the last one
disappears. The branching is internal to ForgetQueue; the
caller passes the queue name verbatim and is unaware of which
map it lived in.
admitForDepthBudget: signature unchanged. New side effect of
populating overflowDepthQueues when collapsing to _other.
Only caller is ObserveQueueDepth; the side effect is invisible
from the caller's POV (still receives the same string label).
Regression test:
TestSQSMetrics_ForgetQueue_LastOverflowClearsOtherGauge
saturates the depth budget, observes two overflow queues,
ForgetQueue's both, and asserts the _other series is gone
(verified: pre-fix the gauge GatherAndCount returns 1539,
post-fix it returns 1536 — exactly the 3 _other series
dropped).
Existing TestSQSMetrics_ForgetQueue_OverflowQueueIsNoOp continues
to pass — ForgetQueue on one of N>=2 overflow queues still
preserves the _other series for the remaining ones, which was
the original invariant.
|
@claude review |
|
Claude finished @bootjp's task in 2m 32s —— View job Code Review (r4)
What r4 fixed (confirmed)The Codex P2 finding was: when all overflow queues disappear from the tick snapshot,
Correctness assessment of the r4 fix:
All map mutations happen under Edge case — queue transitioning from overflow to real-name: when the budget drains below 512 and an overflow queue gets re-admitted as a real-name queue, it ends up in both
Minor nit (no action needed): the test name Standing items (all low-severity, consciously unaddressed)All five review rounds of findings are now resolved. The three latent items below remain from earlier rounds; they're not blockers:
OverallThe five-commit story — initial feature → cardinality/dashboard fixes (r1) → single readTS (r2) → split counter/gauge budgets (r3) → overflow ref-count (r4) — resolves every finding from all review rounds in order of severity. The LGTM. |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 9738f75c4e
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if len(m.trackedDepthQueues) >= sqsMaxTrackedQueues { | ||
| m.overflowDepthQueues[queue] = struct{}{} | ||
| return sqsQueueOverflow | ||
| } | ||
| m.trackedDepthQueues[queue] = struct{}{} | ||
| return queue |
There was a problem hiding this comment.
Remove queue from overflow set when promoting real label
When cardinality drops below sqsMaxTrackedQueues, admitForDepthBudget promotes previously-overflowed queues to real labels by adding them to trackedDepthQueues, but it never removes that queue from overflowDepthQueues. ForgetQueue later uses overflowDepthQueues as a ref-count to decide whether to delete the shared _other gauge, so stale overflow entries can keep _other alive even when no queue still maps to _other, leaving phantom backlog in churny >512-queue deployments. Fresh evidence vs the earlier overflow-cleanup report: this commit introduced promotion via m.trackedDepthQueues[queue] = struct{}{} without corresponding overflow cleanup, which can keep the overflow set non-empty incorrectly.
Useful? React with 👍 / 👎.
Second P2 (Codex review): admitForDepthBudget leaves stale entries
in overflowDepthQueues when a queue gets promoted off the _other
label.
Scenario: budget saturates, queue X collapses to _other. Later a
slot opens (some other queue is ForgetQueue'd). On X's next
observation admitForDepthBudget finds room and admits X under its
real name — but the previous fix only added X to
trackedDepthQueues. The stale entry in overflowDepthQueues was
never cleaned up, so:
1. ForgetQueue's overflow ref-count is permanently off by one
per stale entry — _other can persist after every live queue
has been promoted off it.
2. The _other series itself still carries X's last-overflow
value while X is now reporting under its real label.
Dashboards see double-counted backlog.
Fix: in admitForDepthBudget's slot-available branch, also remove
the queue from overflowDepthQueues. If that drains the overflow
set entirely, drop the three _other state series too — mirroring
ForgetQueue's existing 'last overflow gone' branch.
Refactor: factor the three DeleteLabelValues calls into
dropGaugeStatesFor since they now appear in three places (ForgetQueue
tracked-branch, ForgetQueue overflow-empty-branch, and the new
admitForDepthBudget promotion-drains-overflow branch). Pure
readability — Prometheus's GaugeVec already does its own per-vector
locking so the wrapper holds no extra lock.
Caller audit per the standing semantic-change rule:
admitForDepthBudget: signature unchanged. New side effect — drop
_other gauge on a promotion that drains the overflow set —
invisible to the only caller (ObserveQueueDepth) which still
gets back the same string label.
ObserveQueueDepth callers: unaffected (ObserveQueueDepth's
contract is unchanged).
dropGaugeStatesFor: new internal helper, no external callers.
ForgetQueue: external semantics preserved; only the
DeleteLabelValues calls were collapsed into the helper.
Regression test:
TestSQSMetrics_AdmitForDepthBudget_PromotionClearsOverflow
saturates the budget, observes overflow X, ForgetQueue's a
real queue to free a slot, re-observes X, and asserts the
_other series is gone. Pre-fix: 1539 series (511 real + X
real + _other phantom × 3 states). Post-fix: 1536 (the 3
_other phantom series dropped by the new promotion cleanup).
|
@claude review |
|
Claude finished @bootjp's task in 3m 9s —— View job Code Review (r5)
What r5 fixed (confirmed)The Codex P2 finding was: when budget pressure eases and
Correctness assessment of the fix ( m.trackedDepthQueues[queue] = struct{}{}
_, wasOverflow := m.overflowDepthQueues[queue]
if wasOverflow {
delete(m.overflowDepthQueues, queue)
}
overflowSetEmpty := wasOverflow && len(m.overflowDepthQueues) == 0
m.mu.Unlock()
if overflowSetEmpty {
m.dropGaugeStatesFor(sqsQueueOverflow)
}All mutations to Edge case: budget full with two overflow queues (x, y); one slot freed; concurrent Test correctness:
Standing items (all consciously deferred, no new blockers)
Six-commit story, complete
Every medium and low-severity finding across all five review rounds is resolved. The interface design, LGTM. |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 7b5ab13b6a
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| names, err := s.scanQueueNames(ctx) | ||
| if err != nil { | ||
| slog.Warn("sqs depth snapshot: scanQueueNames failed", "err", err) | ||
| return nil |
There was a problem hiding this comment.
Avoid clearing all gauges on queue-scan errors
When scanQueueNames fails, this returns nil, but SQSObserver.observeOnce treats a nil snapshot as “no queues exist now” and calls ForgetQueue for every previously seen queue. In a transient storage/read failure on the leader, that wipes all depth series and produces a false “all backlogs disappeared” signal until the next successful tick. The source should distinguish “snapshot failed” from a real empty/follower snapshot so existing gauges are preserved on error.
Useful? React with 👍 / 👎.
P2 (Codex review): SnapshotQueueDepths returns nil on scanQueueNames
failure, observeOnce treats nil as 'no queues exist now' and
ForgetQueue's every previously-seen queue. A single transient
catalog-read error therefore wipes every depth gauge and
dashboard-renders as a false 'all queues drained' event for the
duration of the failure.
Two empty-snapshot cases were conflated:
- ok=true with empty/nil — legitimate empty (follower because
the node stepped down, leader with zero queues configured).
Observer should diff/ForgetQueue normally.
- scan failure / ctx cancel — observer should NOT diff.
Existing gauges and lastSeen must stay intact so the next
successful tick can still diff against the previous good
state.
Fix: extend SQSDepthSource.SnapshotQueueDepths from `[]X` to
`([]X, bool)` so the source can signal 'skip this tick'
distinctly from 'empty but valid'.
Caller audit per the standing semantic-change rule:
monitoring.SQSDepthSource (interface) — sig changed; doc
enumerates the three return shapes.
monitoring.SQSObserver.observeOnce — added `if !ok { return }`
short-circuit before the gauge writes / diff. Other branches
unchanged.
adapter.SQSServer.SnapshotQueueDepths — three explicit returns:
(nil, true) for nil receiver / follower (legitimate
'this node should not emit'); (nil, false) for
scanQueueNames error or ctx cancel mid-scan; (snaps, true)
for the success path. Per-queue snapshotOneQueueDepth
failures still drop only that queue from this tick (ok stays
true) — the observer ForgetQueue's just that one gauge.
main.sqsDepthSourceAdapter.SnapshotQueueDepths — bridge
propagates the new ok flag verbatim through the
adapter→monitoring type translation.
monitoring.fakeDepthSource (test helper) — replaced
`snapshots [][]X` with `ticks []fakeDepthTick` where each
tick scripts (snaps, ok). Existing tests migrated; trailing
ticks past the scripted slice still default to (nil, true)
so leader-step-down clears gauges as before.
Regression test:
TestSQSObserver_ObserveOnce_TransientScanErrorPreservesGauges
scripts three ticks: success → ok=false (scan fail) →
success-recovered. Asserts gauges keep tick-1 values
through the failed tick AND that recovery on tick 3 still
diffs correctly. Pre-fix shape would wipe both queues on
tick 2.
|
@claude review |
|
Claude finished @bootjp's task in 1m 57s —— View job Code Review (r6)
What r6 fixed (confirmed)The Codex P2 finding was:
Correctness assessmentInterface design — Three explicit return paths in
All four paths are correct. The mid-iteration cancel case (
Long-duration scan failures — gauges remain pinned at their last successful values. This is correct dashboard semantics: stale data is less misleading than a false "all queues empty" reading.
Regression test completeness —
The tick-3 assertion implicitly verifies that Standing items (all consciously deferred, no new action required)
None of these were introduced or worsened by r6. Seven-commit story, complete
All findings across all six review rounds — including the final Codex P2 — are resolved. The interface extension is the correct long-term fix (makes the two empty cases unambiguous for any future implementor), the test is thorough, and the bridge adapter is correctly wired. LGTM. |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 1651db7305
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| names, err := s.scanQueueNames(ctx) | ||
| if err != nil { | ||
| slog.Warn("sqs depth snapshot: scanQueueNames failed", "err", err) | ||
| return nil, false | ||
| } |
There was a problem hiding this comment.
Use one MVCC timestamp for queue-depth snapshots
SnapshotQueueDepths calls scanQueueNames before taking readTS, but scanQueueNames already reads at its own internal nextTxnReadTS. That means queue membership and per-queue counters are collected from different snapshots, so a queue created/deleted between those two timestamps can be incorrectly forgotten or missed for a tick even when the final state is stable. This directly breaks the observer’s “current vs previous” diff contract under queue churn; pass a single read timestamp through both scans so the snapshot is truly point-in-time.
Useful? React with 👍 / 👎.
…#743 r7) P2 (Codex review): SnapshotQueueDepths takes its readTS AFTER scanQueueNames returns. scanQueueNames internally calls nextTxnReadTS, so membership lands on one MVCC view while the per-queue counter reads run at a later ts. A queue created or deleted in that microsecond window can be silently missed (or reported with stale counters) for the tick — and the observer's 'current vs previous' diff loop then ForgetQueue's it spuriously, dashboard-rendering as a phantom drop or zero backlog under normal queue churn. Fix: take readTS once at the top of SnapshotQueueDepths and thread it through both scans. Refactor scanQueueNames into a 1-line wrapper around the new scanQueueNamesAt(ctx, readTS) so the depth-source path passes its ts in while the existing 3 callers (AdminListQueues, the reaper, the catalog walk) keep using the fresh-ts shape they already have — none need cross-call consistency. Caller audit per the standing semantic-change rule: scanQueueNames(ctx): external contract unchanged. The fresh-ts wrapper still acquires its own readTS via s.nextTxnReadTS internally, so admin / catalog / reaper callers see no observable difference. scanQueueNamesAt(ctx, readTS): new package-private. Only caller is SnapshotQueueDepths; no observable behavior elsewhere. SnapshotQueueDepths: signature and return semantics unchanged. Internal change tightens an MVCC consistency property — the only externally-visible effect is that the bug above stops happening, which is strictly an improvement to the observer's diff contract. No regression test added: the failure window is microseconds between two HLC ticks, and a deterministic reproduction would need a fault-injecting fake store wired into the catalog read path — meaningful test scaffolding that doesn't exist today and is disproportionate to a mechanical thread-through fix that's self-evidently correct from code reading. The property the fix establishes (membership scan and per-queue reads use the same readTS) is enforced at the scanQueueNamesAt signature: any future code path here must take readTS as a parameter, making backsliding a compile error rather than a runtime race.
|
@claude review |
|
Claude finished @bootjp's task in 1m 58s —— View job Code Review (r7)
What r7 fixed (confirmed)The final Codex P2 finding was:
Correctness assessment of the fix ( // After r7 — one MVCC view for both scans
readTS := s.nextTxnReadTS(ctx)
names, err := s.scanQueueNamesAt(ctx, readTS) // membership at readTS
// …
snapshotOneQueueDepth(ctx, name, readTS) // counters at same readTSAll callers of the original
No new issuesAll prior findings (r0–r6) remain resolved. The four standing low-severity / nit items from earlier rounds are unchanged and consciously deferred:
Eight-commit story, complete
Every finding across all seven review rounds is resolved. The interface design, LGTM. |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@adapter/sqs_depth_source.go`:
- Around line 89-98: In snapshotOneQueueDepth, handle and log errors from
loadQueueMetaAt rather than silently swallowing them: after calling
loadQueueMetaAt(name, readTS) check if err != nil separately from exists and,
when err != nil, emit a structured slog.Warn (or slog.Error) with stable keys
such as "queue", "err", "commit_ts" (use readTS) and "route_id" if available,
then return SQSQueueDepth{} false; keep the existing behavior for !exists (no
log or a different level) so that scanApproxCounters errors remain logged as
before.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 566287f7-6d52-4579-b1fa-ebc0075b9973
📒 Files selected for processing (5)
adapter/sqs_catalog.goadapter/sqs_depth_source.gomain.gomonitoring/sqs.gomonitoring/sqs_test.go
✅ Files skipped from review due to trivial changes (1)
- adapter/sqs_catalog.go
🚧 Files skipped from review as they are similar to previous changes (2)
- main.go
- monitoring/sqs_test.go
CodeRabbit Minor: snapshotOneQueueDepth's first branch
`if err != nil || !exists` collapses two distinct cases — a
catalog read failure and a queue that legitimately doesn't
exist — into the same silent return. The function-level docstring
and the SnapshotQueueDepths contract both state that per-queue
scan errors are logged; only scanApproxCounters errors actually
were. Asymmetric: a flapping catalog read showed up as a dashboard
gauge gap with nothing in the logs to explain it.
Split the branches:
err != nil -> slog.Warn with stable keys (queue, err)
!exists -> silent return (the queue genuinely vanished —
the observer's diff loop will ForgetQueue the
gauge on its own; logging this is just noise)
No semantic change to caller-visible behavior: both branches
return the same (SQSQueueDepth{}, false) sentinel they did
before. Pure logging hygiene + alignment with the documented
contract. No caller audit needed since neither return value
nor branching changes.
|
@claude review |
|
Claude finished @bootjp's task in 1m 31s —— View job Code Review (r8)
What r8 fixed (confirmed)The CodeRabbit Minor finding was:
Correctness assessment: The split is correct on both axes:
Standing items (all consciously deferred; no new action required)All items below were raised in previous rounds. None were introduced or worsened by r8.
Nine-commit story, complete
All findings across all eight review rounds are resolved. The logging hygiene in r8 is the correct, minimal fix. Interface design, LGTM. |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: fd3a3b040c
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| current := make(map[string]struct{}, len(snaps)) | ||
| for _, snap := range snaps { | ||
| o.metrics.ObserveQueueDepth(snap.Queue, snap.Visible, snap.NotVisible, snap.Delayed) | ||
| current[snap.Queue] = struct{}{} | ||
| } |
There was a problem hiding this comment.
Reclaim old depth slots before admitting new queues
observeOnce emits ObserveQueueDepth for the current snapshot before it forgets queues that disappeared from the previous tick, so admitForDepthBudget still sees stale names occupying trackedDepthQueues during admission. When queue churn is high near the 512 cap (e.g., many old queues removed and many new queues added in the same tick), newly active queues are incorrectly collapsed into _other for at least one interval even though capacity exists after the diff, which produces avoidable label loss and inaccurate per-queue dashboards for that tick.
Useful? React with 👍 / 👎.
r9) P2 (Codex review): observeOnce emits ObserveQueueDepth for the current snapshot BEFORE forgetting queues that disappeared from the previous tick. Under high churn near the 512-queue cap (many old queues removed and many new queues added in the same tick), admitForDepthBudget sees the stale names still occupying trackedDepthQueues and silently collapses brand-new queues onto _other for at least one interval — even though the same tick will free their slots a few lines later. Fix: split observeOnce into two phases under the existing single-writer contract: Phase 1: build the current-name set + ForgetQueue any name in lastSeen that's not in current. ForgetQueue is the path that frees depth-budget slots in trackedDepthQueues; running it first under m.mu (taken inside ForgetQueue) makes the slots visible to the second phase's admission. Phase 2: emit ObserveQueueDepth for every snap. admitForDepthBudget now sees the freed slots and admits brand-new queues under their real labels instead of overflow. End-state semantics are unchanged: after observeOnce returns, gauges reflect the current tick and lastSeen mirrors the current name set. Only the intra-call ordering changes — phase 1 runs strictly before phase 2 instead of interleaving with it. The two phases serialise on m.mu (ForgetQueue and admitForDepthBudget both take it), so phase 2's admissions are guaranteed to see phase 1's reclamations. Caller audit per the standing semantic-change rule: observeOnce: callers are Start (production goroutine loop) and ObserveOnce (test entry). Both treat the function as 'process one tick'; neither observes intermediate state. The reorder is invisible to callers — same inputs, same end state, same lock discipline. No call-site update needed. Regression test: TestSQSObserver_ObserveOnce_HighChurnReclaimsBeforeAdmit saturates the budget with 512 queues in tick 1, replaces half of them with brand-new names in tick 2, and asserts: - new-0 / new-(half-1) gauges carry their tick-2 values under their REAL labels (pre-fix they collapsed to _other, returned 0) - total series count is exactly sqsMaxTrackedQueues*3 = 1536, leaving no room for any _other series (pre-fix would land at 256 real × 3 + _other × 3 = 771) Pre-fix verified failing with the new-0 assertion (got 0, want 3); post-fix passes. Test-side note: the count assertion runs BEFORE any per-queue spot check that uses WithLabelValues, because WithLabelValues materialises an absent label combination (creating the gauge child with default value 0 as a side effect). Asserting '_other visible == 0' via WithLabelValues would have inflated the count to 1537. This is a Prometheus client-API quirk that bit me during the fix and is now documented inline so future test authors don't trip over it.
|
@claude review |
|
Claude finished @bootjp's task in 2m 36s —— View job Code Review (r9)
What r9 fixed (confirmed)The Codex P2 finding was:
Correctness assessment of the fix ( The two phases now serialize on Lock ordering is consistent: when both locks are held simultaneously, it is always The The step-down path ( Test correctness (
Pre-fix: emit ran first → 256 freed slots not yet visible → 256 new queues collapsed to The count assertion ( Standing items (all consciously deferred; no new action required)All items below were flagged in earlier rounds. None were introduced or worsened by r9.
Ten-commit story, complete
Every finding across all nine review rounds is resolved. The phase-reorder fix is minimal, correct, and well-documented. Lock ordering is sound, the regression test pins the pre/post-fix behavior precisely, and the Prometheus LGTM. |
Summary
Adds per-queue depth metrics + a Grafana dashboard, integrated into the existing
monitoring/exporter (alongside the Raft / Redis / DynamoDB / Pebble observers). No new binary, no new scrape config — operators just enable the Grafana dashboard against the same/metricsendpoint elastickv already exposes.What's new
Metric:
elastickv_sqs_queue_messages{queue, state}visibleApproximateNumberOfMessagesnot_visibleApproximateNumberOfMessagesNotVisibledelayedApproximateNumberOfMessagesDelayedGauges are leader-emitted: the SQS adapter's
SnapshotQueueDepthsreturns an empty slice on followers, and the newSQSObserverForgetQueue's any series that disappeared since the last tick (DeleteQueue, leader step-down, tombstoned cohort fully drained). Same shape Raft / Redis observers already use.Dashboard:
monitoring/grafana/dashboards/elastickv-sqs.json$queuetemplate var.elastickv_sqs_partition_messages_totalcounter from PR 7a): ops rate by action and by(queue, partition)— surfaces unevenMessageGroupIddistribution.Architecture
Mirrors
RaftObserver(monitoring/raft.go):main.go::startSQSDepthObserverwires the source afterstartSQSServer. A thinsqsDepthSourceAdapterbridgesadapter.SQSQueueDepth↔monitoring.SQSQueueDepthso the adapter package doesn't have to import monitoring.nil sqsServer(e.g.--sqsAddressempty on this node) is a no-op.Tests
8 new cases in
monitoring/sqs_test.go:_ObserveQueueDepth_EmitsThreeStatesObserveQueueDepthcall_ObserveQueueDepth_ClampsNegativeToZero_ObserveQueueDepth_DropsEmptyQueue_ForgetQueue_DropsThreeSeries_DepthNilReceiverIsSafe*SQSMetricsdoesn't panic_DepthRegistryWiringRegistry.SQSObserver()plumbingSQSObserver_ObserveOnce_EmitsAndForgetsSQSObserver_ObserveOnce_LeaderStepDownClearsAllSQSObserver_NilTolerantgo build ./...clean,golangci-lint run ./monitoring/... ./adapter/...clean, full SQS adapter sweep passes.Self-review (5 lenses)
observer.muprotectslastSeen;SQSMetrics.muprotectstrackedQueues. Source returns a fresh slice per call. Ticker goroutine stops onctx.Done.scanApproxCountersself-caps so a 1M-message queue doesn't block. Per-queue cardinality cap (sqsMaxTrackedQueues = 512) bounds the Prometheus series budget.AdminListQueues/AdminDescribeQueue. Per-queue scan error → drop from this tick (ForgetQueue-like effect via the diff) so a transient blip surfaces as missing data, not a false "drained" reading.Refs
elastickv_sqs_partition_messages_totalcounter — the new dashboard surfaces it alongside the new gauges.monitoring/raft.goRaftObserver.Start(ctx, source, interval)shape.Summary by CodeRabbit
New Features
Tests