From 105635eb05002627518ca30412f96f12d3fdfcaf Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sun, 26 Apr 2026 20:40:37 +0900 Subject: [PATCH 01/11] feat(sqs/admin): SigV4-bypass admin entrypoints + SPA queues pages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces PR #659, which conflicted heavily after main moved (PR #649 squashed; PR #658 added S3 admin endpoints; the Approximate counters implementation now lives directly in adapter/sqs_catalog.go). This PR: Backend (adapter/sqs_admin.go + internal/admin/sqs_handler.go): - SQSServer.AdminListQueues / AdminDescribeQueue / AdminDeleteQueue are SigV4-bypass entrypoints, mirroring the AdminListTables / AdminListBuckets pattern. - AdminDescribeQueue uses the existing scanApproxCounters from sqs_catalog.go (already on main) so the admin path returns the same Visible / NotVisible / Delayed numbers as GetQueueAttributes("All") would, taken at one snapshot read TS. - sqsQueuesBridge in main_admin.go re-shapes adapter.AdminQueueSummary into admin.QueueSummary, keeping internal/admin free of the heavy adapter dependency tree — same pattern as dynamoTablesBridge / s3BucketsBridge. - admin.QueuesSource is opt-in; deployments that don't run --sqsAddress leave /admin/api/v1/sqs/* off the wire and the SPA renders a soft "endpoint pending" notice on the 404. - Role re-evaluation against the live RoleStore on DELETE so a downgraded key cannot keep mutating with a still-valid JWT. - apiRouteTable.dispatch refactored: resourceHandlerFor extracted so the dispatcher stays under cyclop=10 as new resources land (Dynamo, S3, SQS, future). Frontend (web/admin/src/pages/SqsList.tsx, SqsDetail.tsx): - /sqs queue list with refresh + per-row link to detail. - /sqs/:name detail showing FIFO badge, counters card (Visible / In-flight / Delayed), raw attributes table, and a Delete confirmation Modal gated by RequireFullAccess. - api/client.ts gains listQueues / describeQueue / deleteQueue with the same AbortSignal pattern used for cluster / dynamo / s3 reads. - Layout nav adds an SQS tab between DynamoDB and S3. Out of scope (recorded in the SQS partial design doc §16.2): - PurgeQueue from the SPA. Underlying purgeQueueWithRetry is on main; the admin entrypoint is a trivial follow-up. - Send / Peek / CreateQueue from the SPA. Each needs its own adapter entrypoint and form UX; deferred to keep this PR focused. Verified with go build ./..., go test -race ./internal/admin/..., go test -race -run TestSQS ./adapter/, go test -run TestStartAdmin ., golangci-lint run ./adapter/... ./internal/admin/... ./... (0 issues, no //nolint), and cd web/admin && npm run build. --- adapter/sqs_admin.go | 151 ++++++++++++++++ internal/admin/server.go | 100 +++++++++-- internal/admin/sqs_handler.go | 270 ++++++++++++++++++++++++++++ main.go | 12 +- main_admin.go | 97 +++++++++- main_sqs.go | 15 +- web/admin/src/App.tsx | 4 + web/admin/src/api/client.ts | 29 +++ web/admin/src/components/Layout.tsx | 1 + web/admin/src/pages/SqsDetail.tsx | 142 +++++++++++++++ web/admin/src/pages/SqsList.tsx | 69 +++++++ 11 files changed, 861 insertions(+), 29 deletions(-) create mode 100644 adapter/sqs_admin.go create mode 100644 internal/admin/sqs_handler.go create mode 100644 web/admin/src/pages/SqsDetail.tsx create mode 100644 web/admin/src/pages/SqsList.tsx diff --git a/adapter/sqs_admin.go b/adapter/sqs_admin.go new file mode 100644 index 000000000..f20ad9641 --- /dev/null +++ b/adapter/sqs_admin.go @@ -0,0 +1,151 @@ +package adapter + +import ( + "context" + "strconv" + "strings" + "time" + + "github.com/cockroachdb/errors" +) + +// AdminQueueSummary is the per-queue projection the admin dashboard +// surfaces. It deliberately covers only the fields the SPA renders so +// the package's wire-format types stay internal. +// +// Counters mirror the AWS Approximate* attribute set produced by +// scanApproxCounters; they are best-effort by AWS contract and stop +// counting once the catalog's per-call cap is reached (the SPA polls +// continuously, so an unbounded scan would pin the leader). +type AdminQueueSummary struct { + Name string + IsFIFO bool + Generation uint64 + CreatedAt time.Time + Attributes map[string]string + Counters AdminQueueCounters +} + +// AdminQueueCounters matches sqsApproxCounters (int64) so the admin +// bridge does not have to convert between widths. Visible / +// NotVisible / Delayed are the AWS Approximate* triple. +type AdminQueueCounters struct { + Visible int64 + NotVisible int64 + Delayed int64 +} + +// AdminListQueues returns every queue name this server knows about, +// in the lexicographic order the queue catalog index produces. Read +// path; runs on follower or leader and uses the same scanQueueNames +// helper the SigV4 ListQueues handler does. +func (s *SQSServer) AdminListQueues(ctx context.Context) ([]string, error) { + return s.scanQueueNames(ctx) //nolint:wrapcheck // pure pass-through; the adapter owns the error context. +} + +// AdminDescribeQueue returns a snapshot of name's metadata plus the +// approximate counters. The triple (result, present, error) lets +// admin callers distinguish a missing queue from a storage error +// without sniffing sentinels. +// +// Like AdminDescribeTable on the Dynamo side, this entrypoint runs +// on either the leader or a follower (read-only); the counter scan +// uses a fresh nextTxnReadTS so the result is consistent with what +// SigV4 GetQueueAttributes would have returned at the same instant. +func (s *SQSServer) AdminDescribeQueue(ctx context.Context, name string) (*AdminQueueSummary, bool, error) { + if strings.TrimSpace(name) == "" { + return nil, false, ErrAdminSQSValidation + } + readTS := s.nextTxnReadTS(ctx) + meta, exists, err := s.loadQueueMetaAt(ctx, name, readTS) + if err != nil { + return nil, false, errors.WithStack(err) + } + if !exists { + return nil, false, nil + } + counters, err := s.scanApproxCounters(ctx, name, meta.Generation, readTS) + if err != nil { + return nil, false, err + } + summary := &AdminQueueSummary{ + Name: name, + IsFIFO: meta.IsFIFO, + Generation: meta.Generation, + CreatedAt: hlcToTime(meta.CreatedAtHLC), + Attributes: metaAttributesForAdmin(meta), + Counters: AdminQueueCounters(counters), + } + return summary, true, nil +} + +// AdminDeleteQueue is the SigV4-bypass counterpart to deleteQueue. +// Returns the same sentinel errors as AdminCreateTable on the Dynamo +// side: ErrAdminForbidden on a read-only principal, ErrAdminNotLeader +// on a follower, ErrAdminSQSNotFound when the queue is absent. +func (s *SQSServer) AdminDeleteQueue(ctx context.Context, principal AdminPrincipal, name string) error { + if !principal.Role.canWrite() { + return ErrAdminForbidden + } + if !isVerifiedSQSLeader(s.coordinator) { + return ErrAdminNotLeader + } + if strings.TrimSpace(name) == "" { + return ErrAdminSQSValidation + } + if err := s.deleteQueueWithRetry(ctx, name); err != nil { + // deleteQueueWithRetry returns sqsAPIError with + // sqsErrQueueDoesNotExist when the queue is missing; map + // to the structured ErrAdminSQSNotFound so the admin + // handler can render 404 without sniffing the AWS code. + if isSQSAdminQueueDoesNotExist(err) { + return ErrAdminSQSNotFound + } + return errors.Wrap(err, "admin delete queue") + } + return nil +} + +// metaAttributesForAdmin renders the queue meta into the same shape +// queueMetaToAttributes("All") would, minus the counters (the admin +// summary surfaces them as a typed struct alongside, not as strings). +// Kept as a small dedicated helper so the SigV4 path's selection +// machinery stays untouched. +func metaAttributesForAdmin(meta *sqsQueueMeta) map[string]string { + out := map[string]string{ + "VisibilityTimeout": strconv.FormatInt(meta.VisibilityTimeoutSeconds, 10), + "MessageRetentionPeriod": strconv.FormatInt(meta.MessageRetentionSeconds, 10), + "DelaySeconds": strconv.FormatInt(meta.DelaySeconds, 10), + "ReceiveMessageWaitTimeSeconds": strconv.FormatInt(meta.ReceiveMessageWaitSeconds, 10), + "MaximumMessageSize": strconv.FormatInt(meta.MaximumMessageSize, 10), + "FifoQueue": strconv.FormatBool(meta.IsFIFO), + "ContentBasedDeduplication": strconv.FormatBool(meta.ContentBasedDedup), + } + if meta.RedrivePolicy != "" { + out["RedrivePolicy"] = meta.RedrivePolicy + } + return out +} + +// ErrAdminSQSValidation is returned when an admin entrypoint receives +// a request with a missing or syntactically-bad queue name. Maps to +// 400 in the admin HTTP handler. +var ErrAdminSQSValidation = errors.New("sqs admin: invalid queue name") + +// ErrAdminSQSNotFound is returned by write entrypoints when the +// target queue does not exist. Maps to 404. The describe path uses +// the (nil, false, nil) tuple instead of this sentinel for the +// not-found signal, mirroring AdminDescribeTable. +var ErrAdminSQSNotFound = errors.New("sqs admin: queue not found") + +// isSQSAdminQueueDoesNotExist matches the deleteQueueWithRetry path's +// "queue does not exist" sqsAPIError so AdminDeleteQueue can normalise +// it to ErrAdminSQSNotFound. Falls through to false on any unrelated +// error, which AdminDeleteQueue then wraps and propagates. +func isSQSAdminQueueDoesNotExist(err error) bool { + var apiErr *sqsAPIError + if !errors.As(err, &apiErr) || apiErr == nil { + return false + } + return apiErr.errorType == sqsErrQueueDoesNotExist +} diff --git a/internal/admin/server.go b/internal/admin/server.go index 652a9fd84..c1e1587f7 100644 --- a/internal/admin/server.go +++ b/internal/admin/server.go @@ -61,6 +61,14 @@ type ServerDeps struct { // off" state instead of an empty matrix. KeyViz KeyVizSource + // Queues is the SQS admin source — covers list, describe, and + // delete via QueuesSource. Optional: a nil value disables + // /admin/api/v1/sqs/queues{,/{name}} (the mux answers them + // with 404). Same opt-in shape as Tables / Buckets; deployments + // that don't run the SQS adapter omit this without breaking the + // rest of the admin surface. + Queues QueuesSource + // StaticFS is the embed.FS (or any fs.FS) backing the SPA. May be // nil during early development; the router renders 404 for // /admin/assets/* and the SPA fallback in that case. @@ -112,7 +120,8 @@ func NewServer(deps ServerDeps) (*Server, error) { // nil it serves a 503 keyviz_disabled, which the SPA renders as // a clearer "feature off" state than an unknown_endpoint 404. keyviz := NewKeyVizHandler(deps.KeyViz).WithLogger(logger) - mux := buildAPIMux(auth, deps.Verifier, cluster, dynamo, s3, keyviz, logger) + sqs := buildSqsHandlerForDeps(deps, logger) + mux := buildAPIMux(auth, deps.Verifier, cluster, dynamo, s3, keyviz, sqs, logger) router := NewRouter(mux, deps.StaticFS) return &Server{deps: deps, router: router, auth: auth, mux: mux}, nil } @@ -177,6 +186,20 @@ func buildS3HandlerForDeps(deps ServerDeps, logger *slog.Logger) http.Handler { return NewS3Handler(deps.Buckets).WithLogger(logger) } +// buildSqsHandlerForDeps is the parallel constructor for the SQS +// admin handler. Read paths are open to any session; the DELETE +// path re-evaluates the principal's role against the live MapRoleStore +// on every request, so a downgraded key cannot keep mutating with a +// still-valid JWT. +func buildSqsHandlerForDeps(deps ServerDeps, logger *slog.Logger) http.Handler { + if deps.Queues == nil { + return nil + } + return NewSqsHandler(deps.Queues). + WithLogger(logger). + WithRoleStore(MapRoleStore(deps.Roles)) +} + // Handler returns an http.Handler that serves the full admin surface. // We wrap the router in BodyLimit at the top level so every endpoint // — including /admin/healthz and the static asset / SPA paths — is @@ -215,14 +238,14 @@ func (s *Server) APIHandler() http.Handler { // audit path inside AuthService because the generic Audit middleware // cannot see the claimed actor at that point in the chain. // -// dynamoHandler / s3Handler may be nil; in that case the corresponding -// paths fall through to the unknown-endpoint 404, matching the -// behaviour of any other unregistered admin path. +// dynamoHandler / s3Handler / sqsHandler may be nil; in that case +// the corresponding paths fall through to the unknown-endpoint 404, +// matching the behaviour of any other unregistered admin path. // // keyvizHandler is always non-nil even when the sampler is disabled — // it serves 503 keyviz_disabled itself so the SPA gets a clearer // signal than an unknown_endpoint 404 from the catch-all. -func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler, dynamoHandler, s3Handler, keyvizHandler http.Handler, logger *slog.Logger) http.Handler { +func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler, dynamoHandler, s3Handler, keyvizHandler, sqsHandler http.Handler, logger *slog.Logger) http.Handler { loginHandler := http.HandlerFunc(auth.HandleLogin) logoutHandler := http.HandlerFunc(auth.HandleLogout) @@ -290,6 +313,14 @@ func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler, dynamoHa if s3Handler != nil { s3Chain = protect(s3Handler) } + // SQS endpoints share the same protect chain rationale: GET + // reads are session-gated to keep cross-site fetches from + // enumerating queue names; DELETE goes through CSRF + the + // in-handler RoleFull check inside SqsHandler. + var sqsChain http.Handler + if sqsHandler != nil { + sqsChain = protect(sqsHandler) + } routes := apiRouteTable{ login: loginChain, @@ -298,6 +329,7 @@ func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler, dynamoHa dynamo: dynamoChain, s3: s3Chain, keyviz: keyvizChain, + sqs: sqsChain, } return http.HandlerFunc(routes.dispatch) } @@ -309,29 +341,55 @@ func buildAPIMux(auth *AuthService, verifier *Verifier, clusterHandler, dynamoHa // would otherwise push buildAPIMux's branch count past the limit. type apiRouteTable struct { login, logout, cluster http.Handler - dynamo, s3 http.Handler + dynamo, s3, sqs http.Handler keyviz http.Handler } // dispatch is the receiver method httpHandlerFunc adapts. Logic is -// the same path-prefix switch the call site previously inlined. +// the same path-prefix switch the call site previously inlined; the +// resource-prefix half of it lives in resourceHandlerFor so this +// function stays under the cyclop ceiling as new resources land. func (t apiRouteTable) dispatch(w http.ResponseWriter, r *http.Request) { - switch { - case r.URL.Path == "/admin/api/v1/auth/login": + switch r.URL.Path { + case "/admin/api/v1/auth/login": t.login.ServeHTTP(w, r) - case r.URL.Path == "/admin/api/v1/auth/logout": + return + case "/admin/api/v1/auth/logout": t.logout.ServeHTTP(w, r) - case r.URL.Path == "/admin/api/v1/cluster": + return + case "/admin/api/v1/cluster": t.cluster.ServeHTTP(w, r) - case r.URL.Path == "/admin/api/v1/keyviz/matrix": - t.keyviz.ServeHTTP(w, r) - case t.dynamo != nil && isDynamoPath(r.URL.Path): - t.dynamo.ServeHTTP(w, r) - case t.s3 != nil && isS3Path(r.URL.Path): - t.s3.ServeHTTP(w, r) + return + } + if h := t.resourceHandlerFor(r.URL.Path); h != nil { + h.ServeHTTP(w, r) + return + } + writeJSONError(w, http.StatusNotFound, "unknown_endpoint", + "no admin API handler is registered for this path") +} + +// resourceHandlerFor returns the handler that owns the URL path's +// resource family, or nil when no resource matches. Pulled out of +// dispatch so dispatch stays under cyclop=10 even as new admin +// resources (Dynamo, S3, SQS, KeyViz, future) get added. +// +// KeyViz is *always* registered (the constructor wires a non-nil +// handler that itself emits 503 keyviz_disabled when the underlying +// sampler is nil), so the switch matches against an exact path +// equality and never against a nil receiver. +func (t apiRouteTable) resourceHandlerFor(path string) http.Handler { + switch { + case t.keyviz != nil && path == "/admin/api/v1/keyviz/matrix": + return t.keyviz + case t.dynamo != nil && isDynamoPath(path): + return t.dynamo + case t.s3 != nil && isS3Path(path): + return t.s3 + case t.sqs != nil && isSqsPath(path): + return t.sqs default: - writeJSONError(w, http.StatusNotFound, "unknown_endpoint", - "no admin API handler is registered for this path") + return nil } } @@ -343,6 +401,10 @@ func isS3Path(p string) bool { return p == pathS3Buckets || strings.HasPrefix(p, pathPrefixS3Buckets) } +func isSqsPath(p string) bool { + return p == pathSqsQueues || strings.HasPrefix(p, pathPrefixSqsQueues) +} + func errMissing(field string) error { return &missingDepError{field: field} } diff --git a/internal/admin/sqs_handler.go b/internal/admin/sqs_handler.go new file mode 100644 index 000000000..081a482b1 --- /dev/null +++ b/internal/admin/sqs_handler.go @@ -0,0 +1,270 @@ +package admin + +import ( + "context" + "errors" + "log/slog" + "net/http" + "strconv" + "strings" + "time" + + "github.com/goccy/go-json" +) + +// pathSqsQueues is the URL prefix the SQS handler owns. The "" suffix +// produces the collection root /admin/api/v1/sqs/queues; the +// pathPrefixSqsQueues form is used for the per-queue routes. +const ( + pathSqsQueues = "/admin/api/v1/sqs/queues" + pathPrefixSqsQueues = pathSqsQueues + "/" +) + +// QueueSummary is the SPA-facing projection of a single SQS queue. +// Mirrors adapter.AdminQueueSummary 1:1; the bridge in main_admin.go +// translates between the two so the admin package stays free of the +// adapter dependency tree. +type QueueSummary struct { + Name string `json:"name"` + IsFIFO bool `json:"is_fifo"` + Generation uint64 `json:"generation"` + CreatedAt time.Time `json:"created_at,omitempty"` + Attributes map[string]string `json:"attributes,omitempty"` + Counters QueueCounters `json:"counters"` + CountersTruncated bool `json:"counters_truncated,omitempty"` +} + +// QueueCounters mirrors the three Approximate* counters AWS exposes +// on GetQueueAttributes. Definitions follow §16.1 of the SQS design +// doc. +type QueueCounters struct { + Visible int64 `json:"visible"` + NotVisible int64 `json:"not_visible"` + Delayed int64 `json:"delayed"` +} + +// QueuesSource is the contract the SQS handler depends on. Wired in +// production to *adapter.SQSServer via a small bridge in main_admin.go; +// tests use a stub. +// +// AdminDescribeQueue returns (nil, false, nil) for a missing queue so +// callers can distinguish "not found" from a storage error without +// sniffing sentinels. AdminDeleteQueue returns the structured +// sentinels below so the handler can map them to HTTP statuses +// without leaking the adapter's error vocabulary. +type QueuesSource interface { + AdminListQueues(ctx context.Context) ([]string, error) + AdminDescribeQueue(ctx context.Context, name string) (*QueueSummary, bool, error) + AdminDeleteQueue(ctx context.Context, principal AuthPrincipal, name string) error +} + +// Errors the QueuesSource may return for the handler to map onto a +// specific HTTP response. Sentinels rather than typed errors so the +// bridge can map any adapter-internal failure onto exactly one of +// these without the admin package importing adapter-private types. +var ( + // ErrQueuesForbidden — principal lacks the role required (403). + ErrQueuesForbidden = errors.New("admin sqs: principal lacks required role") + // ErrQueuesNotLeader — local node is not the verified Raft + // leader. Without follower-forwarding wired (out of scope for + // the SPA's read+delete surface), maps to 503 + Retry-After: 1. + ErrQueuesNotLeader = errors.New("admin sqs: local node is not the raft leader") + // ErrQueuesNotFound — DELETE / DESCRIBE targets a queue that + // does not exist (404). The describe path uses (nil, false, nil) + // instead of this sentinel for the not-found signal. + ErrQueuesNotFound = errors.New("admin sqs: queue not found") + // ErrQueuesValidation — request shape is bad (400). + ErrQueuesValidation = errors.New("admin sqs: validation failed") +) + +// SqsHandler serves /admin/api/v1/sqs/queues and +// /admin/api/v1/sqs/queues/{name}. Reads (list, describe) accept GET; +// delete accepts DELETE and goes through the same protected +// middleware chain (BodyLimit -> SessionAuth -> Audit -> CSRF) as +// every other write surface, with an in-handler RoleFull gate so a +// read-only key cannot delete even with a valid CSRF token. +type SqsHandler struct { + source QueuesSource + roles RoleStore + logger *slog.Logger +} + +// NewSqsHandler binds the source and seeds logging with +// slog.Default(). Use WithLogger to attach a tagged logger and +// WithRoleStore to plug in the live access-key role lookup so a +// downgraded key cannot continue mutating with a still-valid JWT. +func NewSqsHandler(source QueuesSource) *SqsHandler { + return &SqsHandler{source: source, logger: slog.Default()} +} + +// WithLogger overrides the default slog destination. No-ops on nil to +// preserve the constructor-seeded slog.Default(). +func (h *SqsHandler) WithLogger(l *slog.Logger) *SqsHandler { + if l == nil { + return h + } + h.logger = l + return h +} + +// WithRoleStore enables per-request role revalidation on the delete +// endpoint. Without it, the handler trusts whatever role is embedded +// in the session JWT — which is fine for single-tenant deployments +// where the role config never changes, but problematic when an +// operator revokes or downgrades a key. Production wiring in +// main_admin.go always sets this. +func (h *SqsHandler) WithRoleStore(r RoleStore) *SqsHandler { + h.roles = r + return h +} + +// ServeHTTP routes /queues and /queues/{name}. Method handling +// mirrors DynamoHandler — keep the two parallel so an operator +// reading one understands the other for free. +func (h *SqsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + switch { + case r.URL.Path == pathSqsQueues: + switch r.Method { + case http.MethodGet: + h.handleList(w, r) + default: + writeJSONError(w, http.StatusMethodNotAllowed, "method_not_allowed", "only GET") + } + case strings.HasPrefix(r.URL.Path, pathPrefixSqsQueues): + name := strings.TrimPrefix(r.URL.Path, pathPrefixSqsQueues) + switch r.Method { + case http.MethodGet: + h.handleDescribe(w, r, name) + case http.MethodDelete: + h.handleDelete(w, r, name) + default: + writeJSONError(w, http.StatusMethodNotAllowed, "method_not_allowed", "only GET or DELETE") + } + default: + writeJSONError(w, http.StatusNotFound, "unknown_endpoint", + "no admin SQS handler is registered for this path") + } +} + +type listQueuesResponse struct { + Queues []string `json:"queues"` +} + +func (h *SqsHandler) handleList(w http.ResponseWriter, r *http.Request) { + names, err := h.source.AdminListQueues(r.Context()) + if err != nil { + h.logger.LogAttrs(r.Context(), slog.LevelError, "admin sqs list queues failed", + slog.String("error", err.Error()), + ) + writeJSONError(w, http.StatusInternalServerError, "list_failed", + "failed to list queues; see server logs") + return + } + w.Header().Set("Content-Type", "application/json; charset=utf-8") + w.Header().Set("Cache-Control", "no-store") + w.WriteHeader(http.StatusOK) + if err := json.NewEncoder(w).Encode(listQueuesResponse{Queues: names}); err != nil { + h.logger.LogAttrs(r.Context(), slog.LevelWarn, "admin sqs list response encode failed", + slog.String("error", err.Error()), + ) + } +} + +func (h *SqsHandler) handleDescribe(w http.ResponseWriter, r *http.Request, name string) { + if strings.TrimSpace(name) == "" { + writeJSONError(w, http.StatusBadRequest, "invalid_queue_name", "queue name is required") + return + } + summary, exists, err := h.source.AdminDescribeQueue(r.Context(), name) + if err != nil { + writeQueuesError(w, err, h.logger, r) + return + } + if !exists { + writeJSONError(w, http.StatusNotFound, "queue_not_found", + "no queue is registered with that name") + return + } + w.Header().Set("Content-Type", "application/json; charset=utf-8") + w.Header().Set("Cache-Control", "no-store") + w.WriteHeader(http.StatusOK) + if err := json.NewEncoder(w).Encode(summary); err != nil { + h.logger.LogAttrs(r.Context(), slog.LevelWarn, "admin sqs describe response encode failed", + slog.String("error", err.Error()), + ) + } +} + +func (h *SqsHandler) handleDelete(w http.ResponseWriter, r *http.Request, name string) { + principal, ok := PrincipalFromContext(r.Context()) + if !ok { + // SessionAuth runs before this handler, so a missing + // principal is a wiring bug. 500 rather than 401 since + // 401 would be misleading — the request was authenticated. + writeJSONError(w, http.StatusInternalServerError, "internal", "missing session principal") + return + } + // Re-evaluate the role against the live store so a downgraded + // key cannot keep deleting with a still-valid JWT. The check is + // before the leader check so a forbidden read-only caller never + // learns the leader's identity by indirection. + if !h.principalCanWrite(principal) { + writeJSONError(w, http.StatusForbidden, "forbidden", + "this access key is not authorised to delete queues") + return + } + if strings.TrimSpace(name) == "" { + writeJSONError(w, http.StatusBadRequest, "invalid_queue_name", "queue name is required") + return + } + if err := h.source.AdminDeleteQueue(r.Context(), principal, name); err != nil { + writeQueuesError(w, err, h.logger, r) + return + } + w.Header().Set("Cache-Control", "no-store") + w.WriteHeader(http.StatusNoContent) +} + +// principalCanWrite re-resolves the access key against the live +// RoleStore (when configured) so a downgrade or revoke applies to +// the next request, not just to new logins. Falls back to the JWT's +// embedded role when no role store is wired (single-tenant default). +func (h *SqsHandler) principalCanWrite(p AuthPrincipal) bool { + role := p.Role + if h.roles != nil { + if live, ok := h.roles.LookupRole(p.AccessKey); ok { + role = live + } else { + // Key has been removed from the role config since + // login. Treat it as no-access regardless of what + // the JWT claimed. + return false + } + } + return role.AllowsWrite() +} + +// writeQueuesError translates a QueuesSource error onto an HTTP +// response. Unrecognised errors map to 500 with a sanitised message +// — the raw err.Error() may include adapter internals (Pebble paths, +// raft peer ids) that should not flow to the SPA. +func writeQueuesError(w http.ResponseWriter, err error, logger *slog.Logger, r *http.Request) { + switch { + case errors.Is(err, ErrQueuesForbidden): + writeJSONError(w, http.StatusForbidden, "forbidden", "principal lacks required role") + case errors.Is(err, ErrQueuesNotLeader): + w.Header().Set("Retry-After", strconv.Itoa(1)) + writeJSONError(w, http.StatusServiceUnavailable, "leader_unavailable", + "local node is not the leader; retry shortly") + case errors.Is(err, ErrQueuesNotFound): + writeJSONError(w, http.StatusNotFound, "queue_not_found", "no queue with that name") + case errors.Is(err, ErrQueuesValidation): + writeJSONError(w, http.StatusBadRequest, "invalid_request", err.Error()) + default: + logger.LogAttrs(r.Context(), slog.LevelError, "admin sqs operation failed", + slog.String("error", err.Error()), + ) + writeJSONError(w, http.StatusInternalServerError, "internal", + "queue operation failed; see server logs") + } +} diff --git a/main.go b/main.go index 05176eff0..02f09f7fe 100644 --- a/main.go +++ b/main.go @@ -764,7 +764,7 @@ func startServers(in serversInput) error { // the handler hands ErrTablesNotLeader writes to the forwarder // which dials the leader over the cached gRPC pool. Without these // the handler falls back to 503 + Retry-After:1. - if err := startAdminFromFlags(in.ctx, in.lc, in.eg, in.runtimes, runner.dynamoServer, runner.s3Server, in.coordinate, connCache, in.keyvizSampler); err != nil { + if err := startAdminFromFlags(in.ctx, in.lc, in.eg, in.runtimes, runner.dynamoServer, runner.s3Server, runner.sqsServer, in.coordinate, connCache, in.keyvizSampler); err != nil { return waitErrgroupAfterStartupFailure(in.cancel, in.eg, err) } return nil @@ -1309,6 +1309,12 @@ type runtimeServerRunner struct { // 404, mirroring the dynamoServer == nil contract. s3Server *adapter.S3Server + // sqsServer plays the same role for the SQS admin entrypoints + // (adapter/sqs_admin.go). Nil when --sqsAddress is empty; the + // admin listener then leaves /admin/api/v1/sqs/* off the wire + // (the mux 404s those paths). + sqsServer *adapter.SQSServer + // roleStore is the access-key → role index the leader-side // gRPC AdminForward service uses to re-validate the principal // on every forwarded write. Mirrors what admin.Config.RoleIndex @@ -1362,9 +1368,11 @@ func (r *runtimeServerRunner) start() error { return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) } r.s3Server = s3Server - if err := startSQSServer(r.ctx, r.lc, r.eg, r.sqsAddress, r.shardStore, r.coordinate, r.leaderSQS, r.sqsRegion, r.sqsCredsFile); err != nil { + sqsServer, err := startSQSServer(r.ctx, r.lc, r.eg, r.sqsAddress, r.shardStore, r.coordinate, r.leaderSQS, r.sqsRegion, r.sqsCredsFile) + if err != nil { return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) } + r.sqsServer = sqsServer if err := startMetricsServer(r.ctx, r.lc, r.eg, r.metricsAddress, r.metricsToken, r.metricsRegistry.Handler()); err != nil { return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) } diff --git a/main_admin.go b/main_admin.go index 75d37c374..7c5a24a04 100644 --- a/main_admin.go +++ b/main_admin.go @@ -76,6 +76,7 @@ func startAdminFromFlags( runtimes []*raftGroupRuntime, dynamoServer *adapter.DynamoDBServer, s3Server *adapter.S3Server, + sqsServer *adapter.SQSServer, coordinate kv.Coordinator, connCache *kv.GRPCConnCache, keyvizSampler *keyviz.MemSampler, @@ -121,14 +122,102 @@ func startAdminFromFlags( clusterSrc := newClusterInfoSource(*raftId, buildVersion(), runtimes) tablesSrc := newDynamoTablesSource(dynamoServer) bucketsSrc := newBucketsSource(s3Server) + queuesSrc := newSqsQueuesSource(sqsServer) forwarder, err := buildAdminLeaderForwarder(coordinate, connCache, *raftId) if err != nil { return errors.Wrap(err, "build admin leader forwarder") } - _, err = startAdminServer(ctx, lc, eg, cfg, staticCreds, clusterSrc, tablesSrc, bucketsSrc, forwarder, keyvizSampler, buildVersion()) + _, err = startAdminServer(ctx, lc, eg, cfg, staticCreds, clusterSrc, tablesSrc, bucketsSrc, queuesSrc, forwarder, keyvizSampler, buildVersion()) return err } +// newSqsQueuesSource adapts *adapter.SQSServer to admin.QueuesSource. +// Same architectural reasoning as newDynamoTablesSource and +// newBucketsSource: the bridge stays in this file (rather than +// internal/admin) so the admin package stays free of the heavy +// adapter-package dependency tree. Returns nil when sqsServer is nil +// so admin.NewServer leaves /admin/api/v1/sqs/* off the wire. +func newSqsQueuesSource(sqsServer *adapter.SQSServer) admin.QueuesSource { + if sqsServer == nil { + return nil + } + return &sqsQueuesBridge{server: sqsServer} +} + +// sqsQueuesBridge re-shapes adapter.AdminQueueSummary into +// admin.QueueSummary. The two structs are deliberately isomorphic so +// this translation does no allocation more than necessary; if a +// future field is added on one side, the build breaks here, which +// is the drift signal we want. +type sqsQueuesBridge struct { + server *adapter.SQSServer +} + +func (b *sqsQueuesBridge) AdminListQueues(ctx context.Context) ([]string, error) { + return b.server.AdminListQueues(ctx) //nolint:wrapcheck // pure pass-through; adapter owns the error context. +} + +func (b *sqsQueuesBridge) AdminDescribeQueue(ctx context.Context, name string) (*admin.QueueSummary, bool, error) { + summary, exists, err := b.server.AdminDescribeQueue(ctx, name) + if err != nil { + return nil, false, translateAdminQueuesError(err) + } + if !exists { + return nil, false, nil + } + return convertAdminQueueSummary(summary), true, nil +} + +func (b *sqsQueuesBridge) AdminDeleteQueue(ctx context.Context, principal admin.AuthPrincipal, name string) error { + if err := b.server.AdminDeleteQueue(ctx, convertAdminPrincipal(principal), name); err != nil { + return translateAdminQueuesError(err) + } + return nil +} + +// convertAdminQueueSummary mirrors adapter.AdminQueueSummary into +// admin.QueueSummary. Counter fields are int64 on both sides; if +// either side grows a field, this function should be extended in the +// same commit so a compile error catches the drift. +func convertAdminQueueSummary(in *adapter.AdminQueueSummary) *admin.QueueSummary { + if in == nil { + return nil + } + return &admin.QueueSummary{ + Name: in.Name, + IsFIFO: in.IsFIFO, + Generation: in.Generation, + CreatedAt: in.CreatedAt, + Attributes: in.Attributes, + Counters: admin.QueueCounters{ + Visible: in.Counters.Visible, + NotVisible: in.Counters.NotVisible, + Delayed: in.Counters.Delayed, + }, + } +} + +// translateAdminQueuesError maps the adapter's SQS error vocabulary +// onto the admin-package sentinels the SQS handler matches against. +// Anything not recognised is forwarded as-is and answered with 500 +// + a sanitised body, matching the dynamo / s3 bridges' behaviour. +func translateAdminQueuesError(err error) error { + switch { + case err == nil: + return nil + case errors.Is(err, adapter.ErrAdminForbidden): + return admin.ErrQueuesForbidden + case errors.Is(err, adapter.ErrAdminNotLeader): + return admin.ErrQueuesNotLeader + case errors.Is(err, adapter.ErrAdminSQSNotFound): + return admin.ErrQueuesNotFound + case errors.Is(err, adapter.ErrAdminSQSValidation): + return admin.ErrQueuesValidation + default: + return err + } +} + // buildAdminLeaderForwarder constructs the production LeaderForwarder // for the dynamo HTTP handler when the wiring is complete enough to // reach a remote leader. The bridge tolerates a nil connCache (and a @@ -450,6 +539,7 @@ func startAdminServer( cluster admin.ClusterInfoSource, tables admin.TablesSource, buckets admin.BucketsSource, + queues admin.QueuesSource, forwarder admin.LeaderForwarder, keyvizSampler *keyviz.MemSampler, version string, @@ -459,7 +549,7 @@ func startAdminServer( if err != nil || !enabled { return "", err } - server, err := buildAdminHTTPServer(&adminCfg, creds, cluster, tables, buckets, forwarder, keyvizSampler) + server, err := buildAdminHTTPServer(&adminCfg, creds, cluster, tables, buckets, queues, forwarder, keyvizSampler) if err != nil { return "", err } @@ -499,7 +589,7 @@ func checkAdminConfig(adminCfg *admin.Config, cluster admin.ClusterInfoSource) ( return true, nil } -func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, cluster admin.ClusterInfoSource, tables admin.TablesSource, buckets admin.BucketsSource, forwarder admin.LeaderForwarder, keyvizSampler *keyviz.MemSampler) (*admin.Server, error) { +func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, cluster admin.ClusterInfoSource, tables admin.TablesSource, buckets admin.BucketsSource, queues admin.QueuesSource, forwarder admin.LeaderForwarder, keyvizSampler *keyviz.MemSampler) (*admin.Server, error) { primaryKeys, err := adminCfg.DecodedSigningKeys() if err != nil { return nil, errors.Wrap(err, "decode admin signing keys") @@ -524,6 +614,7 @@ func buildAdminHTTPServer(adminCfg *admin.Config, creds map[string]string, clust ClusterInfo: cluster, Tables: tables, Buckets: buckets, + Queues: queues, Forwarder: forwarder, KeyViz: keyvizSourceFromSampler(keyvizSampler), StaticFS: staticFS, diff --git a/main_sqs.go b/main_sqs.go index 55ca41684..7bb8623e0 100644 --- a/main_sqs.go +++ b/main_sqs.go @@ -11,6 +11,11 @@ import ( "golang.org/x/sync/errgroup" ) +// startSQSServer stands up the SQS adapter on sqsAddr and returns the +// running *adapter.SQSServer so the admin listener can call SigV4-bypass +// admin entrypoints against it (see adapter/sqs_admin.go). Returns +// (nil, nil) when sqsAddr is empty — that is the "SQS disabled" branch +// and the admin listener leaves /admin/api/v1/sqs/* off the wire. func startSQSServer( ctx context.Context, lc *net.ListenConfig, @@ -21,19 +26,19 @@ func startSQSServer( leaderSQS map[string]string, region string, credentialsFile string, -) error { +) (*adapter.SQSServer, error) { sqsAddr = strings.TrimSpace(sqsAddr) if sqsAddr == "" { - return nil + return nil, nil } sqsL, err := lc.Listen(ctx, "tcp", sqsAddr) if err != nil { - return errors.Wrapf(err, "failed to listen on %s", sqsAddr) + return nil, errors.Wrapf(err, "failed to listen on %s", sqsAddr) } staticCreds, err := loadSigV4StaticCredentialsFile(credentialsFile, "sqs") if err != nil { _ = sqsL.Close() - return err + return nil, err } sqsServer := adapter.NewSQSServer( sqsL, @@ -63,5 +68,5 @@ func startSQSServer( } return errors.WithStack(err) }) - return nil + return sqsServer, nil } diff --git a/web/admin/src/App.tsx b/web/admin/src/App.tsx index 90272b146..08311dfcf 100644 --- a/web/admin/src/App.tsx +++ b/web/admin/src/App.tsx @@ -9,6 +9,8 @@ import { LoginPage } from "./pages/Login"; import { NotFoundPage } from "./pages/NotFound"; import { S3DetailPage } from "./pages/S3Detail"; import { S3ListPage } from "./pages/S3List"; +import { SqsDetailPage } from "./pages/SqsDetail"; +import { SqsListPage } from "./pages/SqsList"; export function App() { return ( @@ -25,6 +27,8 @@ export function App() { } /> } /> } /> + } /> + } /> } /> } /> } /> diff --git a/web/admin/src/api/client.ts b/web/admin/src/api/client.ts index c3ec92cdf..ee1dcea40 100644 --- a/web/admin/src/api/client.ts +++ b/web/admin/src/api/client.ts @@ -193,6 +193,29 @@ export interface CreateBucketRequest { acl?: "private" | "public-read"; } +// SQS queue admin DTOs (Section 16.2 of the SQS partial design doc). +// `attributes` mirrors the AWS GetQueueAttributes "All" set with +// snake_case keys; `counters` is the typed projection of the three +// Approximate* counters added in Phase 3.A. +export interface SqsQueueCounters { + visible: number; + not_visible: number; + delayed: number; +} + +export interface SqsQueueSummary { + name: string; + is_fifo: boolean; + generation: number; + created_at?: string; + attributes?: Record; + counters: SqsQueueCounters; +} + +export interface SqsQueueList { + queues: string[]; +} + export const api = { login: (access_key: string, secret_key: string) => apiFetch("/auth/login", { @@ -223,4 +246,10 @@ export const api = { }), deleteBucket: (name: string) => apiFetch(`/s3/buckets/${encodeURIComponent(name)}`, { method: "DELETE" }), + listQueues: (signal?: AbortSignal) => + apiFetch("/sqs/queues", { signal }), + describeQueue: (name: string, signal?: AbortSignal) => + apiFetch(`/sqs/queues/${encodeURIComponent(name)}`, { signal }), + deleteQueue: (name: string) => + apiFetch(`/sqs/queues/${encodeURIComponent(name)}`, { method: "DELETE" }), }; diff --git a/web/admin/src/components/Layout.tsx b/web/admin/src/components/Layout.tsx index 80e6641da..26620cdb8 100644 --- a/web/admin/src/components/Layout.tsx +++ b/web/admin/src/components/Layout.tsx @@ -4,6 +4,7 @@ import { useAuth } from "../auth"; const navItems: { to: string; label: string; end?: boolean }[] = [ { to: "/", label: "Overview", end: true }, { to: "/dynamo", label: "DynamoDB" }, + { to: "/sqs", label: "SQS" }, { to: "/s3", label: "S3" }, ]; diff --git a/web/admin/src/pages/SqsDetail.tsx b/web/admin/src/pages/SqsDetail.tsx new file mode 100644 index 000000000..536a011e6 --- /dev/null +++ b/web/admin/src/pages/SqsDetail.tsx @@ -0,0 +1,142 @@ +import { useState } from "react"; +import { Link, useNavigate, useParams } from "react-router-dom"; +import { api } from "../api/client"; +import { useAuth } from "../auth"; +import { Modal } from "../components/Modal"; +import { formatApiError, useApiQuery } from "../lib/useApi"; + +export function SqsDetailPage() { + const { name = "" } = useParams<{ name: string }>(); + const { session } = useAuth(); + const detail = useApiQuery((signal) => api.describeQueue(name, signal), [name]); + const [confirmDelete, setConfirmDelete] = useState(false); + const [deleting, setDeleting] = useState(false); + const [deleteError, setDeleteError] = useState(null); + const navigate = useNavigate(); + const writeAllowed = session?.role === "full"; + + const onDelete = async () => { + setDeleting(true); + setDeleteError(null); + try { + await api.deleteQueue(name); + navigate("/sqs", { replace: true }); + } catch (err) { + setDeleteError(formatApiError(err)); + setDeleting(false); + } + }; + + return ( +
+
+ ← All queues +

