Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
20f832d
feat: implement full replication subsystem
mickvandijke Mar 31, 2026
bb9ad07
test: add replication e2e tests and wire ReplicationEngine into test …
mickvandijke Mar 31, 2026
04732aa
fix: resolve all clippy warnings with -D warnings on all targets
mickvandijke Mar 31, 2026
ea07e03
fix: resolve doc link error and Windows CI test failures
mickvandijke Mar 31, 2026
4b0a271
test: add comprehensive Section 18 test coverage (36 new tests)
mickvandijke Apr 1, 2026
02883f1
test: add final 12 Section 18 scenarios for full test matrix coverage
mickvandijke Apr 1, 2026
796a51f
ci: enable e2e and replication tests in CI workflows
mickvandijke Apr 1, 2026
0521e31
test: add final 9 Section 18 scenarios for complete test matrix coverage
mickvandijke Apr 1, 2026
a3c1d71
fix: run e2e tests single-threaded to prevent LMDB TLS exhaustion
mickvandijke Apr 1, 2026
cea1966
fix: resolve protocol name validation and request-response handling i…
mickvandijke Apr 1, 2026
eec04ce
fix: handle Instant::checked_sub failure on Windows in bootstrap clai…
mickvandijke Apr 1, 2026
b4dce78
test: fix 7 weak/mislabeled Section 18 replication scenario tests
mickvandijke Apr 1, 2026
13be5e6
fix: resolve message handler deadlock and bootstrap flag in replicati…
mickvandijke Apr 1, 2026
a46b8fe
test: fix 6 missing/mislabeled/weak Section 18 replication scenario t…
mickvandijke Apr 1, 2026
5b2a7bd
fix: resolve clippy doc_markdown and needless_range_loop warnings
mickvandijke Apr 1, 2026
31d9d76
refactor: trigger replication sync on KClosestPeersChanged instead of…
mickvandijke Apr 1, 2026
16d5ba5
fix: gate bootstrap sync on DhtNetworkEvent::BootstrapComplete
mickvandijke Apr 1, 2026
2a69e58
feat: dynamic audit batch sizing based on local store size
mickvandijke Apr 1, 2026
0a3a01f
refactor: use local RT lookups instead of network lookups in audit
mickvandijke Apr 1, 2026
85eb73a
refactor: pick audit peer first, then find their responsible keys
mickvandijke Apr 1, 2026
c366773
refactor: concurrent fetch worker using FuturesUnordered
mickvandijke Apr 1, 2026
e153255
fix: address critical and high severity issues from PR review
mickvandijke Apr 2, 2026
882ae74
fix: only reward trust on audit success, not fetch success
mickvandijke Apr 2, 2026
578e3e0
fix: clear stale bootstrap claims in audit result handler
mickvandijke Apr 2, 2026
73cac9e
fix: remove double trust penalty on fetch transport failure
mickvandijke Apr 2, 2026
aca83e0
fix: reduce audit failure trust penalty from 5.0 to 2.0
mickvandijke Apr 2, 2026
3f8f358
feat: scale audit response timeout by chunk count
mickvandijke Apr 2, 2026
ebb8843
fix: increase per-chunk audit timeout from 1ms to 10ms
mickvandijke Apr 2, 2026
abd0bb4
fix: address PR review feedback on replication subsystem
mickvandijke Apr 2, 2026
819b70a
fix: address second-pass PR review feedback
mickvandijke Apr 2, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ jobs:
uses: foundry-rs/foundry-toolchain@v1
with:
version: nightly
- name: Run tests
run: cargo test
- name: Run unit tests
run: cargo test --lib --features test-utils
- name: Run e2e tests
run: cargo test --test e2e --features test-utils -- --test-threads=1

doc:
name: Documentation
Expand Down
6 changes: 4 additions & 2 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,10 @@ jobs:
- name: Run clippy
run: cargo clippy --all-targets --all-features -- -D warnings

- name: Run tests
run: cargo test
- name: Run unit tests
run: cargo test --lib --features test-utils
- name: Run e2e tests
run: cargo test --test e2e --features test-utils -- --test-threads=1

build:
name: Build ${{ matrix.target }}
Expand Down
658 changes: 658 additions & 0 deletions docs/REPLICATION_DESIGN.md

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/ant_protocol/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
use serde::{Deserialize, Serialize};

/// Protocol identifier for chunk operations.
pub const CHUNK_PROTOCOL_ID: &str = "autonomi/ant/chunk/v1";
pub const CHUNK_PROTOCOL_ID: &str = "autonomi.ant.chunk.v1";

