diff --git a/adapter/sqs_catalog.go b/adapter/sqs_catalog.go index c1c07e91..7555af1a 100644 --- a/adapter/sqs_catalog.go +++ b/adapter/sqs_catalog.go @@ -99,6 +99,36 @@ type sqsQueueMeta struct { // Persisted on the meta so a leader failover loads the configuration // along with the rest of the queue. Throttle *sqsQueueThrottle `json:"throttle,omitempty"` + // PartitionCount is the number of FIFO partitions for this queue + // (Phase 3.D HT-FIFO, see docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md). + // Zero or 1 means the legacy single-partition layout — no schema + // change. Greater than 1 enables HT-FIFO. Set at CreateQueue time + // and immutable thereafter (SetQueueAttributes rejects any change). + // Power-of-two values only (validator rejects others). PR 2 of the + // rollout introduces this field but a temporary CreateQueue gate + // rejects PartitionCount > 1 until PR 5 lifts the gate atomically + // with the data-plane fanout — so the schema exists but no + // partitioned data can land before the data plane is wired. + PartitionCount uint32 `json:"partition_count,omitempty"` + // FifoThroughputLimit mirrors the AWS attribute. "perMessageGroupId" + // (default for HT-FIFO) keeps the §3.3 hash-by-MessageGroupId + // routing; "perQueue" activates the partition-0 short-circuit so + // every group ID routes to one partition (effectively N=1). + // Set at CreateQueue time and immutable thereafter — flipping it + // live would re-route in-flight messages and silently violate + // within-group FIFO ordering (see §3.2 of the design). + FifoThroughputLimit string `json:"fifo_throughput_limit,omitempty"` + // DeduplicationScope mirrors the AWS attribute. "messageGroup" + // (default for HT-FIFO) means the dedup window is per + // (queue, partition, MessageGroupId, dedupId); "queue" is the + // legacy single-window behaviour. Set at CreateQueue time and + // immutable thereafter — changing live can resurrect or suppress + // messages depending on the direction of the change. The + // validator additionally rejects {PartitionCount > 1, + // DeduplicationScope = "queue"} at CreateQueue time because the + // dedup key cannot be globally unique across partitions without + // a cross-partition OCC transaction. + DeduplicationScope string `json:"deduplication_scope,omitempty"` } // sqsQueueThrottle is the per-queue token-bucket configuration. Three @@ -334,6 +364,14 @@ func parseAttributesIntoMeta(name string, attrs map[string]string) (*sqsQueueMet if meta.ContentBasedDedup && !meta.IsFIFO { return nil, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, "ContentBasedDeduplication is only valid on FIFO queues") } + // HT-FIFO validation runs after resolveFifoQueueFlag so the + // IsFIFO-only checks see the post-resolution flag. The temporary + // dormancy gate (§11 PR 2) runs separately in createQueue so + // SetQueueAttributes paths share the schema validator without + // re-rejecting on the gate. + if err := validatePartitionConfig(meta); err != nil { + return nil, err + } return meta, nil } @@ -421,6 +459,42 @@ var sqsAttributeAppliers = map[string]attributeApplier{ m.ContentBasedDedup = b return nil }, + // PartitionCount enables HT-FIFO when > 1 (Phase 3.D, see + // docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md). Set + // at CreateQueue time; SetQueueAttributes attempts to change it + // reject via the immutability check in trySetQueueAttributesOnce. + // PR 2 of the rollout introduces the field but the temporary + // dormancy gate in tryCreateQueueOnce rejects PartitionCount > 1 + // until PR 5 lifts the gate atomically with the data plane. + "PartitionCount": func(m *sqsQueueMeta, v string) error { + n, err := strconv.ParseUint(strings.TrimSpace(v), 10, 32) + if err != nil { + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + "PartitionCount must be a non-negative integer") + } + m.PartitionCount = uint32(n) //nolint:gosec // bounded by ParseUint(_, _, 32) above. + return nil + }, + "FifoThroughputLimit": func(m *sqsQueueMeta, v string) error { + v = strings.TrimSpace(v) + switch v { + case "", htfifoThroughputPerMessageGroupID, htfifoThroughputPerQueue: + m.FifoThroughputLimit = v + return nil + } + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + "FifoThroughputLimit must be 'perMessageGroupId' or 'perQueue'") + }, + "DeduplicationScope": func(m *sqsQueueMeta, v string) error { + v = strings.TrimSpace(v) + switch v { + case "", htfifoDedupeScopeMessageGroup, htfifoDedupeScopeQueue: + m.DeduplicationScope = v + return nil + } + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + "DeduplicationScope must be 'messageGroup' or 'queue'") + }, // Throttle* are non-AWS extensions for per-queue rate limiting, // see docs/design/2026_04_26_proposed_sqs_per_queue_throttling.md. // Each accepts a non-negative float64; the cross-attribute @@ -478,6 +552,12 @@ func applyAttributes(meta *sqsQueueMeta, attrs map[string]string) error { if err := validateThrottleConfig(meta); err != nil { return err } + // HT-FIFO partition validation runs in parseAttributesIntoMeta / + // trySetQueueAttributesOnce, AFTER resolveFifoQueueFlag, so the + // IsFIFO-only checks see the post-resolution flag. Running here + // would reject a valid CreateQueue with FifoQueue=true + + // FifoThroughputLimit=perMessageGroupId because IsFIFO is still + // false at this point in the flow. return nil } @@ -640,7 +720,9 @@ func attributesEqual(a, b *sqsQueueMeta) bool { if a == nil || b == nil { return false } - return baseAttributesEqual(a, b) && throttleConfigEqual(a.Throttle, b.Throttle) + return baseAttributesEqual(a, b) && + throttleConfigEqual(a.Throttle, b.Throttle) && + htfifoAttributesEqual(a, b) } // baseAttributesEqual compares the pre-Phase-3.C/3.D attribute set. @@ -678,6 +760,13 @@ func throttleConfigEqual(a, b *sqsQueueThrottle) bool { a.DefaultRefillPerSecond == b.DefaultRefillPerSecond } +// htfifoAttributesEqual compares the Phase 3.D HT-FIFO fields. +func htfifoAttributesEqual(a, b *sqsQueueMeta) bool { + return a.PartitionCount == b.PartitionCount && + a.FifoThroughputLimit == b.FifoThroughputLimit && + a.DeduplicationScope == b.DeduplicationScope +} + // ------------------------ storage primitives ------------------------ func (s *SQSServer) nextTxnReadTS(ctx context.Context) uint64 { @@ -745,6 +834,16 @@ func (s *SQSServer) createQueue(w http.ResponseWriter, r *http.Request) { writeSQSErrorFromErr(w, err) return } + // Temporary dormancy gate (Phase 3.D §11 PR 2). PartitionCount > 1 + // must reject until PR 5 wires the data plane atomically with the + // gate-lift. Without this, accepting a partitioned-queue create + // would let SendMessage write under the legacy single-partition + // prefix; the PR 5 reader would never find those messages and the + // reaper would not enumerate them — silent message loss. + if err := validatePartitionDormancyGate(requested); err != nil { + writeSQSErrorFromErr(w, err) + return + } if len(in.Tags) > sqsMaxTagsPerQueue { // AWS caps tags per queue at 50. CreateQueue must reject // over-cap tag bundles up front; a silent slice-and-store @@ -1196,6 +1295,10 @@ func queueMetaToAttributes(meta *sqsQueueMeta, selection sqsAttributeSelection, // keys. Extracted into a helper so queueMetaToAttributes stays // under the cyclop ceiling. addThrottleAttributes(all, meta.Throttle) + // HT-FIFO attributes (Phase 3.D). Same omission rule as Throttle*: + // only present when configured. Extracted into a helper so this + // function stays under the cyclop ceiling. + addHTFIFOAttributes(all, meta) if selection.expandAll { return all } @@ -1267,6 +1370,42 @@ func (s *SQSServer) setQueueAttributesWithRetry(ctx context.Context, queueName s return newSQSAPIError(http.StatusInternalServerError, sqsErrInternalFailure, "set queue attributes retry attempts exhausted") } +// applyAndValidateSetAttributes runs the apply + cross-validator +// chain for a SetQueueAttributes request. Extracted from +// trySetQueueAttributesOnce so that function stays under the cyclop +// ceiling once HT-FIFO immutability + Throttle validators were +// added. Returns nil on success; on rejection returns the typed +// sqsAPIError the caller forwards to writeSQSErrorFromErr. +// +// preApply snapshot allocation is gated on htfifoAttributesPresent +// so the common "mutable-only update" path stays alloc-free per the +// Gemini medium feedback on PR #681. +func applyAndValidateSetAttributes(meta *sqsQueueMeta, attrs map[string]string) error { + var preApply *sqsQueueMeta + if htfifoAttributesPresent(attrs) { + preApply = snapshotImmutableHTFIFO(meta) + } + if err := applyAttributes(meta, attrs); err != nil { + return err + } + if preApply != nil { + if err := validatePartitionImmutability(preApply, meta); err != nil { + return err + } + } + // ContentBasedDeduplication is FIFO-only; a Standard queue + // silently accepting it would advertise unsupported behavior to + // clients. Same rule enforced on CreateQueue. + if meta.ContentBasedDedup && !meta.IsFIFO { + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, "ContentBasedDeduplication is only valid on FIFO queues") + } + // HT-FIFO schema validator runs after applyAttributes so the + // FIFO-only checks see the post-apply state. IsFIFO comes from + // the loaded meta record (immutable from CreateQueue) so the + // validator sees the same flag CreateQueue set. + return validatePartitionConfig(meta) +} + // trySetQueueAttributesOnce is one read-validate-commit pass. The first // return reports whether the caller should stop retrying (the attrs // are now committed); an error means either a non-retryable failure @@ -1280,15 +1419,9 @@ func (s *SQSServer) trySetQueueAttributesOnce(ctx context.Context, queueName str if !exists { return false, newSQSAPIError(http.StatusBadRequest, sqsErrQueueDoesNotExist, "queue does not exist") } - if err := applyAttributes(meta, attrs); err != nil { + if err := applyAndValidateSetAttributes(meta, attrs); err != nil { return false, err } - // ContentBasedDeduplication is FIFO-only; a Standard queue - // silently accepting it would advertise unsupported behavior to - // clients. Same rule enforced on CreateQueue. - if meta.ContentBasedDedup && !meta.IsFIFO { - return false, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, "ContentBasedDeduplication is only valid on FIFO queues") - } meta.LastModifiedAtMillis = time.Now().UnixMilli() metaBytes, err := encodeSQSQueueMeta(meta) if err != nil { diff --git a/adapter/sqs_partitioning.go b/adapter/sqs_partitioning.go new file mode 100644 index 00000000..27a2244f --- /dev/null +++ b/adapter/sqs_partitioning.go @@ -0,0 +1,278 @@ +package adapter + +import ( + "net/http" + "strconv" +) + +// HT-FIFO (Phase 3.D split-queue FIFO) configuration vocabulary and +// the routing primitive partitionFor. See the design doc at +// docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md. +// +// PR 2 of the §11 rollout introduces the schema fields plus the +// validation surface — including the temporary dormancy gate that +// rejects PartitionCount > 1 at CreateQueue. PR 5 lifts the gate +// atomically with the data-plane fanout so a half-deployed cluster +// can never accept a partitioned queue without the data plane to +// serve it. Until then the field exists in the meta type and the +// router function compiles, but no partitioned queue can land. + +const ( + // htfifoMaxPartitions caps the per-queue partition count. 32 is + // enough for ~30,000 RPS per queue at the per-shard ~1,000 RPS + // limit. Higher would require larger per-queue meta records and + // more reaper cycles; bumping the cap is a follow-up if operators + // demand it. See §10 of the design. + htfifoMaxPartitions uint32 = 32 + + // htfifoThroughputPerMessageGroupID is the default + // FifoThroughputLimit value for HT-FIFO queues — every group ID + // hashes to a partition independently, giving the throughput + // scaling HT-FIFO is designed for. + htfifoThroughputPerMessageGroupID = "perMessageGroupId" + // htfifoThroughputPerQueue activates the §3.3 short-circuit: every + // group ID routes to partition 0, collapsing throughput back to + // what a single-partition queue gets. Useful for clients that want + // the AWS attribute set without the extra capacity. + htfifoThroughputPerQueue = "perQueue" + + // htfifoDedupeScopeMessageGroup is the default DeduplicationScope + // value for HT-FIFO queues — the dedup window is per (queue, + // partition, MessageGroupId, dedupId). + htfifoDedupeScopeMessageGroup = "messageGroup" + // htfifoDedupeScopeQueue is the legacy single-window scope. Per + // §3.2 this is incompatible with PartitionCount > 1 (the dedup + // key cannot be globally unique across partitions without a + // cross-partition OCC transaction); the validator rejects the + // combination at CreateQueue time. + htfifoDedupeScopeQueue = "queue" +) + +// htfifoTemporaryGateMessage is the operator-facing reason the +// CreateQueue gate uses while PR 2-4 are in production. Removed in +// PR 5 in the same commit that wires the data-plane fanout. +const htfifoTemporaryGateMessage = "PartitionCount > 1 requires HT-FIFO data plane — not yet enabled" + +// partitionFor maps a (queue meta, MessageGroupId) pair to a +// partition index in [0, PartitionCount). Edge cases: +// +// - PartitionCount == 0 or 1 → always 0 (legacy single-partition). +// - FifoThroughputLimit == "perQueue" → always 0 (the §3.3 +// short-circuit; collapses every group to one partition). +// - Empty MessageGroupId → 0 (defensive; FIFO send validation +// should already have rejected this). +// +// Hashing uses FNV-1a per §3.3 of the design: fast, no SIMD setup +// cost, deterministic across Go versions and architectures, no key. +// Operators do not need this to be cryptographically strong — +// well-distributed and deterministic is what matters. +func partitionFor(meta *sqsQueueMeta, messageGroupID string) uint32 { + if meta == nil { + return 0 + } + if meta.PartitionCount <= 1 { + return 0 + } + if meta.FifoThroughputLimit == htfifoThroughputPerQueue { + return 0 + } + if messageGroupID == "" { + return 0 + } + // Inlined FNV-1a over the string to avoid the []byte allocation + // hash/fnv.New64a + h.Write would force (Gemini medium on PR + // #681). MessageGroupId is capped at 128 chars by validation, so + // this loop bounds at 128 iterations of integer arithmetic per + // SendMessage — measurably faster than the hash.Hash interface + // path on the routing hot path. + const ( + fnv64Offset uint64 = 14695981039346656037 + fnv64Prime uint64 = 1099511628211 + ) + hash := fnv64Offset + for i := 0; i < len(messageGroupID); i++ { + hash ^= uint64(messageGroupID[i]) + hash *= fnv64Prime + } + // PartitionCount is a power of two (validator-enforced); mod is + // equivalent to mask-AND. The mask is meta.PartitionCount - 1. + // Computing the mask in uint64 first then narrowing to uint32 is + // safe because htfifoMaxPartitions == 32 fits in uint32 trivially. + mask := uint64(meta.PartitionCount - 1) + return uint32(hash & mask) //nolint:gosec // masked by (PartitionCount - 1) ≤ htfifoMaxPartitions − 1, fits in uint32. +} + +// isPowerOfTwo returns true when n is a positive power of two. +// PartitionCount must satisfy this so partitionFor's bitwise mask +// (h & (n-1)) is equivalent to (h % n) — without the constraint the +// distribution would be biased toward the lower indices. +func isPowerOfTwo(n uint32) bool { + return n > 0 && (n&(n-1)) == 0 +} + +// validatePartitionConfig enforces the §3.2 cross-attribute rules on +// the post-applier meta. Per-field constraints (parse, range) live +// inside the per-attribute appliers. Cross-field rules: +// +// - PartitionCount must be a power of two in [1, htfifoMaxPartitions] +// when set. PartitionCount == 0 is canonical "unset" and is +// equivalent to 1 for routing purposes. +// - FifoThroughputLimit / DeduplicationScope are FIFO-only — +// setting either on a Standard queue rejects with +// InvalidAttributeValue. +// - {PartitionCount > 1, DeduplicationScope = "queue"} rejects +// with InvalidParameterValue: queue-scoped dedup is incompatible +// with multi-partition FIFO because the dedup key cannot be +// globally unique across partitions without a cross-partition +// OCC transaction. +// - The §11 PR 2 dormancy gate (PartitionCount > 1 rejected at +// CreateQueue) lives in validatePartitionDormancyGate so the +// dormancy check can be turned off in unit tests that want to +// exercise the full schema path. Production CreateQueue calls +// both validators. +func validatePartitionConfig(meta *sqsQueueMeta) error { + if meta.PartitionCount > 0 { + if !isPowerOfTwo(meta.PartitionCount) { + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + "PartitionCount must be a power of two") + } + if meta.PartitionCount > htfifoMaxPartitions { + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + "PartitionCount exceeds the per-queue cap of "+strconv.FormatUint(uint64(htfifoMaxPartitions), 10)) + } + } + if !meta.IsFIFO { + // PartitionCount > 1 only makes sense on FIFO queues (HT-FIFO + // is by definition a FIFO feature). Without this guard a + // Standard queue with PartitionCount=2 would slip past the + // validator once PR 5 lifts the dormancy gate (Claude review + // on PR #681 round 2 caught this). PartitionCount=0 and 1 + // are accepted because both mean "single-partition layout" + // which is valid on Standard queues. + if meta.PartitionCount > 1 { + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + "PartitionCount > 1 is only valid on FIFO queues") + } + if meta.FifoThroughputLimit != "" { + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + "FifoThroughputLimit is only valid on FIFO queues") + } + if meta.DeduplicationScope != "" { + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + "DeduplicationScope is only valid on FIFO queues") + } + } + if meta.PartitionCount > 1 && meta.DeduplicationScope == htfifoDedupeScopeQueue { + // sqsErrValidation is "InvalidParameterValue" (Gemini medium + // on PR #681 — uses the existing constant rather than a + // duplicate-value alias). + return newSQSAPIError(http.StatusBadRequest, sqsErrValidation, + "queue-scoped deduplication is incompatible with multi-partition FIFO because the dedup key cannot be globally unique across partitions without a cross-partition OCC transaction") + } + return nil +} + +// validatePartitionDormancyGate is the temporary §11 PR 2 gate. As +// long as the data-plane fanout (PR 5) has not landed, accepting a +// partitioned-queue CreateQueue would let SendMessage write under +// the legacy single-partition prefix — the PR 5 reader would never +// find those messages and the reaper would not enumerate them. This +// gate makes the wrong-layout-data class of bug impossible. +// +// Removed in PR 5 in the same commit that wires the data plane so +// the gate-and-lift land atomically. +func validatePartitionDormancyGate(meta *sqsQueueMeta) error { + if meta.PartitionCount > 1 { + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + htfifoTemporaryGateMessage) + } + return nil +} + +// validatePartitionImmutability enforces the §3.2 rule that +// PartitionCount, FifoThroughputLimit, and DeduplicationScope are +// all immutable from CreateQueue onward. Called from +// trySetQueueAttributesOnce after the meta is loaded; rejects the +// whole SetQueueAttributes call (all-or-nothing — even mutable +// attributes in the same request do not commit when an immutable +// one is invalid) per §3.2. +// +// requested is the post-apply meta; current is the on-disk meta. +// If any of the three immutable fields differs, the validator +// returns InvalidAttributeValue naming the attribute so the +// operator sees the cause directly. A same-value "no-op" succeeds. +func validatePartitionImmutability(current, requested *sqsQueueMeta) error { + if current == nil || requested == nil { + return nil + } + if current.PartitionCount != requested.PartitionCount { + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + "PartitionCount is immutable; SetQueueAttributes cannot change it (DeleteQueue + CreateQueue to reconfigure)") + } + if current.FifoThroughputLimit != requested.FifoThroughputLimit { + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + "FifoThroughputLimit is immutable; SetQueueAttributes cannot change it (DeleteQueue + CreateQueue to reconfigure)") + } + if current.DeduplicationScope != requested.DeduplicationScope { + return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, + "DeduplicationScope is immutable; SetQueueAttributes cannot change it (DeleteQueue + CreateQueue to reconfigure)") + } + return nil +} + +// htfifoAttributeKeys lists the wire-side attribute names that this +// PR introduces. Used by the immutability check (and future +// admin-surface code) to know which keys a SetQueueAttributes +// request might attempt to change. +var htfifoAttributeKeys = []string{ + "PartitionCount", + "FifoThroughputLimit", + "DeduplicationScope", +} + +// htfifoAttributesPresent reports whether any HT-FIFO attribute key +// appears in attrs. Cheap helper used by the validator to short- +// circuit the immutability check for SetQueueAttributes requests +// that touch only mutable attributes. +func htfifoAttributesPresent(attrs map[string]string) bool { + for _, k := range htfifoAttributeKeys { + if _, ok := attrs[k]; ok { + return true + } + } + return false +} + +// addHTFIFOAttributes renders the configured HT-FIFO attributes into +// out. Mirrors the Throttle* renderer in addThrottleAttributes; same +// omission rule (only present when set), same wire-side names. Kept +// in this file so the HT-FIFO surface lives in one place. +func addHTFIFOAttributes(out map[string]string, meta *sqsQueueMeta) { + if meta == nil { + return + } + if meta.PartitionCount > 0 { + out["PartitionCount"] = strconv.FormatUint(uint64(meta.PartitionCount), 10) + } + if meta.FifoThroughputLimit != "" { + out["FifoThroughputLimit"] = meta.FifoThroughputLimit + } + if meta.DeduplicationScope != "" { + out["DeduplicationScope"] = meta.DeduplicationScope + } +} + +// snapshotImmutableHTFIFO captures the three immutable HT-FIFO field +// values from a meta record. Returned struct is shallow-equal-comparable +// — validatePartitionImmutability uses the snapshot to check for any +// differing value after applyAttributes runs. +func snapshotImmutableHTFIFO(meta *sqsQueueMeta) *sqsQueueMeta { + if meta == nil { + return nil + } + return &sqsQueueMeta{ + PartitionCount: meta.PartitionCount, + FifoThroughputLimit: meta.FifoThroughputLimit, + DeduplicationScope: meta.DeduplicationScope, + } +} diff --git a/adapter/sqs_partitioning_integration_test.go b/adapter/sqs_partitioning_integration_test.go new file mode 100644 index 00000000..9bc5f88b --- /dev/null +++ b/adapter/sqs_partitioning_integration_test.go @@ -0,0 +1,269 @@ +package adapter + +import ( + "net/http" + "strings" + "testing" +) + +// TestSQSServer_HTFIFO_DormancyGate_RejectsPartitionedCreate pins +// the §11 PR 2 dormancy gate at the wire layer: CreateQueue with +// PartitionCount > 1 rejects with InvalidAttributeValue and the +// gate's reason ("not yet enabled") makes it into the operator- +// visible message. Removed in PR 5 in the same commit that wires +// the data plane. +func TestSQSServer_HTFIFO_DormancyGate_RejectsPartitionedCreate(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + for _, n := range []string{"2", "4", "8", "32"} { + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": "htfifo-gate-" + n + ".fifo", + "Attributes": map[string]string{ + "FifoQueue": "true", + "PartitionCount": n, + }, + }) + if status != http.StatusBadRequest { + t.Fatalf("PartitionCount=%s: status %d (expected 400 from dormancy gate); body=%v", n, status, out) + } + if got, _ := out["__type"].(string); got != sqsErrInvalidAttributeValue { + t.Fatalf("PartitionCount=%s: __type=%q (expected InvalidAttributeValue)", n, got) + } + msg, _ := out["message"].(string) + if msg == "" || !strings.Contains(msg, "not yet enabled") { + t.Fatalf("PartitionCount=%s: message %q must mention the gate reason", n, msg) + } + } +} + +// TestSQSServer_HTFIFO_DormancyGate_AllowsPartitionCountOne pins +// the no-op-partition-count path: PartitionCount=1 is the legacy +// single-partition layout and must pass the dormancy gate even on +// FIFO queues that explicitly set the field. +func TestSQSServer_HTFIFO_DormancyGate_AllowsPartitionCountOne(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": "htfifo-singlepart.fifo", + "Attributes": map[string]string{ + "FifoQueue": "true", + "PartitionCount": "1", + }, + }) + if status != http.StatusOK { + t.Fatalf("PartitionCount=1 must be accepted: status %d body %v", status, out) + } +} + +// TestSQSServer_HTFIFO_RejectsNonPowerOfTwoPartitionCount pins the +// validator's power-of-two rule. The validator runs before the +// dormancy gate so an invalid count (3) reports the validator's +// reason, not the gate's. +func TestSQSServer_HTFIFO_RejectsNonPowerOfTwoPartitionCount(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": "htfifo-bad-count.fifo", + "Attributes": map[string]string{ + "FifoQueue": "true", + "PartitionCount": "3", + }, + }) + if status != http.StatusBadRequest { + t.Fatalf("PartitionCount=3 must reject: status %d", status) + } + if msg, _ := out["message"].(string); msg == "" || !strings.Contains(msg, "power of two") { + t.Fatalf("expected 'power of two' in message, got %q", msg) + } +} + +// TestSQSServer_HTFIFO_RejectsHTFIFOAttrsOnStandardQueue pins the +// FIFO-only rule: setting FifoThroughputLimit or DeduplicationScope +// on a Standard queue rejects with InvalidAttributeValue. Without +// this, the queue would silently land with no-op attributes that +// SDK clients might mistake for actually configured. +func TestSQSServer_HTFIFO_RejectsHTFIFOAttrsOnStandardQueue(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + status, _ := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": "standard-with-htfifo-attr", + "Attributes": map[string]string{ + "FifoThroughputLimit": htfifoThroughputPerMessageGroupID, + }, + }) + if status != http.StatusBadRequest { + t.Fatalf("FifoThroughputLimit on Standard queue: status %d (expected 400)", status) + } +} + +// TestSQSServer_HTFIFO_RejectsQueueScopedDedupOnPartitioned pins +// the §3.2 cross-attribute control-plane gate at the wire layer. +// {PartitionCount > 1, DeduplicationScope = "queue"} is rejected by +// validatePartitionConfig (the schema validator) which runs inside +// parseAttributesIntoMeta — that is, BEFORE validatePartitionDormancyGate +// runs in createQueue. So the cross-attr rejection is what the wire +// layer sees today, even though the dormancy gate would also reject +// the same input on its own. After PR 5 lifts the dormancy gate the +// cross-attr rule remains the sole rejection path. +// +// The test only checks the 400 status to stay agnostic about which +// validator fires first — both are correct behaviour, and a future +// reordering of the createQueue control flow does not need to break +// this test. +func TestSQSServer_HTFIFO_RejectsQueueScopedDedupOnPartitioned(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": "htfifo-bad-dedup.fifo", + "Attributes": map[string]string{ + "FifoQueue": "true", + "PartitionCount": "2", + "DeduplicationScope": htfifoDedupeScopeQueue, + }, + }) + if status != http.StatusBadRequest { + t.Fatalf("PartitionCount=2 + DeduplicationScope=queue must reject: status %d body %v", status, out) + } +} + +// TestSQSServer_HTFIFO_ImmutabilitySetQueueAttributesRejects pins +// the §3.2 immutability rule at the wire layer: SetQueueAttributes +// attempts to change PartitionCount / FifoThroughputLimit / +// DeduplicationScope reject with InvalidAttributeValue. Test creates +// a single-partition FIFO queue (allowed by dormancy) with +// FifoThroughputLimit set, then tries to change it. +func TestSQSServer_HTFIFO_ImmutabilitySetQueueAttributesRejects(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + url := mustCreateFIFOWithThroughputLimit(t, node, "htfifo-immutable.fifo", htfifoThroughputPerMessageGroupID) + + // Try to flip FifoThroughputLimit. Must reject. + status, out := callSQS(t, node, sqsSetQueueAttributesTarget, map[string]any{ + "QueueUrl": url, + "Attributes": map[string]string{ + "FifoThroughputLimit": htfifoThroughputPerQueue, + }, + }) + if status != http.StatusBadRequest { + t.Fatalf("FifoThroughputLimit change: status %d body %v (expected 400 immutable)", status, out) + } + // Same-value no-op succeeds. + status, _ = callSQS(t, node, sqsSetQueueAttributesTarget, map[string]any{ + "QueueUrl": url, + "Attributes": map[string]string{ + "FifoThroughputLimit": htfifoThroughputPerMessageGroupID, + }, + }) + if status != http.StatusOK { + t.Fatalf("same-value no-op SetQueueAttributes: status %d (expected 200)", status) + } +} + +// TestSQSServer_HTFIFO_ImmutabilityAllOrNothing pins the §3.2 all- +// or-nothing rule: a SetQueueAttributes that touches a *mutable* +// attribute alongside an attempted *immutable* change rejects the +// whole request, leaving the mutable attribute unchanged on the +// meta record. +func TestSQSServer_HTFIFO_ImmutabilityAllOrNothing(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + url := mustCreateFIFOWithThroughputLimit(t, node, "htfifo-allornothing.fifo", htfifoThroughputPerMessageGroupID) + + // Combined: mutable VisibilityTimeout + immutable FifoThroughputLimit + // change. Must reject as a whole, mutable change must not commit. + status, _ := callSQS(t, node, sqsSetQueueAttributesTarget, map[string]any{ + "QueueUrl": url, + "Attributes": map[string]string{ + "VisibilityTimeout": "60", + "FifoThroughputLimit": htfifoThroughputPerQueue, + }, + }) + if status != http.StatusBadRequest { + t.Fatalf("mutable+immutable combined: status %d (expected 400)", status) + } + // Confirm VisibilityTimeout did NOT commit by reading it back. + status, out := callSQS(t, node, sqsGetQueueAttributesTarget, map[string]any{ + "QueueUrl": url, + "AttributeNames": []string{"VisibilityTimeout"}, + }) + if status != http.StatusOK { + t.Fatalf("get attrs: %d", status) + } + attrs, _ := out["Attributes"].(map[string]any) + if got, _ := attrs["VisibilityTimeout"].(string); got == "60" { + t.Fatalf("all-or-nothing violated: VisibilityTimeout committed even though immutable change rejected (got %q)", got) + } +} + +// TestSQSServer_HTFIFO_GetQueueAttributesRoundTrip pins the wire +// surface for the configured HT-FIFO attributes: SetQueueAttributes +// (or CreateQueue with the attribute) followed by GetQueueAttributes +// returns the same value. +func TestSQSServer_HTFIFO_GetQueueAttributesRoundTrip(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + url := mustCreateFIFOWithThroughputLimit(t, node, "htfifo-roundtrip.fifo", htfifoThroughputPerMessageGroupID) + status, out := callSQS(t, node, sqsGetQueueAttributesTarget, map[string]any{ + "QueueUrl": url, + "AttributeNames": []string{"All"}, + }) + if status != http.StatusOK { + t.Fatalf("GetQueueAttributes: status %d", status) + } + attrs, _ := out["Attributes"].(map[string]any) + if got, _ := attrs["FifoThroughputLimit"].(string); got != htfifoThroughputPerMessageGroupID { + t.Fatalf("FifoThroughputLimit round-trip: got %q want %q", got, htfifoThroughputPerMessageGroupID) + } + if _, present := attrs["DeduplicationScope"]; present { + t.Fatalf("DeduplicationScope must be omitted when not set; attrs=%v", attrs) + } + if _, present := attrs["PartitionCount"]; present { + t.Fatalf("PartitionCount must be omitted when not set / left at zero; attrs=%v", attrs) + } +} + +// --- helpers --- + +// mustCreateFIFOWithThroughputLimit creates a single-partition FIFO +// queue (allowed by the §11 PR 2 dormancy gate) with the requested +// FifoThroughputLimit set. Used by the immutability tests so they +// have a non-empty FifoThroughputLimit to attempt to change. +func mustCreateFIFOWithThroughputLimit(t *testing.T, node Node, name, limit string) string { + t.Helper() + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": name, + "Attributes": map[string]string{ + "FifoQueue": "true", + "FifoThroughputLimit": limit, + }, + }) + if status != http.StatusOK { + t.Fatalf("createQueue %q: status %d body %v", name, status, out) + } + url, _ := out["QueueUrl"].(string) + return url +} diff --git a/adapter/sqs_partitioning_test.go b/adapter/sqs_partitioning_test.go new file mode 100644 index 00000000..bedbda2d --- /dev/null +++ b/adapter/sqs_partitioning_test.go @@ -0,0 +1,277 @@ +package adapter + +import ( + "errors" + "strconv" + "testing" + + "github.com/stretchr/testify/require" +) + +// --- partitionFor unit tests --- + +// TestPartitionFor_LegacyZeroOrOneAlwaysPartitionZero pins the +// single-partition compatibility contract: a queue with +// PartitionCount == 0 (the unset state) or 1 routes every group ID +// to partition 0. Without this guarantee an existing single- +// partition queue would re-shuffle messages once PR 5 lands the +// data plane. +func TestPartitionFor_LegacyZeroOrOneAlwaysPartitionZero(t *testing.T) { + t.Parallel() + for _, count := range []uint32{0, 1} { + meta := &sqsQueueMeta{PartitionCount: count} + for _, gid := range []string{"a", "b", "user-1", "long-group-id-blah"} { + require.Equal(t, uint32(0), partitionFor(meta, gid), + "PartitionCount=%d, group=%q must route to 0", count, gid) + } + } +} + +// TestPartitionFor_PerQueueShortCircuits pins the §3.3 short-circuit: +// FifoThroughputLimit=perQueue collapses every group ID to +// partition 0 regardless of PartitionCount. Operators who want the +// AWS attribute set without the throughput scaling depend on this. +func TestPartitionFor_PerQueueShortCircuits(t *testing.T) { + t.Parallel() + meta := &sqsQueueMeta{PartitionCount: 8, FifoThroughputLimit: htfifoThroughputPerQueue} + for _, gid := range []string{"a", "b", "user-1", "long-group-id"} { + require.Equal(t, uint32(0), partitionFor(meta, gid)) + } +} + +// TestPartitionFor_EmptyMessageGroupIdRoutesZero pins the defensive +// fallback. FIFO send validation rejects empty MessageGroupId so +// this case should never reach the router; the test ensures the +// router doesn't crash if it does. +func TestPartitionFor_EmptyMessageGroupIdRoutesZero(t *testing.T) { + t.Parallel() + meta := &sqsQueueMeta{PartitionCount: 8} + require.Equal(t, uint32(0), partitionFor(meta, "")) +} + +// TestPartitionFor_DeterministicAcrossRuns pins the §3.3 +// determinism contract: the same group ID always returns the same +// partition. Without it, a consumer that pulls from a partition by +// group ID could see messages re-routed to a different partition on +// a process restart and lose ordering guarantees. +func TestPartitionFor_DeterministicAcrossRuns(t *testing.T) { + t.Parallel() + meta := &sqsQueueMeta{PartitionCount: 8} + gid := "user-1234" + first := partitionFor(meta, gid) + for range 100 { + require.Equal(t, first, partitionFor(meta, gid)) + } +} + +// TestPartitionFor_DistributionApproximatelyUniform pins the §9 unit +// test from the design: 100k random group IDs across 8 partitions +// must land within ±5% of equal share. FNV-1a is not a CSPRNG but +// for non-adversarial input the distribution is well-behaved. +func TestPartitionFor_DistributionApproximatelyUniform(t *testing.T) { + t.Parallel() + const partitions uint32 = 8 + const sample = 100_000 + meta := &sqsQueueMeta{PartitionCount: partitions} + hits := make(map[uint32]int, partitions) + for i := range sample { + hits[partitionFor(meta, "group-"+strconv.Itoa(i))]++ + } + expected := sample / int(partitions) + tolerance := expected / 20 // ±5% + for p := uint32(0); p < partitions; p++ { + count := hits[p] + if count < expected-tolerance || count > expected+tolerance { + t.Fatalf("partition %d: %d hits, expected within ±%d of %d (full distribution: %v)", + p, count, tolerance, expected, hits) + } + } +} + +// TestPartitionFor_PowerOfTwoMaskingMatchesMod is a regression +// guard for the bitwise-mask optimisation in partitionFor. The +// optimisation is equivalent to `% PartitionCount` only when +// PartitionCount is a power of two — the validator enforces this +// at config time, but if a future bug leaks a non-power-of-two +// value through validation, this test will catch the distribution +// bias immediately. +func TestPartitionFor_PowerOfTwoMaskingMatchesMod(t *testing.T) { + t.Parallel() + for _, n := range []uint32{2, 4, 8, 16, 32} { + meta := &sqsQueueMeta{PartitionCount: n} + for i := range 1000 { + gid := "g-" + strconv.Itoa(i) + require.Less(t, partitionFor(meta, gid), n, + "partitionFor must always be < PartitionCount=%d", n) + } + } +} + +// --- isPowerOfTwo unit tests --- + +func TestIsPowerOfTwo(t *testing.T) { + t.Parallel() + cases := []struct { + n uint32 + want bool + }{ + {0, false}, + {1, true}, + {2, true}, + {3, false}, + {4, true}, + {7, false}, + {8, true}, + {16, true}, + {32, true}, + {33, false}, + } + for _, tc := range cases { + require.Equal(t, tc.want, isPowerOfTwo(tc.n), "n=%d", tc.n) + } +} + +// --- validatePartitionConfig unit tests --- + +// TestValidatePartitionConfig_PowerOfTwo pins the §3.2 rule that +// PartitionCount must be a power of two. The bitwise-mask routing +// in partitionFor depends on this; non-powers would distribute +// unevenly. +func TestValidatePartitionConfig_PowerOfTwo(t *testing.T) { + t.Parallel() + bad := []uint32{3, 5, 6, 7, 9, 10, 12, 15} + for _, n := range bad { + err := validatePartitionConfig(&sqsQueueMeta{PartitionCount: n, IsFIFO: true}) + require.Error(t, err, "n=%d must reject", n) + } + good := []uint32{1, 2, 4, 8, 16, 32} + for _, n := range good { + err := validatePartitionConfig(&sqsQueueMeta{PartitionCount: n, IsFIFO: true}) + require.NoError(t, err, "n=%d must be accepted", n) + } +} + +// TestValidatePartitionConfig_RejectsAboveMax pins the §10 +// per-queue cap. 64 must reject; 32 must succeed. +func TestValidatePartitionConfig_RejectsAboveMax(t *testing.T) { + t.Parallel() + require.Error(t, validatePartitionConfig(&sqsQueueMeta{PartitionCount: 64, IsFIFO: true})) + require.NoError(t, validatePartitionConfig(&sqsQueueMeta{PartitionCount: 32, IsFIFO: true})) +} + +// TestValidatePartitionConfig_StandardQueueRejectsHTFIFOAttrs pins +// the §3.2 FIFO-only rule: HT-FIFO attributes on a non-FIFO queue +// reject with InvalidAttributeValue. Setting them silently on a +// Standard queue would advertise unsupported behaviour. +// +// PartitionCount > 1 is also FIFO-only (Claude review on PR #681 +// round 2 caught the gap) — without the guard a Standard queue +// with PartitionCount=2 would slip past the validator after PR 5 +// lifts the dormancy gate. PartitionCount 0/1 are still accepted +// on Standard queues because both mean "single-partition layout". +func TestValidatePartitionConfig_StandardQueueRejectsHTFIFOAttrs(t *testing.T) { + t.Parallel() + require.Error(t, validatePartitionConfig(&sqsQueueMeta{IsFIFO: false, FifoThroughputLimit: htfifoThroughputPerQueue})) + require.Error(t, validatePartitionConfig(&sqsQueueMeta{IsFIFO: false, DeduplicationScope: htfifoDedupeScopeMessageGroup})) + for _, n := range []uint32{2, 4, 8, 16, 32} { + require.Error(t, validatePartitionConfig(&sqsQueueMeta{IsFIFO: false, PartitionCount: n}), + "PartitionCount=%d on Standard queue must reject", n) + } + require.NoError(t, validatePartitionConfig(&sqsQueueMeta{IsFIFO: false, PartitionCount: 0})) + require.NoError(t, validatePartitionConfig(&sqsQueueMeta{IsFIFO: false, PartitionCount: 1})) + require.NoError(t, validatePartitionConfig(&sqsQueueMeta{IsFIFO: true, FifoThroughputLimit: htfifoThroughputPerMessageGroupID, PartitionCount: 8})) +} + +// TestValidatePartitionConfig_RejectsQueueScopedDedupOnPartitioned +// pins the §3.2 cross-attribute control-plane gate: queue-scoped +// dedup is incompatible with multi-partition FIFO because the dedup +// key cannot be globally unique across partitions without a cross- +// partition OCC transaction. Rejected as InvalidParameterValue at +// CreateQueue / SetQueueAttributes time so the operator sees the +// error before a single SendMessage. +func TestValidatePartitionConfig_RejectsQueueScopedDedupOnPartitioned(t *testing.T) { + t.Parallel() + err := validatePartitionConfig(&sqsQueueMeta{ + IsFIFO: true, + PartitionCount: 8, + DeduplicationScope: htfifoDedupeScopeQueue, + }) + require.Error(t, err) + var apiErr *sqsAPIError + require.True(t, errors.As(err, &apiErr), "expected sqsAPIError, got %T", err) + require.Equal(t, sqsErrValidation, apiErr.errorType, + "the cross-attribute rejection must use InvalidParameterValue (incoherent params, sqsErrValidation), not InvalidAttributeValue (malformed individual value)") + // Single-partition + queue-scoped dedup is fine (legacy behaviour). + require.NoError(t, validatePartitionConfig(&sqsQueueMeta{ + IsFIFO: true, + PartitionCount: 1, + DeduplicationScope: htfifoDedupeScopeQueue, + })) +} + +// --- validatePartitionDormancyGate unit tests --- + +// TestValidatePartitionDormancyGate_RejectsAboveOne pins the §11 +// PR 2 dormancy gate: PartitionCount > 1 must reject until PR 5 +// lifts the gate. PartitionCount 0 or 1 must pass (both are the +// legacy single-partition layout). +func TestValidatePartitionDormancyGate_RejectsAboveOne(t *testing.T) { + t.Parallel() + require.NoError(t, validatePartitionDormancyGate(&sqsQueueMeta{PartitionCount: 0})) + require.NoError(t, validatePartitionDormancyGate(&sqsQueueMeta{PartitionCount: 1})) + for _, n := range []uint32{2, 4, 8, 16, 32} { + err := validatePartitionDormancyGate(&sqsQueueMeta{PartitionCount: n}) + require.Error(t, err, "PartitionCount=%d must reject under the dormancy gate", n) + require.Contains(t, err.Error(), "not yet enabled", + "the gate's reason must surface to the operator") + } +} + +// --- validatePartitionImmutability unit tests --- + +// TestValidatePartitionImmutability_RejectsAnyChange pins the §3.2 +// immutability rule: SetQueueAttributes attempts to change any of +// the three immutable HT-FIFO fields reject with +// InvalidAttributeValue. +func TestValidatePartitionImmutability_RejectsAnyChange(t *testing.T) { + t.Parallel() + current := &sqsQueueMeta{ + PartitionCount: 8, + FifoThroughputLimit: htfifoThroughputPerMessageGroupID, + DeduplicationScope: htfifoDedupeScopeMessageGroup, + } + cases := []struct { + name string + mutate func(*sqsQueueMeta) + mustError bool + }{ + {"PartitionCount changed", func(m *sqsQueueMeta) { m.PartitionCount = 4 }, true}, + {"FifoThroughputLimit changed", func(m *sqsQueueMeta) { m.FifoThroughputLimit = htfifoThroughputPerQueue }, true}, + {"DeduplicationScope changed", func(m *sqsQueueMeta) { m.DeduplicationScope = htfifoDedupeScopeQueue }, true}, + {"no immutable change (same-value no-op)", func(m *sqsQueueMeta) {}, false}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + req := *current + tc.mutate(&req) + err := validatePartitionImmutability(current, &req) + if tc.mustError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + +// --- htfifoAttributesPresent --- + +func TestHTFIFOAttributesPresent(t *testing.T) { + t.Parallel() + require.False(t, htfifoAttributesPresent(map[string]string{})) + require.False(t, htfifoAttributesPresent(map[string]string{"VisibilityTimeout": "30"})) + require.True(t, htfifoAttributesPresent(map[string]string{"PartitionCount": "8"})) + require.True(t, htfifoAttributesPresent(map[string]string{"FifoThroughputLimit": htfifoThroughputPerMessageGroupID})) + require.True(t, htfifoAttributesPresent(map[string]string{"DeduplicationScope": htfifoDedupeScopeMessageGroup})) +}