Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions adapter/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,20 @@ func (s *SQSServer) handle(w http.ResponseWriter, r *http.Request) {
return
}

// pickSqsProtocol decides between the JSON path (X-Amz-Target +
// JSON body, the existing default) and the query path (form-
// encoded body, XML response) on a per-request basis. See
// docs/design/2026_04_26_proposed_sqs_query_protocol.md for the
// detection rules.
if pickSqsProtocol(r) == sqsProtocolQuery {
s.handleQueryProtocol(w, r)
return
}
// JSON / Unknown both fall through to the JSON path: the JSON-
// style 400 is the most informative error for a client that
// has not picked a codec yet (§3 of the design doc). The
// dispatch table below stays the single decision point.

if r.Method != http.MethodPost {
w.Header().Set("Allow", http.MethodPost)
writeSQSError(w, http.StatusMethodNotAllowed, sqsErrMalformedRequest, "SQS JSON protocol requires POST")
Expand All @@ -174,6 +188,27 @@ func (s *SQSServer) handle(w http.ResponseWriter, r *http.Request) {
handler(w, r)
}

// handleQueryProtocol owns the query-protocol leg of handle(): method
// gating, SigV4 authorisation against the form body, and dispatch
// into per-verb handlers. Pulled out of handle() so the dispatcher
// stays under cyclop=10 even as more wire formats are added.
func (s *SQSServer) handleQueryProtocol(w http.ResponseWriter, r *http.Request) {
// GET is legal for query (some legacy ListQueues callers).
// POST is the common case. Anything else (PUT/DELETE) is
// outside the SQS surface entirely.
if r.Method != http.MethodGet && r.Method != http.MethodPost {
w.Header().Set("Allow", "GET, POST")
writeSQSQueryError(w, newSQSAPIError(http.StatusMethodNotAllowed, sqsErrMalformedRequest,
"SQS query protocol requires GET or POST"))
return
}
if authErr := s.authorizeSQSRequest(r); authErr != nil {
writeSQSQueryError(w, newSQSAPIError(authErr.Status, authErr.Code, authErr.Message))
return
}
s.handleQuery(w, r)
}

func (s *SQSServer) serveHealthz(w http.ResponseWriter, r *http.Request) bool {
if r == nil || r.URL == nil {
return false
Expand Down
91 changes: 62 additions & 29 deletions adapter/sqs_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,30 +508,41 @@ func (s *SQSServer) createQueue(w http.ResponseWriter, r *http.Request) {
writeSQSErrorFromErr(w, err)
return
}
if err := validateQueueName(in.QueueName); err != nil {
queueName, err := s.createQueueCore(r.Context(), &in)
if err != nil {
writeSQSErrorFromErr(w, err)
return
}
writeSQSJSON(w, map[string]string{"QueueUrl": s.queueURL(r, queueName)})
}

// createQueueCore is the wire-format-free worker shared by the JSON
// handler above and the query-protocol handler in
// sqs_query_protocol.go (Phase 3.B). Returns the canonical queue
// name on success so each wire wrapper can build its own QueueUrl
// shape (the URL host comes from the request, which is a wire-layer
// concern). Errors keep their typed sqsAPIError so both the JSON and
// XML error envelopes reuse the existing classification path.
func (s *SQSServer) createQueueCore(ctx context.Context, in *sqsCreateQueueInput) (string, error) {
if err := validateQueueName(in.QueueName); err != nil {
return "", err
}
requested, err := parseAttributesIntoMeta(in.QueueName, in.Attributes)
if err != nil {
writeSQSErrorFromErr(w, err)
return
return "", err
}
if len(in.Tags) > sqsMaxTagsPerQueue {
// AWS caps tags per queue at 50. CreateQueue must reject
// over-cap tag bundles up front; a silent slice-and-store
// would let queues land with more tags than TagQueue would
// ever accept on the same queue.
writeSQSError(w, http.StatusBadRequest, sqsErrInvalidAttributeValue, "queue tag count exceeds 50")
return
return "", newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, "queue tag count exceeds 50")
}
requested.Tags = in.Tags

if err := s.createQueueWithRetry(r.Context(), requested); err != nil {
writeSQSErrorFromErr(w, err)
return
if err := s.createQueueWithRetry(ctx, requested); err != nil {
return "", err
}
writeSQSJSON(w, map[string]string{"QueueUrl": s.queueURL(r, in.QueueName)})
return in.QueueName, nil
}

func (s *SQSServer) createQueueWithRetry(ctx context.Context, requested *sqsQueueMeta) error {
Expand Down Expand Up @@ -684,33 +695,46 @@ func (s *SQSServer) listQueues(w http.ResponseWriter, r *http.Request) {
writeSQSErrorFromErr(w, err)
return
}
maxResults := clampListQueuesMaxResults(in.MaxResults)

names, err := s.scanQueueNames(r.Context())
page, nextToken, err := s.listQueuesCore(r.Context(), &in)
if err != nil {
writeSQSErrorFromErr(w, err)
return
}
urls := make([]string, 0, len(page))
for _, n := range page {
urls = append(urls, s.queueURL(r, n))
}
resp := map[string]any{"QueueUrls": urls}
if nextToken != "" {
resp["NextToken"] = nextToken
}
writeSQSJSON(w, resp)
}

// listQueuesCore is the wire-format-free worker shared by the JSON
// handler and the query-protocol handler. Returns the page of queue
// *names* plus the next-page token (empty when not truncated); URL
// construction is a wire-layer concern handled by each wrapper.
func (s *SQSServer) listQueuesCore(ctx context.Context, in *sqsListQueuesInput) ([]string, string, error) {
maxResults := clampListQueuesMaxResults(in.MaxResults)
names, err := s.scanQueueNames(ctx)
if err != nil {
return nil, "", err
}
sort.Strings(names)
filtered := filterByPrefix(names, in.QueueNamePrefix)
start := resolveListQueuesStart(filtered, in.NextToken)

end := start + maxResults
truncated := end < len(filtered)
if !truncated {
end = len(filtered)
}
page := filtered[start:end]

urls := make([]string, 0, len(page))
for _, n := range page {
urls = append(urls, s.queueURL(r, n))
}
resp := map[string]any{"QueueUrls": urls}
var nextToken string
if truncated && len(page) > 0 {
resp["NextToken"] = encodeSQSSegment(page[len(page)-1])
nextToken = encodeSQSSegment(page[len(page)-1])
}
writeSQSJSON(w, resp)
return page, nextToken, nil
}

func clampListQueuesMaxResults(requested int) int {
Expand Down Expand Up @@ -796,20 +820,29 @@ func (s *SQSServer) getQueueUrl(w http.ResponseWriter, r *http.Request) {
writeSQSErrorFromErr(w, err)
return
}
if err := validateQueueName(in.QueueName); err != nil {
queueName, err := s.getQueueUrlCore(r.Context(), &in)
if err != nil {
writeSQSErrorFromErr(w, err)
return
}
_, exists, err := s.loadQueueMetaAt(r.Context(), in.QueueName, s.nextTxnReadTS(r.Context()))
writeSQSJSON(w, map[string]string{"QueueUrl": s.queueURL(r, queueName)})
}

// getQueueUrlCore is the wire-format-free worker shared by the JSON
// handler and the query-protocol handler. Returns the validated
// queue name on success; URL construction is a wire-layer concern.
func (s *SQSServer) getQueueUrlCore(ctx context.Context, in *sqsGetQueueUrlInput) (string, error) {
if err := validateQueueName(in.QueueName); err != nil {
return "", err
}
_, exists, err := s.loadQueueMetaAt(ctx, in.QueueName, s.nextTxnReadTS(ctx))
if err != nil {
writeSQSErrorFromErr(w, err)
return
return "", err
}
if !exists {
writeSQSError(w, http.StatusBadRequest, sqsErrQueueDoesNotExist, "queue does not exist")
return
return "", newSQSAPIError(http.StatusBadRequest, sqsErrQueueDoesNotExist, "queue does not exist")
}
writeSQSJSON(w, map[string]string{"QueueUrl": s.queueURL(r, in.QueueName)})
return in.QueueName, nil
}

func (s *SQSServer) getQueueAttributes(w http.ResponseWriter, r *http.Request) {
Expand Down
Loading
Loading