diff --git a/src/replication/gossip.rs b/src/replication/gossip.rs index c6344d0..05910b2 100644 --- a/src/replication/gossip.rs +++ b/src/replication/gossip.rs @@ -127,10 +127,16 @@ impl GossipMessage { /// under `max_bytes` when wrapped in a `DeltaBatch`. Chunks are greedily /// filled: each delta is serialized, its cost measured, and if adding it to /// the current chunk would exceed `max_bytes` the chunk is sealed and a new -/// one started. A single oversized delta (rare — larger than `max_bytes` on -/// its own) is placed into its own chunk; the receiver will still reject it, -/// but the sender produces a clean signal instead of silently accumulating -/// messages that can never fit. +/// one started. +/// +/// A single oversized delta (larger than `max_bytes` on its own — typically a +/// very large LPUSH/XADD payload, HSET against a huge hash, or long list/set) +/// cannot be split at the gossip level because we would violate CRDT delta +/// semantics. Such a delta is placed into its own chunk and a WARN is emitted +/// with the key and serialized size so operators can trace the offending +/// client. The resulting message will still exceed the receiver's 1 MiB cap +/// and be rejected, but the sender produces a diagnostic signal instead of +/// silently accumulating un-deliverable state. /// /// ## Invariants /// - Every returned chunk is non-empty. @@ -164,6 +170,21 @@ fn split_deltas_into_chunks( .map(|v| v.len().saturating_add(1)) .unwrap_or(256); + // A single delta that's already over the chunk cap cannot be split + // at this layer — log the key + size so operators can trace the + // offending client. The downstream message will exceed the 1 MiB + // receiver cap and be dropped on the wire, but we prefer a loud + // signal over silent accumulation. + if delta_bytes > max_bytes { + warn!( + key = %delta.key, + delta_bytes, + max_bytes, + "Oversized replication delta exceeds gossip chunk cap; receiver will \ + reject. Check client for unusually large values (LPUSH/XADD/HSET)." + ); + } + let projected = current_bytes.saturating_add(delta_bytes); if projected > max_bytes && !current.is_empty() { chunks.push(std::mem::take(&mut current)); @@ -722,6 +743,53 @@ mod tests { } } + /// Diagnostic: measure actual post-chunking serialized message sizes for + /// realistic large-value scenarios. Captures the size statistics to help + /// validate (or invalidate) the chunking boundary. + #[test] + fn diag_many_medium_deltas_post_chunking_size_under_cap() { + // 100 deltas, each ~15 KB (realistic for medium values) + let deltas: Vec<_> = (0..100) + .map(|i| make_delta_approx_bytes(i, 15_000)) + .collect(); + let chunks = split_deltas_into_chunks(deltas, MAX_GOSSIP_PAYLOAD_BYTES); + for (idx, chunk) in chunks.iter().enumerate() { + let msg = GossipMessage::new_delta_batch(ReplicaId::new(1), chunk.clone(), 0); + let size = msg.serialize().unwrap().len(); + assert!( + size <= 1_048_576, + "Chunk {} with {} medium deltas serialized to {} bytes, exceeds 1 MiB", + idx, + chunk.len(), + size + ); + } + } + + /// Diagnostic: a SINGLE delta larger than the chunk cap must be detectable — + /// my chunking places it in its own chunk (can't split one delta), but the + /// resulting serialized message will exceed the 1 MiB receiver cap and be + /// silently dropped on the wire. This test documents that behavior and + /// serves as a guard for future fixes that may need to split at the + /// value level or drop oversized deltas with a WARN. + #[test] + fn diag_single_oversized_delta_produces_oversized_message() { + // A single delta with a 1.5 MB value — plausible for a large LPUSH + let huge = make_delta_approx_bytes(0, 1_500_000); + let chunks = split_deltas_into_chunks(vec![huge], MAX_GOSSIP_PAYLOAD_BYTES); + assert_eq!(chunks.len(), 1, "One oversized delta produces one chunk"); + assert_eq!(chunks[0].len(), 1); + let msg = GossipMessage::new_delta_batch(ReplicaId::new(1), chunks[0].clone(), 0); + let size = msg.serialize().unwrap().len(); + // This is the failure mode: chunking did its best, but the receiver + // will still reject a single-delta message over the cap. + assert!( + size > 1_048_576, + "Oversized single delta must produce an oversize message (got {size}) — \ + this is the bug PR #24 addresses by logging a diagnostic" + ); + } + #[test] fn test_queue_deltas_splits_in_broadcast_fallback() { // Without a router configured, queue_deltas falls back to broadcast