Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 72 additions & 4 deletions src/replication/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,16 @@ impl GossipMessage {
/// under `max_bytes` when wrapped in a `DeltaBatch`. Chunks are greedily
/// filled: each delta is serialized, its cost measured, and if adding it to
/// the current chunk would exceed `max_bytes` the chunk is sealed and a new
/// one started. A single oversized delta (rare — larger than `max_bytes` on
/// its own) is placed into its own chunk; the receiver will still reject it,
/// but the sender produces a clean signal instead of silently accumulating
/// messages that can never fit.
/// one started.
///
/// A single oversized delta (larger than `max_bytes` on its own — typically a
/// very large LPUSH/XADD payload, HSET against a huge hash, or long list/set)
/// cannot be split at the gossip level because we would violate CRDT delta
/// semantics. Such a delta is placed into its own chunk and a WARN is emitted
/// with the key and serialized size so operators can trace the offending
/// client. The resulting message will still exceed the receiver's 1 MiB cap
/// and be rejected, but the sender produces a diagnostic signal instead of
/// silently accumulating un-deliverable state.
///
/// ## Invariants
/// - Every returned chunk is non-empty.
Expand Down Expand Up @@ -164,6 +170,21 @@ fn split_deltas_into_chunks(
.map(|v| v.len().saturating_add(1))
.unwrap_or(256);

// A single delta that's already over the chunk cap cannot be split
// at this layer — log the key + size so operators can trace the
// offending client. The downstream message will exceed the 1 MiB
// receiver cap and be dropped on the wire, but we prefer a loud
// signal over silent accumulation.
if delta_bytes > max_bytes {
warn!(
key = %delta.key,
delta_bytes,
max_bytes,
"Oversized replication delta exceeds gossip chunk cap; receiver will \
reject. Check client for unusually large values (LPUSH/XADD/HSET)."
);
}

let projected = current_bytes.saturating_add(delta_bytes);
if projected > max_bytes && !current.is_empty() {
chunks.push(std::mem::take(&mut current));
Expand Down Expand Up @@ -722,6 +743,53 @@ mod tests {
}
}

/// Diagnostic: measure actual post-chunking serialized message sizes for
/// realistic large-value scenarios. Captures the size statistics to help
/// validate (or invalidate) the chunking boundary.
#[test]
fn diag_many_medium_deltas_post_chunking_size_under_cap() {
// 100 deltas, each ~15 KB (realistic for medium values)
let deltas: Vec<_> = (0..100)
.map(|i| make_delta_approx_bytes(i, 15_000))
.collect();
let chunks = split_deltas_into_chunks(deltas, MAX_GOSSIP_PAYLOAD_BYTES);
for (idx, chunk) in chunks.iter().enumerate() {
let msg = GossipMessage::new_delta_batch(ReplicaId::new(1), chunk.clone(), 0);
let size = msg.serialize().unwrap().len();
assert!(
size <= 1_048_576,
"Chunk {} with {} medium deltas serialized to {} bytes, exceeds 1 MiB",
idx,
chunk.len(),
size
);
}
}

/// Diagnostic: a SINGLE delta larger than the chunk cap must be detectable —
/// my chunking places it in its own chunk (can't split one delta), but the
/// resulting serialized message will exceed the 1 MiB receiver cap and be
/// silently dropped on the wire. This test documents that behavior and
/// serves as a guard for future fixes that may need to split at the
/// value level or drop oversized deltas with a WARN.
#[test]
fn diag_single_oversized_delta_produces_oversized_message() {
// A single delta with a 1.5 MB value — plausible for a large LPUSH
let huge = make_delta_approx_bytes(0, 1_500_000);
let chunks = split_deltas_into_chunks(vec![huge], MAX_GOSSIP_PAYLOAD_BYTES);
assert_eq!(chunks.len(), 1, "One oversized delta produces one chunk");
assert_eq!(chunks[0].len(), 1);
let msg = GossipMessage::new_delta_batch(ReplicaId::new(1), chunks[0].clone(), 0);
let size = msg.serialize().unwrap().len();
// This is the failure mode: chunking did its best, but the receiver
// will still reject a single-delta message over the cap.
assert!(
size > 1_048_576,
"Oversized single delta must produce an oversize message (got {size}) — \
this is the bug PR #24 addresses by logging a diagnostic"
);
}

#[test]
fn test_queue_deltas_splits_in_broadcast_fallback() {
// Without a router configured, queue_deltas falls back to broadcast
Expand Down