Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 141 additions & 8 deletions adapter/sqs_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,36 @@ type sqsQueueMeta struct {
// Persisted on the meta so a leader failover loads the configuration
// along with the rest of the queue.
Throttle *sqsQueueThrottle `json:"throttle,omitempty"`
// PartitionCount is the number of FIFO partitions for this queue
// (Phase 3.D HT-FIFO, see docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md).
// Zero or 1 means the legacy single-partition layout — no schema
// change. Greater than 1 enables HT-FIFO. Set at CreateQueue time
// and immutable thereafter (SetQueueAttributes rejects any change).
// Power-of-two values only (validator rejects others). PR 2 of the
// rollout introduces this field but a temporary CreateQueue gate
// rejects PartitionCount > 1 until PR 5 lifts the gate atomically
// with the data-plane fanout — so the schema exists but no
// partitioned data can land before the data plane is wired.
PartitionCount uint32 `json:"partition_count,omitempty"`
// FifoThroughputLimit mirrors the AWS attribute. "perMessageGroupId"
// (default for HT-FIFO) keeps the §3.3 hash-by-MessageGroupId
// routing; "perQueue" activates the partition-0 short-circuit so
// every group ID routes to one partition (effectively N=1).
// Set at CreateQueue time and immutable thereafter — flipping it
// live would re-route in-flight messages and silently violate
// within-group FIFO ordering (see §3.2 of the design).
FifoThroughputLimit string `json:"fifo_throughput_limit,omitempty"`
// DeduplicationScope mirrors the AWS attribute. "messageGroup"
// (default for HT-FIFO) means the dedup window is per
// (queue, partition, MessageGroupId, dedupId); "queue" is the
// legacy single-window behaviour. Set at CreateQueue time and
// immutable thereafter — changing live can resurrect or suppress
// messages depending on the direction of the change. The
// validator additionally rejects {PartitionCount > 1,
// DeduplicationScope = "queue"} at CreateQueue time because the
// dedup key cannot be globally unique across partitions without
// a cross-partition OCC transaction.
DeduplicationScope string `json:"deduplication_scope,omitempty"`
}

// sqsQueueThrottle is the per-queue token-bucket configuration. Three
Expand Down Expand Up @@ -334,6 +364,14 @@ func parseAttributesIntoMeta(name string, attrs map[string]string) (*sqsQueueMet
if meta.ContentBasedDedup && !meta.IsFIFO {
return nil, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, "ContentBasedDeduplication is only valid on FIFO queues")
}
// HT-FIFO validation runs after resolveFifoQueueFlag so the
// IsFIFO-only checks see the post-resolution flag. The temporary
// dormancy gate (§11 PR 2) runs separately in createQueue so
// SetQueueAttributes paths share the schema validator without
// re-rejecting on the gate.
if err := validatePartitionConfig(meta); err != nil {
return nil, err
}
return meta, nil
}

Expand Down Expand Up @@ -421,6 +459,42 @@ var sqsAttributeAppliers = map[string]attributeApplier{
m.ContentBasedDedup = b
return nil
},
// PartitionCount enables HT-FIFO when > 1 (Phase 3.D, see
// docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md). Set
// at CreateQueue time; SetQueueAttributes attempts to change it
// reject via the immutability check in trySetQueueAttributesOnce.
// PR 2 of the rollout introduces the field but the temporary
// dormancy gate in tryCreateQueueOnce rejects PartitionCount > 1
// until PR 5 lifts the gate atomically with the data plane.
"PartitionCount": func(m *sqsQueueMeta, v string) error {
n, err := strconv.ParseUint(strings.TrimSpace(v), 10, 32)
if err != nil {
return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
"PartitionCount must be a non-negative integer")
}
m.PartitionCount = uint32(n) //nolint:gosec // bounded by ParseUint(_, _, 32) above.
return nil
},
"FifoThroughputLimit": func(m *sqsQueueMeta, v string) error {
v = strings.TrimSpace(v)
switch v {
case "", htfifoThroughputPerMessageGroupID, htfifoThroughputPerQueue:
m.FifoThroughputLimit = v
Comment on lines +481 to +482
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Disallow empty HT-FIFO attribute values

The applier currently treats an explicit empty string as valid for FifoThroughputLimit (and similarly for DeduplicationScope), so a request that sends the key with "" succeeds and silently acts like unset rather than failing fast. This can hide client/configuration mistakes and produces surprising control-plane outcomes (especially when users think they set a concrete throughput/dedup mode).

Useful? React with 👍 / 👎.

return nil
}
return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
"FifoThroughputLimit must be 'perMessageGroupId' or 'perQueue'")
},
"DeduplicationScope": func(m *sqsQueueMeta, v string) error {
v = strings.TrimSpace(v)
switch v {
case "", htfifoDedupeScopeMessageGroup, htfifoDedupeScopeQueue:
m.DeduplicationScope = v
return nil
}
return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
"DeduplicationScope must be 'messageGroup' or 'queue'")
},
// Throttle* are non-AWS extensions for per-queue rate limiting,
// see docs/design/2026_04_26_proposed_sqs_per_queue_throttling.md.
// Each accepts a non-negative float64; the cross-attribute
Expand Down Expand Up @@ -478,6 +552,12 @@ func applyAttributes(meta *sqsQueueMeta, attrs map[string]string) error {
if err := validateThrottleConfig(meta); err != nil {
return err
}
// HT-FIFO partition validation runs in parseAttributesIntoMeta /
// trySetQueueAttributesOnce, AFTER resolveFifoQueueFlag, so the
// IsFIFO-only checks see the post-resolution flag. Running here
// would reject a valid CreateQueue with FifoQueue=true +
// FifoThroughputLimit=perMessageGroupId because IsFIFO is still
// false at this point in the flow.
return nil
}

Expand Down Expand Up @@ -640,7 +720,9 @@ func attributesEqual(a, b *sqsQueueMeta) bool {
if a == nil || b == nil {
return false
}
return baseAttributesEqual(a, b) && throttleConfigEqual(a.Throttle, b.Throttle)
return baseAttributesEqual(a, b) &&
throttleConfigEqual(a.Throttle, b.Throttle) &&
htfifoAttributesEqual(a, b)
}

// baseAttributesEqual compares the pre-Phase-3.C/3.D attribute set.
Expand Down Expand Up @@ -678,6 +760,13 @@ func throttleConfigEqual(a, b *sqsQueueThrottle) bool {
a.DefaultRefillPerSecond == b.DefaultRefillPerSecond
}

// htfifoAttributesEqual compares the Phase 3.D HT-FIFO fields.
func htfifoAttributesEqual(a, b *sqsQueueMeta) bool {
return a.PartitionCount == b.PartitionCount &&
a.FifoThroughputLimit == b.FifoThroughputLimit &&
a.DeduplicationScope == b.DeduplicationScope
}

// ------------------------ storage primitives ------------------------

func (s *SQSServer) nextTxnReadTS(ctx context.Context) uint64 {
Expand Down Expand Up @@ -745,6 +834,16 @@ func (s *SQSServer) createQueue(w http.ResponseWriter, r *http.Request) {
writeSQSErrorFromErr(w, err)
return
}
// Temporary dormancy gate (Phase 3.D §11 PR 2). PartitionCount > 1
// must reject until PR 5 wires the data plane atomically with the
// gate-lift. Without this, accepting a partitioned-queue create
// would let SendMessage write under the legacy single-partition
// prefix; the PR 5 reader would never find those messages and the
// reaper would not enumerate them — silent message loss.
if err := validatePartitionDormancyGate(requested); err != nil {
writeSQSErrorFromErr(w, err)
return
}
if len(in.Tags) > sqsMaxTagsPerQueue {
// AWS caps tags per queue at 50. CreateQueue must reject
// over-cap tag bundles up front; a silent slice-and-store
Expand Down Expand Up @@ -1196,6 +1295,10 @@ func queueMetaToAttributes(meta *sqsQueueMeta, selection sqsAttributeSelection,
// keys. Extracted into a helper so queueMetaToAttributes stays
// under the cyclop ceiling.
addThrottleAttributes(all, meta.Throttle)
// HT-FIFO attributes (Phase 3.D). Same omission rule as Throttle*:
// only present when configured. Extracted into a helper so this
// function stays under the cyclop ceiling.
addHTFIFOAttributes(all, meta)
if selection.expandAll {
return all
}
Expand Down Expand Up @@ -1267,6 +1370,42 @@ func (s *SQSServer) setQueueAttributesWithRetry(ctx context.Context, queueName s
return newSQSAPIError(http.StatusInternalServerError, sqsErrInternalFailure, "set queue attributes retry attempts exhausted")
}

// applyAndValidateSetAttributes runs the apply + cross-validator
// chain for a SetQueueAttributes request. Extracted from
// trySetQueueAttributesOnce so that function stays under the cyclop
// ceiling once HT-FIFO immutability + Throttle validators were
// added. Returns nil on success; on rejection returns the typed
// sqsAPIError the caller forwards to writeSQSErrorFromErr.
//
// preApply snapshot allocation is gated on htfifoAttributesPresent
// so the common "mutable-only update" path stays alloc-free per the
// Gemini medium feedback on PR #681.
func applyAndValidateSetAttributes(meta *sqsQueueMeta, attrs map[string]string) error {
var preApply *sqsQueueMeta
if htfifoAttributesPresent(attrs) {
preApply = snapshotImmutableHTFIFO(meta)
}
if err := applyAttributes(meta, attrs); err != nil {
return err
}
if preApply != nil {
if err := validatePartitionImmutability(preApply, meta); err != nil {
return err
}
}
// ContentBasedDeduplication is FIFO-only; a Standard queue
// silently accepting it would advertise unsupported behavior to
// clients. Same rule enforced on CreateQueue.
if meta.ContentBasedDedup && !meta.IsFIFO {
return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, "ContentBasedDeduplication is only valid on FIFO queues")
}
// HT-FIFO schema validator runs after applyAttributes so the
// FIFO-only checks see the post-apply state. IsFIFO comes from
// the loaded meta record (immutable from CreateQueue) so the
// validator sees the same flag CreateQueue set.
return validatePartitionConfig(meta)
}

// trySetQueueAttributesOnce is one read-validate-commit pass. The first
// return reports whether the caller should stop retrying (the attrs
// are now committed); an error means either a non-retryable failure
Expand All @@ -1280,15 +1419,9 @@ func (s *SQSServer) trySetQueueAttributesOnce(ctx context.Context, queueName str
if !exists {
return false, newSQSAPIError(http.StatusBadRequest, sqsErrQueueDoesNotExist, "queue does not exist")
}
if err := applyAttributes(meta, attrs); err != nil {
if err := applyAndValidateSetAttributes(meta, attrs); err != nil {
return false, err
}
// ContentBasedDeduplication is FIFO-only; a Standard queue
// silently accepting it would advertise unsupported behavior to
// clients. Same rule enforced on CreateQueue.
if meta.ContentBasedDedup && !meta.IsFIFO {
return false, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, "ContentBasedDeduplication is only valid on FIFO queues")
}
meta.LastModifiedAtMillis = time.Now().UnixMilli()
metaBytes, err := encodeSQSQueueMeta(meta)
if err != nil {
Expand Down
Loading
Loading