feat: implement WebSocket streaming protocol for large payloads #2256#3346
feat: implement WebSocket streaming protocol for large payloads #2256#3346iduartgomez merged 18 commits intomainfrom
Conversation
🔍 Rule Review: WebSocket streaming protocol (PR #3346)Rules checked: CriticalNo critical findings. WarningsNo warning-level findings. Info
Notes (not rule violations):
Automated review against |
This comment was marked as outdated.
This comment was marked as outdated.
79c11a4 to
b0db600
Compare
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
425e84c to
a19fd85
Compare
a19fd85 to
8dc5d40
Compare
The rebase on main introduced a duplicate StreamChunk reassembly block (from main's a085374) alongside the PR's existing conn_state-based reassembly. Remove the duplicate and its unused reassembly_buffer variable.
ReassemblyBuffer in freenet-stdlib 0.2.2 already evicts incomplete streams after STREAM_TTL (60s) via evict_stale() on every receive_chunk() call. Update the comment to reflect this. Addresses rule-review warning about unbounded cleanup exemptions.
freenet-stdlib 0.2.2+ automatically chunks large ClientRequest messages (>512 KiB) on the client side regardless of whether streaming=true was passed in the WebSocket URL. The server was only reassembling StreamChunk when conn_state.streaming was true, causing non-streaming clients' large PUT requests to pass through as unrecognized StreamChunk variants and never get processed — resulting in 60s timeouts. The streaming flag should only control whether the server sends chunked responses, not whether it can receive chunked requests.
- Reassembly errors now send an error response to the client instead of killing the connection. A single malformed chunk (duplicate, out-of-range) no longer terminates the entire WebSocket session. - Restore cooperative yielding: send_response_message now yields to the tokio runtime every MAX_CHUNKS_PER_BATCH (64) chunks, preventing starvation of pings, subscriptions, and other select arms. - Avoid materializing all chunks in memory: send_response_message now serializes and sends each chunk individually instead of collecting all into a Vec<Vec<u8>> first, reducing peak memory by ~50% for large payloads. - Extract duplicated stream_content matching into extract_stream_content() helper (was copy-pasted in 2 places).
New tests covering review gaps: - NodeUnavailable encoding: verify Native path produces valid bincode and FBS path produces non-bincode bytes (encoding dispatch bugfix) - Non-streaming StreamChunk reassembly: verify streaming=false still reassembles chunked requests (freenet-stdlib always chunks large reqs) - FBS chunked validation: verify FBS chunks are non-empty, have overhead, and are not accidentally bincode-encoded - stream_id wrapping: verify u32::MAX wraps to 0 - extract_stream_content: verify GetResponse produces Some, others None
Problem
The WebSocket interface between clients and the Freenet node used a single monolithic message frame for all payloads, requiring a 100 MB
max_message_sizelimit to handle large WASM contract uploads. This caused excessive memory pressure on both client and server, since the entire payload had to be buffered in a single WebSocket frame before processing could begin.Solution
Introduce a streaming protocol for WebSocket connections, using the chunking primitives from
freenet-stdlib 0.2.0. Clients opt in via a?streaming=truequery parameter; non-streaming clients are completely unaffected.How it works
All message types are chunked when large enough. The chunking decision is based purely on payload size — any serialized response or request that exceeds
CHUNK_THRESHOLD(512 KiB) is automatically split into 256 KiBStreamChunkframes, regardless of theHostResponseorClientRequestvariant.Server → Client (responses):
When streaming is enabled and a serialized response exceeds
CHUNK_THRESHOLD, the server:StreamHeader(Native encoding only) withstream_id,total_bytes, and content metadata so the client can pre-allocate or show progress. Currently onlyGetResponseincludes this header.StreamChunkmessages viachunk_response()fromfreenet-stdlib.MAX_CHUNKS_PER_BATCHchunks so pings and other control messages can flow through the select loop.Client → Server (requests):
freenet-stdlib 0.2.2+automatically chunks large serialized requests (>512 KiB) intoStreamChunkframes on the client side, regardless of thestreamingquery parameter. The server always reassembles them usingReassemblyBufferbefore decoding the fullClientRequest.Chunking vs StreamHeader — two layers
StreamChunk)CHUNK_THRESHOLDGetResponse+ Native encodingThe
StreamHeaderis an optimization that enablesrecv_stream()on the client — incremental chunk-by-chunk processing via an asyncStream<Item = Bytes>. Without a header, the client'srecv()transparently reassembles chunks and returns the completeHostResponse. Both paths deliver correct data; the header just enables richer client UX (progress bars, pre-allocation).This two-layer design is intentional:
GetResponseis the primary large-payload case (contract WASM up to 50 MiB + state up to 50 MiB) and the one where incremental consumption matters most. Other potentially large responses likeUpdateNotificationwith full state are chunked and reassembled transparently. TheStreamContent::Rawvariant exists as a placeholder for future extension.Key design decisions
max_message_sizekept at 100 MB — non-streaming clients still send large payloads (WASM contracts) as single frames. The?streaming=trueparameter controls chunked responses only; the client-side stdlib always chunks large requests automatically.StreamHeaderonly for Native encoding — Flatbuffers clients (browser) use transparent reassembly viaStreamChunkalone; the header is an optimization for bincode-based clients.MAX_CHUNKS_PER_BATCHchunks sent, the sender yields to the tokio runtime to prevent starving pings, subscriptions, and other control flow in the connection's select loop.u32counter withwrapping_add; collisions would require 4B streams on a single connection.Code changes
ConnectionStatestruct — consolidates per-connection state (encoding protocol, streaming flag, reassembly buffer, stream ID counter) instead of passing individual parameters.send_response_message()— new helper that abstracts the "send as single frame or chunk" decision, used by all four response paths: host responses, subscription notifications, auth errors, and node-unavailable errors.decode_client_request()— extracted as an inner function to avoid duplicating deserialization logic between normal requests and post-reassembly decoded requests.EncodingProtocolnow derivesPartialEq, Eq— needed for conditionalStreamHeaderlogic.Bonus bugfix
The
NodeUnavailableerror path inprocess_host_responsepreviously usedbincode::serializeunconditionally, which would send malformed data to Flatbuffers clients. Now correctly serializes per the connection's encoding protocol.Testing
freenet-stdlib(freenet-stdlib#58).cargo test -p freenetpasses (oncefreenet-stdlib 0.2.0is published).Dependencies
This PR must be merged after freenet-stdlib#58, which adds the streaming types (
StreamChunk,StreamHeader,ReassemblyBuffer,chunk_response, etc.) infreenet-stdlib 0.2.0. CI will fail until that crate is published.Files changed
Cargo.tomlfreenet-stdlibto0.2.0Cargo.lock,apps/freenet-ping/Cargo.lockcrates/core/src/client_events/websocket.rsConnectionState, reassembly, chunked send across all response pathscrates/core/src/util/mod.rsPartialEq, EqtoEncodingProtocolFixes
Closes #2256