Skip to content

feat(sqs/monitoring): per-queue depth gauges + Grafana dashboard#743

Merged
bootjp merged 10 commits into
mainfrom
feat/sqs-prometheus-exporter
May 7, 2026
Merged

feat(sqs/monitoring): per-queue depth gauges + Grafana dashboard#743
bootjp merged 10 commits into
mainfrom
feat/sqs-prometheus-exporter

Conversation

@bootjp
Copy link
Copy Markdown
Owner

@bootjp bootjp commented May 6, 2026

Summary

Adds per-queue depth metrics + a Grafana dashboard, integrated into the existing monitoring/ exporter (alongside the Raft / Redis / DynamoDB / Pebble observers). No new binary, no new scrape config — operators just enable the Grafana dashboard against the same /metrics endpoint elastickv already exposes.

What's new

Metric: elastickv_sqs_queue_messages{queue, state}

state source attribute
visible ApproximateNumberOfMessages
not_visible ApproximateNumberOfMessagesNotVisible
delayed ApproximateNumberOfMessagesDelayed

Gauges are leader-emitted: the SQS adapter's SnapshotQueueDepths returns an empty slice on followers, and the new SQSObserver ForgetQueue's any series that disappeared since the last tick (DeleteQueue, leader step-down, tombstoned cohort fully drained). Same shape Raft / Redis observers already use.

Dashboard: monitoring/grafana/dashboards/elastickv-sqs.json

  • Top stat row: queue count, total backlog, HT-FIFO ops/sec, active leader.
  • Per-queue depth timeseries (visible / in-flight / delayed), filterable via $queue template var.
  • Top 20 queues by backlog table.
  • HT-FIFO partition activity (uses the existing elastickv_sqs_partition_messages_total counter from PR 7a): ops rate by action and by (queue, partition) — surfaces uneven MessageGroupId distribution.

Architecture

Mirrors RaftObserver (monitoring/raft.go):

adapter/sqs_depth_source.go            monitoring/sqs.go
*SQSServer.SnapshotQueueDepths   ->    SQSDepthSource interface
        (leader-only)                          ↓
                                       SQSObserver.Start(ctx, source, interval)
                                          - tick loop (default 30s)
                                          - current vs previous diff
                                          - ObserveQueueDepth / ForgetQueue
                                                  ↓
                                       SQSMetrics.queueDepth (GaugeVec)
                                                  ↓
                                       elastickv_sqs_queue_messages

main.go::startSQSDepthObserver wires the source after startSQSServer. A thin sqsDepthSourceAdapter bridges adapter.SQSQueueDepthmonitoring.SQSQueueDepth so the adapter package doesn't have to import monitoring. nil sqsServer (e.g. --sqsAddress empty on this node) is a no-op.

Tests

8 new cases in monitoring/sqs_test.go:

Test Pins
_ObserveQueueDepth_EmitsThreeStates Three state-labelled series per ObserveQueueDepth call
_ObserveQueueDepth_ClampsNegativeToZero Defensive clamp on -1 sentinels from a future failed scan
_ObserveQueueDepth_DropsEmptyQueue Empty queue name drop (mirrors partition-counter rule)
_ForgetQueue_DropsThreeSeries All three state series dropped; cumulative counter survives
_DepthNilReceiverIsSafe Typed-nil *SQSMetrics doesn't panic
_DepthRegistryWiring Public Registry.SQSObserver() plumbing
SQSObserver_ObserveOnce_EmitsAndForgets Diff state machine: ForgetQueue on disappearance
SQSObserver_ObserveOnce_LeaderStepDownClearsAll Empty source slice (step-down) clears all gauges
SQSObserver_NilTolerant Nil source / nil observer no-op

go build ./... clean, golangci-lint run ./monitoring/... ./adapter/... clean, full SQS adapter sweep passes.

Self-review (5 lenses)

  1. Data loss — N/A; metrics-only.
  2. Concurrencyobserver.mu protects lastSeen; SQSMetrics.mu protects trackedQueues. Source returns a fresh slice per call. Ticker goroutine stops on ctx.Done.
  3. Performance — one catalog scan + one approx-counter scan per queue per 30s on the leader. scanApproxCounters self-caps so a 1M-message queue doesn't block. Per-queue cardinality cap (sqsMaxTrackedQueues = 512) bounds the Prometheus series budget.
  4. Data consistency — leader-only emission keeps gauges in lock-step with AdminListQueues / AdminDescribeQueue. Per-queue scan error → drop from this tick (ForgetQueue-like effect via the diff) so a transient blip surfaces as missing data, not a false "drained" reading.
  5. Test coverage — 8 unit tests cover gauge math, observer state machine, leader step-down, nil-tolerance, and registry wiring.

Refs

Summary by CodeRabbit

  • New Features

    • SQS queue depth monitoring: per-queue gauges for visible, not-visible and delayed states, background observer that polls and publishes depths, and automatic cleanup of disappeared queues
    • Grafana dashboard for SQS metrics (per-queue depth, backlog, partition activity, leader status)
    • Runtime wiring to start the SQS depth observer on startup
  • Tests

    • Added unit test verifying the three per-queue depth states are emitted correctly

Adds elastickv_sqs_queue_messages{queue,state} (visible /
not_visible / delayed) on top of the existing partition-counter
metric (PR 7a), plus the Grafana dashboard the operator drops
into the existing provisioning chain.

Pattern mirrors the Raft / Redis observer split:

- adapter/sqs_depth_source.go: *SQSServer.SnapshotQueueDepths(ctx)
  returns one SQSQueueDepth per known queue, or an empty slice on
  followers. Leader-only emission keeps gauge state consistent
  with what AdminListQueues would return at the same instant —
  followers scanning concurrently would race the leader's writes
  and emit conflicting numbers for the same series.
- monitoring/sqs.go (extended): SQSMetrics gains a queueDepth
  GaugeVec + ObserveQueueDepth / ForgetQueue helpers. New
  SQSObserver type owns the tick loop and the
  current-vs-previous queue diff; queues that disappeared since
  the last tick get ForgetQueue'd so the dashboard doesn't show
  a frozen backlog after DeleteQueue. Same nil-tolerant contract
  as RaftObserver.
- monitoring/registry.go: r.SQSObserver() accessor mirrors
  r.RaftObserver().
- main.go: startSQSDepthObserver wires the SQS adapter into the
  observer after startSQSServer; thin sqsDepthSourceAdapter type
  bridges adapter.SQSQueueDepth to monitoring.SQSQueueDepth so
  the adapter package doesn't have to import monitoring.

Dashboard (monitoring/grafana/dashboards/elastickv-sqs.json):

- Top stat row: queue count, total backlog, HT-FIFO ops rate,
  active leader.
- Per-queue depth timeseries (visible / in-flight / delayed) with
  $queue templating.
- Top 20 queues by backlog table.
- HT-FIFO partition activity: ops rate by action and by
  (queue, partition) — the partition split is what the original
  PR 7a counter was added for; the dashboard surfaces it.

Tests:

- TestSQSMetrics_ObserveQueueDepth_EmitsThreeStates pins the
  gauge label triple (visible / not_visible / delayed).
- TestSQSMetrics_ObserveQueueDepth_ClampsNegativeToZero pins the
  defensive clamp against -1 sentinels from a future failed scan.
- TestSQSMetrics_ObserveQueueDepth_DropsEmptyQueue pins the
  empty-name drop (mirrors the partition-counter rule).
- TestSQSMetrics_ForgetQueue_DropsThreeSeries pins that
  ForgetQueue clears every state series for the queue while
  leaving the cumulative counter untouched.
- TestSQSMetrics_DepthNilReceiverIsSafe + DepthRegistryWiring
  pin the nil-tolerant contract and registry plumbing.
- TestSQSObserver_ObserveOnce_EmitsAndForgets pins the diff
  state machine: queues present in the current tick get gauges,
  queues that disappeared get ForgetQueue'd.
- TestSQSObserver_ObserveOnce_LeaderStepDownClearsAll pins the
  step-down branch (source returns empty slice -> all gauges
  cleared).
- TestSQSObserver_NilTolerant pins nil source / nil observer
  no-op behaviour.

Self-review (5 lenses):

1. Data loss — N/A; metrics-only, no storage / Raft / FSM touch.
2. Concurrency — observer.mu protects lastSeen; SQSMetrics.mu
   protects trackedQueues. Source returns a fresh slice per call
   (no shared mutable state). Ticker goroutine stops on ctx.Done.
3. Performance — one catalog scan + one approx-counter scan per
   queue per 30s on the leader. scanApproxCounters already
   self-caps the per-queue scan so a 1M-message queue doesn't
   block. Per-queue cardinality cap (sqsMaxTrackedQueues = 512)
   bounds Prometheus series budget.
4. Data consistency — leader-only emission keeps gauges in lock-
   step with AdminListQueues / AdminDescribeQueue. Per-queue scan
   error -> ForgetQueue (not zero) so a transient blip surfaces as
   missing data, not as a false "drained" reading.
5. Test coverage — 8 new tests cover gauge math, observer state
   machine, leader step-down, nil-tolerance, and registry wiring.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented May 6, 2026

@claude review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 6, 2026

Warning

Rate limit exceeded

@bootjp has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 31 minutes and 30 seconds before requesting another review.

To continue reviewing without waiting, purchase usage credits in the billing tab.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 5e5863f4-b680-427d-9189-712170d0c0b5

📥 Commits

Reviewing files that changed from the base of the PR and between dd8a3c3 and eab2099.

📒 Files selected for processing (3)
  • adapter/sqs_depth_source.go
  • monitoring/sqs.go
  • monitoring/sqs_test.go
📝 Walkthrough

Walkthrough

Adds leader-only SQS queue-depth snapshotting and a polling observer that publishes per-queue visible/not_visible/delayed Gauges, wiring the observer into startup, adding metrics types, tests, and a Grafana dashboard; also bumps AWS SDK patch dependencies.

Changes

SQS Queue Depth Monitoring

Layer / File(s) Summary
Data Shapes
monitoring/sqs.go, adapter/sqs_depth_source.go
Adds SQSQueueDepth struct and SQSDepthSource interface representing per-queue depths (Visible, NotVisible, Delayed).
Core Metrics & Observer
monitoring/sqs.go
Adds SQSMetrics.queueDepth GaugeVec; implements ObserveQueueDepth, ForgetQueue, two admission/budget helpers for counters vs depth gauges; implements SQSObserver with Start, ObserveOnce, and observe/diff logic.
Depth Snapshot Implementation
adapter/sqs_depth_source.go
Implements (*SQSServer).SnapshotQueueDepths(ctx) and helper snapshotOneQueueDepth performing leader check, single readTS snapshot across queue-name scan and per-queue counter reads; top-level scan errors return (nil,false), per-queue failures are logged and skipped.
Catalog MVCC helper
adapter/sqs_catalog.go
Refactors scanQueueNames to scanQueueNamesAt to allow callers to supply a single MVCC readTS for consistent scans.
Registry & Integration
monitoring/registry.go, main.go
Registry stores/returns a SQSObserver; startup wires startSQSDepthObserver and an adapter that calls SQSServer.SnapshotQueueDepths, starting the observer after SQS server creation.
Visualization & Tests
monitoring/grafana/dashboards/elastickv-sqs.json, monitoring/sqs_test.go
Adds Grafana dashboard JSON for SQS metrics and a unit test TestSQSMetrics_ObserveQueueDepth_EmitsThreeStates.
Dependencies
go.mod
Bumps AWS SDK v2 and related indirect dependencies (minor/patch version updates).

Sequence Diagram

sequenceDiagram
    participant Startup as Startup
    participant Observer as SQSObserver
    participant Source as SQSServer
    participant Metrics as SQSMetrics
    participant Prometheus as Prometheus

    Startup->>Observer: Start(ctx, source, interval)
    Observer->>Observer: observeOnce (initial synchronous)
    Observer->>Source: SnapshotQueueDepths(ctx)
    Note right of Source: verify leader, get single readTS\nscan queue names at readTS
    Source->>Source: snapshotOneQueueDepth(name, readTS)
    Source-->>Observer: []SQSQueueDepth (or ok=false)
    Observer->>Metrics: ObserveQueueDepth(queue, visible, notVisible, delayed)
    Metrics->>Prometheus: set gauge values per (queue,state)
    Observer->>Observer: schedule next tick (interval)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

  • bootjp/elastickv#606: Introduces SQS adapter surface changes related to queue scanning and snapshot reads referenced by these depth snapshot implementations.