/// Current protocol version.
pub const PROTOCOL_VERSION: u16 = 1;
Expand Down Expand Up @@ -519,7 +519,7 @@ mod tests {

#[test]
fn test_constants() {
assert_eq!(CHUNK_PROTOCOL_ID, "autonomi/ant/chunk/v1");
assert_eq!(CHUNK_PROTOCOL_ID, "autonomi.ant.chunk.v1");
assert_eq!(PROTOCOL_VERSION, 1);
assert_eq!(MAX_CHUNK_SIZE, 4 * 1024 * 1024);
assert_eq!(DATA_TYPE_CHUNK, 0);
Expand Down
4 changes: 4 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ pub enum Error {
#[error("invalid chunk: {0}")]
InvalidChunk(String),

/// Replication error.
#[error("replication error: {0}")]
Replication(String),

/// Node is shutting down.
#[error("node is shutting down")]
ShuttingDown,
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub mod error;
pub mod event;
pub mod node;
pub mod payment;
pub mod replication;
pub mod storage;
pub mod upgrade;

Expand All @@ -65,6 +66,7 @@ pub use error::{Error, Result};
pub use event::{NodeEvent, NodeEventsChannel};
pub use node::{NodeBuilder, RunningNode};
pub use payment::{PaymentStatus, PaymentVerifier, PaymentVerifierConfig};
pub use replication::{config::ReplicationConfig, ReplicationEngine};
pub use storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};

/// Re-exports from `saorsa-core` so downstream crates (e.g. `ant-client`)
Expand Down
58 changes: 57 additions & 1 deletion src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use crate::event::{create_event_channel, NodeEvent, NodeEventsChannel, NodeEvent
use crate::payment::metrics::QuotingMetricsTracker;
use crate::payment::wallet::parse_rewards_address;
use crate::payment::{EvmVerifierConfig, PaymentVerifier, PaymentVerifierConfig, QuoteGenerator};
use crate::replication::config::ReplicationConfig;
use crate::replication::ReplicationEngine;
use crate::storage::{AntProtocol, LmdbStorage, LmdbStorageConfig};
use crate::upgrade::{
upgrade_cache_dir, AutoApplyUpgrader, BinaryCache, ReleaseCache, UpgradeMonitor, UpgradeResult,
Expand Down Expand Up @@ -133,15 +135,43 @@ impl NodeBuilder {
None
};

let p2p_arc = Arc::new(p2p_node);

// Initialize replication engine (if storage is enabled)
let replication_engine = if let Some(ref protocol) = ant_protocol {
let repl_config = ReplicationConfig::default();
let storage_arc = protocol.storage();
let payment_verifier_arc = protocol.payment_verifier_arc();
match ReplicationEngine::new(
repl_config,
Arc::clone(&p2p_arc),
storage_arc,
payment_verifier_arc,
&self.config.root_dir,
shutdown.clone(),
)
.await
{
Ok(engine) => Some(engine),
Err(e) => {
warn!("Failed to initialize replication engine: {e}");
None
}
}
} else {
None
};

let node = RunningNode {
config: self.config,
p2p_node: Arc::new(p2p_node),
p2p_node: p2p_arc,
shutdown,
events_tx,
events_rx: Some(events_rx),
upgrade_monitor,
bootstrap_manager,
ant_protocol,
replication_engine,
protocol_task: None,
upgrade_exit_code: Arc::new(AtomicI32::new(-1)),
};
Expand Down Expand Up @@ -431,6 +461,8 @@ pub struct RunningNode {
bootstrap_manager: Option<BootstrapManager>,
/// ANT protocol handler for chunk storage.
ant_protocol: Option<Arc<AntProtocol>>,
/// Replication engine (manages neighbor sync, verification, audits).
replication_engine: Option<ReplicationEngine>,
/// Protocol message routing background task.
protocol_task: Option<JoinHandle<()>>,
/// Exit code requested by a successful upgrade (-1 = no upgrade exit pending).
Expand Down Expand Up @@ -466,6 +498,14 @@ impl RunningNode {
pub async fn run(&mut self) -> Result<()> {
info!("Node runtime loop starting");

// Subscribe to DHT events BEFORE starting the P2P node so the
// bootstrap-sync task does not miss the BootstrapComplete event
// emitted during P2PNode::start().
let dht_events_for_bootstrap = self
.replication_engine
.as_ref()
.map(|_| self.p2p_node.dht_manager().subscribe_events());

// Start the P2P node
self.p2p_node
.start()
Expand Down Expand Up @@ -493,6 +533,16 @@ impl RunningNode {
// Start protocol message routing (P2P → AntProtocol → P2P response)
self.start_protocol_routing();

// Start replication engine background tasks
if let Some(ref mut engine) = self.replication_engine {
// Safety: dht_events_for_bootstrap is Some when replication_engine
// is Some (both arms use the same condition).
if let Some(dht_events) = dht_events_for_bootstrap {
engine.start(dht_events);
}
info!("Replication engine started");
}

// Start upgrade monitor if enabled
if let Some(monitor) = self.upgrade_monitor.take() {
let events_tx = self.events_tx.clone();
Expand Down Expand Up @@ -652,6 +702,12 @@ impl RunningNode {
);
}

// Shutdown replication engine before P2P so background tasks don't
// use a dead P2P layer, and Arc<LmdbStorage> references are released.
if let Some(ref mut engine) = self.replication_engine {
engine.shutdown().await;
}

// Stop protocol routing task
if let Some(handle) = self.protocol_task.take() {
handle.abort();
Expand Down
Loading
Loading