diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e63228..00667ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ### Changed +- Cache-miss responses are now streamed directly to a temp file on disk instead of being buffered in memory, eliminating memory spikes for large packages +- `Content-Length` is validated before committing cached files, preventing truncated upstream responses from being cached +- Client disconnects no longer prevent caching — if a client aborts mid-download, the upstream response is still fully received and cached - Landing page snippets for Debian/Ubuntu use `` placeholder instead of hardcoded codenames - README client configuration examples updated to current stable releases (Debian trixie, Ubuntu noble) diff --git a/Makefile b/Makefile index e8abf09..a1c3bab 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ GO_BUILD_ARGS_EXTRA := # Vet: https://pkg.go.dev/cmd/vet GO_VET_ARGS := -GO_TEST_ARGS := -v -race +GO_TEST_ARGS := -v -race -count=1 GO_TEST_ARGS_EXTRA := # By default build static binaries diff --git a/openspec/changes/archive/2026-03-24-stream-cache-writes/.openspec.yaml b/openspec/changes/archive/2026-03-24-stream-cache-writes/.openspec.yaml new file mode 100644 index 0000000..0a32546 --- /dev/null +++ b/openspec/changes/archive/2026-03-24-stream-cache-writes/.openspec.yaml @@ -0,0 +1,2 @@ +schema: spec-driven +created: 2026-03-23 diff --git a/openspec/changes/archive/2026-03-24-stream-cache-writes/design.md b/openspec/changes/archive/2026-03-24-stream-cache-writes/design.md new file mode 100644 index 0000000..e017317 --- /dev/null +++ b/openspec/changes/archive/2026-03-24-stream-cache-writes/design.md @@ -0,0 +1,85 @@ +## Context + +The cache middleware in `pkg/pkgproxy/proxy.go` intercepts cache-miss responses by replacing the `http.ResponseWriter` with a `bufferWriter` that tees writes to both the client and an in-memory `bytes.Buffer`. After the request completes with status 200, the buffer is flushed to disk via `FileCache.SaveToDisk`, which itself writes to a temp file and renames it into place. + +This means every cacheable response byte is held in RAM until the entire response is received. For large packages (kernel, firmware — 100+ MB), this causes significant memory spikes. Concurrent large downloads multiply the problem. + +## Goals / Non-Goals + +**Goals:** +- Eliminate memory buffering of cache-miss responses by streaming directly to a temp file on disk +- Keep the `FileCache` interface as the owner of path resolution, traversal protection, and atomic write semantics +- Maintain identical client-facing behavior (same response headers, same caching semantics) + +**Non-Goals:** +- Cache eviction, size limits, or TTL policies (separate concern) +- Changing the cache-hit serving path (already efficient — serves from disk) +- Supporting concurrent writes to the same cache key (current behavior: last writer wins via rename, which is acceptable) + +## Decisions + +### 1. Tee to temp file instead of `bytes.Buffer` + +**Decision:** Replace the `bytes.Buffer` in the cache middleware with an `*os.File` (temp file) as the second writer in the `io.MultiWriter`. + +**Alternatives considered:** +- **Hybrid memory/disk (spill at threshold):** Buffer small responses in memory, spill to disk above a configurable size. Rejected: adds complexity (two code paths, arbitrary threshold) for minimal benefit — disk writes happen anyway. +- **Check `Content-Length` to decide:** Use the upstream response header to choose buffer vs file upfront. Rejected: not all upstreams send `Content-Length` (chunked encoding), so a fallback is still needed. + +**Rationale:** The disk write is unavoidable for cached files. Moving it earlier in the pipeline (streaming vs. post-response) keeps memory flat with no behavioral trade-offs. + +### 2. Extend `FileCache` with `CreateTempWriter` and `CommitTempFile` + +**Decision:** Add two methods to the `FileCache` interface: +- `CreateTempWriter(uri string) (*os.File, error)` — creates a temp file in the correct cache subdirectory (creating parent dirs as needed), with path traversal protection +- `CommitTempFile(tmpPath string, uri string, mtime time.Time) error` — sets mtime and atomically renames the temp file to the final cache path. This method trusts that the URI was already validated by `CreateTempWriter` and does not perform its own path traversal check + +**Alternatives considered:** +- **Middleware handles file I/O directly:** The middleware calls `GetFilePath()`, creates the temp file, and renames. Rejected: leaks cache internals (path resolution, directory creation, traversal protection) into the middleware. + +**Rationale:** Keeps all filesystem and security logic encapsulated in `FileCache`. The middleware only deals with an `*os.File` writer. + +### 3. Refactor `SaveToDisk` to use the same primitives + +**Decision:** Reimplement `SaveToDisk` internally using `CreateTempWriter` + write + `CommitTempFile`. This keeps `SaveToDisk` available for callers that have an in-memory buffer, while ensuring a single code path for atomic writes. + +### 4. Resilient writer wrapper with lazy temp file creation (cache side) + +**Decision:** Wrap the cache write side in a `resilientWriter` that lazily creates the temp file (via `CreateTempWriter`) on the first `Write()` call. This avoids creating temp files for connection-level failures where `ForwardProxy` returns an error before any response bytes are written. On any write error (including temp file creation failure), the wrapper absorbs all disk write errors from the first failure onward: it returns `len(b), nil` for that write (satisfying `io.MultiWriter`'s short-write check), logs the error, and marks itself as failed — all subsequent writes are also silently discarded (returning `len(b), nil`). It exposes a `failed` flag that the post-response code checks — if set, the commit is skipped and the temp file is cleaned up. The wrapper maintains a `bytesWritten` counter that is incremented only by the number of bytes actually written to the underlying temp file (the return value of the successful `os.File.Write` call), not by the `len(b)` returned to callers. This counter is used for the "data was written" check and Content-Length validation. After streaming completes (i.e. `next(c)` returns), the temp file MUST be closed before attempting commit or cleanup. + +The existing `bufferWriter` struct remains as the outermost wrapper around the `io.MultiWriter`. It is needed to satisfy the `http.ResponseWriter` interface (`WriteHeader`, `Flush`, `Hijack`) while routing `Write` calls through the `io.MultiWriter`. The `resilientWriter` and `safeWriter` (Decision 5) are the two legs *inside* the `io.MultiWriter`. The `bufferWriter` SHALL hold a reference to the `safeWriter` so that `Flush()` and `WriteHeader()` become no-ops when `safeWriter.failed` is true — these methods bypass the `io.MultiWriter` and call the raw `ResponseWriter` directly, which could panic or error after a client disconnect. + +**Rationale:** `io.MultiWriter` propagates errors from any underlying writer. Without this wrapper, a disk write error would surface as a response write error to the client and would stop `io.Copy`, preventing the upstream body from being fully consumed. The existing code already follows the principle that cache failures must not affect clients (see the `SaveToDisk` error handling). The resilient wrapper preserves this contract in the streaming path. Lazy creation avoids unnecessary filesystem operations when upstream requests fail at the connection level. + +### 5. Client-side write isolation + +**Decision:** Wrap the client-side `ResponseWriter` in the `io.MultiWriter` with a writer that absorbs write errors starting from the first failure (e.g. a lightweight `safeWriter` that, on any write error, returns `len(b), nil` for that write and marks itself as failed so all subsequent writes are also discarded). Combined with the cache-side `resilientWriter` (Decision 4), both legs of the `io.MultiWriter` absorb errors, ensuring `io.Copy` in `ForwardProxy` reads the entire upstream response body regardless of client disconnects or disk failures. + +**Alternatives considered:** +- **Custom "best-effort" MultiWriter that ignores individual writer errors:** A replacement for `io.MultiWriter` that continues writing to remaining writers when one fails. Rejected: more complex to implement and test; wrapping individual writers is simpler and reuses the existing pattern. +- **Detect client disconnect and switch to cache-only reads:** Monitor the client connection state and, on disconnect, stop teeing and drain the upstream body directly to the cache writer. Rejected: adds complexity with connection state monitoring; wrapping both writers achieves the same effect with no conditional logic. + +**Rationale:** `ForwardProxy` already intentionally detaches the upstream context from the client (`context.Background()`), so the upstream connection stays open even when the client disconnects. However, `io.Copy` still stops when _any_ writer in the `io.MultiWriter` returns an error. By absorbing errors on both the cache and client sides, `io.Copy` drains the upstream body to completion. If a client aborts a download and retries moments later, the cache already has (or is completing) the full file from the first request. + +### 6. Content-Length validation before commit + +**Decision:** Before committing the temp file, compare the `resilientWriter`'s bytes-written count against the `Content-Length` response header (if present). If they don't match (fewer or more bytes than expected), skip the commit and clean up the temp file. Since both the client and cache writers absorb errors (Decisions 4 and 5), `io.Copy` reads the entire upstream body — a Content-Length mismatch indicates an upstream issue (e.g. truncated connection, or a buggy upstream sending more data than declared), not a client disconnect. + +**Prerequisite:** `Content-Length` must be added to `allowedResponseHeaders` so it is forwarded from the upstream response and accessible to the Cache middleware via `c.Response().Header().Get("Content-Length")` after `next(c)` returns. + +**Alternatives considered:** +- **Check if upstream body was fully consumed:** Attempt a small read from `rsp.Body` after `io.Copy` returns. Works without `Content-Length` but is fragile and hacky. + +**Rationale:** Most package mirrors send `Content-Length` for binary files. The `resilientWriter` already tracks bytes written, so the comparison is nearly free. When `Content-Length` is absent (chunked encoding), the validation is skipped — this is acceptable because chunked transfers are uncommon for large binary packages served by Linux mirrors. + +### 7. Cleanup via `defer` close and remove + +**Decision:** After streaming completes, the `resilientWriter`'s temp file (if created) MUST be closed. If the temp file was created, register cleanup via `defer os.Remove(tmpPath)`. On the success path (200, commit), the file has been renamed so the remove is a harmless no-op (ENOENT). On any failure path (non-200, streaming error, partial response, or resilient writer failure), the temp file is cleaned up automatically. If the temp file was never created (connection-level error), no cleanup is needed. + +## Risks / Trade-offs + +- **Increased disk I/O during streaming:** Writes happen incrementally during the response instead of in one burst at the end. Mitigation: OS page cache absorbs this well; the total bytes written are identical. Lazy temp file creation avoids disk I/O entirely for connection-level failures. +- **Orphaned temp files on crash:** If the process is killed mid-stream, the deferred cleanup won't run. Mitigation: Temp files use a recognizable `*.tmp` pattern and live in the cache directory — a startup cleanup sweep could be added later if needed (non-goal for this change). +- **`FileCache` interface change is breaking:** External implementers of `FileCache` must add the new methods. Mitigation: No known external implementers exist. The interface is internal to this project. +- **Disk write failure during streaming:** `io.MultiWriter` propagates errors from any writer, so a disk error would kill the client response and stop upstream consumption. Mitigation: a `resilientWriter` wrapper swallows disk errors (returning `len(b), nil` to satisfy `io.MultiWriter`'s short-write check), sets a `failed` flag, and the commit is skipped — the client response is unaffected. +- **Full upstream consumption on client disconnect:** With both writers absorbing errors, `io.Copy` always reads the entire upstream response even if the client disconnected early. This uses bandwidth and disk I/O for a response no client is waiting for. Mitigation: the cached file serves subsequent requests immediately, avoiding a redundant upstream fetch. The upstream context already has a deadline if one was set on the original request, bounding the maximum consumption time. diff --git a/openspec/changes/archive/2026-03-24-stream-cache-writes/proposal.md b/openspec/changes/archive/2026-03-24-stream-cache-writes/proposal.md new file mode 100644 index 0000000..4f29d7a --- /dev/null +++ b/openspec/changes/archive/2026-03-24-stream-cache-writes/proposal.md @@ -0,0 +1,30 @@ +## Why + +Cache-miss responses are fully buffered in a `bytes.Buffer` before being written to disk. Large packages (kernel packages, firmware blobs, etc. can exceed 100 MB) cause memory spikes proportional to their size, and concurrent downloads multiply the problem. Since the data is already being streamed to the client, there is no reason to also hold it in RAM — it can be teed directly to a temp file on disk. + +## What Changes + +- Extend the `FileCache` interface with two new methods (`CreateTempWriter`, `CommitTempFile`) that let callers stream data to a temp file and atomically commit it into the cache. `CommitTempFile` trusts that the URI was already validated by `CreateTempWriter` +- Modify the `Cache` middleware to tee upstream responses to a temp file instead of a `bytes.Buffer` +- Wrap the temp file writer in a resilient writer that lazily creates the temp file on the first write (avoiding filesystem work for connection-level failures) and absorbs all disk write errors from the first failure onward, including temp file creation failure, without affecting the client response +- Wrap the client-side `ResponseWriter` in a safe writer that absorbs write errors starting from the first failure (e.g. client disconnect), ensuring the upstream body is always fully consumed and the cache receives the complete response +- Add `Content-Length` to `allowedResponseHeaders` so it is forwarded from upstream and accessible to the Cache middleware +- Validate bytes written against `Content-Length` before committing to cache, preventing truncated upstream responses from being cached (**bugfix**) +- Refactor `SaveToDisk` internally to use the same temp-file primitives (no behavioral change for existing callers) + +## Capabilities + +### New Capabilities + +- `streaming-cache-write`: Streaming write path for caching upstream responses to disk without buffering the full response in memory + +### Modified Capabilities + +_(none — no existing spec-level requirements change)_ + +## Impact + +- **Code**: `pkg/cache/cache.go` (new interface methods, refactored internals), `pkg/pkgproxy/proxy.go` (cache middleware rewrite for streaming path, `Content-Length` added to `allowedResponseHeaders`), `pkg/pkgproxy/` (new `resilientWriter` and `safeWriter` types) +- **Tests**: `pkg/cache/cache_test.go` (new method tests), `pkg/pkgproxy/proxy_test.go` (updated cache-miss tests) +- **API**: `FileCache` interface gains two methods — any external implementers would need to add them (**BREAKING** for out-of-tree implementations, though none are known) +- **Behavior**: Functionally identical from the client's perspective for completed downloads; client disconnects no longer prevent caching — if a client aborts and retries, the file may already be cached from the first (completed upstream) request diff --git a/openspec/changes/archive/2026-03-24-stream-cache-writes/specs/streaming-cache-write/spec.md b/openspec/changes/archive/2026-03-24-stream-cache-writes/specs/streaming-cache-write/spec.md new file mode 100644 index 0000000..66a24dd --- /dev/null +++ b/openspec/changes/archive/2026-03-24-stream-cache-writes/specs/streaming-cache-write/spec.md @@ -0,0 +1,161 @@ +## ADDED Requirements + +### Requirement: FileCache provides temp file creation for streaming writes + +The `FileCache` interface SHALL expose a `CreateTempWriter(uri string) (*os.File, error)` method that creates a temporary file in the correct cache subdirectory for the given URI. The method MUST create parent directories if they do not exist. The method MUST reject URIs that would resolve outside the cache base directory. + +#### Scenario: Temp file created in correct directory +- **WHEN** `CreateTempWriter` is called with a valid URI (e.g. `/fedora/releases/42/Everything/x86_64/os/Packages/k/kernel-6.12.rpm`) +- **THEN** a temporary file is created in the same directory where the final cached file would reside, and the file handle is returned + +#### Scenario: Parent directories created automatically +- **WHEN** `CreateTempWriter` is called with a URI whose parent directories do not yet exist in the cache +- **THEN** the required directories are created before the temp file is created + +#### Scenario: Path traversal rejected +- **WHEN** `CreateTempWriter` is called with a URI containing path traversal sequences (e.g. `/../../../etc/passwd`) +- **THEN** an error is returned and no file is created + +### Requirement: FileCache provides atomic commit of temp files into cache + +The `FileCache` interface SHALL expose a `CommitTempFile(tmpPath string, uri string, mtime time.Time) error` method that atomically moves a temp file to the final cache path for the given URI and sets the file modification time. + +#### Scenario: Successful commit +- **WHEN** `CommitTempFile` is called with a valid temp file path, a valid URI, and a timestamp +- **THEN** the temp file is renamed to the final cache path and the file's modification time is set to the given timestamp + +#### Scenario: File becomes visible to IsCached after commit +- **WHEN** `CommitTempFile` completes successfully for a given URI +- **THEN** `IsCached` returns `true` for that URI + +### Requirement: Cache middleware streams responses to disk instead of memory + +The cache middleware SHALL tee cache-miss responses to a temp file on disk instead of an in-memory `bytes.Buffer`. The response MUST be streamed simultaneously to the client and to the temp file via a resilient writer that lazily creates the temp file on the first write and absorbs disk write errors without affecting the client response. + +#### Scenario: Large package cached without memory spike +- **WHEN** a cache-miss request is received for a large cacheable file (e.g. 200 MB) +- **THEN** the response is streamed to both the client and a temp file, without accumulating the full response in memory + +#### Scenario: Successful response committed to cache +- **WHEN** the upstream returns status 200 for a cacheable file and streaming completes without cache write errors +- **THEN** the temp file is committed to the cache via `CommitTempFile` with the upstream `Last-Modified` timestamp (or current time if absent) + +#### Scenario: Non-200 response cleans up temp file +- **WHEN** the upstream returns a non-200 status for a cacheable file +- **THEN** the temp file is removed and nothing is committed to the cache + +#### Scenario: Upstream error does not create temp file +- **WHEN** the upstream request fails at the network/connection level (e.g. DNS failure, connection reset) and the `ForwardProxy` middleware returns an error before writing any response bytes +- **THEN** no temp file is created and nothing is committed to the cache + +#### Scenario: Client disconnect does not prevent caching +- **WHEN** a client disconnects mid-download of a cacheable file and the upstream response is still streaming +- **THEN** the upstream body continues to be read to completion, the response is fully written to the temp file, and (if all other conditions are met) the temp file is committed to the cache + +#### Scenario: CommitTempFile failure does not affect client response +- **WHEN** the response has been fully streamed to both the client and the temp file, but `CommitTempFile` fails (e.g. rename fails due to permissions) +- **THEN** the error is logged, the temp file is cleaned up, and the client response is unaffected (the client already received the full response) + +#### Scenario: Disk write error does not affect client response +- **WHEN** a disk write error occurs while teeing the response to the temp file (e.g. disk full) +- **THEN** the client response continues uninterrupted, the cache write is abandoned, and the temp file is cleaned up + +### Requirement: Resilient writer isolates cache write failures from client responses + +The cache middleware SHALL use a resilient writer wrapper that lazily creates the temp file (via `CreateTempWriter`) on the first `Write()` call. This avoids creating temp files for connection-level failures where no response bytes are written. On any write error (including temp file creation failure), the wrapper MUST log the error, return `len(b), nil` for that write (not propagate the error), and mark itself as failed; all subsequent writes MUST also return `len(b), nil` without attempting the underlying write. The wrapper MUST expose a mechanism (e.g. a `failed` flag) for the post-response code to detect that the cache write failed, so the commit can be skipped. The wrapper MUST maintain a `bytesWritten` counter that is incremented only by the number of bytes actually written to the underlying temp file (i.e. the return value of the successful `os.File.Write` call); this counter is NOT incremented for discarded writes after failure. This counter serves as the "data was written" check in the post-response code (replacing the previous `len(rspBody.Bytes()) > 0` check). Note: the `len(b), nil` return value to `io.MultiWriter` is independent of the `bytesWritten` counter — the return value satisfies the `io.MultiWriter` contract, while the counter tracks actual bytes on disk. The wrapper MUST provide a way to close the underlying temp file after streaming completes and before commit or cleanup. + +#### Scenario: Temp file created lazily on first write +- **WHEN** the first `Write()` call is made to the resilient writer +- **THEN** the temp file is created via `CreateTempWriter` before writing data + +#### Scenario: No temp file created on connection failure +- **WHEN** the upstream request fails at the connection level and `ForwardProxy` returns an error before writing any response bytes +- **THEN** no temp file is created and no filesystem cleanup is needed + +#### Scenario: Write error caught and suppressed +- **WHEN** the underlying temp file write (or lazy creation) returns an error +- **THEN** the resilient writer logs the error, returns `len(b), nil` to the `io.MultiWriter` (since `io.MultiWriter` treats a short write as `io.ErrShortWrite`), and marks itself as failed + +#### Scenario: Subsequent writes discarded after failure +- **WHEN** a write error has previously occurred +- **THEN** all subsequent writes to the resilient writer are silently discarded (returning success) + +#### Scenario: Temp file closed after streaming completes +- **WHEN** `next(c)` returns (streaming is done) +- **THEN** the resilient writer's temp file (if created) is closed before the commit-or-cleanup decision + +#### Scenario: Commit skipped when writer has failed +- **WHEN** the response completes with status 200 but the resilient writer's failed flag is set +- **THEN** `CommitTempFile` is not called and the temp file is cleaned up + +#### Scenario: Commit skipped when no bytes were written +- **WHEN** the response completes with status 200 but the resilient writer's bytes-written count is zero +- **THEN** `CommitTempFile` is not called and the temp file is cleaned up + +### Requirement: Client-side writer isolates cache from client disconnects + +The cache middleware SHALL wrap the client-side `ResponseWriter` in the `io.MultiWriter` with a writer that absorbs write errors starting from the first failure. On any write error, the wrapper MUST return `len(b), nil` for that write (not propagate the error) and mark itself as failed; all subsequent writes MUST also return `len(b), nil` without attempting the underlying write. Combined with the cache-side `resilientWriter`, this ensures `io.Copy` in `ForwardProxy` reads the entire upstream response body to completion regardless of whether the client disconnects mid-download. + +#### Scenario: First write error absorbed silently +- **WHEN** a write to the client `ResponseWriter` fails for the first time +- **THEN** the client-side wrapper returns `len(b), nil` for that write (does not propagate the error) and marks itself as failed + +#### Scenario: Subsequent writes discarded after client failure +- **WHEN** the client-side wrapper has previously encountered a write error +- **THEN** all subsequent writes return `len(b), nil` without attempting the underlying write + +#### Scenario: Cache receives full response despite client disconnect +- **WHEN** the client disconnects mid-download but the upstream continues streaming +- **THEN** the `resilientWriter` (cache side) receives every byte from the upstream response body because `io.Copy` is not stopped by client-side write errors + +#### Scenario: Client-side wrapper does not suppress errors before first failure +- **WHEN** writes to the client `ResponseWriter` succeed +- **THEN** the client-side wrapper passes data through transparently with no overhead beyond the function call + +### Requirement: bufferWriter skips Flush and WriteHeader after client disconnect + +The `bufferWriter` struct SHALL hold a reference to the `safeWriter`. When `safeWriter.failed` is true, `bufferWriter.Flush()` and `bufferWriter.WriteHeader()` MUST be no-ops — they SHALL NOT call through to the underlying `ResponseWriter`. This prevents panics or errors from calling methods on a disconnected client's `ResponseWriter`. + +#### Scenario: Flush is a no-op after client disconnect +- **WHEN** the `safeWriter` has marked itself as failed (client disconnected) and `Flush()` is called on the `bufferWriter` +- **THEN** the call returns without invoking the underlying `ResponseWriter`'s `Flush()` + +#### Scenario: WriteHeader is a no-op after client disconnect +- **WHEN** the `safeWriter` has marked itself as failed (client disconnected) and `WriteHeader()` is called on the `bufferWriter` +- **THEN** the call returns without invoking the underlying `ResponseWriter`'s `WriteHeader()` + +#### Scenario: Flush works normally before client disconnect +- **WHEN** the `safeWriter` has not failed and `Flush()` is called on the `bufferWriter` +- **THEN** the call delegates to the underlying `ResponseWriter`'s `Flush()` as normal + +### Requirement: Cache middleware validates Content-Length before committing + +The cache middleware SHALL compare the resilient writer's bytes-written count against the `Content-Length` response header (forwarded from upstream via `allowedResponseHeaders`) before committing the temp file. Since both the client and cache writers absorb errors, `io.Copy` reads the entire upstream body — a Content-Length mismatch indicates the upstream connection was truncated (e.g. upstream server closed the connection early or a network interruption between the proxy and upstream). If `Content-Length` is present and the counts do not match, the commit MUST be skipped and the temp file cleaned up. If `Content-Length` is absent, this validation SHALL be skipped. + +#### Scenario: Complete download committed +- **WHEN** the upstream response has `Content-Length: 1000` and the resilient writer has written 1000 bytes +- **THEN** the temp file is committed to the cache + +#### Scenario: Truncated upstream response rejected +- **WHEN** the upstream response has `Content-Length: 1000` but the resilient writer has only written 500 bytes (e.g. upstream connection dropped mid-stream) +- **THEN** `CommitTempFile` is not called, the temp file is cleaned up, and a warning is logged + +#### Scenario: Oversized upstream response rejected +- **WHEN** the upstream response has `Content-Length: 1000` but the resilient writer has written more than 1000 bytes (e.g. buggy upstream) +- **THEN** `CommitTempFile` is not called, the temp file is cleaned up, and a warning is logged + +#### Scenario: Missing Content-Length skips validation +- **WHEN** the upstream response does not include a `Content-Length` header (e.g. chunked encoding) +- **THEN** the Content-Length validation is skipped and the commit proceeds based on the other checks (status 200, bytes written > 0, writer not failed) + +#### Scenario: Malformed Content-Length skips validation +- **WHEN** the upstream response includes a `Content-Length` header that cannot be parsed as an integer +- **THEN** the Content-Length validation is skipped and the commit proceeds based on the other checks (status 200, bytes written > 0, writer not failed) + +### Requirement: SaveToDisk remains functional + +The existing `SaveToDisk(uri string, buf *bytes.Buffer, mtime time.Time) error` method SHALL continue to work for callers that have an in-memory buffer. Its internal implementation MAY be refactored to use `CreateTempWriter` and `CommitTempFile`. + +#### Scenario: SaveToDisk still works with buffer input +- **WHEN** `SaveToDisk` is called with a URI, a `bytes.Buffer`, and a timestamp +- **THEN** the file is written to the cache at the correct path with the correct modification time, identical to current behavior diff --git a/openspec/changes/archive/2026-03-24-stream-cache-writes/tasks.md b/openspec/changes/archive/2026-03-24-stream-cache-writes/tasks.md new file mode 100644 index 0000000..37dc6f4 --- /dev/null +++ b/openspec/changes/archive/2026-03-24-stream-cache-writes/tasks.md @@ -0,0 +1,31 @@ +## 1. FileCache Interface & Implementation + +- [x] 1.1 Add `CreateTempWriter(uri string) (*os.File, error)` to the `FileCache` interface and implement it in `pkg/cache/cache.go` — creates parent dirs, validates path traversal, returns a temp file handle +- [x] 1.2 Add `CommitTempFile(tmpPath string, uri string, mtime time.Time) error` to the `FileCache` interface and implement it in `pkg/cache/cache.go` — sets mtime via `os.Chtimes`, renames temp file to final path +- [x] 1.3 Refactor `SaveToDisk` to use `CreateTempWriter` and `CommitTempFile` internally +- [x] 1.4 Add unit tests for `CreateTempWriter` (valid URI, missing parent dirs, path traversal rejection) +- [x] 1.5 Add unit tests for `CommitTempFile` (successful commit, `IsCached` returns true after commit, path traversal rejection) +- [x] 1.6 Verify existing `SaveToDisk` tests still pass + +## 2. Prerequisite: Forward Content-Length + +- [X] 2.0 Add `Content-Length` to `allowedResponseHeaders` in `pkg/pkgproxy/proxy.go` so the Cache middleware can read it after `next(c)` returns + +## 3. Cache Middleware Streaming + +- [x] 3.1 Implement a `resilientWriter` struct in `pkg/pkgproxy/` that lazily creates a temp file (via `CreateTempWriter`) on the first `Write()` call; on any write error (including creation failure), returns `len(b), nil` for that write, logs the error, and marks itself as failed so all subsequent writes are also discarded (returning `len(b), nil`); exposes a `failed` flag, tracks bytes written, and provides a `Close()` method for the underlying temp file +- [x] 3.2 Implement a `safeWriter` struct in `pkg/pkgproxy/` that wraps an `io.Writer` and absorbs write errors starting from the first failure — on any write error, returns `len(b), nil` for that write and marks itself as failed so all subsequent writes are also discarded (returning `len(b), nil`) — isolating the cache from client disconnects +- [x] 3.3 Modify the `Cache` middleware in `pkg/pkgproxy/proxy.go` to use the `resilientWriter` (cache side) and `safeWriter` (client side) instead of allocating a `bytes.Buffer` for cache-miss responses +- [x] 3.4 Wire the `io.MultiWriter` with both the `safeWriter`-wrapped client `ResponseWriter` and the `resilientWriter`, keeping the existing `bufferWriter` wrapper around the `io.MultiWriter`. Give `bufferWriter` a reference to the `safeWriter` so that `Flush()` and `WriteHeader()` become no-ops when `safeWriter.failed` is true +- [x] 3.5 After `next(c)` returns, close the `resilientWriter`'s temp file (if created), then defer its removal — after a successful commit the file has been renamed so the deferred remove is a harmless ENOENT; on any failure path the deferred remove ensures cleanup +- [x] 3.6 Replace the post-response `SaveToDisk` call with `CommitTempFile` (on status 200, `resilientWriter.bytesWritten > 0`, `resilientWriter.failed` is false, and Content-Length matches if present); log and continue on `CommitTempFile` errors (same as current `SaveToDisk` error handling) +- [x] 3.7 Add Content-Length validation: parse the response `Content-Length` header as an integer; if present and valid, compare against `resilientWriter.bytesWritten` before committing — skip commit and log warning on mismatch; if absent or non-numeric, skip validation +- [x] 3.8 Add unit tests for `resilientWriter` (lazy creation on first write, no temp file on zero writes, successful write, error caught and suppressed, subsequent writes discarded, bytes-written tracking, close) +- [x] 3.9 Add unit tests for `safeWriter` (passthrough on success, error absorbed after first failure, subsequent writes discarded) +- [x] 3.10 Update proxy unit tests to cover the streaming cache-write path (cache miss with 200, non-200 cleanup, connection error creates no temp file, disk write error does not affect client, client disconnect does not prevent caching, truncated upstream rejected via Content-Length mismatch, missing Content-Length skips validation) + +## 4. Validation + +- [x] 4.1 Run `make ci-check` — all linting, vulnerability checks, and tests pass +- [x] 4.2 Run `make e2e` — end-to-end tests pass with the new streaming path +- [x] 4.3 Update `CHANGELOG.md` unreleased section with the change diff --git a/openspec/specs/streaming-cache-write/spec.md b/openspec/specs/streaming-cache-write/spec.md new file mode 100644 index 0000000..66a24dd --- /dev/null +++ b/openspec/specs/streaming-cache-write/spec.md @@ -0,0 +1,161 @@ +## ADDED Requirements + +### Requirement: FileCache provides temp file creation for streaming writes + +The `FileCache` interface SHALL expose a `CreateTempWriter(uri string) (*os.File, error)` method that creates a temporary file in the correct cache subdirectory for the given URI. The method MUST create parent directories if they do not exist. The method MUST reject URIs that would resolve outside the cache base directory. + +#### Scenario: Temp file created in correct directory +- **WHEN** `CreateTempWriter` is called with a valid URI (e.g. `/fedora/releases/42/Everything/x86_64/os/Packages/k/kernel-6.12.rpm`) +- **THEN** a temporary file is created in the same directory where the final cached file would reside, and the file handle is returned + +#### Scenario: Parent directories created automatically +- **WHEN** `CreateTempWriter` is called with a URI whose parent directories do not yet exist in the cache +- **THEN** the required directories are created before the temp file is created + +#### Scenario: Path traversal rejected +- **WHEN** `CreateTempWriter` is called with a URI containing path traversal sequences (e.g. `/../../../etc/passwd`) +- **THEN** an error is returned and no file is created + +### Requirement: FileCache provides atomic commit of temp files into cache + +The `FileCache` interface SHALL expose a `CommitTempFile(tmpPath string, uri string, mtime time.Time) error` method that atomically moves a temp file to the final cache path for the given URI and sets the file modification time. + +#### Scenario: Successful commit +- **WHEN** `CommitTempFile` is called with a valid temp file path, a valid URI, and a timestamp +- **THEN** the temp file is renamed to the final cache path and the file's modification time is set to the given timestamp + +#### Scenario: File becomes visible to IsCached after commit +- **WHEN** `CommitTempFile` completes successfully for a given URI +- **THEN** `IsCached` returns `true` for that URI + +### Requirement: Cache middleware streams responses to disk instead of memory + +The cache middleware SHALL tee cache-miss responses to a temp file on disk instead of an in-memory `bytes.Buffer`. The response MUST be streamed simultaneously to the client and to the temp file via a resilient writer that lazily creates the temp file on the first write and absorbs disk write errors without affecting the client response. + +#### Scenario: Large package cached without memory spike +- **WHEN** a cache-miss request is received for a large cacheable file (e.g. 200 MB) +- **THEN** the response is streamed to both the client and a temp file, without accumulating the full response in memory + +#### Scenario: Successful response committed to cache +- **WHEN** the upstream returns status 200 for a cacheable file and streaming completes without cache write errors +- **THEN** the temp file is committed to the cache via `CommitTempFile` with the upstream `Last-Modified` timestamp (or current time if absent) + +#### Scenario: Non-200 response cleans up temp file +- **WHEN** the upstream returns a non-200 status for a cacheable file +- **THEN** the temp file is removed and nothing is committed to the cache + +#### Scenario: Upstream error does not create temp file +- **WHEN** the upstream request fails at the network/connection level (e.g. DNS failure, connection reset) and the `ForwardProxy` middleware returns an error before writing any response bytes +- **THEN** no temp file is created and nothing is committed to the cache + +#### Scenario: Client disconnect does not prevent caching +- **WHEN** a client disconnects mid-download of a cacheable file and the upstream response is still streaming +- **THEN** the upstream body continues to be read to completion, the response is fully written to the temp file, and (if all other conditions are met) the temp file is committed to the cache + +#### Scenario: CommitTempFile failure does not affect client response +- **WHEN** the response has been fully streamed to both the client and the temp file, but `CommitTempFile` fails (e.g. rename fails due to permissions) +- **THEN** the error is logged, the temp file is cleaned up, and the client response is unaffected (the client already received the full response) + +#### Scenario: Disk write error does not affect client response +- **WHEN** a disk write error occurs while teeing the response to the temp file (e.g. disk full) +- **THEN** the client response continues uninterrupted, the cache write is abandoned, and the temp file is cleaned up + +### Requirement: Resilient writer isolates cache write failures from client responses + +The cache middleware SHALL use a resilient writer wrapper that lazily creates the temp file (via `CreateTempWriter`) on the first `Write()` call. This avoids creating temp files for connection-level failures where no response bytes are written. On any write error (including temp file creation failure), the wrapper MUST log the error, return `len(b), nil` for that write (not propagate the error), and mark itself as failed; all subsequent writes MUST also return `len(b), nil` without attempting the underlying write. The wrapper MUST expose a mechanism (e.g. a `failed` flag) for the post-response code to detect that the cache write failed, so the commit can be skipped. The wrapper MUST maintain a `bytesWritten` counter that is incremented only by the number of bytes actually written to the underlying temp file (i.e. the return value of the successful `os.File.Write` call); this counter is NOT incremented for discarded writes after failure. This counter serves as the "data was written" check in the post-response code (replacing the previous `len(rspBody.Bytes()) > 0` check). Note: the `len(b), nil` return value to `io.MultiWriter` is independent of the `bytesWritten` counter — the return value satisfies the `io.MultiWriter` contract, while the counter tracks actual bytes on disk. The wrapper MUST provide a way to close the underlying temp file after streaming completes and before commit or cleanup. + +#### Scenario: Temp file created lazily on first write +- **WHEN** the first `Write()` call is made to the resilient writer +- **THEN** the temp file is created via `CreateTempWriter` before writing data + +#### Scenario: No temp file created on connection failure +- **WHEN** the upstream request fails at the connection level and `ForwardProxy` returns an error before writing any response bytes +- **THEN** no temp file is created and no filesystem cleanup is needed + +#### Scenario: Write error caught and suppressed +- **WHEN** the underlying temp file write (or lazy creation) returns an error +- **THEN** the resilient writer logs the error, returns `len(b), nil` to the `io.MultiWriter` (since `io.MultiWriter` treats a short write as `io.ErrShortWrite`), and marks itself as failed + +#### Scenario: Subsequent writes discarded after failure +- **WHEN** a write error has previously occurred +- **THEN** all subsequent writes to the resilient writer are silently discarded (returning success) + +#### Scenario: Temp file closed after streaming completes +- **WHEN** `next(c)` returns (streaming is done) +- **THEN** the resilient writer's temp file (if created) is closed before the commit-or-cleanup decision + +#### Scenario: Commit skipped when writer has failed +- **WHEN** the response completes with status 200 but the resilient writer's failed flag is set +- **THEN** `CommitTempFile` is not called and the temp file is cleaned up + +#### Scenario: Commit skipped when no bytes were written +- **WHEN** the response completes with status 200 but the resilient writer's bytes-written count is zero +- **THEN** `CommitTempFile` is not called and the temp file is cleaned up + +### Requirement: Client-side writer isolates cache from client disconnects + +The cache middleware SHALL wrap the client-side `ResponseWriter` in the `io.MultiWriter` with a writer that absorbs write errors starting from the first failure. On any write error, the wrapper MUST return `len(b), nil` for that write (not propagate the error) and mark itself as failed; all subsequent writes MUST also return `len(b), nil` without attempting the underlying write. Combined with the cache-side `resilientWriter`, this ensures `io.Copy` in `ForwardProxy` reads the entire upstream response body to completion regardless of whether the client disconnects mid-download. + +#### Scenario: First write error absorbed silently +- **WHEN** a write to the client `ResponseWriter` fails for the first time +- **THEN** the client-side wrapper returns `len(b), nil` for that write (does not propagate the error) and marks itself as failed + +#### Scenario: Subsequent writes discarded after client failure +- **WHEN** the client-side wrapper has previously encountered a write error +- **THEN** all subsequent writes return `len(b), nil` without attempting the underlying write + +#### Scenario: Cache receives full response despite client disconnect +- **WHEN** the client disconnects mid-download but the upstream continues streaming +- **THEN** the `resilientWriter` (cache side) receives every byte from the upstream response body because `io.Copy` is not stopped by client-side write errors + +#### Scenario: Client-side wrapper does not suppress errors before first failure +- **WHEN** writes to the client `ResponseWriter` succeed +- **THEN** the client-side wrapper passes data through transparently with no overhead beyond the function call + +### Requirement: bufferWriter skips Flush and WriteHeader after client disconnect + +The `bufferWriter` struct SHALL hold a reference to the `safeWriter`. When `safeWriter.failed` is true, `bufferWriter.Flush()` and `bufferWriter.WriteHeader()` MUST be no-ops — they SHALL NOT call through to the underlying `ResponseWriter`. This prevents panics or errors from calling methods on a disconnected client's `ResponseWriter`. + +#### Scenario: Flush is a no-op after client disconnect +- **WHEN** the `safeWriter` has marked itself as failed (client disconnected) and `Flush()` is called on the `bufferWriter` +- **THEN** the call returns without invoking the underlying `ResponseWriter`'s `Flush()` + +#### Scenario: WriteHeader is a no-op after client disconnect +- **WHEN** the `safeWriter` has marked itself as failed (client disconnected) and `WriteHeader()` is called on the `bufferWriter` +- **THEN** the call returns without invoking the underlying `ResponseWriter`'s `WriteHeader()` + +#### Scenario: Flush works normally before client disconnect +- **WHEN** the `safeWriter` has not failed and `Flush()` is called on the `bufferWriter` +- **THEN** the call delegates to the underlying `ResponseWriter`'s `Flush()` as normal + +### Requirement: Cache middleware validates Content-Length before committing + +The cache middleware SHALL compare the resilient writer's bytes-written count against the `Content-Length` response header (forwarded from upstream via `allowedResponseHeaders`) before committing the temp file. Since both the client and cache writers absorb errors, `io.Copy` reads the entire upstream body — a Content-Length mismatch indicates the upstream connection was truncated (e.g. upstream server closed the connection early or a network interruption between the proxy and upstream). If `Content-Length` is present and the counts do not match, the commit MUST be skipped and the temp file cleaned up. If `Content-Length` is absent, this validation SHALL be skipped. + +#### Scenario: Complete download committed +- **WHEN** the upstream response has `Content-Length: 1000` and the resilient writer has written 1000 bytes +- **THEN** the temp file is committed to the cache + +#### Scenario: Truncated upstream response rejected +- **WHEN** the upstream response has `Content-Length: 1000` but the resilient writer has only written 500 bytes (e.g. upstream connection dropped mid-stream) +- **THEN** `CommitTempFile` is not called, the temp file is cleaned up, and a warning is logged + +#### Scenario: Oversized upstream response rejected +- **WHEN** the upstream response has `Content-Length: 1000` but the resilient writer has written more than 1000 bytes (e.g. buggy upstream) +- **THEN** `CommitTempFile` is not called, the temp file is cleaned up, and a warning is logged + +#### Scenario: Missing Content-Length skips validation +- **WHEN** the upstream response does not include a `Content-Length` header (e.g. chunked encoding) +- **THEN** the Content-Length validation is skipped and the commit proceeds based on the other checks (status 200, bytes written > 0, writer not failed) + +#### Scenario: Malformed Content-Length skips validation +- **WHEN** the upstream response includes a `Content-Length` header that cannot be parsed as an integer +- **THEN** the Content-Length validation is skipped and the commit proceeds based on the other checks (status 200, bytes written > 0, writer not failed) + +### Requirement: SaveToDisk remains functional + +The existing `SaveToDisk(uri string, buf *bytes.Buffer, mtime time.Time) error` method SHALL continue to work for callers that have an in-memory buffer. Its internal implementation MAY be refactored to use `CreateTempWriter` and `CommitTempFile`. + +#### Scenario: SaveToDisk still works with buffer input +- **WHEN** `SaveToDisk` is called with a URI, a `bytes.Buffer`, and a timestamp +- **THEN** the file is written to the cache at the correct path with the correct modification time, identical to current behavior diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 1745e0c..5fad759 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -16,6 +16,16 @@ import ( ) type FileCache interface { + // Create a temp file in the cache directory for the given URI, suitable + // for streaming writes. Creates parent directories and validates path + // traversal. The caller is responsible for closing the returned file. + CreateTempWriter(uri string) (*os.File, error) + + // Atomically commit a temp file into the cache by setting its mtime + // and renaming it to the final path for the given URI. Trusts that + // the URI was already validated by CreateTempWriter. + CommitTempFile(tmpPath string, uri string, mtime time.Time) error + // Remove cached file for given URL DeleteFile(string) error @@ -118,26 +128,55 @@ func (c *cache) IsCached(uri string) bool { return err == nil } -// Saves buffer to file -func (c *cache) SaveToDisk(uri string, buffer *bytes.Buffer, fileTime time.Time) error { +// CreateTempWriter creates a temporary file in the correct cache subdirectory +// for the given URI, creating parent directories as needed. +func (c *cache) CreateTempWriter(uri string) (*os.File, error) { filePath, err := c.resolvedFilePath(uri) if err != nil { - return err + return nil, err } - if _, err := os.Stat(filepath.Dir(filePath)); errors.Is(err, os.ErrNotExist) { - if err := os.MkdirAll(filepath.Dir(filePath), 0o750); err != nil { - return err + dir := filepath.Dir(filePath) + if _, err := os.Stat(dir); errors.Is(err, os.ErrNotExist) { + if err := os.MkdirAll(dir, 0o750); err != nil { + return nil, err } } - tmpFile, err := os.CreateTemp(filepath.Dir(filePath), "*.tmp") + return os.CreateTemp(dir, "*.tmp") +} + +// CommitTempFile atomically moves a temp file to the final cache path for the +// given URI and sets the file modification time. It trusts that the URI was +// already validated by CreateTempWriter. +func (c *cache) CommitTempFile(tmpPath string, uri string, mtime time.Time) error { + filePath, err := c.resolvedFilePath(uri) + if err != nil { + return err + } + + info, err := os.Stat(tmpPath) + if err != nil { + return err + } + + if err := os.Chtimes(tmpPath, time.Now().Local(), mtime); err != nil { + return err + } + + slog.Info("cache write", "path", filePath, "bytes", info.Size()) + return os.Rename(tmpPath, filePath) +} + +// Saves buffer to file +func (c *cache) SaveToDisk(uri string, buffer *bytes.Buffer, fileTime time.Time) error { + tmpFile, err := c.CreateTempWriter(uri) if err != nil { return err } tmpPath := tmpFile.Name() - size, err := tmpFile.ReadFrom(buffer) + _, err = tmpFile.ReadFrom(buffer) closeErr := tmpFile.Close() if err != nil { _ = os.Remove(tmpPath) @@ -147,16 +186,8 @@ func (c *cache) SaveToDisk(uri string, buffer *bytes.Buffer, fileTime time.Time) _ = os.Remove(tmpPath) return closeErr } - slog.Info("cache write", "path", filePath, "bytes", size) - - // set modified time to given timestamp - if err := os.Chtimes(tmpPath, time.Now().Local(), fileTime); err != nil { - _ = os.Remove(tmpPath) - return err - } - // atomically move into place so IsCached never observes a partial file - if err := os.Rename(tmpPath, filePath); err != nil { + if err := c.CommitTempFile(tmpPath, uri, fileTime); err != nil { _ = os.Remove(tmpPath) return err } diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index 36253d0..54898fa 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -3,9 +3,14 @@ package cache import ( + "bytes" + "os" + "path/filepath" "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestCacheNew(t *testing.T) { @@ -89,3 +94,117 @@ func TestResolvedFilePath(t *testing.T) { }) } } + +func TestCreateTempWriter(t *testing.T) { + t.Run("valid URI creates temp file", func(t *testing.T) { + baseDir := t.TempDir() + c := New(&CacheConfig{BasePath: baseDir}) + + f, err := c.CreateTempWriter("/myrepo/subdir/package.rpm") + require.NoError(t, err) + defer f.Close() + defer os.Remove(f.Name()) + + // Temp file should exist in the correct directory + assert.DirExists(t, filepath.Join(baseDir, "myrepo", "subdir")) + assert.FileExists(t, f.Name()) + assert.Equal(t, filepath.Join(baseDir, "myrepo", "subdir"), filepath.Dir(f.Name())) + }) + + t.Run("missing parent dirs created", func(t *testing.T) { + baseDir := t.TempDir() + c := New(&CacheConfig{BasePath: baseDir}) + + f, err := c.CreateTempWriter("/deep/nested/path/package.rpm") + require.NoError(t, err) + defer f.Close() + defer os.Remove(f.Name()) + + assert.DirExists(t, filepath.Join(baseDir, "deep", "nested", "path")) + }) + + t.Run("path traversal rejected", func(t *testing.T) { + baseDir := t.TempDir() + c := New(&CacheConfig{BasePath: baseDir}) + + f, err := c.CreateTempWriter("/../../../etc/passwd") + assert.Error(t, err) + assert.Nil(t, f) + }) +} + +func TestCommitTempFile(t *testing.T) { + t.Run("successful commit", func(t *testing.T) { + baseDir := t.TempDir() + c := New(&CacheConfig{BasePath: baseDir}) + + // Create a temp file via CreateTempWriter + f, err := c.CreateTempWriter("/myrepo/package.rpm") + require.NoError(t, err) + _, err = f.WriteString("test content") + require.NoError(t, err) + tmpPath := f.Name() + require.NoError(t, f.Close()) + + mtime := time.Date(2025, 1, 15, 10, 0, 0, 0, time.UTC) + err = c.CommitTempFile(tmpPath, "/myrepo/package.rpm", mtime) + require.NoError(t, err) + + // Final file should exist with correct content and mtime + finalPath := filepath.Join(baseDir, "myrepo", "package.rpm") + data, err := os.ReadFile(finalPath) + require.NoError(t, err) + assert.Equal(t, "test content", string(data)) + + info, err := os.Stat(finalPath) + require.NoError(t, err) + assert.Equal(t, mtime, info.ModTime().UTC()) + }) + + t.Run("IsCached returns true after commit", func(t *testing.T) { + baseDir := t.TempDir() + c := New(&CacheConfig{BasePath: baseDir}) + + assert.False(t, c.IsCached("/myrepo/package.rpm")) + + f, err := c.CreateTempWriter("/myrepo/package.rpm") + require.NoError(t, err) + _, err = f.WriteString("data") + require.NoError(t, err) + tmpPath := f.Name() + require.NoError(t, f.Close()) + + require.NoError(t, c.CommitTempFile(tmpPath, "/myrepo/package.rpm", time.Now())) + assert.True(t, c.IsCached("/myrepo/package.rpm")) + }) + + t.Run("path traversal rejected", func(t *testing.T) { + baseDir := t.TempDir() + c := New(&CacheConfig{BasePath: baseDir}) + + err := c.CommitTempFile("/tmp/fake.tmp", "/../../../etc/passwd", time.Now()) + assert.Error(t, err) + }) +} + +func TestSaveToDiskStillWorks(t *testing.T) { + baseDir := t.TempDir() + c := New(&CacheConfig{BasePath: baseDir}) + + buf := bytes.NewBufferString("buffered content") + mtime := time.Date(2025, 6, 1, 12, 0, 0, 0, time.UTC) + + err := c.SaveToDisk("/myrepo/path/package.rpm", buf, mtime) + require.NoError(t, err) + + finalPath := filepath.Join(baseDir, "myrepo", "path", "package.rpm") + data, err := os.ReadFile(finalPath) + require.NoError(t, err) + assert.Equal(t, "buffered content", string(data)) + + info, err := os.Stat(finalPath) + require.NoError(t, err) + assert.Equal(t, mtime, info.ModTime().UTC()) + + assert.True(t, c.IsCached("/myrepo/path/package.rpm")) +} diff --git a/pkg/pkgproxy/bufferwriter.go b/pkg/pkgproxy/bufferwriter.go index 4791b49..20b6c4e 100644 --- a/pkg/pkgproxy/bufferwriter.go +++ b/pkg/pkgproxy/bufferwriter.go @@ -10,9 +10,13 @@ import ( type bufferWriter struct { io.Writer http.ResponseWriter + safe *safeWriter } func (w *bufferWriter) WriteHeader(code int) { + if w.safe != nil && w.safe.failed { + return + } w.ResponseWriter.WriteHeader(code) } @@ -21,6 +25,9 @@ func (w *bufferWriter) Write(b []byte) (int, error) { } func (w *bufferWriter) Flush() { + if w.safe != nil && w.safe.failed { + return + } w.ResponseWriter.(http.Flusher).Flush() } diff --git a/pkg/pkgproxy/proxy.go b/pkg/pkgproxy/proxy.go index 7542cef..abd9f98 100644 --- a/pkg/pkgproxy/proxy.go +++ b/pkg/pkgproxy/proxy.go @@ -13,6 +13,7 @@ import ( "os" "path" "path/filepath" + "strconv" "strings" "time" @@ -82,6 +83,7 @@ var ( "Allow", "Content-Encoding", "Content-Language", + "Content-Length", "Content-Type", "Cache-Control", "Date", @@ -150,7 +152,7 @@ func New(config *PkgProxyConfig) PkgProxy { func (pp *pkgProxy) Cache(next echo.HandlerFunc) echo.HandlerFunc { return func(c *echo.Context) error { var repoCache cache.FileCache - var rspBody *bytes.Buffer + var rw *resilientWriter // the request URI might be changed later, keep the original value uri := strings.Clone(c.Request().RequestURI) @@ -184,34 +186,77 @@ func (pp *pkgProxy) Cache(next echo.HandlerFunc) echo.HandlerFunc { if c.Request().Method == "DELETE" { return c.JSON(http.StatusNotFound, map[string]string{"message": "Not Found"}) } - // if not in cache write response body to buffer - rspBody = new(bytes.Buffer) + // Stream response to both client and cache temp file + rw = newResilientWriter(repoCache, uri) if resp, _ := echo.UnwrapResponse(c.Response()); resp != nil { - bodyWriter := io.MultiWriter(resp.ResponseWriter, rspBody) + sw := newSafeWriter(resp.ResponseWriter) + bodyWriter := io.MultiWriter(sw, rw) writer := &bufferWriter{ Writer: bodyWriter, - ResponseWriter: resp.ResponseWriter} + ResponseWriter: resp.ResponseWriter, + safe: sw, + } resp.ResponseWriter = writer } } } } + // Ensure temp file cleanup runs regardless of next(c) outcome. + // The error handler middleware above Cache may write an error response + // through the wrapped ResponseWriter after Cache returns, so we need + // cleanup even on error paths. + if rw != nil { + defer func() { + // Disable prevents writes from error handlers that run + // after Cache returns (e.g. the 502 JSON body). + rw.Disable() + // Close before Remove to avoid leaking file descriptors and + // to ensure removal succeeds on all platforms. + _ = rw.Close() + if tmpPath := rw.TmpPath(); tmpPath != "" { + _ = os.Remove(tmpPath) + } + }() + } + if err := next(c); err != nil { return err } - if pp.isRepositoryRequest(uri) { + if pp.isRepositoryRequest(uri) && rw != nil { + // Close temp file before commit or cleanup. A close error means the + // file may not have been fully flushed, so skip the commit. + if err := rw.Close(); err != nil { + slog.Error("cache temp file close failed", "request_id", requestID(c), "uri", uri, "error", err) + rw.failed = true + } + resp, _ := echo.UnwrapResponse(c.Response()) - if repoCache.IsCacheCandidate(uri) && !repoCache.IsCached(uri) && resp != nil && (resp.Status == 200) && len(rspBody.Bytes()) > 0 { - timestamp := time.Now().Local() - if c.Response().Header().Get("Last-Modified") != "" { - timestamp, _ = http.ParseTime(c.Response().Header().Get("Last-Modified")) + if repoCache.IsCacheCandidate(uri) && !repoCache.IsCached(uri) && resp != nil && resp.Status == 200 && rw.bytesWritten > 0 && !rw.failed { + // Content-Length validation + commitOK := true + if clHeader := c.Response().Header().Get("Content-Length"); clHeader != "" { + if expectedLen, err := strconv.ParseInt(clHeader, 10, 64); err == nil { + if rw.bytesWritten != expectedLen { + slog.Warn("cache write skipped: Content-Length mismatch", + "request_id", requestID(c), "uri", uri, + "expected", expectedLen, "actual", rw.bytesWritten) + commitOK = false + } + } } - // save buffer to disk - if err := repoCache.SaveToDisk(uri, rspBody, timestamp); err != nil { - // don't fail request if we cannot write to cache - slog.Error("cache write failed", "request_id", requestID(c), "uri", uri, "error", err) + + if commitOK { + timestamp := time.Now().Local() + if c.Response().Header().Get("Last-Modified") != "" { + timestamp, _ = http.ParseTime(c.Response().Header().Get("Last-Modified")) + } + // CommitTempFile renames the file; the deferred Remove becomes a harmless ENOENT + if err := repoCache.CommitTempFile(rw.TmpPath(), uri, timestamp); err != nil { + // don't fail request if we cannot write to cache + slog.Error("cache commit failed", "request_id", requestID(c), "uri", uri, "error", err) + } } } } diff --git a/pkg/pkgproxy/proxy_test.go b/pkg/pkgproxy/proxy_test.go index 8ed6b5e..81753f0 100644 --- a/pkg/pkgproxy/proxy_test.go +++ b/pkg/pkgproxy/proxy_test.go @@ -606,6 +606,149 @@ func TestForwardProxyQueryStringPreserved(t *testing.T) { assert.Equal(t, "age=300&arch=x86_64", receivedRawQuery) } +// --- Streaming cache-write tests --- + +func TestCacheMissNon200Cleanup(t *testing.T) { + // Upstream returns 404 — no file should be cached + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotFound) + fmt.Fprint(w, "not found") + })) + defer upstream.Close() + + pp, cacheDir := newTestProxy(t, []string{upstream.URL + "/"}) + app := newTestApp(pp) + + req := httptest.NewRequest(http.MethodGet, "/testrepo/path/package.rpm", nil) + rec := httptest.NewRecorder() + app.ServeHTTP(rec, req) + + assert.Equal(t, http.StatusNotFound, rec.Code) + + // No temp files or cached files should exist + cachedPath := filepath.Join(cacheDir, "testrepo", "path", "package.rpm") + _, err := os.Stat(cachedPath) + assert.True(t, os.IsNotExist(err)) +} + +func TestCacheConnectionErrorNoTempFile(t *testing.T) { + // Bind to a free port then close it — connection will fail + l, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + addr := l.Addr().String() + l.Close() + + pp, cacheDir := newTestProxy(t, []string{"http://" + addr + "/"}) + app := newTestApp(pp) + + req := httptest.NewRequest(http.MethodGet, "/testrepo/path/package.rpm", nil) + rec := httptest.NewRecorder() + app.ServeHTTP(rec, req) + + assert.Equal(t, http.StatusBadGateway, rec.Code) + + // No temp files should be created + entries, err := os.ReadDir(filepath.Join(cacheDir, "testrepo", "path")) + if err != nil { + assert.True(t, os.IsNotExist(err), "unexpected ReadDir error: %v", err) + } else { + assert.Empty(t, entries) + } +} + +func TestCacheContentLengthMismatchRejectsCommit(t *testing.T) { + // Upstream claims Content-Length: 100 but only sends 5 bytes + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/octet-stream") + w.Header().Set("Content-Length", "100") + // Bypass Go's automatic Content-Length by hijacking + // Actually, just write less — Go's httptest won't enforce CL on server side + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, "short") + })) + defer upstream.Close() + + pp, cacheDir := newTestProxy(t, []string{upstream.URL + "/"}) + app := newTestApp(pp) + + req := httptest.NewRequest(http.MethodGet, "/testrepo/path/package.rpm", nil) + rec := httptest.NewRecorder() + app.ServeHTTP(rec, req) + + assert.Equal(t, http.StatusOK, rec.Code) + + // File should NOT be cached due to Content-Length mismatch + cachedPath := filepath.Join(cacheDir, "testrepo", "path", "package.rpm") + _, err := os.Stat(cachedPath) + assert.True(t, os.IsNotExist(err)) +} + +func TestCacheMissingContentLengthSkipsValidation(t *testing.T) { + // Upstream sends no Content-Length — should still cache + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/octet-stream") + w.Header().Set("Transfer-Encoding", "chunked") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, "chunked-data") + })) + defer upstream.Close() + + pp, cacheDir := newTestProxy(t, []string{upstream.URL + "/"}) + app := newTestApp(pp) + + req := httptest.NewRequest(http.MethodGet, "/testrepo/path/package.rpm", nil) + rec := httptest.NewRecorder() + app.ServeHTTP(rec, req) + + assert.Equal(t, http.StatusOK, rec.Code) + + // File should be cached since there's no Content-Length to validate + cachedPath := filepath.Join(cacheDir, "testrepo", "path", "package.rpm") + data, err := os.ReadFile(cachedPath) + require.NoError(t, err) + assert.Equal(t, "chunked-data", string(data)) +} + +func TestCacheDiskWriteErrorDoesNotAffectClient(t *testing.T) { + upstreamBody := "upstream-content-for-client" + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/octet-stream") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, upstreamBody) + })) + defer upstream.Close() + + // Use a read-only cache dir to trigger disk write failure + cacheDir := t.TempDir() + repoConfig := &RepoConfig{ + Repositories: map[string]Repository{ + "testrepo": { + CacheSuffixes: []string{".rpm"}, + Mirrors: []string{upstream.URL + "/"}, + }, + }, + } + pp := New(&PkgProxyConfig{ + CacheBasePath: filepath.Join(cacheDir, "readonly"), + RepositoryConfig: repoConfig, + }) + // Make the cache base path read-only + require.NoError(t, os.MkdirAll(filepath.Join(cacheDir, "readonly"), 0o555)) + + app := newTestApp(pp) + + req := httptest.NewRequest(http.MethodGet, "/testrepo/path/package.rpm", nil) + rec := httptest.NewRecorder() + app.ServeHTTP(rec, req) + + // Client should still receive the response despite cache write failure + assert.Equal(t, http.StatusOK, rec.Code) + assert.Equal(t, upstreamBody, rec.Body.String()) + + // Restore permissions for cleanup + os.Chmod(filepath.Join(cacheDir, "readonly"), 0o755) +} + // --- httpbin.org tests (gated by environment variable) --- func TestForwardProxyWithHttpbin(t *testing.T) { diff --git a/pkg/pkgproxy/writers.go b/pkg/pkgproxy/writers.go new file mode 100644 index 0000000..7746d31 --- /dev/null +++ b/pkg/pkgproxy/writers.go @@ -0,0 +1,99 @@ +package pkgproxy + +import ( + "io" + "log/slog" + "os" + + "github.com/ganto/pkgproxy/pkg/cache" +) + +// resilientWriter lazily creates a temp file on the first Write() call and +// absorbs all disk write errors without propagating them. On any error +// (including temp file creation failure), it returns len(b), nil for that +// and all subsequent writes, satisfying io.MultiWriter's short-write check. +type resilientWriter struct { + fc cache.FileCache + uri string + file *os.File + failed bool + bytesWritten int64 +} + +func newResilientWriter(fc cache.FileCache, uri string) *resilientWriter { + return &resilientWriter{fc: fc, uri: uri} +} + +func (w *resilientWriter) Write(b []byte) (int, error) { + if w.failed { + return len(b), nil + } + + // Lazy creation of temp file on first write + if w.file == nil { + f, err := w.fc.CreateTempWriter(w.uri) + if err != nil { + slog.Error("cache temp file creation failed", "uri", w.uri, "error", err) + w.failed = true + return len(b), nil + } + w.file = f + } + + n, err := w.file.Write(b) + w.bytesWritten += int64(n) + if err != nil { + slog.Error("cache write failed", "uri", w.uri, "error", err) + w.failed = true + return len(b), nil + } + return len(b), nil +} + +// Close closes the underlying temp file if it was created. +func (w *resilientWriter) Close() error { + if w.file != nil { + return w.file.Close() + } + return nil +} + +// Disable marks the writer as failed so any subsequent writes (e.g. from +// an error handler running after the Cache middleware returns) are silently +// discarded without creating temp files. +func (w *resilientWriter) Disable() { + w.failed = true +} + +// TmpPath returns the path of the temp file, or empty string if not created. +func (w *resilientWriter) TmpPath() string { + if w.file != nil { + return w.file.Name() + } + return "" +} + +// safeWriter wraps an io.Writer and absorbs write errors starting from the +// first failure. After the first error, all subsequent writes return +// len(b), nil without attempting the underlying write. +type safeWriter struct { + inner io.Writer + failed bool +} + +func newSafeWriter(w io.Writer) *safeWriter { + return &safeWriter{inner: w} +} + +func (w *safeWriter) Write(b []byte) (int, error) { + if w.failed { + return len(b), nil + } + + n, writeErr := w.inner.Write(b) + if writeErr != nil { + w.failed = true + return len(b), nil //nolint:nilerr // intentionally absorbing write errors + } + return n, nil +} diff --git a/pkg/pkgproxy/writers_test.go b/pkg/pkgproxy/writers_test.go new file mode 100644 index 0000000..a1087a2 --- /dev/null +++ b/pkg/pkgproxy/writers_test.go @@ -0,0 +1,181 @@ +package pkgproxy + +import ( + "errors" + "os" + "testing" + + "github.com/ganto/pkgproxy/pkg/cache" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// --- resilientWriter tests --- + +func TestResilientWriterLazyCreation(t *testing.T) { + fc := cache.New(&cache.CacheConfig{ + BasePath: t.TempDir(), + FileSuffixes: []string{".rpm"}, + }) + rw := newResilientWriter(fc, "/repo/package.rpm") + + // Before any write, no temp file should exist + assert.Empty(t, rw.TmpPath()) + assert.False(t, rw.failed) + assert.Equal(t, int64(0), rw.bytesWritten) + + // First write triggers lazy creation + n, err := rw.Write([]byte("hello")) + assert.NoError(t, err) + assert.Equal(t, 5, n) + assert.NotEmpty(t, rw.TmpPath()) + assert.False(t, rw.failed) + assert.Equal(t, int64(5), rw.bytesWritten) + + require.NoError(t, rw.Close()) + defer os.Remove(rw.TmpPath()) +} + +func TestResilientWriterNoTempFileOnZeroWrites(t *testing.T) { + fc := cache.New(&cache.CacheConfig{ + BasePath: t.TempDir(), + FileSuffixes: []string{".rpm"}, + }) + rw := newResilientWriter(fc, "/repo/package.rpm") + + assert.Empty(t, rw.TmpPath()) + assert.NoError(t, rw.Close()) +} + +func TestResilientWriterSuccessfulWrite(t *testing.T) { + fc := cache.New(&cache.CacheConfig{ + BasePath: t.TempDir(), + FileSuffixes: []string{".rpm"}, + }) + rw := newResilientWriter(fc, "/repo/package.rpm") + + _, _ = rw.Write([]byte("part1")) + _, _ = rw.Write([]byte("part2")) + + assert.False(t, rw.failed) + assert.Equal(t, int64(10), rw.bytesWritten) + + require.NoError(t, rw.Close()) + defer os.Remove(rw.TmpPath()) + + data, err := os.ReadFile(rw.TmpPath()) + require.NoError(t, err) + assert.Equal(t, "part1part2", string(data)) +} + +func TestResilientWriterCreationError(t *testing.T) { + // Use an invalid base path to trigger creation failure + fc := cache.New(&cache.CacheConfig{ + BasePath: "/nonexistent/path/that/cannot/exist", + FileSuffixes: []string{".rpm"}, + }) + rw := newResilientWriter(fc, "/repo/package.rpm") + + n, err := rw.Write([]byte("data")) + assert.NoError(t, err) + assert.Equal(t, 4, n) + assert.True(t, rw.failed) + assert.Equal(t, int64(0), rw.bytesWritten) +} + +func TestResilientWriterSubsequentWritesDiscarded(t *testing.T) { + // Use an invalid base path to trigger failure on first write + fc := cache.New(&cache.CacheConfig{ + BasePath: "/nonexistent/path", + FileSuffixes: []string{".rpm"}, + }) + rw := newResilientWriter(fc, "/repo/package.rpm") + + // First write fails (creation error) + n1, err1 := rw.Write([]byte("first")) + assert.NoError(t, err1) + assert.Equal(t, 5, n1) + assert.True(t, rw.failed) + + // Subsequent writes are silently discarded + n2, err2 := rw.Write([]byte("second")) + assert.NoError(t, err2) + assert.Equal(t, 6, n2) + assert.Equal(t, int64(0), rw.bytesWritten) +} + +func TestResilientWriterBytesWrittenTracking(t *testing.T) { + fc := cache.New(&cache.CacheConfig{ + BasePath: t.TempDir(), + FileSuffixes: []string{".rpm"}, + }) + rw := newResilientWriter(fc, "/repo/package.rpm") + + rw.Write([]byte("abc")) + assert.Equal(t, int64(3), rw.bytesWritten) + + rw.Write([]byte("defgh")) + assert.Equal(t, int64(8), rw.bytesWritten) + + require.NoError(t, rw.Close()) + os.Remove(rw.TmpPath()) +} + +// --- safeWriter tests --- + +type errWriter struct { + failAfter int + writes int +} + +func (w *errWriter) Write(b []byte) (int, error) { + w.writes++ + if w.writes > w.failAfter { + return 0, errors.New("write error") + } + return len(b), nil +} + +func TestSafeWriterPassthrough(t *testing.T) { + inner := &errWriter{failAfter: 100} + sw := newSafeWriter(inner) + + n, err := sw.Write([]byte("hello")) + assert.NoError(t, err) + assert.Equal(t, 5, n) + assert.False(t, sw.failed) + assert.Equal(t, 1, inner.writes) +} + +func TestSafeWriterErrorAbsorbed(t *testing.T) { + inner := &errWriter{failAfter: 0} // fail on first write + sw := newSafeWriter(inner) + + n, err := sw.Write([]byte("hello")) + assert.NoError(t, err) + assert.Equal(t, 5, n) + assert.True(t, sw.failed) +} + +func TestSafeWriterSubsequentWritesDiscarded(t *testing.T) { + inner := &errWriter{failAfter: 1} // succeed first, fail second + sw := newSafeWriter(inner) + + // First write succeeds + n1, err1 := sw.Write([]byte("first")) + assert.NoError(t, err1) + assert.Equal(t, 5, n1) + assert.False(t, sw.failed) + + // Second write fails and is absorbed + n2, err2 := sw.Write([]byte("second")) + assert.NoError(t, err2) + assert.Equal(t, 6, n2) + assert.True(t, sw.failed) + + // Third write is discarded without calling inner + n3, err3 := sw.Write([]byte("third")) + assert.NoError(t, err3) + assert.Equal(t, 5, n3) + assert.Equal(t, 2, inner.writes) // only 2 actual writes to inner +}