Poem

🐰
I hop through queues with careful peeks,
Counting visible, delayed, and sneaks.
Metrics hum, dashboards glow,
Depths reported row by row.
A tiny rabbit keeps watch as it speaks.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately and clearly summarizes the main change: adding per-queue depth gauges and a Grafana dashboard for SQS monitoring.
Docstring Coverage ✅ Passed Docstring coverage is 87.50% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/sqs-prometheus-exporter

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@claude
Copy link
Copy Markdown

claude Bot commented May 6, 2026

Claude finished @bootjp's task in 2m 30s —— View job


Code Review

  • Gather context and understand the request
  • Read changed files
  • Review adapter/sqs_depth_source.go
  • Review monitoring/sqs.go
  • Review monitoring/sqs_test.go
  • Review monitoring/registry.go
  • Review main.go changes
  • Review Grafana dashboard JSON
  • Compile findings and post review

Overall the PR is well-structured and follows the existing observer pattern faithfully. The interface design, nil-tolerance contract, and test coverage are solid. There are a few issues to address before merge.

Medium severity

trackedQueues memory leak — cardinality budget never reclaimed after ForgetQueue

monitoring/sqs.go:169-177ForgetQueue calls queueLabelForCardinalityBudget(queue) to resolve the label, which correctly returns the queue name when the queue is in trackedQueues. But then it only deletes the Prometheus series; it never removes the queue from trackedQueues. The slot in the 512-entry cardinality budget is permanently consumed.

In a deployment that regularly creates and deletes queues (e.g., per-job ephemeral queues, CI workloads), the budget will fill with stale entries and new live queues will be silently collapsed into the _other series. This is the same class of bug as a TTL-less cache.

ForgetQueue should remove the queue from trackedQueues after deleting the series:

func (m *SQSMetrics) ForgetQueue(queue string) {
    if m == nil || queue == "" {
        return
    }
    m.mu.Lock()
    queueLabel, ok := m.trackedQueues[queue] // or just queue if it's in the map
    if ok {
        delete(m.trackedQueues, queue)
    }
    m.mu.Unlock()
    // Only delete series that were actually tracked under this name.
    // Queues that were mapped to _other have no individual series to delete.
    if ok {
        m.queueDepth.DeleteLabelValues(queue, sqsQueueStateVisible)
        m.queueDepth.DeleteLabelValues(queue, sqsQueueStateNotVisible)
        m.queueDepth.DeleteLabelValues(queue, sqsQueueStateDelayed)
    }
}

Note that the current implementation also has a subtle correctness hole: queueLabelForCardinalityBudget adds the queue back to trackedQueues when ForgetQueue is called if the queue has somehow been removed (e.g., in a future refactor). ForgetQueue should not go through the cardinality helper — it should operate directly on trackedQueues.

Fix this →


Low severity / style

maxInt64 helper is redundant with Go 1.21+ builtin

monitoring/sqs.go:179-183 — The module requires Go 1.25 (per CLAUDE.md / go.mod). The builtin max[T] (introduced in Go 1.21) covers this. float64(max(int64(0), visible)) replaces float64(maxInt64(0, visible)) and removes a private helper.

Fix this →

Redundant nil guard in main.go

main.go:1608-1609 — The call site already guards with if r.sqsServer != nil { startSQSDepthObserver(...) }, making the sqsServer == nil branch inside startSQSDepthObserver dead code for this call path. The function-level guard is still the right defensive style (the function is exported-ish by signature), but worth a comment to avoid confusion or a future double-guard being added.

Grafana "Active leader" stat shows no data when all queues are empty

monitoring/grafana/dashboards/elastickv-sqs.json panel id 4 — The query max by (node_id) (elastickv_sqs_queue_messages > 0) * 0 + 1 returns no data points when every queue depth is 0 (all are filtered by > 0). On an idle cluster the stat will show "No data" even when the leader is healthy. A more robust alternative is to select on any series unconditionally, e.g. max by (node_id) (count by (node_id) (elastickv_sqs_queue_messages)).

observeOnce acquires o.mu only for the diff, not for the observe calls

monitoring/sqs.go:270-297 — The calls to o.metrics.ObserveQueueDepth happen outside o.mu, while ForgetQueue calls happen inside it. This is safe today because observeOnce is driven by a single ticker goroutine (and ObserveOnce is documented as test-only). However, if Start has been called and ObserveOnce is called concurrently from a test, the two observeOnce invocations can interleave — one might ObserveQueueDepth for a queue while the other is ForgetQueue-ing it in the diff loop. A comment clarifying the assumed single-writer contract would prevent accidental concurrent use.


Tests

The test suite is thorough and well-named. A few gaps:

  • TestSQSMetrics_ForgetQueue_DropsThreeSeries does not assert that the cardinality slot was freed. After the bug fix above, add an assertion that len(m.trackedQueues) == 0 (or that a second ObserveQueueDepth after ForgetQueue emits a new series rather than collapsing to _other).
  • No test for the overflow + forget interaction — what happens when the 512-slot cap is hit and ForgetQueue is called on an _other-mapped queue. Worth a test to pin the "no-op for overflow queues" contract.

Minor nits (no action needed unless you want to tighten up)

  • adapter/sqs_depth_source.go:35 — nil check s.store == nil duplicates the guard that s.coordinator.IsLeader() would already catch if coordinator is nil-safe. Low impact.
  • The SQSQueueDepth comment in monitoring/sqs.go:63-67 says "a compile error at the SQSObserver call site" — this is true only because sqsDepthSourceAdapter in main.go does a field-by-field copy. Worth keeping that copy explicit (which it is), but the comment slightly overstates the guarantee.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 5f16a7ef61

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread monitoring/sqs.go Outdated
Comment on lines +158 to +161
queueLabel := m.queueLabelForCardinalityBudget(queue)
m.queueDepth.WithLabelValues(queueLabel, sqsQueueStateVisible).Set(float64(maxInt64(0, visible)))
m.queueDepth.WithLabelValues(queueLabel, sqsQueueStateNotVisible).Set(float64(maxInt64(0, notVisible)))
m.queueDepth.WithLabelValues(queueLabel, sqsQueueStateDelayed).Set(float64(maxInt64(0, delayed)))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve overflow queue depth instead of overwriting gauges

When the tracked queue cap is exceeded (sqsMaxTrackedQueues), queueLabelForCardinalityBudget returns _other, but ObserveQueueDepth writes depth with Set on that shared label. This means multiple overflow queues in the same tick clobber each other (only the last queue scanned is visible), and ForgetQueue for one disappeared overflow queue can delete _other even if other overflow queues are still present. The result is incorrect/missing backlog metrics specifically in deployments with more than 512 queues.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements queue-depth monitoring for the SQS adapter, introducing a background observer that exports message counts (visible, in-flight, and delayed) as Prometheus gauges. It also adds a corresponding Grafana dashboard and updates AWS SDK dependencies. Feedback identifies a bug in the ForgetQueue logic where the cardinality budget is not correctly managed, potentially leading to label exhaustion. Additionally, it is recommended to use a single read timestamp across all queue snapshots in a single tick to ensure data consistency and improve efficiency.

