diff --git a/adapter/sqs.go b/adapter/sqs.go index dfd1916cc..9e775a725 100644 --- a/adapter/sqs.go +++ b/adapter/sqs.go @@ -154,6 +154,20 @@ func (s *SQSServer) handle(w http.ResponseWriter, r *http.Request) { return } + // pickSqsProtocol decides between the JSON path (X-Amz-Target + + // JSON body, the existing default) and the query path (form- + // encoded body, XML response) on a per-request basis. See + // docs/design/2026_04_26_proposed_sqs_query_protocol.md for the + // detection rules. + if pickSqsProtocol(r) == sqsProtocolQuery { + s.handleQueryProtocol(w, r) + return + } + // JSON / Unknown both fall through to the JSON path: the JSON- + // style 400 is the most informative error for a client that + // has not picked a codec yet (§3 of the design doc). The + // dispatch table below stays the single decision point. + if r.Method != http.MethodPost { w.Header().Set("Allow", http.MethodPost) writeSQSError(w, http.StatusMethodNotAllowed, sqsErrMalformedRequest, "SQS JSON protocol requires POST") @@ -174,6 +188,27 @@ func (s *SQSServer) handle(w http.ResponseWriter, r *http.Request) { handler(w, r) } +// handleQueryProtocol owns the query-protocol leg of handle(): method +// gating, SigV4 authorisation against the form body, and dispatch +// into per-verb handlers. Pulled out of handle() so the dispatcher +// stays under cyclop=10 even as more wire formats are added. +func (s *SQSServer) handleQueryProtocol(w http.ResponseWriter, r *http.Request) { + // GET is legal for query (some legacy ListQueues callers). + // POST is the common case. Anything else (PUT/DELETE) is + // outside the SQS surface entirely. + if r.Method != http.MethodGet && r.Method != http.MethodPost { + w.Header().Set("Allow", "GET, POST") + writeSQSQueryError(w, newSQSAPIError(http.StatusMethodNotAllowed, sqsErrMalformedRequest, + "SQS query protocol requires GET or POST")) + return + } + if authErr := s.authorizeSQSRequest(r); authErr != nil { + writeSQSQueryError(w, newSQSAPIError(authErr.Status, authErr.Code, authErr.Message)) + return + } + s.handleQuery(w, r) +} + func (s *SQSServer) serveHealthz(w http.ResponseWriter, r *http.Request) bool { if r == nil || r.URL == nil { return false diff --git a/adapter/sqs_catalog.go b/adapter/sqs_catalog.go index 4dc801e68..f73d8dc3b 100644 --- a/adapter/sqs_catalog.go +++ b/adapter/sqs_catalog.go @@ -508,30 +508,41 @@ func (s *SQSServer) createQueue(w http.ResponseWriter, r *http.Request) { writeSQSErrorFromErr(w, err) return } - if err := validateQueueName(in.QueueName); err != nil { + queueName, err := s.createQueueCore(r.Context(), &in) + if err != nil { writeSQSErrorFromErr(w, err) return } + writeSQSJSON(w, map[string]string{"QueueUrl": s.queueURL(r, queueName)}) +} + +// createQueueCore is the wire-format-free worker shared by the JSON +// handler above and the query-protocol handler in +// sqs_query_protocol.go (Phase 3.B). Returns the canonical queue +// name on success so each wire wrapper can build its own QueueUrl +// shape (the URL host comes from the request, which is a wire-layer +// concern). Errors keep their typed sqsAPIError so both the JSON and +// XML error envelopes reuse the existing classification path. +func (s *SQSServer) createQueueCore(ctx context.Context, in *sqsCreateQueueInput) (string, error) { + if err := validateQueueName(in.QueueName); err != nil { + return "", err + } requested, err := parseAttributesIntoMeta(in.QueueName, in.Attributes) if err != nil { - writeSQSErrorFromErr(w, err) - return + return "", err } if len(in.Tags) > sqsMaxTagsPerQueue { // AWS caps tags per queue at 50. CreateQueue must reject // over-cap tag bundles up front; a silent slice-and-store // would let queues land with more tags than TagQueue would // ever accept on the same queue. - writeSQSError(w, http.StatusBadRequest, sqsErrInvalidAttributeValue, "queue tag count exceeds 50") - return + return "", newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, "queue tag count exceeds 50") } requested.Tags = in.Tags - - if err := s.createQueueWithRetry(r.Context(), requested); err != nil { - writeSQSErrorFromErr(w, err) - return + if err := s.createQueueWithRetry(ctx, requested); err != nil { + return "", err } - writeSQSJSON(w, map[string]string{"QueueUrl": s.queueURL(r, in.QueueName)}) + return in.QueueName, nil } func (s *SQSServer) createQueueWithRetry(ctx context.Context, requested *sqsQueueMeta) error { @@ -684,33 +695,46 @@ func (s *SQSServer) listQueues(w http.ResponseWriter, r *http.Request) { writeSQSErrorFromErr(w, err) return } - maxResults := clampListQueuesMaxResults(in.MaxResults) - - names, err := s.scanQueueNames(r.Context()) + page, nextToken, err := s.listQueuesCore(r.Context(), &in) if err != nil { writeSQSErrorFromErr(w, err) return } + urls := make([]string, 0, len(page)) + for _, n := range page { + urls = append(urls, s.queueURL(r, n)) + } + resp := map[string]any{"QueueUrls": urls} + if nextToken != "" { + resp["NextToken"] = nextToken + } + writeSQSJSON(w, resp) +} + +// listQueuesCore is the wire-format-free worker shared by the JSON +// handler and the query-protocol handler. Returns the page of queue +// *names* plus the next-page token (empty when not truncated); URL +// construction is a wire-layer concern handled by each wrapper. +func (s *SQSServer) listQueuesCore(ctx context.Context, in *sqsListQueuesInput) ([]string, string, error) { + maxResults := clampListQueuesMaxResults(in.MaxResults) + names, err := s.scanQueueNames(ctx) + if err != nil { + return nil, "", err + } sort.Strings(names) filtered := filterByPrefix(names, in.QueueNamePrefix) start := resolveListQueuesStart(filtered, in.NextToken) - end := start + maxResults truncated := end < len(filtered) if !truncated { end = len(filtered) } page := filtered[start:end] - - urls := make([]string, 0, len(page)) - for _, n := range page { - urls = append(urls, s.queueURL(r, n)) - } - resp := map[string]any{"QueueUrls": urls} + var nextToken string if truncated && len(page) > 0 { - resp["NextToken"] = encodeSQSSegment(page[len(page)-1]) + nextToken = encodeSQSSegment(page[len(page)-1]) } - writeSQSJSON(w, resp) + return page, nextToken, nil } func clampListQueuesMaxResults(requested int) int { @@ -796,20 +820,29 @@ func (s *SQSServer) getQueueUrl(w http.ResponseWriter, r *http.Request) { writeSQSErrorFromErr(w, err) return } - if err := validateQueueName(in.QueueName); err != nil { + queueName, err := s.getQueueUrlCore(r.Context(), &in) + if err != nil { writeSQSErrorFromErr(w, err) return } - _, exists, err := s.loadQueueMetaAt(r.Context(), in.QueueName, s.nextTxnReadTS(r.Context())) + writeSQSJSON(w, map[string]string{"QueueUrl": s.queueURL(r, queueName)}) +} + +// getQueueUrlCore is the wire-format-free worker shared by the JSON +// handler and the query-protocol handler. Returns the validated +// queue name on success; URL construction is a wire-layer concern. +func (s *SQSServer) getQueueUrlCore(ctx context.Context, in *sqsGetQueueUrlInput) (string, error) { + if err := validateQueueName(in.QueueName); err != nil { + return "", err + } + _, exists, err := s.loadQueueMetaAt(ctx, in.QueueName, s.nextTxnReadTS(ctx)) if err != nil { - writeSQSErrorFromErr(w, err) - return + return "", err } if !exists { - writeSQSError(w, http.StatusBadRequest, sqsErrQueueDoesNotExist, "queue does not exist") - return + return "", newSQSAPIError(http.StatusBadRequest, sqsErrQueueDoesNotExist, "queue does not exist") } - writeSQSJSON(w, map[string]string{"QueueUrl": s.queueURL(r, in.QueueName)}) + return in.QueueName, nil } func (s *SQSServer) getQueueAttributes(w http.ResponseWriter, r *http.Request) { diff --git a/adapter/sqs_query_protocol.go b/adapter/sqs_query_protocol.go new file mode 100644 index 000000000..adfadf7be --- /dev/null +++ b/adapter/sqs_query_protocol.go @@ -0,0 +1,530 @@ +package adapter + +import ( + "crypto/rand" + "encoding/base32" + "encoding/xml" + "io" + "net/http" + "net/url" + "sort" + "strconv" + "strings" + + "github.com/cockroachdb/errors" +) + +// SQS query-protocol (form-encoded request, XML response) support per +// docs/design/2026_04_26_proposed_sqs_query_protocol.md. Detection +// runs on every inbound SQS request alongside the existing JSON path +// — no separate listener, no flag. The implementation deliberately +// touches as little of the JSON path as possible: the SQS handlers +// have been split into createQueueCore / listQueuesCore / +// getQueueUrlCore (in sqs_catalog.go) so both wrappers reuse the +// exact same business logic. + +const ( + // sqsContentTypeQueryURLEncoded is the request Content-Type the + // older AWS SDKs (aws-sdk-java v1, boto < 1.34, the AWS CLI in + // query mode) send. Detected case-insensitively against the + // prefix so charsets like ";charset=UTF-8" do not break the + // dispatch. + sqsContentTypeQueryURLEncoded = "application/x-www-form-urlencoded" + + // sqsContentTypeQueryXML is what we emit on every successful + // query-protocol response. AWS itself sends "text/xml" with a + // UTF-8 charset; matching that shape keeps SDK XML parsers + // happy. + sqsContentTypeQueryXML = "text/xml; charset=utf-8" + + // sqsQueryNamespace pins the XML namespace on every response + // envelope. Older XML parsers DO validate the namespace; emitting + // the wrong one for "compat" reasons silently breaks unmarshalling + // in aws-sdk-java v1. + sqsQueryNamespace = "http://queue.amazonaws.com/doc/2012-11-05/" + + // sqsQueryRequestIDLen is the AWS-shape RequestId length: 22 + // base32 chars. Base32 emits 1 char per 5 input bits, so 22 chars + // require ceil(22*5/8)=14 random bytes (110 bits — comfortably + // more entropy than UUID-v4). The constant is used both to size + // the random source and to trim the encoded output. + sqsQueryRequestIDLen = 22 +) + +// sqsProtocol enumerates the two wire formats the SQS listener now +// accepts. Returned by pickSqsProtocol so the dispatcher can branch +// without re-inspecting the request. +type sqsProtocol int + +const ( + sqsProtocolJSON sqsProtocol = iota + sqsProtocolQuery + // sqsProtocolUnknown signals a request whose Content-Type does + // not match either codec. The dispatcher answers with the + // existing JSON-style 400 envelope (best generic shape — query + // clients that are mid-misconfiguration usually want to see + // *some* error, and they understand HTTP status codes). + sqsProtocolUnknown +) + +// pickSqsProtocol decides which wire format a request is using. The +// rules are documented in §3 of the design doc: +// +// - X-Amz-Target header set + JSON Content-Type (or no body) → JSON. +// - form-urlencoded body → query. +// - GET with Action in the query string (legacy ListQueues callers) +// → query. +// - everything else → unknown (handled as a JSON-style 400 by the +// dispatcher so even broken probes get a useful error). +func pickSqsProtocol(r *http.Request) sqsProtocol { + if r == nil { + return sqsProtocolUnknown + } + if r.Header.Get("X-Amz-Target") != "" { + return sqsProtocolJSON + } + contentType := strings.ToLower(strings.TrimSpace(r.Header.Get("Content-Type"))) + if strings.HasPrefix(contentType, sqsContentTypeQueryURLEncoded) { + return sqsProtocolQuery + } + if r.Method == http.MethodGet && r.URL != nil && r.URL.Query().Get("Action") != "" { + return sqsProtocolQuery + } + if strings.HasPrefix(contentType, sqsContentTypeJSON) { + return sqsProtocolJSON + } + return sqsProtocolUnknown +} + +// handleQuery is the query-protocol entry point invoked by SQSServer.handle +// once pickSqsProtocol returns sqsProtocolQuery. The dispatcher reads +// the form, looks up the Action, and routes to the per-verb handler. +// Errors at any layer are written through writeSQSQueryError so the +// envelope shape stays consistent. +func (s *SQSServer) handleQuery(w http.ResponseWriter, r *http.Request) { + form, err := readQueryForm(r) + if err != nil { + writeSQSQueryError(w, err) + return + } + action := form.Get("Action") + if action == "" { + // AWS itself returns 400 MissingAction here. The JSON-style + // envelope is documented as the right shape because query + // clients hitting this branch usually have not yet picked a + // codec; raw HTTP probe tooling typically logs `__type` from + // JSON more readably than the ErrorResponse XML envelope. + writeSQSError(w, http.StatusBadRequest, sqsErrMissingParameter, "Action is required") + return + } + switch action { + case "CreateQueue": + s.handleQueryCreateQueue(w, r, form) + case "ListQueues": + s.handleQueryListQueues(w, r, form) + case "GetQueueUrl": + s.handleQueryGetQueueUrl(w, r, form) + default: + // Per design §4.1: every wired verb appears here; everything + // else returns 501 NotImplementedYet so operators see the + // gap explicitly rather than the request silently failing + // against the JSON dispatch table. + writeSQSQueryError(w, newSQSAPIError( + http.StatusNotImplemented, + "NotImplementedYet", + "query-protocol Action "+action+" is not yet wired in elastickv (Phase 3.B follow-up)", + )) + } +} + +// readQueryForm extracts the form values from either the request +// body (POST) or the URL query string (GET / hybrid). The body read +// is bounded by the same sqsMaxRequestBodyBytes the JSON path uses, +// so the query path inherits the JSON path's DoS protection without +// separate plumbing. +func readQueryForm(r *http.Request) (url.Values, error) { + if r.URL == nil { + return nil, newSQSAPIError(http.StatusBadRequest, sqsErrMalformedRequest, "missing request URL") + } + // r.URL.Query() returns a fresh map on each call so we can adopt + // it directly as the base instead of copying entries one-by-one + // (Gemini high-priority on PR #662). On GET we are done; on POST + // the body's form values are merged into the same map. + values := r.URL.Query() + if r.Method == http.MethodGet { + return values, nil + } + body, err := io.ReadAll(http.MaxBytesReader(nil, r.Body, sqsMaxRequestBodyBytes)) + if err != nil { + return nil, newSQSAPIError(http.StatusBadRequest, sqsErrMalformedRequest, err.Error()) + } + if len(body) == 0 { + return values, nil + } + parsed, err := url.ParseQuery(string(body)) + if err != nil { + return nil, newSQSAPIError(http.StatusBadRequest, sqsErrMalformedRequest, "malformed form body: "+err.Error()) + } + for k, vs := range parsed { + values[k] = append(values[k], vs...) + } + return values, nil +} + +// ----------- per-verb handlers (CreateQueue / ListQueues / GetQueueUrl) ----------- + +func (s *SQSServer) handleQueryCreateQueue(w http.ResponseWriter, r *http.Request, form url.Values) { + in, err := parseQueryCreateQueue(form) + if err != nil { + writeSQSQueryError(w, err) + return + } + queueName, err := s.createQueueCore(r.Context(), in) + if err != nil { + writeSQSQueryError(w, err) + return + } + writeSQSQueryResponse(w, "CreateQueue", queryCreateQueueResult{ + QueueUrl: s.queueURL(r, queueName), + }) +} + +func (s *SQSServer) handleQueryListQueues(w http.ResponseWriter, r *http.Request, form url.Values) { + in := parseQueryListQueues(form) + page, nextToken, err := s.listQueuesCore(r.Context(), in) + if err != nil { + writeSQSQueryError(w, err) + return + } + urls := make([]string, 0, len(page)) + for _, n := range page { + urls = append(urls, s.queueURL(r, n)) + } + writeSQSQueryResponse(w, "ListQueues", queryListQueuesResult{ + QueueUrl: urls, + NextToken: nextToken, + }) +} + +func (s *SQSServer) handleQueryGetQueueUrl(w http.ResponseWriter, r *http.Request, form url.Values) { + in := parseQueryGetQueueUrl(form) + queueName, err := s.getQueueUrlCore(r.Context(), in) + if err != nil { + writeSQSQueryError(w, err) + return + } + writeSQSQueryResponse(w, "GetQueueUrl", queryGetQueueUrlResult{ + QueueUrl: s.queueURL(r, queueName), + }) +} + +// ----------- form parsers ----------- + +func parseQueryCreateQueue(form url.Values) (*sqsCreateQueueInput, error) { + in := &sqsCreateQueueInput{ + QueueName: strings.TrimSpace(form.Get("QueueName")), + } + if in.QueueName == "" { + return nil, newSQSAPIError(http.StatusBadRequest, sqsErrMissingParameter, "QueueName is required") + } + in.Attributes = collectIndexedKVPairs(form, "Attribute", "Name") + // Tags use Tag.N.Key / Tag.N.Value (NOT Tag.N.Name). The AWS + // SQS query reference is explicit on this and CodexP1 / Gemini + // both flagged the previous .Name-only parser as a silent + // tag-loss bug. Pass the suffix in so each caller picks the AWS + // vocabulary for that resource. + in.Tags = collectIndexedKVPairs(form, "Tag", "Key") + return in, nil +} + +func parseQueryListQueues(form url.Values) *sqsListQueuesInput { + in := &sqsListQueuesInput{ + QueueNamePrefix: form.Get("QueueNamePrefix"), + NextToken: form.Get("NextToken"), + } + if v := form.Get("MaxResults"); v != "" { + // AWS docs say MaxResults must be 1–1000. clampListQueuesMaxResults + // already enforces the range; leave parsing forgiving here so + // a non-integer (e.g. an empty value some SDKs send) does not + // poison the request — just falls through to default. + if n, err := strconv.Atoi(v); err == nil { + in.MaxResults = n + } + } + return in +} + +func parseQueryGetQueueUrl(form url.Values) *sqsGetQueueUrlInput { + return &sqsGetQueueUrlInput{ + QueueName: strings.TrimSpace(form.Get("QueueName")), + } +} + +// collectIndexedKVPairs reads AWS-style indexed pairs of the form +// +// .1. = key1 +// .1.Value = value1 +// .2. = key2 +// .2.Value = value2 +// +// and returns them as a map. The keyField suffix is "Name" for +// Attributes and "Key" for Tags per the AWS SQS query reference; +// callers pass the right one. Pairs missing either side are dropped +// silently (AWS does the same — the validator in the core handler +// reports the actual problem). +// +// Iteration order: pairs are processed in ascending integer index +// order (so two clients sending the same parameters in different +// HTTP body orders see identical maps). When two distinct entries +// resolve to the *same* key (e.g. both Attribute.1.Name and +// Attribute.2.Name set to "VisibilityTimeout"), the lower index +// wins — AWS rejects this case as InvalidParameterValue, but our +// validator at the next layer is the right place for that, not +// this codec; deterministic last-write-wins-by-index is enough to +// make tests stable. (CodexP2 + Gemini high.) +func collectIndexedKVPairs(form url.Values, prefix, keyField string) map[string]string { + if len(form) == 0 { + return nil + } + pairs := gatherIndexedKVPairs(form, prefix+".", "."+keyField) + if len(pairs) == 0 { + return nil + } + sort.Slice(pairs, func(i, j int) bool { return pairs[i].idx < pairs[j].idx }) + out := make(map[string]string, len(pairs)) + for _, p := range pairs { + // Lower-index wins on duplicates so iteration order does not + // affect the result; map insertion overwrite would otherwise + // resurface the original non-determinism. + if _, taken := out[p.mapKey]; !taken { + out[p.mapKey] = p.mapVal + } + } + if len(out) == 0 { + return nil + } + return out +} + +// indexedKVPair is an intermediate (idx, key, value) triple used to +// sort the form's indexed entries before flattening into the final +// map. Kept as a private struct so collectIndexedKVPairs and the +// gather helper share the exact same shape. +type indexedKVPair struct { + idx int + mapKey string + mapVal string +} + +// gatherIndexedKVPairs walks the form once and emits every well-formed +// (idx, key, value) triple matching the wantPrefix / keySuffix shape. +// Pulled out of collectIndexedKVPairs to keep that function under +// cyclop=10 — the inner loop has too many guards to share a single +// scope. Returns the slice in the order Go's map iteration produced; +// the caller sorts. +func gatherIndexedKVPairs(form url.Values, wantPrefix, keySuffix string) []indexedKVPair { + pairs := make([]indexedKVPair, 0) + for k, vs := range form { + idx, ok := indexedPairKeyToIdx(k, wantPrefix, keySuffix) + if !ok { + continue + } + if len(vs) == 0 || vs[0] == "" { + continue + } + valueKey := strings.TrimSuffix(k, keySuffix) + ".Value" + valueVs, found := form[valueKey] + if !found || len(valueVs) == 0 { + continue + } + pairs = append(pairs, indexedKVPair{idx: idx, mapKey: vs[0], mapVal: valueVs[0]}) + } + return pairs +} + +// indexedPairKeyToIdx parses "" and returns +// (N, true). Non-matching shapes return (_, false). Non-integer +// "" segments (e.g. "Attribute.foo.Name") fall outside the AWS +// contract and return false rather than guess. +func indexedPairKeyToIdx(key, wantPrefix, keySuffix string) (int, bool) { + if !strings.HasPrefix(key, wantPrefix) || !strings.HasSuffix(key, keySuffix) { + return 0, false + } + idxStr := strings.TrimSuffix(strings.TrimPrefix(key, wantPrefix), keySuffix) + idx, err := strconv.Atoi(idxStr) + if err != nil { + return 0, false + } + return idx, true +} + +// ----------- response shapes ----------- + +// queryCreateQueueResult is the inner body. The +// outer wrapper + ResponseMetadata is added by +// writeSQSQueryResponse so every verb shares the same envelope code. +type queryCreateQueueResult struct { + XMLName xml.Name `xml:"CreateQueueResult"` + QueueUrl string `xml:"QueueUrl"` +} + +type queryListQueuesResult struct { + XMLName xml.Name `xml:"ListQueuesResult"` + QueueUrl []string `xml:"QueueUrl"` + NextToken string `xml:"NextToken,omitempty"` +} + +type queryGetQueueUrlResult struct { + XMLName xml.Name `xml:"GetQueueUrlResult"` + QueueUrl string `xml:"QueueUrl"` +} + +// queryResponseEnvelope wraps the per-verb result in the AWS +// outer-envelope shape: +// +// <{Action}Response xmlns="http://queue.amazonaws.com/doc/2012-11-05/"> +// <{Action}Result>... +// ... +// +// +// XMLName is set per-call so the same struct serves every verb. +type queryResponseEnvelope struct { + XMLName xml.Name + XMLNS string `xml:"xmlns,attr"` + Result any `xml:",any"` + Metadata queryResponseMetadata `xml:"ResponseMetadata"` +} + +type queryResponseMetadata struct { + RequestId string `xml:"RequestId"` +} + +// writeSQSQueryResponse emits a 200 XML envelope. The action drives +// the outer element name; result is marshalled as the embedded +// Result element. RequestId is generated server-side; clients echo +// it back in support requests so the access log line carries the +// same value (logged in the access path which sees this writer). +func writeSQSQueryResponse(w http.ResponseWriter, action string, result any) { + env := queryResponseEnvelope{ + XMLName: xml.Name{Local: action + "Response"}, + XMLNS: sqsQueryNamespace, + Result: result, + Metadata: queryResponseMetadata{RequestId: newQueryRequestID()}, + } + body, err := xml.MarshalIndent(env, "", " ") + if err != nil { + // xml.MarshalIndent fails only on programming errors (cyclic + // types, unsupported tags). Fall back to a best-effort 500; + // the operator log gives the actual reason. + writeSQSQueryError(w, errors.Wrap(err, "marshal query-protocol response")) + return + } + w.Header().Set("Content-Type", sqsContentTypeQueryXML) + w.Header().Set("x-amzn-RequestId", env.Metadata.RequestId) + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(xml.Header)) + _, _ = w.Write(body) +} + +// writeSQSQueryError emits the AWS query-protocol error envelope: +// +// +// +// Sender|Receiver +// ... +// ... +// +// ... +// +// +// HTTP status mirrors what the JSON path would have returned for the +// same sqsAPIError; SDK retry classifiers key off both and +// HTTP status, so keeping them aligned across protocols means a +// retry policy that works for the JSON client also works for the +// query client. +func writeSQSQueryError(w http.ResponseWriter, err error) { + status := http.StatusInternalServerError + code := sqsErrInternalFailure + message := "internal error" + var apiErr *sqsAPIError + if errors.As(err, &apiErr) { + status = apiErr.status + if apiErr.errorType != "" { + code = apiErr.errorType + } + if apiErr.message != "" { + message = apiErr.message + } + } + env := queryErrorEnvelope{ + XMLName: xml.Name{Local: "ErrorResponse"}, + XMLNS: sqsQueryNamespace, + Error: queryErrorBody{Type: errorTypeForStatus(status), Code: code, Message: message}, + RequestId: newQueryRequestID(), + } + body, marshalErr := xml.MarshalIndent(env, "", " ") + if marshalErr != nil { + // Truly unreachable for our shape; fall back to plain text + // so the operator at least sees something useful. + http.Error(w, code+": "+message, status) + return + } + w.Header().Set("Content-Type", sqsContentTypeQueryXML) + w.Header().Set("x-amzn-RequestId", env.RequestId) + if code != "" { + w.Header().Set("x-amzn-ErrorType", code) + } + w.WriteHeader(status) + _, _ = w.Write([]byte(xml.Header)) + _, _ = w.Write(body) +} + +type queryErrorEnvelope struct { + XMLName xml.Name `xml:"ErrorResponse"` + XMLNS string `xml:"xmlns,attr"` + Error queryErrorBody `xml:"Error"` + RequestId string `xml:"RequestId"` +} + +type queryErrorBody struct { + Type string `xml:"Type"` + Code string `xml:"Code"` + Message string `xml:"Message"` +} + +// errorTypeForStatus maps an HTTP status to the AWS error +// classification AWS itself reports in : 4xx becomes Sender +// (the client did something wrong); everything else is Receiver +// (the server failed). Matches what aws-sdk-java v1 expects when it +// decides whether to retry. +func errorTypeForStatus(status int) string { + if status >= http.StatusBadRequest && status < http.StatusInternalServerError { + return "Sender" + } + return "Receiver" +} + +// newQueryRequestID returns a fresh per-response identifier shaped +// like the AWS RequestId: 22 chars of base32 (no padding). Base32 +// emits 1 char per 5 input bits, so 22 chars require ceil(22*5/8)=14 +// random bytes (110 bits, comfortably more entropy than a UUID-v4). +// Gemini medium on PR #662 flagged the prior 16-byte source — that +// produces a 26-char ID, not 22, and the comment was a lie. +func newQueryRequestID() string { + var raw [14]byte + if _, err := rand.Read(raw[:]); err != nil { + // crypto/rand.Read does not fail on supported platforms; + // returning a constant on the unreachable error keeps the + // signature error-free without hiding the symptom (operators + // will notice every RequestId being identical). + return strings.Repeat("0", sqsQueryRequestIDLen) + } + // Base32 of 14 bytes is 23 raw chars in the no-padding form. + // Trim to the documented sqsQueryRequestIDLen so the ID is the + // exact AWS shape. + encoded := base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(raw[:]) + if len(encoded) > sqsQueryRequestIDLen { + encoded = encoded[:sqsQueryRequestIDLen] + } + return encoded +} diff --git a/adapter/sqs_query_protocol_test.go b/adapter/sqs_query_protocol_test.go new file mode 100644 index 000000000..e1d68ed63 --- /dev/null +++ b/adapter/sqs_query_protocol_test.go @@ -0,0 +1,369 @@ +package adapter + +import ( + "bytes" + "context" + "encoding/xml" + "io" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "testing" +) + +// ---------- protocol detection ---------- + +func TestPickSqsProtocol(t *testing.T) { + t.Parallel() + cases := []struct { + name string + method string + urlStr string + headerCT string + headerTgt string + want sqsProtocol + }{ + {"json: target + json ct", http.MethodPost, "/", sqsContentTypeJSON, "AmazonSQS.CreateQueue", sqsProtocolJSON}, + {"json: target wins over form ct", http.MethodPost, "/", sqsContentTypeQueryURLEncoded, "AmazonSQS.CreateQueue", sqsProtocolJSON}, + {"query: form ct without target", http.MethodPost, "/", sqsContentTypeQueryURLEncoded, "", sqsProtocolQuery}, + {"query: form ct with charset", http.MethodPost, "/", sqsContentTypeQueryURLEncoded + "; charset=utf-8", "", sqsProtocolQuery}, + {"query: GET with Action in querystring", http.MethodGet, "/?Action=ListQueues", "", "", sqsProtocolQuery}, + {"unknown: empty body, no headers", http.MethodPost, "/", "", "", sqsProtocolUnknown}, + {"unknown: GET without Action", http.MethodGet, "/", "", "", sqsProtocolUnknown}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + r := httptest.NewRequest(tc.method, tc.urlStr, nil) + if tc.headerCT != "" { + r.Header.Set("Content-Type", tc.headerCT) + } + if tc.headerTgt != "" { + r.Header.Set("X-Amz-Target", tc.headerTgt) + } + if got := pickSqsProtocol(r); got != tc.want { + t.Fatalf("pickSqsProtocol = %d, want %d", got, tc.want) + } + }) + } +} + +func TestPickSqsProtocol_NilRequest(t *testing.T) { + t.Parallel() + if got := pickSqsProtocol(nil); got != sqsProtocolUnknown { + t.Fatalf("nil request: got %d, want sqsProtocolUnknown", got) + } +} + +// ---------- collectIndexedKVPairs ---------- + +func TestCollectIndexedKVPairs(t *testing.T) { + t.Parallel() + form := url.Values{ + "Attribute.1.Name": []string{"VisibilityTimeout"}, + "Attribute.1.Value": []string{"60"}, + "Attribute.2.Name": []string{"DelaySeconds"}, + "Attribute.2.Value": []string{"5"}, + // orphan Name (no matching Value): silently skipped + "Attribute.3.Name": []string{"NoValue"}, + // noise that should not interfere + "NotAnAttribute": []string{"hi"}, + } + got := collectIndexedKVPairs(form, "Attribute", "Name") + if len(got) != 2 { + t.Fatalf("expected 2 pairs, got %d (%v)", len(got), got) + } + if got["VisibilityTimeout"] != "60" { + t.Errorf("VisibilityTimeout = %q, want 60", got["VisibilityTimeout"]) + } + if got["DelaySeconds"] != "5" { + t.Errorf("DelaySeconds = %q, want 5", got["DelaySeconds"]) + } + if _, ok := got["NoValue"]; ok { + t.Errorf("orphan Name should not appear in result: %v", got) + } +} + +// TestCollectIndexedKVPairs_TagSuffix pins that the keyField argument +// distinguishes Attribute (.Name) from Tag (.Key) shapes per the AWS +// SQS query reference. CodexP1 + Gemini both flagged the prior +// hardcoded .Name path as a silent tag-loss bug. +func TestCollectIndexedKVPairs_TagSuffix(t *testing.T) { + t.Parallel() + form := url.Values{ + "Tag.1.Key": []string{"env"}, + "Tag.1.Value": []string{"prod"}, + "Tag.2.Key": []string{"team"}, + "Tag.2.Value": []string{"sre"}, + // Wrong shape for tags: must NOT be picked up. + "Tag.3.Name": []string{"shouldNotAppear"}, + "Tag.3.Value": []string{"nope"}, + } + got := collectIndexedKVPairs(form, "Tag", "Key") + if len(got) != 2 { + t.Fatalf("expected 2 tag pairs, got %d (%v)", len(got), got) + } + if got["env"] != "prod" || got["team"] != "sre" { + t.Errorf("tag map = %v, want env=prod team=sre", got) + } + if _, present := got["shouldNotAppear"]; present { + t.Errorf("Tag.N.Name was incorrectly accepted as a tag key: %v", got) + } +} + +// TestCollectIndexedKVPairs_DeterministicOnDuplicates pins that two +// entries resolving to the same logical key resolve deterministically +// (lower index wins). CodexP2 flagged the previous map iteration as +// non-deterministic because Go map order is randomised. +func TestCollectIndexedKVPairs_DeterministicOnDuplicates(t *testing.T) { + t.Parallel() + form := url.Values{ + "Attribute.5.Name": []string{"VisibilityTimeout"}, + "Attribute.5.Value": []string{"50"}, + "Attribute.2.Name": []string{"VisibilityTimeout"}, + "Attribute.2.Value": []string{"20"}, + } + // Run many times to make sure map-iteration randomness does not + // leak through. Lower index (2) must win every iteration. + for i := 0; i < 64; i++ { + got := collectIndexedKVPairs(form, "Attribute", "Name") + if got["VisibilityTimeout"] != "20" { + t.Fatalf("iter %d: lower-index value lost; got=%v", i, got) + } + } +} + +func TestCollectIndexedKVPairs_Empty(t *testing.T) { + t.Parallel() + if got := collectIndexedKVPairs(nil, "Attribute", "Name"); got != nil { + t.Fatalf("nil form: got %v, want nil", got) + } + if got := collectIndexedKVPairs(url.Values{}, "Attribute", "Name"); got != nil { + t.Fatalf("empty form: got %v, want nil", got) + } + if got := collectIndexedKVPairs(url.Values{"Other.1.Name": []string{"x"}, "Other.1.Value": []string{"y"}}, "Attribute", "Name"); got != nil { + t.Fatalf("unrelated form: got %v, want nil", got) + } +} + +// TestNewQueryRequestID_Length pins the AWS shape: 22 base32 chars. +// Gemini medium on PR #662 caught the prior 26-char output that +// contradicted the function's own doc comment. +func TestNewQueryRequestID_Length(t *testing.T) { + t.Parallel() + for i := 0; i < 64; i++ { + id := newQueryRequestID() + if len(id) != 22 { + t.Fatalf("RequestId length = %d, want 22; id=%q", len(id), id) + } + } +} + +// ---------- error envelope shape ---------- + +func TestWriteSQSQueryError_ShapeAndStatus(t *testing.T) { + t.Parallel() + rec := httptest.NewRecorder() + writeSQSQueryError(rec, newSQSAPIError(http.StatusBadRequest, sqsErrQueueDoesNotExist, "queue does not exist")) + if rec.Code != http.StatusBadRequest { + t.Fatalf("status = %d, want 400", rec.Code) + } + if ct := rec.Header().Get("Content-Type"); !strings.HasPrefix(ct, "text/xml") { + t.Fatalf("Content-Type = %q, want text/xml prefix", ct) + } + if got := rec.Header().Get("x-amzn-ErrorType"); got != sqsErrQueueDoesNotExist { + t.Fatalf("x-amzn-ErrorType = %q, want %q", got, sqsErrQueueDoesNotExist) + } + body := rec.Body.String() + if !strings.Contains(body, "Sender") { + t.Errorf("missing Sender for 4xx; body=%s", body) + } + if !strings.Contains(body, ""+sqsErrQueueDoesNotExist+"") { + t.Errorf("missing ; body=%s", body) + } + if !strings.Contains(body, "queue does not exist") { + t.Errorf("missing ; body=%s", body) + } + if !strings.Contains(body, `xmlns="`+sqsQueryNamespace+`"`) { + t.Errorf("missing namespace; body=%s", body) + } +} + +func TestWriteSQSQueryError_5xxIsReceiver(t *testing.T) { + t.Parallel() + rec := httptest.NewRecorder() + writeSQSQueryError(rec, newSQSAPIError(http.StatusInternalServerError, sqsErrInternalFailure, "boom")) + if !strings.Contains(rec.Body.String(), "Receiver") { + t.Fatalf("expected Receiver for 5xx; body=%s", rec.Body.String()) + } +} + +// ---------- end-to-end via SQS listener ---------- + +// queryRoundTrip calls a single SQS query-protocol verb against the +// in-process listener and returns the decoded response envelope. +func queryRoundTrip(t *testing.T, node Node, action string, form url.Values) (int, []byte) { + t.Helper() + form.Set("Action", action) + req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, + "http://"+node.sqsAddress+"/", strings.NewReader(form.Encode())) + if err != nil { + t.Fatalf("request: %v", err) + } + req.Header.Set("Content-Type", sqsContentTypeQueryURLEncoded) + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("do: %v", err) + } + defer resp.Body.Close() + body, _ := io.ReadAll(resp.Body) + return resp.StatusCode, body +} + +func TestSQSServer_QueryProtocol_CreateQueueRoundTrip(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + status, body := queryRoundTrip(t, node, "CreateQueue", url.Values{ + "QueueName": []string{"query-create"}, + "Attribute.1.Name": []string{"VisibilityTimeout"}, + "Attribute.1.Value": []string{"45"}, + }) + if status != http.StatusOK { + t.Fatalf("CreateQueue: status %d body %s", status, body) + } + var resp struct { + XMLName xml.Name `xml:"CreateQueueResponse"` + Result struct { + QueueUrl string `xml:"QueueUrl"` + } `xml:"CreateQueueResult"` + Metadata struct { + RequestId string `xml:"RequestId"` + } `xml:"ResponseMetadata"` + } + if err := xml.Unmarshal(bytes.TrimSpace(body), &resp); err != nil { + t.Fatalf("decode: %v\nbody=%s", err, body) + } + if !strings.HasSuffix(resp.Result.QueueUrl, "/query-create") { + t.Errorf("QueueUrl = %q; expected suffix /query-create", resp.Result.QueueUrl) + } + if resp.Metadata.RequestId == "" { + t.Errorf("missing RequestId") + } + + // Verify the queue actually exists by hitting GetQueueUrl + // through the JSON path — round-trip parity §9.2 in design doc. + jStatus, jOut := callSQS(t, node, sqsGetQueueUrlTarget, map[string]any{ + "QueueName": "query-create", + }) + if jStatus != http.StatusOK { + t.Fatalf("JSON GetQueueUrl after Query CreateQueue: %d %v", jStatus, jOut) + } + if got, _ := jOut["QueueUrl"].(string); got != resp.Result.QueueUrl { + t.Errorf("URL mismatch across protocols: query=%q json=%q", resp.Result.QueueUrl, got) + } +} + +func TestSQSServer_QueryProtocol_ListQueuesRoundTrip(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + // Create two queues via JSON, list via query — parity check. + for _, name := range []string{"query-list-a", "query-list-b"} { + _, _ = callSQS(t, node, sqsCreateQueueTarget, map[string]any{"QueueName": name}) + } + + status, body := queryRoundTrip(t, node, "ListQueues", url.Values{ + "QueueNamePrefix": []string{"query-list-"}, + }) + if status != http.StatusOK { + t.Fatalf("ListQueues: status %d body %s", status, body) + } + var resp struct { + XMLName xml.Name `xml:"ListQueuesResponse"` + Result struct { + QueueUrls []string `xml:"QueueUrl"` + } `xml:"ListQueuesResult"` + } + if err := xml.Unmarshal(bytes.TrimSpace(body), &resp); err != nil { + t.Fatalf("decode: %v\nbody=%s", err, body) + } + if len(resp.Result.QueueUrls) != 2 { + t.Fatalf("got %d URLs, want 2; body=%s", len(resp.Result.QueueUrls), body) + } + hasA, hasB := false, false + for _, u := range resp.Result.QueueUrls { + if strings.HasSuffix(u, "/query-list-a") { + hasA = true + } + if strings.HasSuffix(u, "/query-list-b") { + hasB = true + } + } + if !hasA || !hasB { + t.Fatalf("missing one of the seeded queues; URLs=%v", resp.Result.QueueUrls) + } +} + +func TestSQSServer_QueryProtocol_GetQueueUrlNotFoundError(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + status, body := queryRoundTrip(t, node, "GetQueueUrl", url.Values{ + "QueueName": []string{"never-existed"}, + }) + if status != http.StatusBadRequest { + t.Fatalf("expected 400 for missing queue; got %d body=%s", status, body) + } + bodyStr := string(body) + if !strings.Contains(bodyStr, ""+sqsErrQueueDoesNotExist+"") { + t.Fatalf("expected QueueDoesNotExist code; body=%s", bodyStr) + } +} + +func TestSQSServer_QueryProtocol_UnknownActionReturns501(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + status, body := queryRoundTrip(t, node, "TotallyNotARealAction", url.Values{}) + if status != http.StatusNotImplemented { + t.Fatalf("expected 501 for unknown verb; got %d body=%s", status, body) + } + if !strings.Contains(string(body), "NotImplementedYet") { + t.Fatalf("expected NotImplementedYet code; body=%s", body) + } +} + +func TestSQSServer_QueryProtocol_MissingActionReturns400(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + // Send a form-urlencoded body without an Action — must surface + // MissingAction (JSON-style envelope per §3 of design doc). + req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, + "http://"+node.sqsAddress+"/", strings.NewReader("Foo=bar")) + if err != nil { + t.Fatalf("request: %v", err) + } + req.Header.Set("Content-Type", sqsContentTypeQueryURLEncoded) + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("do: %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusBadRequest { + t.Fatalf("missing Action: status %d, want 400", resp.StatusCode) + } +} diff --git a/docs/design/2026_04_26_proposed_sqs_query_protocol.md b/docs/design/2026_04_26_proposed_sqs_query_protocol.md new file mode 100644 index 000000000..e3312863e --- /dev/null +++ b/docs/design/2026_04_26_proposed_sqs_query_protocol.md @@ -0,0 +1,292 @@ +# SQS Query-Protocol Wire Format Support + +**Status:** Proposed +**Author:** bootjp +**Date:** 2026-04-26 + +--- + +## 1. Background and Motivation + +The elastickv SQS adapter currently speaks only the **AWS JSON 1.0 protocol**: `Content-Type: application/x-www-form-urlencoded` is rejected, the request must carry `X-Amz-Target: AmazonSQS.`, and the body is JSON. This matches what the modern AWS SDK v2 family (`aws-sdk-go-v2`, `boto3 ≥ 1.34`, `aws-sdk-java-v2`) emits. + +A long tail of clients still emits the older **query protocol** (form-encoded request, XML response) — `aws-sdk-java` v1, older `boto`/`boto3 < 1.34`, every CLI tool that builds requests by hand, and the AWS CLI itself when used against a region that defaults to query. Today these clients fail with `400 MalformedRequest` or `415 UnsupportedMediaType` on the very first request, even though the underlying SQS feature is fully implemented. + +Adding query-protocol support is the last piece needed to claim "drop-in SQS compatibility" for v1-era SDKs. Phase 3.B in [`docs/design/2026_04_24_partial_sqs_compatible_adapter.md`](2026_04_24_partial_sqs_compatible_adapter.md) §16.4 marked this as TODO; this document is the proposal that unblocks the implementation. + +--- + +## 2. Goals and Non-Goals + +### 2.1 Goals + +1. Accept query-protocol requests on the **same SQS listener** that already serves JSON. Detection is based on the request shape, not a separate port. +2. Reuse every existing handler. The wire codec is the only new code; no SQS business logic moves or duplicates. +3. Emit XML responses that AWS SDK v1 / older boto unmarshal without modification. +4. Preserve the existing JSON-protocol behaviour bit-for-bit. No regression test on the JSON path may change. +5. Keep the doc-driven coverage explicit: the first PR ships a subset of verbs; later PRs widen it without further design work. + +### 2.2 Non-Goals + +1. **Server-side XML schema validation** of unsupported fields. AWS itself silently ignores unknown query-string keys; we mirror that and rely on per-handler validation that is already in place. +2. **EC2 query-protocol fidelity** for non-SQS services. This proposal touches only the SQS adapter. +3. **HTTP/2 negotiation tweaks**. The query protocol works over plain HTTP/1.1 just like JSON. +4. **Streaming responses**. SQS responses are bounded; chunked encoding is not needed. +5. Adding query-protocol support to S3 or DynamoDB. Those adapters have their own protocol semantics and are out of scope. + +--- + +## 3. Detection + +The dispatcher in `adapter/sqs.go` (`SQSServer.ServeHTTP`) decides which protocol to invoke per request, with no flag, no header, and no per-listener configuration. The decision is made from request-side signals only: + +| Signal | Protocol | +|---|---| +| `X-Amz-Target` header is set **and** `Content-Type` starts with `application/x-amz-json-1.0` | JSON (existing path, unchanged) | +| `Content-Type` starts with `application/x-www-form-urlencoded` **and** the request carries a non-empty `Action` form field | Query (new path) | +| Anything else | `400 MissingAction` (existing JSON path's error envelope, since it is the most informative when the client can't even pick a protocol) | + +Detection lives in a small `pickSqsProtocol(*http.Request) sqsProtocol` helper so unit tests can pin every edge case (mixed headers, missing `Action`, query-protocol POST with empty body, GET-with-query-string fallthrough). The two protocol branches share zero code beyond that switch. + +Edge cases the detector accepts as **query-protocol**: + +- `GET` with `Action` in the query string (some old clients still emit this for `ListQueues`). +- `POST` with `Action` either in the body or in the query string. AWS lets either side carry the `Action` parameter; we accept both. + +Edge cases the detector **rejects as JSON-protocol Errors** (mirroring AWS's behaviour): + +- `Content-Type: application/x-www-form-urlencoded` but no `Action` field anywhere → JSON-style 400 `{"__type":"MissingAction","message":"Action is required"}`. Returning XML here would force every probe / health-checker that doesn't know either protocol to learn both. + +--- + +## 4. Internal Handler Shape + +Today, each handler in `adapter/sqs_messages.go` / `adapter/sqs_catalog.go` is shaped: + +```go +func (s *SQSServer) sendMessage(w http.ResponseWriter, r *http.Request) +``` + +— it owns request parsing **and** response writing, so the JSON wire format is hard-coded into every handler. To let the query protocol reuse the same logic without duplicating either the SQS algorithm or the SigV4 path, we factor each verb into three layers: + +``` +JSON wrapper (decode JSON / write JSON) ─┐ + ├──► sqsHandlerCore: (in T) → (out U, error) +Query wrapper (decode form / write XML) ─┘ +``` + +Where `sqsHandlerCore` is **the existing handler body, minus the codec calls**. Concretely, for `SendMessage`: + +```go +// adapter/sqs_messages.go (existing pattern, refactored) +func (s *SQSServer) sendMessage(w http.ResponseWriter, r *http.Request) { + var in sqsSendMessageInput + if err := decodeSQSJSONInput(r, &in); err != nil { + writeSQSErrorFromErr(w, err) + return + } + out, err := s.sendMessageCore(r.Context(), &in) + if err != nil { + writeSQSErrorFromErr(w, err) + return + } + writeSQSJSON(w, out) +} + +// New SigV4-and-codec-free worker. Already extractable for every existing +// verb because the per-handler logic is the body of the current function +// minus the first decode and final write. +func (s *SQSServer) sendMessageCore(ctx context.Context, in *sqsSendMessageInput) (*sqsSendMessageOutput, error) +``` + +Query path: + +```go +// adapter/sqs_query_protocol.go (new) +func (s *SQSServer) handleQuerySendMessage(w http.ResponseWriter, r *http.Request, form url.Values) { + in, err := parseQuerySendMessage(form) + if err != nil { + writeSQSQueryError(w, err) + return + } + out, err := s.sendMessageCore(r.Context(), in) + if err != nil { + writeSQSQueryError(w, err) + return + } + writeSQSQueryResponse(w, "SendMessage", out) +} +``` + +This refactor is mechanical and trivially reviewable: the JSON wrapper before the change is identical to the JSON wrapper after the change, except `decodeSQSJSONInput` and the in-place body have been split. Existing tests cover every verb's JSON path and pass unchanged. + +### 4.1 Verb coverage in the first PR + +The first PR is **architectural proof** — it ships dispatch, decoding, encoding, error envelope, and the refactor pattern, with **three verbs** wired end-to-end as concrete proof. The pattern then extends mechanically to every other verb in follow-up PRs (each follow-up adds a parser + response struct + one line in the dispatch table). + +| Verb | Why it's in the proof set | +|---|---| +| `CreateQueue` | Simplest write verb: takes `QueueName` + optional `Attribute.N`, returns `QueueUrl`. Exercises the indexed-collection parser for `Attribute.N.Name`/`Attribute.N.Value`. | +| `ListQueues` | Read-only verb. Exercises the repeated-element XML shape (`...` repeated under ``) which is harder than the typical leaf-element response. | +| `GetQueueUrl` | Trivial round-trip verb. Pins that single-leaf XML response shape (`...`) and the `QueueDoesNotExist` error envelope path. | + +`SendMessage` / `ReceiveMessage` / `DeleteMessage` are the highest-priority follow-ups; they need the `*Core` refactor to also reach into the FIFO send loop (`sqs_messages.go: sendMessageFifoLoop`), which is mechanical but bigger than this proof PR should swallow. + +Verbs **not** in the first round (recorded as TODO in the PR description and in §16.4 of the partial doc): + +- `DeleteQueue`, `GetQueueUrl`, `GetQueueAttributes`, `SetQueueAttributes`, `PurgeQueue` — single-call extensions; each is one parser + one response shape. +- `ReceiveMessage` / `DeleteMessage` / `ChangeMessageVisibility` — the in-flight message lifecycle. Each non-trivial because of `Attribute.N` plumbing on responses. +- `SendMessageBatch` / `DeleteMessageBatch` / `ChangeMessageVisibilityBatch` — query-protocol batch encoding has its own quirks (`SendMessageBatchRequestEntry.1.MessageBody=...`); deserves its own focused PR. +- `TagQueue`, `UntagQueue`, `ListQueueTags`, DLQ redrive control-plane verbs — small additions, easy to land incrementally. + +The `pickSqsAction` switch returns a **501 `NotImplementedYet`** for any query-protocol Action that has not been wired yet, with an XML envelope that names the missing action. Operators see the gap explicitly rather than silently falling through to JSON-style errors. As verbs land, their entries move from the "TODO" branch to the live dispatch table — no other code changes per added verb. + +--- + +## 5. Query-Protocol Decoding + +Form parsing uses `net/url.ParseQuery` after `io.ReadAll` on the request body (capped at the existing `sqsMaxRequestBodyBytes` so the query path inherits the JSON path's DoS protection without separate plumbing). Each verb has a dedicated parser that walks the parsed `url.Values` and produces the *same* internal input struct the JSON path already uses — the parsers are the only protocol-specific code per verb. + +AWS-style numeric collection encoding (`AttributeName.1=...`, `AttributeName.2=...`) is handled by a single `collectIndexedValues(form url.Values, prefix string) []string` helper that strips the dotted suffix, sorts by the integer index, and returns the values in order. All multi-value parameters (`AttributeNames`, `MessageAttribute.N.Name`, …) go through this helper, so the indexed-collection parsing logic exists once. + +`MessageAttribute.N.Name` / `MessageAttribute.N.Value.DataType` / etc. is the only nested case; the code lives in `parseMessageAttributesQuery` and produces the same `[]sqsMessageAttribute` slice the JSON path consumes. No SQS handler sees the difference. + +--- + +## 6. Query-Protocol Encoding (XML) + +Response XML follows the AWS SQS QueryProtocol envelope per verb: + +```xml + + + + ... + ... + + + ... + + +``` + +`encoding/xml` marshals every response struct directly. The wrapper `writeSQSQueryResponse(w, action, payload)` constructs the action-specific outer envelope (`<{Action}Response>` + `<{Action}Result>`) and the `` block, then streams the marshalled payload. Per-verb response struct definitions live in `adapter/sqs_query_responses.go` and use struct tags so the XML schema is grep-able. + +`RequestId` is generated server-side: a 22-character base32 of 16 random bytes. The same value is logged in the access-log line so operator support requests can be cross-referenced. + +### 6.1 Error envelope + +Errors use the AWS QueryProtocol error format: + +```xml + + + + Sender + QueueAlreadyExists + A queue with that name already exists. + + ... + +``` + +`` is `Sender` for 4xx and `Receiver` for 5xx. The `` field reuses the existing `sqsErr*` constants (`QueueDoesNotExist`, `InvalidParameterValue`, …) — they are *already* AWS-compatible because the JSON path uses the same vocabulary. + +The status code is set to the same value the JSON path returns, so client-side retry classifiers (which key off both `` and HTTP status) behave identically across protocols. + +--- + +## 7. Authentication and SigV4 + +The query protocol uses the same SigV4 signature the JSON protocol does — the signed canonical request includes the form-encoded body, so SigV4 verification works against either codec without changes. The existing SigV4 middleware (`sigv4.go`) sees an `http.Request`; it does not care about the body schema. + +Query-protocol clients sometimes send `Authorization: AWS4-HMAC-SHA256 Credential=...` and sometimes pass `X-Amz-Algorithm` / `X-Amz-Signature` as form parameters (presigned-URL style). The SigV4 path already accepts both shapes; the query-protocol dispatcher does not need its own auth handling. + +--- + +## 8. Configuration + +No new flags. The query protocol is enabled by being detected; deployments that want **only** JSON can set `--sqsRejectQueryProtocol` (Phase 3 follow-up flag, not in the first PR) which will short-circuit the detection. + +The conservative default (accept both protocols) matches AWS itself: even regions that have switched their *default* SDK protocol still accept query for backward compatibility. + +--- + +## 9. Testing Strategy + +1. **Golden-file XML tests** (`adapter/sqs_query_protocol_test.go`): + - For each wired verb, build a typical SDK v1 request as `url.Values`, send it through the in-process listener, and assert the XML response byte-for-byte against a stored golden file under `adapter/testdata/sqs_query/.xml`. + - The golden files are *exactly* what `aws-sdk-java` v1 unmarshals; updating them is a deliberate review event. + +2. **Round-trip parity** (`adapter/sqs_query_protocol_parity_test.go`): + - For each wired verb, perform the same logical operation through both the JSON and query protocols (e.g. `SendMessage` with body `"hello"` and `MessageGroupId=g1` on a FIFO queue). + - Read back via `ReceiveMessage` on whichever protocol opposes the send protocol. Confirm body, MD5, attributes, and any sequencing fields match across the two paths. + +3. **Detection edge cases** (`adapter/sqs_dispatch_test.go`): + - `Content-Type: application/x-www-form-urlencoded` + missing `Action` → JSON-style 400 `MissingAction`. + - `X-Amz-Target` set + form-encoded body → JSON path (the header wins). + - GET with `Action=ListQueues` in the query string → query path. + - Body over `sqsMaxRequestBodyBytes` → 413 from the existing limit, regardless of protocol. + +4. **SigV4 fixture test**: take a known-good `aws-sdk-java` v1 request capture (saved under `testdata/`), feed it through the listener with the matching credentials, assert the signature verifies and the call succeeds. Pins that the SigV4 canonical-request derivation matches the query-protocol body encoding. + +5. **Lint**: extend `.golangci.yaml` exemptions only if the XML envelopes trip cyclomatic complexity (they shouldn't — each verb's encoder is a flat struct definition). + +--- + +## 10. Compatibility and Rollout + +The protocol is purely additive. Existing JSON clients continue to hit the JSON path because their `Content-Type` differs. No flag default changes. No migration step is required. + +Deployments that *want* to refuse query-protocol traffic (e.g. lock down to v2-SDK-only clients) can land the `--sqsRejectQueryProtocol` flag in a follow-up PR without affecting the default behaviour. + +Rollout sequence: + +1. This PR — implementation + tests for the first verb subset (§4.1). +2. Follow-up PR — batch verbs + tag verbs. +3. Follow-up PR — DLQ redrive admin / FIFO administrative verbs. +4. Eventually, when the `_partial_` doc's TODO list is empty, the SQS design doc transitions to `_implemented_`. + +--- + +## 11. Alternatives Considered + +### 11.1 Separate listener per protocol + +Run JSON on the existing port and query on a new `--sqsQueryAddress`. Rejected because: + +- Operators have to manage two ports + two TLS configs + two firewall rules. +- AWS's own behaviour is single-port multi-protocol. +- SigV4's canonical request is invariant across protocols, so no security boundary is gained. + +### 11.2 Synthetic JSON re-dispatch + +Translate the query request into a JSON request, hand it back to the existing pipeline, then transcode the JSON response to XML. Rejected because: + +- Every verb pays the JSON marshal + unmarshal cost twice. +- Error mapping becomes stringly-typed (the JSON path returns `__type` strings; we'd have to parse them out for the XML envelope). +- Future verb-specific differences between protocols become tangled (AWS does sometimes diverge — the JSON path returns `Attributes` as `map` while query returns `......` repeated). + +### 11.3 Use a third-party library (`aws/smithy-go` etc.) + +Rejected because the smithy generators are aimed at *clients*, and pulling in the dependency for a few hundred lines of XML scaffolding would be a net negative for binary size and supply-chain surface. + +--- + +## 11.4 Known limitation — leader-proxy error envelope + +`proxyToLeader` falls through to `sqsLeaderProxyErrorWriter`, which today always emits the JSON error envelope. A query-protocol client whose request lands on a follower during a leader flip will therefore see one JSON-shaped error before the next request lands on the new leader. This is acceptable because: + +1. The window is short (one or two requests at most). +2. SDK retry classifiers key off HTTP status before body shape. +3. AWS itself sometimes returns JSON-shaped errors for query-protocol clients during regional failovers (observed in incident reports). + +A follow-up PR threads the detected protocol onto the request context so the proxy error writer can emit the matching XML envelope. Recorded in the partial doc's §16.4 follow-ups list. + +--- + +## 12. Open Questions + +1. **Do we need to honour the `Version=2012-11-05` form parameter** to gate verbs that AWS retired? Likely no — clients always send the same version string, and the verb set we implement is stable across SQS API versions. Defer. +2. **Should `RequestId` be sourced from the existing distributed trace context** (W3C `traceparent` header) when available? Operators might appreciate the linkage. Out of scope for the first PR but a low-cost follow-up. +3. **Should the XML responses preserve `xmlns="http://queue.amazonaws.com/doc/2012-11-05/"`** even when no client tooling actually validates the namespace? Yes for compatibility — older XML parsers do enforce it; the cost is one literal string per response.