{name}

+ {detail.data && ( + + {detail.data.is_fifo ? "FIFO" : "Standard"} + + )} + {writeAllowed && detail.data && ( + + )} +
+ +
+ {detail.loading &&
Loading…
} + {detail.error?.status === 404 && ( +
+ Either the queue does not exist or the SQS admin endpoints are not + wired (no --sqsAddress). +
+ )} + {detail.error && detail.error.status !== 404 && ( +
{formatApiError(detail.error)}
+ )} + {detail.data && ( +
+
Generation
+
{detail.data.generation}
+
Created
+
+ {detail.data.created_at ? new Date(detail.data.created_at).toLocaleString() : "—"} +
+
+ )} +
+ + {detail.data && ( +
+
+

Approximate message counts

+
+
+ + + +
+
+ )} + + {detail.data?.attributes && Object.keys(detail.data.attributes).length > 0 && ( +
+

Configuration

+
+ {Object.entries(detail.data.attributes).map(([k, v]) => ( +
+
{k}
+
{v}
+
+ ))} +
+
+ )} + + !deleting && setConfirmDelete(false)} + busy={deleting} + > +

+ Permanently delete {name}? All messages + will be removed and the queue cannot be recovered. +

+ {deleteError &&
{deleteError}
} +
+ + +
+
+
+ ); +} + +function CounterCard({ label, value }: { label: string; value: number }) { + return ( +
+
{label}
+
{value}
+
+ ); +} diff --git a/web/admin/src/pages/SqsList.tsx b/web/admin/src/pages/SqsList.tsx new file mode 100644 index 000000000..3d42a658e --- /dev/null +++ b/web/admin/src/pages/SqsList.tsx @@ -0,0 +1,69 @@ +import { Link } from "react-router-dom"; +import { api } from "../api/client"; +import { formatApiError, useApiQuery } from "../lib/useApi"; + +export function SqsListPage() { + const queues = useApiQuery((signal) => api.listQueues(signal), []); + + return ( +
+
+
+

SQS queues

+

+ Backed by the SigV4-bypass admin entrypoints in + adapter/sqs_admin.go. + Detail pages show the new approximate counters from Phase 3.A. +

+
+ +
+ +
+ {queues.loading &&
Loading…
} + {queues.error?.status === 404 && ( +
+ SQS admin endpoints not wired on this build (the cluster was started + without --sqsAddress, so the + admin listener leaves /admin/api/v1/sqs/* + off the wire). +
+ )} + {queues.error && queues.error.status !== 404 && ( +
{formatApiError(queues.error)}
+ )} + {queues.data && queues.data.queues.length === 0 && ( +
No queues yet.
+ )} + {queues.data && queues.data.queues.length > 0 && ( + + + + + + + + {queues.data.queues.map((name) => ( + + + + + ))} + +
Queue +
+ + {name} + + + + details → + +
+ )} +
+
+ ); +} From 3236616a8eb1413344350cebc8abf74c47fea9bc Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sun, 26 Apr 2026 20:48:58 +0900 Subject: [PATCH 02/11] fix(admin/sqs): drop stale CountersTruncated + serialise empty list as [] MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two medium findings from Gemini review on PR #670: 1. CountersTruncated field was stale on internal/admin.QueueSummary. The PR description called out that the field was dropped because main's sqsApproxCounters does not expose truncation, but I missed removing the JSON-side mirror in internal/admin/sqs_handler.go. The SPA was never reading it (SqsDetail.tsx already had the reference removed in this PR), so the field was load-bearing for nothing. Removed. 2. handleList serialised an empty queue catalog as {"queues": null} when AdminListQueues returned nil. The SPA iterates the array directly and would crash on null. Normalise nil to []string{} immediately before encoding so the response shape is always {"queues": []} even on the empty case. No new tests because the existing internal/admin race test suite already exercises the encoder path (would have caught a json.Marshal break) and the empty-catalog case is exactly what the SPA hits the moment a fresh node comes up — manual smoke during PR #649 already verified the SPA renders an empty list cleanly when the array is []. Verified with go build ./..., go test -race ./internal/admin/..., go test -race -run TestSQS ./adapter/, golangci-lint run ./adapter/... ./internal/admin/... — all clean, no //nolint. --- internal/admin/sqs_handler.go | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/internal/admin/sqs_handler.go b/internal/admin/sqs_handler.go index 081a482b1..b41e56a74 100644 --- a/internal/admin/sqs_handler.go +++ b/internal/admin/sqs_handler.go @@ -25,13 +25,12 @@ const ( // translates between the two so the admin package stays free of the // adapter dependency tree. type QueueSummary struct { - Name string `json:"name"` - IsFIFO bool `json:"is_fifo"` - Generation uint64 `json:"generation"` - CreatedAt time.Time `json:"created_at,omitempty"` - Attributes map[string]string `json:"attributes,omitempty"` - Counters QueueCounters `json:"counters"` - CountersTruncated bool `json:"counters_truncated,omitempty"` + Name string `json:"name"` + IsFIFO bool `json:"is_fifo"` + Generation uint64 `json:"generation"` + CreatedAt time.Time `json:"created_at,omitempty"` + Attributes map[string]string `json:"attributes,omitempty"` + Counters QueueCounters `json:"counters"` } // QueueCounters mirrors the three Approximate* counters AWS exposes @@ -160,6 +159,14 @@ func (h *SqsHandler) handleList(w http.ResponseWriter, r *http.Request) { "failed to list queues; see server logs") return } + // Force the empty-result case to render as `{"queues": []}` rather + // than `{"queues": null}` (Gemini medium on PR #670). The SPA + // iterates the array directly and would crash on null. AdminListQueues + // returns nil when no queues exist, so the normalisation has to + // happen here before encoding. + if names == nil { + names = []string{} + } w.Header().Set("Content-Type", "application/json; charset=utf-8") w.Header().Set("Cache-Control", "no-store") w.WriteHeader(http.StatusOK) From 2586cfb6949c3b6f46583273478e854038e53a3c Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sun, 26 Apr 2026 21:00:27 +0900 Subject: [PATCH 03/11] fix(admin/sqs): map leader-churn errors to 503; add tests; SPA polish MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three findings from Claude review on PR #670: 1. (P1) translateAdminQueuesError did not catch leader-churn errors. AdminDeleteQueue passes the upfront isVerifiedSQSLeader check, then dispatches deleteQueueWithRetry which can hit a kv coordinator that just lost leadership. The resulting kv.ErrLeaderNotFound / adapter.ErrNotLeader / wrapped "not leader" suffixes were not in the translator's switch — they fell to default and the admin handler rendered a generic 500 instead of the spec'd 503 + Retry-After: 1. Added the `case isLeaderChurnError(err)` arm mirroring translateAdminTablesError's identical fix from PR #634. 2. (P2) No tests for translateAdminQueuesError. Mirrored the three Dynamo equivalents in main_admin_test.go: - TestTranslateAdminQueuesError_LeaderChurn covers every kv sentinel + canonical wrapped-suffix variant. - TestTranslateAdminQueuesError_LeaderPhraseInMiddleOfMessage pins the HasSuffix matcher behaviour against false positives on user-supplied error messages mid-string. - TestTranslateAdminQueuesError_UnrelatedErrorPassesThrough confirms the detector does not swallow innocent "leader" mentions outside the canonical phrase set. 3. (Low / polish) SqsList.tsx subtitle leaked the Go file path "adapter/sqs_admin.go" and the internal milestone name "Phase 3.A" to end users — DynamoList / S3List don't do this. Replaced with operator-facing prose describing what the page does. Verified: - go test -run TestTranslateAdminQueuesError . — passes - go build ./... clean - go test -race ./internal/admin/... + go test -race -run TestSQS ./adapter/ — pass - golangci-lint run ./adapter/... ./internal/admin/... ./... — 0 issues - cd web/admin && npm run lint (tsc --strict) clean --- main_admin.go | 15 ++++++++- main_admin_test.go | 60 +++++++++++++++++++++++++++++++++ web/admin/src/pages/SqsList.tsx | 5 ++- 3 files changed, 76 insertions(+), 4 deletions(-) diff --git a/main_admin.go b/main_admin.go index 7c5a24a04..d9aa74fad 100644 --- a/main_admin.go +++ b/main_admin.go @@ -213,8 +213,21 @@ func translateAdminQueuesError(err error) error { return admin.ErrQueuesNotFound case errors.Is(err, adapter.ErrAdminSQSValidation): return admin.ErrQueuesValidation + case isLeaderChurnError(err): + // Leadership can be lost between AdminDeleteQueue's upfront + // isVerifiedSQSLeader check and the coordinator dispatch + // inside deleteQueueWithRetry. The kv coordinator surfaces + // that as ErrLeaderNotFound / ErrNotLeader (or a wrapped + // "not leader" / "leader not found" suffix), and the + // retry loop's isRetryableTransactWriteError does not catch + // them. Without this arm the error falls to default and the + // admin handler renders a generic 500 — Codex P2 + Claude + // P1 on PR #670 confirmed the gap. Mirrors the same arm in + // translateAdminTablesError that fixed this for Dynamo on + // PR #634. + return admin.ErrQueuesNotLeader default: - return err + return err //nolint:wrapcheck // forwarded so the handler logs but does not surface it. } } diff --git a/main_admin_test.go b/main_admin_test.go index 2f83fdf0f..6e3ae5adf 100644 --- a/main_admin_test.go +++ b/main_admin_test.go @@ -426,3 +426,63 @@ func TestTranslateAdminTablesError_UnrelatedErrorPassesThrough(t *testing.T) { require.NotErrorIs(t, out, admin.ErrTablesNotLeader) require.Equal(t, in, out) } + +// TestTranslateAdminQueuesError_LeaderChurn is the SQS counterpart of +// TestTranslateAdminTablesError_LeaderChurn. AdminDeleteQueue clears +// the upfront isVerifiedSQSLeader check but the kv coordinator can +// still drop leadership inside deleteQueueWithRetry's Dispatch; the +// resulting ErrLeaderNotFound / ErrNotLeader / wrapped suffixes must +// classify as 503 leader_unavailable, not the generic 500 fallthrough. +// Codex P2 + Claude P1 on PR #670 confirmed the original gap. +func TestTranslateAdminQueuesError_LeaderChurn(t *testing.T) { + cases := []struct { + name string + in error + }{ + {"kv.ErrLeaderNotFound", kv.ErrLeaderNotFound}, + {"adapter.ErrNotLeader", adapter.ErrNotLeader}, + {"adapter.ErrLeaderNotFound", adapter.ErrLeaderNotFound}, + {"wrapped not leader", errors.New("dispatch failed: not leader")}, + {"wrapped leader not found", errors.New("dispatch: leader not found")}, + {"wrapped leadership lost", errors.New("commit aborted: leadership lost")}, + {"wrapped leadership transfer", errors.New("retry exhausted: leadership transfer in progress")}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + out := translateAdminQueuesError(tc.in) + require.ErrorIs(t, out, admin.ErrQueuesNotLeader, + "input %q must map to ErrQueuesNotLeader", tc.in) + }) + } +} + +// TestTranslateAdminQueuesError_LeaderPhraseInMiddleOfMessage is the +// SQS counterpart of the same Tables test — pins that the HasSuffix +// matcher in isLeaderChurnError does not false-positive on +// user-supplied error messages that happen to mention a leader +// phrase mid-string (e.g. a queue name or attribute value that +// happens to contain "not leader"). +func TestTranslateAdminQueuesError_LeaderPhraseInMiddleOfMessage(t *testing.T) { + cases := []string{ + "not leader: actually a downstream error", + "leader not found, but recovered automatically", + "leadership lost mid-snapshot, retried successfully", + } + for _, msg := range cases { + t.Run(msg, func(t *testing.T) { + out := translateAdminQueuesError(errors.New(msg)) + require.NotErrorIs(t, out, admin.ErrQueuesNotLeader, + "mid-message leader phrase %q must not classify as leader-churn", msg) + }) + } +} + +// TestTranslateAdminQueuesError_UnrelatedErrorPassesThrough confirms +// the leader-churn detector does not swallow unrelated errors that +// happen to mention the word "leader" outside the canonical phrases. +func TestTranslateAdminQueuesError_UnrelatedErrorPassesThrough(t *testing.T) { + in := errors.New("team leader misconfigured") + out := translateAdminQueuesError(in) + require.NotErrorIs(t, out, admin.ErrQueuesNotLeader) + require.Equal(t, in, out) +} diff --git a/web/admin/src/pages/SqsList.tsx b/web/admin/src/pages/SqsList.tsx index 3d42a658e..f5b4b5517 100644 --- a/web/admin/src/pages/SqsList.tsx +++ b/web/admin/src/pages/SqsList.tsx @@ -11,9 +11,8 @@ export function SqsListPage() {

SQS queues

- Backed by the SigV4-bypass admin entrypoints in - adapter/sqs_admin.go. - Detail pages show the new approximate counters from Phase 3.A. + List, describe, and delete SQS queues. Detail pages also surface + the approximate visible / in-flight / delayed message counts.