Comment thread monitoring/sqs.go
Comment on lines +169 to +177
func (m *SQSMetrics) ForgetQueue(queue string) {
if m == nil || queue == "" {
return
}
queueLabel := m.queueLabelForCardinalityBudget(queue)
m.queueDepth.DeleteLabelValues(queueLabel, sqsQueueStateVisible)
m.queueDepth.DeleteLabelValues(queueLabel, sqsQueueStateNotVisible)
m.queueDepth.DeleteLabelValues(queueLabel, sqsQueueStateDelayed)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation of ForgetQueue has two issues: 1. It doesn't remove the queue from trackedQueues, meaning the cardinality budget (sqsMaxTrackedQueues = 512) will eventually be exhausted by deleted queues, forcing all new queues into the _other label forever. 2. If a queue was not tracked (i.e., it was in the overflow bucket), calling queueLabelForCardinality budget will return sqsQueueOverflow, causing DeleteLabelValues to wipe the overflow series for all queues sharing that label. ForgetQueue should check if the queue is specifically tracked, remove it from the map if so, and only then delete the gauge series.

Suggested change
func (m *SQSMetrics) ForgetQueue(queue string) {
if m == nil || queue == "" {
return
}
queueLabel := m.queueLabelForCardinalityBudget(queue)
m.queueDepth.DeleteLabelValues(queueLabel, sqsQueueStateVisible)
m.queueDepth.DeleteLabelValues(queueLabel, sqsQueueStateNotVisible)
m.queueDepth.DeleteLabelValues(queueLabel, sqsQueueStateDelayed)
}
func (m *SQSMetrics) ForgetQueue(queue string) {
if m == nil || queue == "" {
return
}
m.mu.Lock()
_, tracked := m.trackedQueues[queue]
if tracked {
delete(m.trackedQueues, queue)
}
m.mu.Unlock()
if tracked {
m.queueDepth.DeleteLabelValues(queue, sqsQueueStateVisible)
m.queueDepth.DeleteLabelValues(queue, sqsQueueStateNotVisible)
m.queueDepth.DeleteLabelValues(queue, sqsQueueStateDelayed)
}
}
References
  1. To prevent unbounded memory growth and potential OOM issues, apply a fixed bound to collections that can grow from external requests. Ensure that the management of these collections correctly handles deletions to free up budget.

Comment thread adapter/sqs_depth_source.go Outdated
Comment on lines +35 to +54
func (s *SQSServer) SnapshotQueueDepths(ctx context.Context) []SQSQueueDepth {
if s == nil || s.coordinator == nil || s.store == nil || !s.coordinator.IsLeader() {
return nil
}
names, err := s.scanQueueNames(ctx)
if err != nil {
slog.Warn("sqs depth snapshot: scanQueueNames failed", "err", err)
return nil
}
out := make([]SQSQueueDepth, 0, len(names))
for _, name := range names {
if err := ctx.Err(); err != nil {
return out
}
if snap, ok := s.snapshotOneQueueDepth(ctx, name); ok {
out = append(out, snap)
}
}
return out
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For better efficiency and consistency across the snapshot, you should acquire a single read timestamp at the start of the pass rather than calling nextTxnReadTS inside the loop for every queue. This ensures that all queue depths in a single tick reflect the same point-in-time state of the catalog and counters.

func (s *SQSServer) SnapshotQueueDepths(ctx context.Context) []SQSQueueDepth {
	if s == nil || s.coordinator == nil || s.store == nil || !s.coordinator.IsLeader() {
		return nil
	}
	names, err := s.scanQueueNames(ctx)
	if err != nil {
		slog.Warn("sqs depth snapshot: scanQueueNames failed", "err", err)
		return nil
	}
	readTS := s.nextTxnReadTS(ctx)
	out := make([]SQSQueueDepth, 0, len(names))
	for _, name := range names {
		if err := ctx.Err(); err != nil {
			return out
		}
		if snap, ok := s.snapshotOneQueueDepth(ctx, name, readTS); ok {
			out = append(out, snap)
		}
	}
	return out
}
References
  1. In an MVCC system, all operations within a single transaction must share the same start timestamp to ensure atomicity and a consistent snapshot view.

Comment thread adapter/sqs_depth_source.go Outdated
Comment on lines +64 to +81
func (s *SQSServer) snapshotOneQueueDepth(ctx context.Context, name string) (SQSQueueDepth, bool) {
readTS := s.nextTxnReadTS(ctx)
meta, exists, err := s.loadQueueMetaAt(ctx, name, readTS)
if err != nil || !exists {
return SQSQueueDepth{}, false
}
counters, err := s.scanApproxCounters(ctx, name, meta.Generation, readTS)
if err != nil {
slog.Warn("sqs depth snapshot: counters failed", "queue", name, "err", err)
return SQSQueueDepth{}, false
}
return SQSQueueDepth{
Queue: name,
Visible: counters.Visible,
NotVisible: counters.NotVisible,
Delayed: counters.Delayed,
}, true
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Update the signature to accept the pre-acquired readTS to avoid redundant timestamp generation calls in the loop.

func (s *SQSServer) snapshotOneQueueDepth(ctx context.Context, name string, readTS uint64) (SQSQueueDepth, bool) {
	meta, exists, err := s.loadQueueMetaAt(ctx, name, readTS)
	if err != nil || !exists {
		return SQSQueueDepth{}, false
	}
	counters, err := s.scanApproxCounters(ctx, name, meta.Generation, readTS)
	if err != nil {
		slog.Warn("sqs depth snapshot: counters failed", "queue", name, "err", err)
		return SQSQueueDepth{}, false
	}
	return SQSQueueDepth{
		Queue:      name,
		Visible:    counters.Visible,
		NotVisible: counters.NotVisible,
		Delayed:    counters.Delayed,
	}, true
}

…#743 r1)

Three findings from the round-1 review on PR #743, plus the
suggested test additions:

Medium (cardinality leak):
- ForgetQueue used to call DeleteLabelValues but never shrank
  trackedQueues, so the 512-entry budget filled with stale slots
  and post-cap new queues silently collapsed to _other forever.
  Fix: take m.mu, delete from trackedQueues, release, then
  DeleteLabelValues — but only when the queue was tracked.
  Overflow-mapped queues (queues that hit the cap and got the
  _other label) skip DeleteLabelValues so we don't tear down
  the shared _other series for unrelated queues.

Low / style:
- maxInt64 helper replaced with the Go 1.21+ builtin max[T].
  Module already requires Go 1.25 (go.mod) so the builtin is
  available; the helper was redundant.
- Dashboard 'Active leader' stat: the previous expr filtered on
  'elastickv_sqs_queue_messages > 0' which returns no data on an
  idle cluster (every depth = 0). Switched to
  'count by (node_id) (elastickv_sqs_queue_messages)' which
  surfaces the leader regardless of queue contents.
- observeOnce: added single-writer-contract comment so a future
  caller doesn't accidentally introduce concurrent ticks; the
  CounterVec / GaugeVec writes are individually atomic, but
  symmetric documentation prevents drift.

Tests:
- TestSQSMetrics_ForgetQueue_DropsThreeSeries strengthened: now
  asserts trackedQueues was emptied AND that a post-forget
  Observe re-emits under the real queue name (not _other).
  Without the budget cleanup this assertion would have caught
  the leak.
- TestSQSMetrics_ForgetQueue_OverflowQueueIsNoOp pins the
  overflow-collision safety: ForgetQueue on an _other-mapped
  queue must not delete the shared _other series and must not
  shrink the budget.

Caller-audit per the standing semantic-change rule:
- ForgetQueue's only production caller is SQSObserver.observeOnce.
  observeOnce calls it from the lastSeen-vs-current diff, only
  for queues that had been observed in the previous tick — so
  the not-tracked branch (added with this fix) only fires for
  overflow queues, never for regular ones, never for never-
  observed ones. No caller needs an update.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented May 6, 2026

@claude review

@claude
Copy link
Copy Markdown

claude Bot commented May 6, 2026

Claude finished @bootjp's task in 2m 44s —— View job


Code Review

  • Gather context and understand the request
  • Read adapter/sqs_depth_source.go
  • Read monitoring/sqs.go
  • Read monitoring/sqs_test.go
  • Read monitoring/registry.go
  • Review main.go changes
  • Compile findings and post review

The r1 fixup commit (1911cc1) addressed all medium/low findings from the prior review round cleanly. Reviewing the current state of the branch.

What r1 fixed (confirmed)

Finding Status
ForgetQueue cardinality leak — trackedQueues never shrank ✅ Fixed: lock → delete → unlock → conditional DeleteLabelValues
maxInt64 private helper redundant with Go 1.21+ builtin ✅ Replaced with max[T]
Dashboard "Active leader" stat blank on idle cluster ✅ Changed to count by (node_id) (elastickv_sqs_queue_messages)
observeOnce single-writer assumption undocumented ✅ Comment added
ForgetQueue_DropsThreeSeries didn't assert budget freed ✅ Strengthened with trackedQueues emptiness check + post-forget re-emit
Overflow no-op test missing TestSQSMetrics_ForgetQueue_OverflowQueueIsNoOp added

The ForgetQueue fix is correct — it checks trackedQueues under the lock, removes only if present, then calls DeleteLabelValues only for individually-tracked queues. Overflow-mapped queues correctly skip the delete so the shared _other series isn't torn down.


Remaining items

Low: Per-queue nextTxnReadTS gives each queue a different snapshot timestamp

adapter/sqs_depth_source.go:65snapshotOneQueueDepth calls nextTxnReadTS(ctx) once per queue in the loop. In a 500-queue deployment the first queue gets ts=T₀, the last gets ts=T₄₉₉, diverging by the catalog-scan wall time. For these approximate counters (the SQS attribute names already contain Approximate) this divergence is immaterial — the metrics are explicitly advisory, not transactional. Gemini's suggestion of a single readTS per tick would be cleaner and reduce HLC churn on the leader, but it's a polish item rather than a bug.

Optional improvement →

Low: Redundant outer nil-guard in main.go

main.go:1607 — The call site already checks if r.sqsServer != nil before calling startSQSDepthObserver, and startSQSDepthObserver itself re-checks sqsServer == nil. The inner guard is the canonical defensive pattern; the outer check is harmless redundancy. Neither needs to change, but a comment on one of the two ("inner guard exists for future callers") would clarify the intent.

Low: _other overflow bucket shows last-scanned value, not aggregate

monitoring/sqs.go:158-161 — When more than 512 queues exist, all overflow queues collapse to the _other label via Set. On a single tick, if overflow-a.fifo has depth 100 and overflow-b.fifo has depth 200, the _other.visible gauge ends up at 200 (whichever was scanned last), not 300. The TestSQSMetrics_ForgetQueue_OverflowQueueIsNoOp test documents this as expected behavior, but the dashboard's "Total backlog" panel will undercount when the overflow bucket is active. Worth a brief operator note in the dashboard panel description for the _other stat.

Low: No unit test for SnapshotQueueDepths / snapshotOneQueueDepth

adapter/sqs_depth_source.go is new code with no co-located unit test. The observer layer is fully tested via fakeDepthSource, and the PR description says the full SQS adapter sweep passes — so the paths through loadQueueMetaAt and scanApproxCounters are covered by integration tests. Still worth noting as a gap if the adapter sweep is ever stripped from CI.


What's solid

  • Interface designSQSDepthSource / sqsDepthSourceAdapter keeps the adapter and monitoring packages properly decoupled; a field-order mismatch between adapter.SQSQueueDepth and monitoring.SQSQueueDepth surfaces as a compile error, not a silent metrics misreport.
  • State machineobserveOnce diff (current vs lastSeen) correctly ForgetQueues on step-down (empty slice path) and on DeleteQueue (queue disappears from snapshot). Both cases are pinned by tests.
  • Cardinality bounding — The 512-slot cap + overflow _other pattern mirrors the existing DynamoDBMetrics design. ForgetQueue now correctly reclaims slots.
  • Nil tolerance — Nil receiver on both SQSMetrics and SQSObserver is a no-op everywhere, matching the established pattern for this codebase.
  • Test suite — 10 tests (8 new + 2 strengthened) covering gauge math, negative clamping, empty-name drop, cardinality budget lifecycle, overflow collision safety, nil tolerance, registry wiring, and the observer state machine for both the normal and step-down paths.

The two genuine concerns (cardinality leak, dashboard idle-cluster blank) were already caught and fixed in r1. The remaining items are low-severity polish. LGTM with the optional per-queue readTS cleanup as a nice-to-have.

Round-2 review: optional cleanup. Move nextTxnReadTS out of the
per-queue loop in snapshotOneQueueDepth and into the SnapshotQueueDepths
caller so every queue in one tick shares the same MVCC read
timestamp. Two wins:

- Consistency. With per-queue ts the first queue's depth can
  reflect catalog state the last queue's depth can't (catalog
  mutation between calls). The Approximate* metric names already
  signal advisory-only, so this isn't a correctness bug — but
  one ts per tick gives the dashboard a coherent point-in-time
  snapshot that matches what AdminListQueues + AdminDescribeQueue
  would return at the same instant.
- HLC pressure. Each nextTxnReadTS call advances the leader's
  HLC. On a 500-queue deployment this drops 499 unnecessary
  ticks per scrape interval.

Caller audit per the standing semantic-change rule:
- snapshotOneQueueDepth signature gained one parameter (readTS
  uint64). The only caller is SnapshotQueueDepths in the same
  file; updated to pass the per-tick value through. No other
  caller exists to audit.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented May 6, 2026

@claude review

@claude
Copy link
Copy Markdown

claude Bot commented May 6, 2026

Claude finished @bootjp's task in 2m 22s —— View job


Code Review (r2)

  • Gather context and understand the request
  • Read changed files (adapter/sqs_depth_source.go, monitoring/sqs.go, monitoring/sqs_test.go, monitoring/registry.go, main.go)
  • Review r2 commit (single readTS fix)
  • Review full diff vs origin/main
  • Compile and post review findings

What r2 fixed (confirmed)

Finding Status
Single readTS per tick — nextTxnReadTS was called inside the loop, giving each queue a different snapshot timestamp ✅ Hoisted to SnapshotQueueDepths; snapshotOneQueueDepth now accepts readTS uint64 as a param

The fix is well-implemented. The comment at adapter/sqs_depth_source.go:44-49 clearly states both the correctness reason (consistent MVCC view across all queues per tick) and the performance reason (one HLC tick per interval instead of one per queue). The refactor is minimal and clean.

go.mod delta

All changes are routine AWS SDK patch bumps (v1.41.6 → v1.41.7, smithy-go v1.25.0 → v1.25.1, etc.) across the entire SDK family. No new dependencies, no major/minor version changes. Fine.


Standing items from earlier rounds (no new action required)

These were called out previously and are consciously unaddressed — documenting them here so the merge decision is explicit.

Double nil-guard in main.go (lines ~1607 + inner guard in startSQSDepthObserver) — harmless. The inner guard is the canonical one for future callers; the outer is a call-site optimisation.

_other overflow bucket shows last-write-wins, not aggregateTestSQSMetrics_ForgetQueue_OverflowQueueIsNoOp pins this as expected behavior (line 247: "second observe should overwrite the shared _other gauge"). The "Total backlog" panel query sum(elastickv_sqs_queue_messages) would undercount when >512 queues are active. The test locks in the contract, but operators won't know the _other series is lossy from the dashboard alone. A one-line panel description note would help, but is not a blocker.

No unit test for snapshotOneQueueDepth — covered by integration suite. Acceptable.


Overall

All medium-severity findings from r0 and the low-severity items from r1 are resolved. The three-commit history tells a clean story: initial feature → cardinality/dashboard fixes → single-readTS consistency fix. The interface design, ForgetQueue lifecycle, test coverage, and main.go wiring are all solid.

LGTM.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 33dca0c700

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread monitoring/sqs.go Outdated
Comment on lines +190 to +193
_, tracked := m.trackedQueues[queue]
if tracked {
delete(m.trackedQueues, queue)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Keep partition counter budget separate from depth cleanup

ForgetQueue now deletes entries from trackedQueues, but that map is also used by ObservePartitionMessage to enforce the elastickv_sqs_partition_messages_total cardinality cap. Because counter series are never deleted, removing a queue from trackedQueues lets later queues be admitted with real labels, so queue churn (or a leader step-down that clears lastSeen) can grow partition-counter series beyond sqsMaxTrackedQueues over time. This regresses the original cap and can cause unbounded Prometheus cardinality growth.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
monitoring/sqs.go (1)

248-268: 💤 Low value

Start is not idempotent — repeated calls leak ticker goroutines.

Each call to Start allocates a new time.Ticker and spawns a new goroutine. Currently startSQSDepthObserver only runs once at startup, so this is latent, but a future refactor (e.g., re-wiring the source after a leader transition or a reconfig) would silently double the polling rate per call. The cost of guarding is one bool/atomic.

♻️ Suggested guard
 type SQSObserver struct {
 	metrics *SQSMetrics
 
+	started  atomic.Bool
 	mu       sync.Mutex
 	lastSeen map[string]struct{}
 }
@@
 func (o *SQSObserver) Start(ctx context.Context, source SQSDepthSource, interval time.Duration) {
 	if o == nil || source == nil {
 		return
 	}
+	if !o.started.CompareAndSwap(false, true) {
+		return
+	}
 	if interval <= 0 {
 		interval = sqsDepthObserveInterval
 	}

Add a corresponding test that calls Start twice and asserts only one observation goroutine is active (e.g., via a counter on fakeDepthSource).

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@monitoring/sqs.go` around lines 248 - 268, The Start method on SQSObserver
creates a new time.Ticker and goroutine each call which leaks observers; make
Start idempotent by adding a start guard (e.g., an atomic.Bool or sync.Once
field on SQSObserver such as started or startOnce) so subsequent Start(ctx,
source, interval) calls either return immediately or do not create another
ticker/goroutine, and ensure the existing ticker is stopped on context
cancellation as currently done in the goroutine; update SQSObserver.Start to
check and set that guard before calling observeOnce and creating the
ticker/goroutine (referencing Start, SQSObserver, and observeOnce), and add a
unit test that calls Start twice and asserts only one observation goroutine is
active (use a counter on fakeDepthSource).
adapter/sqs_depth_source.go (1)

35-54: 💤 Low value

Consider a per-tick deadline on the catalog scan.

SnapshotQueueDepths inherits the observer's root context, which only cancels at shutdown. If scanQueueNames or loadQueueMetaAt stalls (e.g., a transient catalog-read hang), the observer's tick goroutine pins on this call indefinitely — the gauges then freeze at their previous values rather than degrading via the documented "scrape failed = missing series" contract on Line 33. A per-tick context.WithTimeout (e.g., bounded by sqsDepthObserveInterval) would let the observer skip a stuck tick and ForgetQueue the disappeared series cleanly. This belongs on the observer side or here at the entrypoint.

♻️ Sketch (caller-side variant in monitoring/sqs.go)
 func (o *SQSObserver) observeOnce(ctx context.Context, source SQSDepthSource) {
 	if o == nil || o.metrics == nil || source == nil {
 		return
 	}
-	snaps := source.SnapshotQueueDepths(ctx)
+	tickCtx, cancel := context.WithTimeout(ctx, sqsDepthObserveInterval)
+	defer cancel()
+	snaps := source.SnapshotQueueDepths(tickCtx)

Either the observer or the adapter is a fine home for the deadline; pick whichever keeps the contract clearer.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@adapter/sqs_depth_source.go` around lines 35 - 54, SnapshotQueueDepths blocks
on the caller/root context so a stalled scanQueueNames or loadQueueMetaAt can
hang the observer tick; add a per-tick deadline (use context.WithTimeout) around
the whole tick in SnapshotQueueDepths so each invocation uses a bounded context
(e.g., use s.sqsDepthObserveInterval or a similar interval constant) when
calling scanQueueNames and s.snapshotOneQueueDepth/loadQueueMetaAt, canceling
the timeout on return so a stuck tick is abandoned and the observer can
ForgetQueue/emit missing series as intended.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@monitoring/grafana/dashboards/elastickv-sqs.json`:
- Around line 158-168: Panel "Active leader" currently uses the PromQL filter
elastickv_sqs_queue_messages > 0 which drops all-zero series and causes "No
data" when queues are drained; update the panel's expr (in the block with
"title": "Active leader", "refId": "A") to remove the > 0 predicate so it
considers zero-valued gauges (e.g. use max by (node_id)
(elastickv_sqs_queue_messages) * 0 + 1) so leaders that report zero are still
shown.

---

Nitpick comments:
In `@adapter/sqs_depth_source.go`:
- Around line 35-54: SnapshotQueueDepths blocks on the caller/root context so a
stalled scanQueueNames or loadQueueMetaAt can hang the observer tick; add a
per-tick deadline (use context.WithTimeout) around the whole tick in
SnapshotQueueDepths so each invocation uses a bounded context (e.g., use
s.sqsDepthObserveInterval or a similar interval constant) when calling
scanQueueNames and s.snapshotOneQueueDepth/loadQueueMetaAt, canceling the
timeout on return so a stuck tick is abandoned and the observer can
ForgetQueue/emit missing series as intended.

In `@monitoring/sqs.go`:
- Around line 248-268: The Start method on SQSObserver creates a new time.Ticker
and goroutine each call which leaks observers; make Start idempotent by adding a
start guard (e.g., an atomic.Bool or sync.Once field on SQSObserver such as
started or startOnce) so subsequent Start(ctx, source, interval) calls either
return immediately or do not create another ticker/goroutine, and ensure the
existing ticker is stopped on context cancellation as currently done in the
goroutine; update SQSObserver.Start to check and set that guard before calling
observeOnce and creating the ticker/goroutine (referencing Start, SQSObserver,
and observeOnce), and add a unit test that calls Start twice and asserts only
one observation goroutine is active (use a counter on fakeDepthSource).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: dffea727-0ee8-4bce-b117-8cf9806ad988

📥 Commits

Reviewing files that changed from the base of the PR and between 7a67c80 and 5f16a7e.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (7)
  • adapter/sqs_depth_source.go
  • go.mod
  • main.go
  • monitoring/grafana/dashboards/elastickv-sqs.json
  • monitoring/registry.go
  • monitoring/sqs.go
  • monitoring/sqs_test.go

Comment thread monitoring/grafana/dashboards/elastickv-sqs.json
P1 (Codex review): ForgetQueue regresses counter cardinality cap.

trackedQueues was a single shared map for both partitionMessages
(CounterVec) and queueDepth (GaugeVec) cardinality budgets. The
r1 fix made ForgetQueue delete the queue's slot so a churn-heavy
deployment (CI workloads, ephemeral queues, leader step-down
clearing the observer's lastSeen) could reuse budget for new
queue names — correct behaviour for the gauge side, since
DeleteLabelValues drops the underlying series too.

But the counter side is cumulative-by-design: Prometheus can't
delete a counter series without losing its observed value, so
ObservePartitionMessage never calls DeleteLabelValues. Reclaiming
the slot for a forgotten queue therefore lets a NEW queue be
admitted under a real label while the OLD queue's counter series
is still alive in Prometheus. Repeating this pushes counter
cardinality past sqsMaxTrackedQueues without bound.

Fix: split trackedQueues into two independent maps —
trackedCounterQueues (one-way; ObservePartitionMessage admits,
ForgetQueue ignores) and trackedDepthQueues (reclaimable;
ForgetQueue frees the slot together with DeleteLabelValues).

Two helpers replace queueLabelForCardinalityBudget:
  - admitForCounterBudget — used by ObservePartitionMessage
  - admitForDepthBudget   — used by ObserveQueueDepth

Caller audit per the standing semantic-change rule:
  ForgetQueue: only caller is SQSObserver.observeOnce. The
    caller's contract ("drop gauges for queue that disappeared
    from this tick's snapshot") is preserved. The narrowed
    scope — counter budget no longer reclaimed — is invisible
    to the observer because it only touches gauges; counters
    are fed via ObservePartitionMessage from a different code
    path.
  ObservePartitionMessage / ObserveQueueDepth: behaviour from
    the caller's POV is unchanged (admit-or-_other on a 512-slot
    cap). The only diff is which map backs the cap. No external
    contract change.
  queueLabelForCardinalityBudget: removed. Grep confirms no
    callers remain.

Regression test:
  TestSQSMetrics_ForgetQueue_DoesNotReclaimCounterBudget
    saturates the counter budget with sqsMaxTrackedQueues distinct
    queues, ForgetQueue's all of them, then asserts a fresh queue
    collapses to _other (would FAIL pre-fix — verified the test
    fails on the previous tip and passes on this commit).
  TestSQSMetrics_ForgetQueue_StillReclaimsDepthBudget
    pins the converse: gauge budget is still reclaimable, so a
    513th distinct queue post-forget gets emitted under its real
    name.

Existing ForgetQueue tests updated for the split-map shape.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented May 6, 2026

@claude review

@claude
Copy link
Copy Markdown

claude Bot commented May 6, 2026

Claude finished @bootjp's task in 1m 45s —— View job


Code Review (r3)

  • Gather context and understand the request
  • Read monitoring/sqs.go (r3 split cardinality budgets)
  • Read monitoring/sqs_test.go (new tests for split budgets)
  • Read adapter/sqs_depth_source.go
  • Read monitoring/registry.go
  • Compile and post review findings

What r3 fixed (confirmed)

The Codex P1 finding was: ForgetQueue deleting from the shared trackedQueues allowed Prometheus counter cardinality to grow past sqsMaxTrackedQueues under queue churn, because freeing a counter slot admitted a new queue under its real label while the old queue's counter series was still alive in Prometheus. Repeat 512 times = unbounded cardinality.

Finding Status
Counter-budget leak under queue churn — shared trackedQueues meant ForgetQueue freed counter slots ✅ Fixed: split into trackedCounterQueues (one-way) and trackedDepthQueues (reclaimable)
queueLabelForCardinalityBudget conflated counter and gauge semantics ✅ Fixed: split into admitForCounterBudget and admitForDepthBudget
No test pinning "counter budget survives ForgetQueue" ✅ Fixed: TestSQSMetrics_ForgetQueue_DoesNotReclaimCounterBudget and TestSQSMetrics_ForgetQueue_StillReclaimsDepthBudget added

Correctness assessment of the fix:

admitForCounterBudget and admitForDepthBudget both serialize correctly under m.mu — no double-lock risk. ForgetQueue takes m.mu only to read/delete from trackedDepthQueues, then releases before calling DeleteLabelValues. The trackedCounterQueues map is untouched by ForgetQueue at any point, which is the exact invariant that prevents the unbounded cardinality growth.

The SQSMetrics struct docblock (monitoring/sqs.go:80-101) explains the design decision clearly — why counters are one-way, why gauges are reclaimable, and what the pre-fix regression was. This is the right level of documentation for a non-obvious split.

Both new tests (_DoesNotReclaimCounterBudget and _StillReclaimsDepthBudget) follow the pre-fix/post-fix pattern well and together fully pin the contract.


Remaining low-severity items (consciously unaddressed; no action required)

These were flagged in earlier rounds. All are latent concerns, not bugs in the current code:

Item Severity Status
Start not idempotent — second call leaks a goroutine Low Latent; current callers never double-call. sync.Once would harden it.
No per-tick deadline on SnapshotQueueDepths — a stalled catalog scan pins the observer goroutine until shutdown Low Latent; context.WithTimeout bounded to sqsDepthObserveInterval would contain it.
_other overflow series is last-write-wins, not aggregate — "Total backlog" panel undercounts when >512 queues active Low Pinned by TestSQSMetrics_ForgetQueue_OverflowQueueIsNoOp:250. Operator note in the dashboard panel description would help.
Double nil-guard in main.go (outer if r.sqsServer != nil + inner guard in startSQSDepthObserver) Nit Harmless.

Summary

r3 closes the final substantive issue (P1 counter cardinality growth under queue churn). All four review rounds of findings — medium-severity ForgetQueue cardinality leak, low-severity items from r1/r2, and the Codex P1 — are now resolved. The interface design, test coverage, observer state machine, and cardinality budget semantics are all in good shape.

LGTM.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: e40ad3bc9f

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread monitoring/sqs.go Outdated
Comment on lines +230 to +231
if !tracked {
// Queue was either never depth-observed or had been collapsed
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Clear overflow gauge when overflow set becomes empty

When queue cardinality is already saturated, ObserveQueueDepth writes all extra queues into the shared _other label, but ForgetQueue now returns early for those queues because they are not in trackedDepthQueues. In a churn scenario (>512 queues) this leaves _other at its last value even after the final overflow queue disappears, so dashboards keep showing phantom backlog for queues that no longer exist until another overflow queue happens to overwrite it.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (3)
monitoring/sqs.go (2)

184-186: 💤 Low value

Optional: drop the redundant int64(0) literal in max.

Go's untyped 0 will be inferred to int64 from the second argument, so max(0, visible) is equivalent and reads more cleanly. Pure readability nit — feel free to ignore.

♻️ Proposed simplification
-	m.queueDepth.WithLabelValues(queueLabel, sqsQueueStateVisible).Set(float64(max(int64(0), visible)))
-	m.queueDepth.WithLabelValues(queueLabel, sqsQueueStateNotVisible).Set(float64(max(int64(0), notVisible)))
-	m.queueDepth.WithLabelValues(queueLabel, sqsQueueStateDelayed).Set(float64(max(int64(0), delayed)))
+	m.queueDepth.WithLabelValues(queueLabel, sqsQueueStateVisible).Set(float64(max(0, visible)))
+	m.queueDepth.WithLabelValues(queueLabel, sqsQueueStateNotVisible).Set(float64(max(0, notVisible)))
+	m.queueDepth.WithLabelValues(queueLabel, sqsQueueStateDelayed).Set(float64(max(0, delayed)))
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@monitoring/sqs.go` around lines 184 - 186, The three lines calling
m.queueDepth.WithLabelValues(...) use max(int64(0), ...) with a redundant int64
literal; replace int64(0) with untyped 0 so the calls become max(0, visible),
max(0, notVisible), max(0, delayed) to improve readability while leaving
behavior unchanged — update the occurrences around queueDepth, queueLabel, and
sqsQueueStateVisible/sqsQueueStateNotVisible/sqsQueueStateDelayed where max is
invoked.

323-343: 💤 Low value

Optional: bail early if ctx is already canceled before the synchronous tick.

If a caller threads in an already-canceled context (e.g., shutdown raced with startup), the synchronous observeOnce call still runs and may trigger a leader catalog scan whose result is then immediately torn down when the goroutine exits on the first select. A one-line guard makes Start a no-op in that case and matches the existing nil-tolerance policy:

♻️ Proposed guard
 	if interval <= 0 {
 		interval = sqsDepthObserveInterval
 	}
+	if ctx.Err() != nil {
+		return
+	}
 	o.observeOnce(ctx, source)

Not a correctness issue — the existing behavior is harmless — just avoids a wasted scan during shutdown races.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@monitoring/sqs.go` around lines 323 - 343, The Start method on SQSObserver
currently always calls o.observeOnce(ctx, source) even if ctx is already
canceled; add an early guard at the top of Start to return if ctx.Done() is
already closed (e.g., check select { case <-ctx.Done(): return default: }), so
the synchronous observeOnce is skipped when the context is canceled and behavior
still falls back to existing nil checks and interval defaulting (symbols:
SQSObserver.Start, o.observeOnce, sqsDepthObserveInterval).
monitoring/sqs_test.go (1)

473-485: 💤 Low value

Optional: use a cancelable context to bound a future regression.

If a future change loosens the nil-source guard, nilObs.Start(context.Background(), &fakeDepthSource{}, time.Millisecond) would leak a goroutine for the lifetime of the test binary. Wiring a t.Cleanup-canceled context would make the test self-contained:

♻️ Proposed defensive tweak
 func TestSQSObserver_NilTolerant(t *testing.T) {
 	t.Parallel()
+	ctx, cancel := context.WithCancel(context.Background())
+	t.Cleanup(cancel)
 	var nilObs *SQSObserver
 	require.NotPanics(t, func() {
-		nilObs.Start(context.Background(), &fakeDepthSource{}, time.Millisecond)
+		nilObs.Start(ctx, &fakeDepthSource{}, time.Millisecond)
 		nilObs.ObserveOnce(&fakeDepthSource{})
 	})
 	obs := newSQSObserver(newSQSMetrics(prometheus.NewRegistry()))
 	require.NotPanics(t, func() {
-		obs.Start(context.Background(), nil, time.Millisecond)
+		obs.Start(ctx, nil, time.Millisecond)
 		obs.ObserveOnce(nil)
 	})
 }

Pure defense-in-depth — the current test passes given the current Start guards.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@monitoring/sqs_test.go` around lines 473 - 485, Wrap Start calls in
TestSQSObserver_NilTolerant with a cancelable context to avoid leaking
goroutines: create a context with context.WithCancel, register the cancel with
t.Cleanup, and pass that ctx into nilObs.Start(...) and obs.Start(...); this
ensures any goroutines started by Start (referenced by SQSObserver.Start) are
canceled when the test finishes.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@monitoring/sqs_test.go`:
- Around line 473-485: Wrap Start calls in TestSQSObserver_NilTolerant with a
cancelable context to avoid leaking goroutines: create a context with
context.WithCancel, register the cancel with t.Cleanup, and pass that ctx into
nilObs.Start(...) and obs.Start(...); this ensures any goroutines started by
Start (referenced by SQSObserver.Start) are canceled when the test finishes.

In `@monitoring/sqs.go`:
- Around line 184-186: The three lines calling m.queueDepth.WithLabelValues(...)
use max(int64(0), ...) with a redundant int64 literal; replace int64(0) with
untyped 0 so the calls become max(0, visible), max(0, notVisible), max(0,
delayed) to improve readability while leaving behavior unchanged — update the
occurrences around queueDepth, queueLabel, and
sqsQueueStateVisible/sqsQueueStateNotVisible/sqsQueueStateDelayed where max is
invoked.
- Around line 323-343: The Start method on SQSObserver currently always calls
o.observeOnce(ctx, source) even if ctx is already canceled; add an early guard
at the top of Start to return if ctx.Done() is already closed (e.g., check
select { case <-ctx.Done(): return default: }), so the synchronous observeOnce
is skipped when the context is canceled and behavior still falls back to
existing nil checks and interval defaulting (symbols: SQSObserver.Start,
o.observeOnce, sqsDepthObserveInterval).

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 1e30ae55-2cc5-44f6-a1a9-d9ece33ff532

📥 Commits

Reviewing files that changed from the base of the PR and between 5f16a7e and e40ad3b.

📒 Files selected for processing (4)
  • adapter/sqs_depth_source.go
  • monitoring/grafana/dashboards/elastickv-sqs.json
  • monitoring/sqs.go
  • monitoring/sqs_test.go

P2 (Codex review): ForgetQueue leaves phantom _other backlog after
overflow queues vanish.

When the depth budget saturates, ObserveQueueDepth collapses extra
queues onto the shared sqsQueueOverflow ('_other') label. ForgetQueue
early-returned for these because they were not in trackedDepthQueues —
which lumps two distinct cases together:

  - never depth-observed (no series to delete; correct no-op)
  - currently mapped to _other (series exists, shared, must drop
    only when no overflow queue is still reporting into it)

In a churn scenario with >sqsMaxTrackedQueues distinct queues the
final overflow queue's last value sits on the dashboard until
another overflow queue overwrites it — phantom backlog for queues
that no longer exist.

Fix: track overflow membership in a third map (overflowDepthQueues)
so ForgetQueue can ref-count overflow queues. Two-branch behaviour:

  - real-name queue: drop the three state series + free the
    trackedDepthQueues slot (unchanged)
  - overflow queue: remove from overflowDepthQueues; if the set
    is now empty, drop the three _other state series too;
    otherwise leave _other alone so other overflow queues keep
    reporting into it

Counter side has no equivalent map. Counters are cumulative — the
_other counter legitimately reflects total operations even from
queues that have since been deleted, so there's no phantom-data
problem to solve there.

Caller audit per the standing semantic-change rule:
  ForgetQueue: only caller is SQSObserver.observeOnce. The
    observer's contract ('drop gauges for queue that disappeared
    from this tick's snapshot') is preserved and consistently
    extended — overflow queues, previously a silent no-op, now
    also stop pinning the shared gauge once the last one
    disappears. The branching is internal to ForgetQueue; the
    caller passes the queue name verbatim and is unaware of which
    map it lived in.
  admitForDepthBudget: signature unchanged. New side effect of
    populating overflowDepthQueues when collapsing to _other.
    Only caller is ObserveQueueDepth; the side effect is invisible
    from the caller's POV (still receives the same string label).

Regression test:
  TestSQSMetrics_ForgetQueue_LastOverflowClearsOtherGauge
    saturates the depth budget, observes two overflow queues,
    ForgetQueue's both, and asserts the _other series is gone
    (verified: pre-fix the gauge GatherAndCount returns 1539,
    post-fix it returns 1536 — exactly the 3 _other series
    dropped).

Existing TestSQSMetrics_ForgetQueue_OverflowQueueIsNoOp continues
to pass — ForgetQueue on one of N>=2 overflow queues still
preserves the _other series for the remaining ones, which was
the original invariant.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented May 6, 2026

@claude review

@claude
Copy link
Copy Markdown

claude Bot commented May 6, 2026

Claude finished @bootjp's task in 2m 32s —— View job


Code Review (r4)

  • Gather context and understand the request
  • Read monitoring/sqs.go (r4 overflow-clear fix)
  • Read monitoring/sqs_test.go (new overflow-clear tests)
  • Review remaining open items from prior rounds
  • Compile and post review findings

What r4 fixed (confirmed)

The Codex P2 finding was: when all overflow queues disappear from the tick snapshot, ForgetQueue was a silent no-op for those queues (they weren't in trackedDepthQueues), leaving the shared _other gauge pinned at its last-written value — phantom backlog for queues that no longer exist.

Finding Status
_other gauge not cleared when last overflow queue is forgotten ✅ Fixed via overflowDepthQueues ref-count set in SQSMetrics
admitForDepthBudget must record overflow queues for ref-counting ✅ Fixed: m.overflowDepthQueues[queue] = struct{}{} on every overflow-path call
ForgetQueue must clear _other only when the overflow ref-count hits zero ✅ Fixed: overflowSetEmpty := overflow && len(m.overflowDepthQueues) == 0 guarding the delete
Test pinning the fix TestSQSMetrics_ForgetQueue_LastOverflowClearsOtherGauge added

Correctness assessment of the r4 fix:

ForgetQueue now handles three disjoint cases:

  1. Tracked (real-name): removes from trackedDepthQueues, deletes three state-labelled series. ✓
  2. Overflow: removes from overflowDepthQueues. If set is now empty, deletes the three _other series. ✓
  3. Unknown (never depth-observed): no-op. ✓

All map mutations happen under m.mu; the DeleteLabelValues calls run after the lock is released. No double-lock risk. The overflowSetEmpty flag is computed atomically inside the lock, so the test-and-delete is race-free. ✓

Edge case — queue transitioning from overflow to real-name: when the budget drains below 512 and an overflow queue gets re-admitted as a real-name queue, it ends up in both trackedDepthQueues and overflowDepthQueues. In that scenario, ForgetQueue would delete the real-name series AND (if it's the last overflow queue) tear down _other. Since _other would only contain that queue's previous overflow-era values (stale), deleting it is the correct behavior. The idempotent struct{}{} map semantics prevent double-counting on re-observe. ✓

TestSQSMetrics_ForgetQueue_OverflowQueueIsNoOp still passes: that test forgets overflow-a.fifo while overflow-b.fifo is still present. overflowSetEmpty is false, so _other is left intact and the test's assertions about trackedDepthQueues length and _other visible value still hold. ✓

Minor nit (no action needed): the test name _OverflowQueueIsNoOp is now slightly misleading — ForgetQueue on an overflow queue does mutate overflowDepthQueues, it just doesn't touch the gauge series when other overflow queues remain. The external behavior the test checks is still correct; the name just no longer matches the internal semantics. Worth noting in case a future reader goes looking.


Standing items (all low-severity, consciously unaddressed)

All five review rounds of findings are now resolved. The three latent items below remain from earlier rounds; they're not blockers:

Item Severity
Start not idempotent — repeated calls each spawn a ticker goroutine. Latent: current startSQSDepthObserver never double-calls. sync.Once would harden it Low
No per-tick deadline on SnapshotQueueDepths — a stalled catalog scan pins the observer goroutine until shutdown Low
Double nil-guard in main.go (outer if r.sqsServer != nil + inner guard in startSQSDepthObserver) Nit

Overall

The five-commit story — initial feature → cardinality/dashboard fixes (r1) → single readTS (r2) → split counter/gauge budgets (r3) → overflow ref-count (r4) — resolves every finding from all review rounds in order of severity. The overflowDepthQueues ref-count in r4 is the correct mechanism for this problem; the test pins both the "don't clear while others remain" sub-case and the "clear on last" sub-case. Implementation is clean and documented.

LGTM.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 9738f75c4e

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread monitoring/sqs.go
Comment on lines +321 to 326
if len(m.trackedDepthQueues) >= sqsMaxTrackedQueues {
m.overflowDepthQueues[queue] = struct{}{}
return sqsQueueOverflow
}
m.trackedDepthQueues[queue] = struct{}{}
return queue
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Remove queue from overflow set when promoting real label

When cardinality drops below sqsMaxTrackedQueues, admitForDepthBudget promotes previously-overflowed queues to real labels by adding them to trackedDepthQueues, but it never removes that queue from overflowDepthQueues. ForgetQueue later uses overflowDepthQueues as a ref-count to decide whether to delete the shared _other gauge, so stale overflow entries can keep _other alive even when no queue still maps to _other, leaving phantom backlog in churny >512-queue deployments. Fresh evidence vs the earlier overflow-cleanup report: this commit introduced promotion via m.trackedDepthQueues[queue] = struct{}{} without corresponding overflow cleanup, which can keep the overflow set non-empty incorrectly.

Useful? React with 👍 / 👎.

Second P2 (Codex review): admitForDepthBudget leaves stale entries
in overflowDepthQueues when a queue gets promoted off the _other
label.

Scenario: budget saturates, queue X collapses to _other. Later a
slot opens (some other queue is ForgetQueue'd). On X's next
observation admitForDepthBudget finds room and admits X under its
real name — but the previous fix only added X to
trackedDepthQueues. The stale entry in overflowDepthQueues was
never cleaned up, so:

  1. ForgetQueue's overflow ref-count is permanently off by one
     per stale entry — _other can persist after every live queue
     has been promoted off it.
  2. The _other series itself still carries X's last-overflow
     value while X is now reporting under its real label.
     Dashboards see double-counted backlog.

Fix: in admitForDepthBudget's slot-available branch, also remove
the queue from overflowDepthQueues. If that drains the overflow
set entirely, drop the three _other state series too — mirroring
ForgetQueue's existing 'last overflow gone' branch.

Refactor: factor the three DeleteLabelValues calls into
dropGaugeStatesFor since they now appear in three places (ForgetQueue
tracked-branch, ForgetQueue overflow-empty-branch, and the new
admitForDepthBudget promotion-drains-overflow branch). Pure
readability — Prometheus's GaugeVec already does its own per-vector
locking so the wrapper holds no extra lock.

Caller audit per the standing semantic-change rule:
  admitForDepthBudget: signature unchanged. New side effect — drop
    _other gauge on a promotion that drains the overflow set —
    invisible to the only caller (ObserveQueueDepth) which still
    gets back the same string label.
  ObserveQueueDepth callers: unaffected (ObserveQueueDepth's
    contract is unchanged).
  dropGaugeStatesFor: new internal helper, no external callers.
  ForgetQueue: external semantics preserved; only the
    DeleteLabelValues calls were collapsed into the helper.

Regression test:
  TestSQSMetrics_AdmitForDepthBudget_PromotionClearsOverflow
    saturates the budget, observes overflow X, ForgetQueue's a
    real queue to free a slot, re-observes X, and asserts the
    _other series is gone. Pre-fix: 1539 series (511 real + X
    real + _other phantom × 3 states). Post-fix: 1536 (the 3
    _other phantom series dropped by the new promotion cleanup).
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented May 6, 2026

@claude review

@claude
Copy link
Copy Markdown

claude Bot commented May 6, 2026

Claude finished @bootjp's task in 3m 9s —— View job


Code Review (r5)

  • Gather context and read changed files
  • Assess r5 fix (overflow set cleanup on real-label promotion)
  • Check for any remaining open issues
  • Post findings

What r5 fixed (confirmed)

The Codex P2 finding was: when budget pressure eases and admitForDepthBudget promotes a previously-overflowed queue from _other to its real-name label, the queue was added to trackedDepthQueues but never removed from overflowDepthQueues. Consequence: ForgetQueue's overflow ref-count was permanently off by one per stale entry — the _other gauge could persist (with phantom backlog) even after every live queue had been promoted off it.

Finding Status
Stale overflow entry on real-label promotion — overflowDepthQueues not cleaned up ✅ Fixed: admitForDepthBudget now checks wasOverflow, deletes from overflowDepthQueues, and drops _other if the set empties
No test pinning "promotion drains overflow entry" ✅ Fixed: TestSQSMetrics_AdmitForDepthBudget_PromotionClearsOverflow added

Correctness assessment of the fix (monitoring/sqs.go:321-343):

m.trackedDepthQueues[queue] = struct{}{}
_, wasOverflow := m.overflowDepthQueues[queue]
if wasOverflow {
    delete(m.overflowDepthQueues, queue)
}
overflowSetEmpty := wasOverflow && len(m.overflowDepthQueues) == 0
m.mu.Unlock()
if overflowSetEmpty {
    m.dropGaugeStatesFor(sqsQueueOverflow)
}

All mutations to trackedDepthQueues and overflowDepthQueues happen under m.mu; dropGaugeStatesFor runs after the lock is released (GaugeVec ops are internally locked). The overflowSetEmpty flag is computed atomically inside the lock, so there is no race between checking and acting on the empty-set condition. ✓

Edge case: budget full with two overflow queues (x, y); one slot freed; concurrent admitForDepthBudget for x. After x is promoted, trackedDepthQueues is full again (511 + x = 512). A subsequent call for y will hit the len >= sqsMaxTrackedQueues branch and re-add y to overflowDepthQueues (idempotent since y was already there). overflowSetEmpty is false in x's promotion (y remains), so _other is not dropped. Correct behavior under concurrency. ✓

Test correctness:

TestSQSMetrics_AdmitForDepthBudget_PromotionClearsOverflow correctly exercises:

  1. Budget saturated → overflow-x.fifo collapses to _other (visible=100)
  2. ForgetQueue("real-0.fifo") frees one slot; _other NOT dropped (x still maps to it)
  3. ObserveQueueDepth("overflow-x.fifo", 200, ...)admitForDepthBudget promotes x, removes from overflowDepthQueues, set now empty → drops _other series, returns real label
  4. Assert x's real-name gauge = 200 ✓
  5. Assert total series = sqsMaxTrackedQueues * 3 (511 surviving real queues + x promoted = 512 queues, no _other) ✓

Standing items (all consciously deferred, no new blockers)

Item Severity Rounds raised
Start not idempotent — repeated calls each spawn a ticker goroutine Low r3, r4
No per-tick deadline on SnapshotQueueDepths — stalled catalog scan pins the observer goroutine Low r3, r4
TestSQSMetrics_ForgetQueue_OverflowQueueIsNoOp name slightly misleading post-r4 — ForgetQueue on an overflow queue now does mutate overflowDepthQueues; it just leaves the gauge intact when other overflow queues remain Nit r4
Double nil-guard in main.go (outer if r.sqsServer != nil + inner guard in startSQSDepthObserver) Nit r1

Six-commit story, complete

Commit What it fixed
Initial Feature: depth gauges, observer, dashboard
r1 ForgetQueue cardinality leak; dashboard idle-cluster blank; maxInt64 helper
r2 Single readTS per tick (MVCC consistency + HLC churn)
r3 Split counter/gauge cardinality budgets (P1: counter budget not one-way)
r4 Overflow ref-count (P2a: _other gauge not cleared on last overflow forget)
r5 Promotion cleanup (P2b: overflow entry not cleaned on real-label promotion)

Every medium and low-severity finding across all five review rounds is resolved. The interface design, ForgetQueue lifecycle, admitForDepthBudget promotion path, test coverage, and main.go wiring are all solid.

LGTM.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 7b5ab13b6a

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread adapter/sqs_depth_source.go Outdated
Comment on lines +39 to +42
names, err := s.scanQueueNames(ctx)
if err != nil {
slog.Warn("sqs depth snapshot: scanQueueNames failed", "err", err)
return nil
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid clearing all gauges on queue-scan errors

When scanQueueNames fails, this returns nil, but SQSObserver.observeOnce treats a nil snapshot as “no queues exist now” and calls ForgetQueue for every previously seen queue. In a transient storage/read failure on the leader, that wipes all depth series and produces a false “all backlogs disappeared” signal until the next successful tick. The source should distinguish “snapshot failed” from a real empty/follower snapshot so existing gauges are preserved on error.

Useful? React with 👍 / 👎.

P2 (Codex review): SnapshotQueueDepths returns nil on scanQueueNames
failure, observeOnce treats nil as 'no queues exist now' and
ForgetQueue's every previously-seen queue. A single transient
catalog-read error therefore wipes every depth gauge and
dashboard-renders as a false 'all queues drained' event for the
duration of the failure.

Two empty-snapshot cases were conflated:

  - ok=true with empty/nil — legitimate empty (follower because
    the node stepped down, leader with zero queues configured).
    Observer should diff/ForgetQueue normally.
  - scan failure / ctx cancel — observer should NOT diff.
    Existing gauges and lastSeen must stay intact so the next
    successful tick can still diff against the previous good
    state.

Fix: extend SQSDepthSource.SnapshotQueueDepths from `[]X` to
`([]X, bool)` so the source can signal 'skip this tick'
distinctly from 'empty but valid'.

Caller audit per the standing semantic-change rule:

  monitoring.SQSDepthSource (interface) — sig changed; doc
    enumerates the three return shapes.
  monitoring.SQSObserver.observeOnce — added `if !ok { return }`
    short-circuit before the gauge writes / diff. Other branches
    unchanged.
  adapter.SQSServer.SnapshotQueueDepths — three explicit returns:
    (nil, true) for nil receiver / follower (legitimate
    'this node should not emit'); (nil, false) for
    scanQueueNames error or ctx cancel mid-scan; (snaps, true)
    for the success path. Per-queue snapshotOneQueueDepth
    failures still drop only that queue from this tick (ok stays
    true) — the observer ForgetQueue's just that one gauge.
  main.sqsDepthSourceAdapter.SnapshotQueueDepths — bridge
    propagates the new ok flag verbatim through the
    adapter→monitoring type translation.
  monitoring.fakeDepthSource (test helper) — replaced
    `snapshots [][]X` with `ticks []fakeDepthTick` where each
    tick scripts (snaps, ok). Existing tests migrated; trailing
    ticks past the scripted slice still default to (nil, true)
    so leader-step-down clears gauges as before.

Regression test:
  TestSQSObserver_ObserveOnce_TransientScanErrorPreservesGauges
    scripts three ticks: success → ok=false (scan fail) →
    success-recovered. Asserts gauges keep tick-1 values
    through the failed tick AND that recovery on tick 3 still
    diffs correctly. Pre-fix shape would wipe both queues on
    tick 2.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented May 6, 2026

@claude review

@claude
Copy link
Copy Markdown

claude Bot commented May 6, 2026

Claude finished @bootjp's task in 1m 57s —— View job


Code Review (r6)

  • Gather context and understand the request
  • Read r6 diff (monitoring/sqs.go, adapter/sqs_depth_source.go, main.go, monitoring/sqs_test.go)
  • Assess the skip-tick vs empty-snapshot fix
  • Check for remaining open issues
  • Post findings

What r6 fixed (confirmed)

The Codex P2 finding was: scanQueueNames failure returns nil, which observeOnce treats as "leader with zero queues" and ForgetQueues every previously-seen queue. A single transient catalog-read error wipes every depth gauge and renders a false "all queues drained" event on the dashboard until the next successful tick.

Finding Status
SnapshotQueueDepths couldn't distinguish "scan failed" from "legitimately no queues" ✅ Fixed: signature extended to ([]SQSQueueDepth, bool)
observeOnce wiped gauges on transient scanQueueNames failure ✅ Fixed: !ok short-circuit before the diff/ForgetQueue loop
Partial scan on ctx.Err() mid-iteration returned usable partial slice ✅ Fixed: mid-iteration cancel now returns (nil, false) instead of (out, true)
Bridge adapter in main.go needed to propagate ok flag ✅ Fixed: three-branch return mirrors the adapter's three cases
fakeDepthSource migrated to fakeDepthTick{snaps, ok} struct ✅ Existing tests preserved; trailing ticks still default to (nil, true)
Regression test TestSQSObserver_ObserveOnce_TransientScanErrorPreservesGauges added

Correctness assessment

Interface design(snaps, bool) is the canonical Go idiom for distinguishing "empty result" from "call failed". New SQSDepthSource implementors must explicitly return a bool, so the two cases are unambiguous at compile time. ✓

Three explicit return paths in SQSServer.SnapshotQueueDepths:

Return Meaning Observer action
(nil, true) Follower or nil receiver Diff normally — ForgetQueue any stale gauges from previous leader tenure
(nil, false) scanQueueNames error Skip-tick — preserve gauges and lastSeen
(nil, false) ctx.Err() mid-iteration Skip-tick — discard partial result (returning partial would ForgetQueue the unscanned tail)
(snaps, true) Success Diff, ForgetQueue disappeared queues

All four paths are correct. The mid-iteration cancel case (ctx.Err() returning (nil, false) instead of the former (out, true)) is the most important subtlety — returning a partial snapshot would have silently ForgetQueued every queue not yet iterated.

lastSeen is preserved through skip-ticks — The !ok short-circuit in observeOnce fires before the diff loop and before any lastSeen mutation. The next successful tick diffs against the correct pre-failure baseline. ✓

Long-duration scan failures — gauges remain pinned at their last successful values. This is correct dashboard semantics: stale data is less misleading than a false "all queues empty" reading.

fakeDepthSource migration — trailing ticks past the scripted slice now default to (nil, true) (was nil, same semantic under the old interface). Existing tests were migrated without behavior change. ✓

Regression test completenessTestSQSObserver_ObserveOnce_TransientScanErrorPreservesGauges covers:

  • Tick 1 (success): gauges set to orders=5, audio=1
  • Tick 2 (ok=false): gauges asserted to still be orders=5, audio=1; gauge count still 6 (2 queues × 3 states)
  • Tick 3 (recovery): gauges update to orders=7, audio=2; no spurious ForgetQueue (both queues still present in tick-3 snapshot, lastSeen intact from tick-1 means diff is clean)

The tick-3 assertion implicitly verifies that lastSeen wasn't corrupted during the failed tick — if it were empty, the tick-3 diff would have no "disappeared" queues to forget anyway, but the tick-3 gauge values would still be wrong if gauges had been wiped in tick-2. ✓


Standing items (all consciously deferred, no new action required)

Item Severity Rounds raised
Start not idempotent — repeated calls each spawn a ticker goroutine. sync.Once would harden it; current call site never double-calls Low r3, r4, r5
No per-tick deadline on SnapshotQueueDepths — stalled catalog scan pins the observer goroutine until shutdown Low r3, r4, r5
TestSQSMetrics_ForgetQueue_OverflowQueueIsNoOp name slightly misleading post-r4 — ForgetQueue on overflow now does mutate overflowDepthQueues; it just leaves the gauge intact when others remain Nit r4, r5
Double nil-guard in main.go (outer if r.sqsServer != nil + inner guard in startSQSDepthObserver) Nit r1

None of these were introduced or worsened by r6.


Seven-commit story, complete

Commit What it fixed
Initial Feature: depth gauges, observer, dashboard
r1 ForgetQueue cardinality leak; dashboard idle-cluster blank; maxInt64 helper
r2 Single readTS per tick (MVCC consistency + HLC churn)
r3 Split counter/gauge cardinality budgets (P1: counter budget not one-way)
r4 Overflow ref-count (P2a: _other gauge not cleared on last overflow forget)
r5 Promotion cleanup (P2b: overflow entry not removed on real-label promotion)
r6 Skip-tick on scan failure (P2c: transient error wiped all gauges)

All findings across all six review rounds — including the final Codex P2 — are resolved. The interface extension is the correct long-term fix (makes the two empty cases unambiguous for any future implementor), the test is thorough, and the bridge adapter is correctly wired.

LGTM.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 1651db7305

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread adapter/sqs_depth_source.go Outdated
Comment on lines +50 to +54
names, err := s.scanQueueNames(ctx)
if err != nil {
slog.Warn("sqs depth snapshot: scanQueueNames failed", "err", err)
return nil, false
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Use one MVCC timestamp for queue-depth snapshots

SnapshotQueueDepths calls scanQueueNames before taking readTS, but scanQueueNames already reads at its own internal nextTxnReadTS. That means queue membership and per-queue counters are collected from different snapshots, so a queue created/deleted between those two timestamps can be incorrectly forgotten or missed for a tick even when the final state is stable. This directly breaks the observer’s “current vs previous” diff contract under queue churn; pass a single read timestamp through both scans so the snapshot is truly point-in-time.

Useful? React with 👍 / 👎.

…#743 r7)

P2 (Codex review): SnapshotQueueDepths takes its readTS AFTER
scanQueueNames returns. scanQueueNames internally calls
nextTxnReadTS, so membership lands on one MVCC view while the
per-queue counter reads run at a later ts. A queue created or
deleted in that microsecond window can be silently missed (or
reported with stale counters) for the tick — and the observer's
'current vs previous' diff loop then ForgetQueue's it spuriously,
dashboard-rendering as a phantom drop or zero backlog under
normal queue churn.

Fix: take readTS once at the top of SnapshotQueueDepths and
thread it through both scans. Refactor scanQueueNames into a
1-line wrapper around the new scanQueueNamesAt(ctx, readTS) so
the depth-source path passes its ts in while the existing 3
callers (AdminListQueues, the reaper, the catalog walk) keep
using the fresh-ts shape they already have — none need
cross-call consistency.

Caller audit per the standing semantic-change rule:
  scanQueueNames(ctx): external contract unchanged. The
    fresh-ts wrapper still acquires its own readTS via
    s.nextTxnReadTS internally, so admin / catalog / reaper
    callers see no observable difference.
  scanQueueNamesAt(ctx, readTS): new package-private. Only
    caller is SnapshotQueueDepths; no observable behavior
    elsewhere.
  SnapshotQueueDepths: signature and return semantics unchanged.
    Internal change tightens an MVCC consistency property — the
    only externally-visible effect is that the bug above stops
    happening, which is strictly an improvement to the
    observer's diff contract.

No regression test added: the failure window is microseconds
between two HLC ticks, and a deterministic reproduction would
need a fault-injecting fake store wired into the catalog
read path — meaningful test scaffolding that doesn't exist
today and is disproportionate to a mechanical thread-through
fix that's self-evidently correct from code reading. The
property the fix establishes (membership scan and per-queue
reads use the same readTS) is enforced at the scanQueueNamesAt
signature: any future code path here must take readTS as a
parameter, making backsliding a compile error rather than a
runtime race.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented May 6, 2026

@claude review

@claude
Copy link
Copy Markdown

claude Bot commented May 6, 2026

Claude finished @bootjp's task in 1m 58s —— View job


Code Review (r7)

  • Gather context and understand the request
  • Read r7 diff
  • Read changed files
  • Compile findings and post review

What r7 fixed (confirmed)

The final Codex P2 finding was: SnapshotQueueDepths (after r2) had hoisted readTS out of the per-queue loop, but the membership scan (scanQueueNames) still called nextTxnReadTS internally — so the queue name list and the per-queue counter/meta reads were still on different MVCC views. A queue created or deleted between those two timestamps could appear in the names list with no valid counter data, or be silently skipped and then spuriously ForgetQueue'd for one tick.

Finding Status
Membership scan and per-queue reads used separate MVCC timestamps ✅ Fixed: readTS hoisted before scanQueueNamesAt; both scans share one timestamp
scanQueueNames needed a ts-parameterised variant ✅ Fixed: scanQueueNamesAt(ctx, readTS) extracted; scanQueueNames preserved as a fresh-ts wrapper for all other callers (AdminListQueues, reaper, catalog walk)

Correctness assessment of the fix (adapter/sqs_depth_source.go:59-61, adapter/sqs_catalog.go:1221-1266):

// After r7 — one MVCC view for both scans
readTS := s.nextTxnReadTS(ctx)
names, err := s.scanQueueNamesAt(ctx, readTS)   // membership at readTS
// …
snapshotOneQueueDepth(ctx, name, readTS)         // counters at same readTS

All callers of the original scanQueueNames (AdminListQueues, reaper, etc.) continue to use the fresh-ts wrapper, so there is no behavioral change elsewhere. The scanQueueNamesAt comment clearly explains why cross-call consistency matters only for SnapshotQueueDepths.

nextTxnReadTS reads GlobalLastCommitTS / LastCommitTS and calls clock.Observe(maxTS) — now called once per tick instead of twice, which is a small positive side effect on HLC observation pressure. The function is otherwise idempotent in its side effects, so the ordering change is safe. ✓


No new issues

All prior findings (r0–r6) remain resolved. The four standing low-severity / nit items from earlier rounds are unchanged and consciously deferred:

Item Severity Rounds raised
Start not idempotent — repeated calls each spawn a ticker goroutine. Latent: current startSQSDepthObserver never double-calls Low r3–r6
No per-tick deadline on SnapshotQueueDepths — stalled catalog scan pins the observer goroutine until shutdown Low r3–r6
TestSQSMetrics_ForgetQueue_OverflowQueueIsNoOp name slightly misleading post-r4 — ForgetQueue on overflow now does mutate overflowDepthQueues; it just leaves the gauge intact while others remain Nit r4–r6
Double nil-guard in main.go (outer if r.sqsServer != nil + inner guard in startSQSDepthObserver) Nit r1–r6

Eight-commit story, complete

Commit What it fixed
Initial Feature: depth gauges, observer, dashboard
r1 ForgetQueue cardinality leak; dashboard idle-cluster blank; maxInt64 helper
r2 Single readTS per tick — hoisted out of the per-queue loop (HLC churn, per-queue consistency)
r3 Split counter/gauge cardinality budgets (P1: counter budget not one-way)
r4 Overflow ref-count (P2a: _other gauge not cleared on last overflow forget)
r5 Promotion cleanup (P2b: overflow entry not removed on real-label promotion)
r6 Skip-tick on scan failure (P2c: transient error wiped all gauges)
r7 Shared readTS across membership + counter scans (P2d: membership and counter reads on different MVCC views)

Every finding across all seven review rounds is resolved. The interface design, ForgetQueue lifecycle, cardinality budget semantics, skip-tick contract, and MVCC consistency are all solid.

LGTM.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@adapter/sqs_depth_source.go`:
- Around line 89-98: In snapshotOneQueueDepth, handle and log errors from
loadQueueMetaAt rather than silently swallowing them: after calling
loadQueueMetaAt(name, readTS) check if err != nil separately from exists and,
when err != nil, emit a structured slog.Warn (or slog.Error) with stable keys
such as "queue", "err", "commit_ts" (use readTS) and "route_id" if available,
then return SQSQueueDepth{} false; keep the existing behavior for !exists (no
log or a different level) so that scanApproxCounters errors remain logged as
before.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 566287f7-6d52-4579-b1fa-ebc0075b9973

📥 Commits

Reviewing files that changed from the base of the PR and between e40ad3b and dd8a3c3.

📒 Files selected for processing (5)
  • adapter/sqs_catalog.go
  • adapter/sqs_depth_source.go
  • main.go
  • monitoring/sqs.go
  • monitoring/sqs_test.go
✅ Files skipped from review due to trivial changes (1)
  • adapter/sqs_catalog.go
🚧 Files skipped from review as they are similar to previous changes (2)
  • main.go
  • monitoring/sqs_test.go

Comment thread adapter/sqs_depth_source.go
CodeRabbit Minor: snapshotOneQueueDepth's first branch
`if err != nil || !exists` collapses two distinct cases — a
catalog read failure and a queue that legitimately doesn't
exist — into the same silent return. The function-level docstring
and the SnapshotQueueDepths contract both state that per-queue
scan errors are logged; only scanApproxCounters errors actually
were. Asymmetric: a flapping catalog read showed up as a dashboard
gauge gap with nothing in the logs to explain it.

Split the branches:
  err != nil  -> slog.Warn with stable keys (queue, err)
  !exists     -> silent return (the queue genuinely vanished —
                 the observer's diff loop will ForgetQueue the
                 gauge on its own; logging this is just noise)

No semantic change to caller-visible behavior: both branches
return the same (SQSQueueDepth{}, false) sentinel they did
before. Pure logging hygiene + alignment with the documented
contract. No caller audit needed since neither return value
nor branching changes.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented May 6, 2026

@claude review

@claude
Copy link
Copy Markdown

claude Bot commented May 6, 2026

Claude finished @bootjp's task in 1m 31s —— View job


Code Review (r8)

  • Gather context and understand the request
  • Read adapter/sqs_depth_source.go (r8 loadQueueMetaAt logging fix)
  • Read monitoring/sqs.go
  • Read monitoring/sqs_test.go
  • Compile findings and post review

What r8 fixed (confirmed)

The CodeRabbit Minor finding was: snapshotOneQueueDepth had a combined if err != nil || !exists branch that silently swallowed catalog read errors — a flapping loadQueueMetaAt would surface only as a missing gauge series, with nothing in the logs to explain why. This was asymmetric: scanApproxCounters errors were already logged at slog.Warn.

Finding Status
loadQueueMetaAt errors silently swallowed — no log, just a silent return (_, false) ✅ Fixed: split into two branches; err != nilslog.Warn with stable keys (queue, err); !exists → silent return (not an error — observer diff will ForgetQueue on its own)

Correctness assessment:

The split is correct on both axes:

  • Behavioral — both branches still return (SQSQueueDepth{}, false). No caller-visible change; no audit needed.
  • Semantic — distinguishing err != nil (something failed that an operator needs to know about) from !exists (queue legitimately gone — steady-state, not noteworthy) is the right split. Logging !exists would generate noise for every queue that gets DeleteQueue'd while the observer is running.
  • Conventionslog.Warn("sqs depth snapshot: loadQueueMetaAt failed", "queue", name, "err", err) uses stable key names (queue, err) per the CLAUDE.md convention and is symmetric with the scanApproxCounters warning at line 110.
  • Comment quality — both branches have inline comments explaining the decision; the !exists path explicitly notes "the observer's diff will ForgetQueue this gauge on its own" so a future reader doesn't wonder why the error case isn't logged here too.

Standing items (all consciously deferred; no new action required)

All items below were raised in previous rounds. None were introduced or worsened by r8.

Item Severity Rounds raised
Start not idempotent — repeated calls each spawn a ticker goroutine. Latent: startSQSDepthObserver never double-calls. sync.Once would harden it Low r3–r7
No per-tick deadline on SnapshotQueueDepths — stalled catalog scan pins the observer goroutine until shutdown Low r3–r7
TestSQSMetrics_ForgetQueue_OverflowQueueIsNoOp name slightly misleading post-r4 — ForgetQueue on overflow now does mutate overflowDepthQueues; it just leaves the gauge intact while others remain Nit r4–r7
Double nil-guard in main.go (outer if r.sqsServer != nil at line 1615 + inner guard in startSQSDepthObserver at line 1143) Nit r1–r7

Nine-commit story, complete

Commit What it fixed
Initial Feature: depth gauges, observer, dashboard
r1 ForgetQueue cardinality leak; dashboard idle-cluster blank; maxInt64 helper
r2 Single readTS per tick — hoisted out of the per-queue loop
r3 Split counter/gauge cardinality budgets (P1: counter budget not one-way)
r4 Overflow ref-count (P2a: _other gauge not cleared on last overflow forget)
r5 Promotion cleanup (P2b: overflow entry not removed on real-label promotion)
r6 Skip-tick on scan failure (P2c: transient error wiped all gauges)
r7 Shared readTS across membership + counter scans (P2d: split MVCC views)
r8 Log loadQueueMetaAt errors symmetrically (logging hygiene, no behavior change)

All findings across all eight review rounds are resolved. The logging hygiene in r8 is the correct, minimal fix. Interface design, ForgetQueue lifecycle, cardinality budget semantics, skip-tick contract, MVCC consistency, and error visibility are all solid.

LGTM.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: fd3a3b040c

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread monitoring/sqs.go
Comment on lines +470 to +474
current := make(map[string]struct{}, len(snaps))
for _, snap := range snaps {
o.metrics.ObserveQueueDepth(snap.Queue, snap.Visible, snap.NotVisible, snap.Delayed)
current[snap.Queue] = struct{}{}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reclaim old depth slots before admitting new queues

observeOnce emits ObserveQueueDepth for the current snapshot before it forgets queues that disappeared from the previous tick, so admitForDepthBudget still sees stale names occupying trackedDepthQueues during admission. When queue churn is high near the 512 cap (e.g., many old queues removed and many new queues added in the same tick), newly active queues are incorrectly collapsed into _other for at least one interval even though capacity exists after the diff, which produces avoidable label loss and inaccurate per-queue dashboards for that tick.

Useful? React with 👍 / 👎.

 r9)

P2 (Codex review): observeOnce emits ObserveQueueDepth for the
current snapshot BEFORE forgetting queues that disappeared from
the previous tick. Under high churn near the 512-queue cap (many
old queues removed and many new queues added in the same tick),
admitForDepthBudget sees the stale names still occupying
trackedDepthQueues and silently collapses brand-new queues onto
_other for at least one interval — even though the same tick
will free their slots a few lines later.

Fix: split observeOnce into two phases under the existing
single-writer contract:

  Phase 1: build the current-name set + ForgetQueue any name in
    lastSeen that's not in current. ForgetQueue is the path that
    frees depth-budget slots in trackedDepthQueues; running it
    first under m.mu (taken inside ForgetQueue) makes the slots
    visible to the second phase's admission.
  Phase 2: emit ObserveQueueDepth for every snap. admitForDepthBudget
    now sees the freed slots and admits brand-new queues under
    their real labels instead of overflow.

End-state semantics are unchanged: after observeOnce returns,
gauges reflect the current tick and lastSeen mirrors the current
name set. Only the intra-call ordering changes — phase 1 runs
strictly before phase 2 instead of interleaving with it. The
two phases serialise on m.mu (ForgetQueue and admitForDepthBudget
both take it), so phase 2's admissions are guaranteed to see
phase 1's reclamations.

Caller audit per the standing semantic-change rule:
  observeOnce: callers are Start (production goroutine loop)
    and ObserveOnce (test entry). Both treat the function as
    'process one tick'; neither observes intermediate state.
    The reorder is invisible to callers — same inputs, same
    end state, same lock discipline. No call-site update needed.

Regression test:
  TestSQSObserver_ObserveOnce_HighChurnReclaimsBeforeAdmit
    saturates the budget with 512 queues in tick 1, replaces
    half of them with brand-new names in tick 2, and asserts:
      - new-0 / new-(half-1) gauges carry their tick-2 values
        under their REAL labels (pre-fix they collapsed to
        _other, returned 0)
      - total series count is exactly sqsMaxTrackedQueues*3 =
        1536, leaving no room for any _other series (pre-fix
        would land at 256 real × 3 + _other × 3 = 771)
    Pre-fix verified failing with the new-0 assertion (got 0,
    want 3); post-fix passes.

Test-side note: the count assertion runs BEFORE any per-queue
spot check that uses WithLabelValues, because WithLabelValues
materialises an absent label combination (creating the gauge
child with default value 0 as a side effect). Asserting '_other
visible == 0' via WithLabelValues would have inflated the count
to 1537. This is a Prometheus client-API quirk that bit me
during the fix and is now documented inline so future test
authors don't trip over it.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented May 6, 2026

@claude review

@claude
Copy link
Copy Markdown

claude Bot commented May 6, 2026

Claude finished @bootjp's task in 2m 36s —— View job


Code Review (r9)

  • Gather context and understand the request
  • Read monitoring/sqs.go (r9 phase-reorder fix)
  • Read monitoring/sqs_test.go (new regression test)
  • Review full diff vs origin/main
  • Compile findings and post review

What r9 fixed (confirmed)

The Codex P2 finding was: observeOnce emitted ObserveQueueDepth for the current snapshot before running ForgetQueue for disappeared queues. Under high churn near the 512-slot cap (many old queues gone, many new queues added in the same tick), admitForDepthBudget saw stale names still occupying trackedDepthQueues during admission, so every newly-active queue was collapsed onto _other for at least one interval — even though the same tick would have freed their slots a few lines later.

Finding Status
Admit-before-forget in observeOnce — stale slots visible to admitForDepthBudget during Phase 2 ✅ Fixed: split into Phase 1 (ForgetQueue all disappeared queues) then Phase 2 (emit ObserveQueueDepth)
Regression test pinning the pre/post-fix shape ✅ Fixed: TestSQSObserver_ObserveOnce_HighChurnReclaimsBeforeAdmit

Correctness assessment of the fix (monitoring/sqs.go:474-497):

The two phases now serialize on SQSMetrics.mu in the correct order:

Phase 1  o.mu held → ForgetQueue(prev) → [takes m.mu, deletes trackedDepthQueues[prev], releases m.mu]
         o.mu.Unlock() — all ForgetQueue calls have fully committed by here
Phase 2  ObserveQueueDepth(snap) → admitForDepthBudget → [takes m.mu, sees freed slot, admits real label, releases m.mu]

Lock ordering is consistent: when both locks are held simultaneously, it is always o.mu → m.mu. Phase 2 does not hold o.mu, so there is no cycle and no deadlock risk. ✓

The lastSeen = current assignment happens inside o.mu at the end of Phase 1 — before Phase 2 starts — so the next tick's diff has the correct baseline regardless of what Phase 2 does. ✓

The step-down path (snaps == [], i.e. follower/empty-source) and the skip-tick path (ok == false) both hit their early-return branches before Phase 1/2, so existing semantics are unchanged. ✓

Test correctness (monitoring/sqs_test.go:693-752):

  • Tick 1: 512 queues (old-0..511.fifo) saturate the budget.
  • Tick 2: 256 old carry over + 256 brand-new (new-0..255.fifo) replace the disappeared half.

Pre-fix: emit ran first → 256 freed slots not yet visible → 256 new queues collapsed to _other (last-write-wins) → new-0.fifo visible == 0 (got from default, not from the _other series value of 3 for the last-scanned queue).
Post-fix: forget ran first → 256 slots freed → 256 new queues admitted under real labels → new-0.fifo visible == 3.

The count assertion (== sqsMaxTrackedQueues*3 == 1536) runs before any WithLabelValues spot-checks — the commit correctly documents why: WithLabelValues materialises a gauge child as a side effect (default value 0), which would have inflated the count to 1537 if _other visible == 0 were asserted via WithLabelValues first. That ordering constraint is documented inline so future test authors don't trip over it. ✓


Standing items (all consciously deferred; no new action required)

All items below were flagged in earlier rounds. None were introduced or worsened by r9.

Item Severity Rounds raised
Start not idempotent — repeated calls each spawn a ticker goroutine. Latent: startSQSDepthObserver never double-calls. sync.Once would harden it Low r3–r8
No per-tick deadline on SnapshotQueueDepths — a stalled catalog scan pins the observer goroutine until shutdown Low r3–r8
TestSQSMetrics_ForgetQueue_OverflowQueueIsNoOp name slightly misleading post-r4 — ForgetQueue on an overflow queue now does mutate overflowDepthQueues; it just leaves the gauge intact while others remain Nit r4–r8
Double nil-guard in main.go (outer if r.sqsServer != nil + inner guard in startSQSDepthObserver) Nit r1–r8

Ten-commit story, complete

Commit What it fixed
Initial Feature: depth gauges, observer, dashboard
r1 ForgetQueue cardinality leak; dashboard idle-cluster blank; maxInt64 helper
r2 Single readTS per tick — hoisted out of the per-queue loop
r3 Split counter/gauge cardinality budgets (P1: counter budget not one-way)
r4 Overflow ref-count (P2a: _other gauge not cleared on last overflow forget)
r5 Promotion cleanup (P2b: overflow entry not removed on real-label promotion)
r6 Skip-tick on scan failure (P2c: transient error wiped all gauges)
r7 Shared readTS across membership + counter scans (P2d: split MVCC views)
r8 Log loadQueueMetaAt errors symmetrically
r9 Reclaim depth slots before admitting (P2e: admit-before-forget ordering)

Every finding across all nine review rounds is resolved. The phase-reorder fix is minimal, correct, and well-documented. Lock ordering is sound, the regression test pins the pre/post-fix behavior precisely, and the Prometheus WithLabelValues materialization caveat is now documented for future contributors.

LGTM.

@bootjp bootjp merged commit cd5bcf6 into main May 7, 2026
10 checks passed
@bootjp bootjp deleted the feat/sqs-prometheus-exporter branch May 7, 2026 08:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant