From b199bee6e2e2624835db29b93d00f2cb61c1f5ca Mon Sep 17 00:00:00 2001 From: It Apilium Date: Sun, 8 Mar 2026 18:59:31 +0100 Subject: [PATCH 1/4] feat: add Kitsune P2P subsystem with QUIC transport, bloom gossip, and hardening Implements the full P2P stack behind the `p2p` feature flag: - QUIC transport with seed-based handshake and Ed25519 identity - Bloom filter gossip for efficient triple announcement - mDNS peer discovery via mdns-sd - Sync manager with tombstone-based deletion propagation (24h TTL) - Persistent peer store (JSON-backed known_peers.json) - Manual peer reconnection with exponential backoff (5s-300s, max 10 retries) - Ping/pong health checks (30s interval, 10s timeout) - Ingress rate limiting per-peer (1000/min) and global (10000/min) - Periodic cleanup of inactive peers, expired tombstones, and stale store entries - CLI flags: --p2p, --p2p-port, --p2p-seed, --p2p-peer, --p2p-mdns --- Cargo.lock | 46 + crates/aingle_cortex/Cargo.toml | 12 + crates/aingle_cortex/src/lib.rs | 2 + crates/aingle_cortex/src/main.rs | 42 +- crates/aingle_cortex/src/p2p/config.rs | 181 +++ crates/aingle_cortex/src/p2p/discovery.rs | 258 +++++ crates/aingle_cortex/src/p2p/gossip.rs | 662 +++++++++++ crates/aingle_cortex/src/p2p/identity.rs | 149 +++ crates/aingle_cortex/src/p2p/manager.rs | 1046 ++++++++++++++++++ crates/aingle_cortex/src/p2p/message.rs | 424 +++++++ crates/aingle_cortex/src/p2p/mod.rs | 15 + crates/aingle_cortex/src/p2p/peer_store.rs | 211 ++++ crates/aingle_cortex/src/p2p/rate_limiter.rs | 122 ++ crates/aingle_cortex/src/p2p/sync_manager.rs | 455 ++++++++ crates/aingle_cortex/src/p2p/transport.rs | 459 ++++++++ crates/aingle_cortex/src/state.rs | 9 + 16 files changed, 4092 insertions(+), 1 deletion(-) create mode 100644 crates/aingle_cortex/src/p2p/config.rs create mode 100644 crates/aingle_cortex/src/p2p/discovery.rs create mode 100644 crates/aingle_cortex/src/p2p/gossip.rs create mode 100644 crates/aingle_cortex/src/p2p/identity.rs create mode 100644 crates/aingle_cortex/src/p2p/manager.rs create mode 100644 crates/aingle_cortex/src/p2p/message.rs create mode 100644 crates/aingle_cortex/src/p2p/mod.rs create mode 100644 crates/aingle_cortex/src/p2p/peer_store.rs create mode 100644 crates/aingle_cortex/src/p2p/rate_limiter.rs create mode 100644 crates/aingle_cortex/src/p2p/sync_manager.rs create mode 100644 crates/aingle_cortex/src/p2p/transport.rs diff --git a/Cargo.lock b/Cargo.lock index 985df84..9d1d560 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -139,13 +139,21 @@ dependencies = [ "blake3", "chrono", "dashmap 6.1.0", + "dirs", + "ed25519-dalek", "futures", + "hex", + "if-addrs 0.13.4", "jsonwebtoken", "log", + "mdns-sd", "once_cell", + "quinn", "rand 0.9.2", + "rcgen", "regex", "reqwest", + "rustls", "serde", "serde_json", "spargebra", @@ -2351,6 +2359,27 @@ dependencies = [ "crypto-common 0.2.1", ] +[[package]] +name = "dirs" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3e8aa94d75141228480295a7d0e7feb620b1a5ad9f12bc40be62411e38cce4e" +dependencies = [ + "dirs-sys", +] + +[[package]] +name = "dirs-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e01a3366d27ee9890022452ee61b2b63a67e6f13f58900b651ff5665f0bb1fab" +dependencies = [ + "libc", + "option-ext", + "redox_users", + "windows-sys 0.61.2", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -5010,6 +5039,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "option-ext" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" + [[package]] name = "oxilangtag" version = "0.1.5" @@ -5731,6 +5766,17 @@ dependencies = [ "bitflags 2.11.0", ] +[[package]] +name = "redox_users" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4e608c6638b9c18977b00b475ac1f28d14e84b27d8d42f70e0bf1e3dec127ac" +dependencies = [ + "getrandom 0.2.17", + "libredox", + "thiserror 2.0.18", +] + [[package]] name = "regalloc2" version = "0.13.5" diff --git a/crates/aingle_cortex/Cargo.toml b/crates/aingle_cortex/Cargo.toml index 6d56671..9987562 100644 --- a/crates/aingle_cortex/Cargo.toml +++ b/crates/aingle_cortex/Cargo.toml @@ -18,6 +18,8 @@ rest = [] graphql = ["dep:async-graphql", "dep:async-graphql-axum"] sparql = ["dep:spargebra"] auth = ["dep:jsonwebtoken", "dep:argon2"] +p2p = ["dep:quinn", "dep:rustls", "dep:rcgen", "dep:ed25519-dalek", "dep:hex", "dep:dirs"] +p2p-mdns = ["p2p", "dep:mdns-sd", "dep:if-addrs"] full = ["rest", "graphql", "sparql", "auth"] [[bin]] @@ -84,6 +86,16 @@ reqwest = { version = "0.12", default-features = false, features = ["json", "rus dashmap = "6.0" once_cell = "1.4" +# P2P networking (optional) +quinn = { version = "0.11", optional = true } +rustls = { version = "0.23", default-features = false, features = ["ring", "std"], optional = true } +rcgen = { version = "0.13", optional = true } +ed25519-dalek = { version = "2", features = ["rand_core"], optional = true } +hex = { version = "0.4", optional = true } +dirs = { version = "6", optional = true } +mdns-sd = { version = "0.18", optional = true } +if-addrs = { version = "0.13", optional = true } + [dev-dependencies] tempfile = "3.26" reqwest = { version = "0.12", features = ["json"] } diff --git a/crates/aingle_cortex/src/lib.rs b/crates/aingle_cortex/src/lib.rs index f28a7d5..209615b 100644 --- a/crates/aingle_cortex/src/lib.rs +++ b/crates/aingle_cortex/src/lib.rs @@ -172,6 +172,8 @@ pub mod server; #[cfg(feature = "sparql")] pub mod sparql; pub mod state; +#[cfg(feature = "p2p")] +pub mod p2p; pub use client::{CortexClientConfig, CortexInternalClient}; pub use error::{Error, Result}; diff --git a/crates/aingle_cortex/src/main.rs b/crates/aingle_cortex/src/main.rs index 4a32585..47a30a1 100644 --- a/crates/aingle_cortex/src/main.rs +++ b/crates/aingle_cortex/src/main.rs @@ -55,8 +55,40 @@ async fn main() -> Result<(), Box> { i += 1; } + // Parse P2P flags (feature-gated at compile time). + #[cfg(feature = "p2p")] + let p2p_config = { + let p2p = aingle_cortex::p2p::config::P2pConfig::from_args(&args); + if let Err(e) = p2p.validate() { + eprintln!("Invalid P2P config: {}", e); + std::process::exit(1); + } + p2p + }; + // Create and run server - let server = CortexServer::new(config)?; + #[allow(unused_mut)] + let mut server = CortexServer::new(config)?; + + // Start P2P manager if enabled. + #[cfg(feature = "p2p")] + if p2p_config.enabled { + match aingle_cortex::p2p::manager::P2pManager::start( + p2p_config.clone(), + server.state().clone(), + ) + .await + { + Ok(manager) => { + // SAFETY: we have exclusive access before serving. + server.state_mut().p2p = Some(manager); + tracing::info!("P2P manager started on port {}", p2p_config.port); + } + Err(e) => { + tracing::error!("P2P manager failed to start: {}", e); + } + } + } // Set up graceful shutdown let shutdown_signal = async { @@ -84,9 +116,17 @@ fn print_help() { println!(" -V, --version Print version and exit"); println!(" --help Print this help message"); println!(); + println!("P2P OPTIONS (requires --features p2p):"); + println!(" --p2p Enable P2P triple synchronization"); + println!(" --p2p-port QUIC listen port (default: 19091)"); + println!(" --p2p-seed Network isolation seed"); + println!(" --p2p-peer Manual peer address (repeatable)"); + println!(" --p2p-mdns Enable mDNS discovery"); + println!(); println!("ENDPOINTS:"); println!(" REST API: http://:/api/v1/"); println!(" GraphQL: http://:/graphql"); println!(" SPARQL: http://:/sparql"); println!(" Health: http://:/api/v1/health"); + println!(" P2P Status: http://:/api/v1/p2p/status"); } diff --git a/crates/aingle_cortex/src/p2p/config.rs b/crates/aingle_cortex/src/p2p/config.rs new file mode 100644 index 0000000..04e1cb5 --- /dev/null +++ b/crates/aingle_cortex/src/p2p/config.rs @@ -0,0 +1,181 @@ +//! P2P configuration with CLI flag parsing and validation. + +use std::net::SocketAddr; +use std::path::PathBuf; + +/// Configuration for the P2P subsystem. +#[derive(Debug, Clone)] +pub struct P2pConfig { + /// Whether P2P is enabled (`--p2p` flag). + pub enabled: bool, + /// QUIC listen port (`--p2p-port`, default 19091). + pub port: u16, + /// Network isolation seed (`--p2p-seed`). Nodes with different seeds reject each other. + pub seed: Option, + /// Manually specified peer addresses (`--p2p-peer`, repeatable). + pub manual_peers: Vec, + /// Enable mDNS discovery (`--p2p-mdns`). + pub mdns: bool, + /// Gossip interval in milliseconds (default 5000). + pub gossip_interval_ms: u64, + /// Maximum triples per sync batch (default 5000). + pub sync_batch_size: usize, + /// Maximum connected peers (default 32). + pub max_peers: usize, + /// Directory for persistent data (keypair, etc.). + pub data_dir: PathBuf, + /// Max triples accepted per peer per minute (default 1000). + pub max_triples_per_peer_per_min: usize, + /// Max triples accepted globally per minute (default 10000). + pub max_triples_global_per_min: usize, +} + +impl Default for P2pConfig { + fn default() -> Self { + let data_dir = dirs::home_dir() + .unwrap_or_else(|| PathBuf::from(".")) + .join(".cortex"); + Self { + enabled: false, + port: 19091, + seed: None, + manual_peers: Vec::new(), + mdns: false, + gossip_interval_ms: 5000, + sync_batch_size: 5000, + max_peers: 32, + data_dir, + max_triples_per_peer_per_min: 1000, + max_triples_global_per_min: 10000, + } + } +} + +impl P2pConfig { + /// Validate configuration values. + pub fn validate(&self) -> Result<(), String> { + if self.port < 1024 { + return Err(format!( + "p2p port must be >= 1024, got {}", + self.port + )); + } + + if let Some(ref seed) = self.seed { + if seed.is_empty() { + return Err("p2p seed must not be empty".to_string()); + } + if !seed.chars().all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-') { + return Err("p2p seed must be alphanumeric (plus _ and -)".to_string()); + } + } + + if self.sync_batch_size < 100 || self.sync_batch_size > 50000 { + return Err(format!( + "sync_batch_size must be 100..50000, got {}", + self.sync_batch_size + )); + } + + if self.gossip_interval_ms < 1000 { + return Err(format!( + "gossip_interval_ms must be >= 1000, got {}", + self.gossip_interval_ms + )); + } + + Ok(()) + } + + /// Parse P2P flags from CLI arguments. + /// + /// Recognises: `--p2p`, `--p2p-port`, `--p2p-seed`, `--p2p-peer`, `--p2p-mdns`. + pub fn from_args(args: &[String]) -> P2pConfig { + let mut cfg = P2pConfig::default(); + let mut i = 0; + while i < args.len() { + match args[i].as_str() { + "--p2p" => cfg.enabled = true, + "--p2p-mdns" => cfg.mdns = true, + "--p2p-port" => { + if i + 1 < args.len() { + if let Ok(p) = args[i + 1].parse::() { + cfg.port = p; + } + i += 1; + } + } + "--p2p-seed" => { + if i + 1 < args.len() { + cfg.seed = Some(args[i + 1].clone()); + i += 1; + } + } + "--p2p-peer" => { + if i + 1 < args.len() { + if let Ok(addr) = args[i + 1].parse::() { + cfg.manual_peers.push(addr); + } + i += 1; + } + } + _ => {} + } + i += 1; + } + cfg + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn defaults_are_valid() { + assert!(P2pConfig::default().validate().is_ok()); + } + + #[test] + fn rejects_invalid_port() { + let mut cfg = P2pConfig::default(); + cfg.port = 0; + assert!(cfg.validate().is_err()); + + cfg.port = 80; + assert!(cfg.validate().is_err()); + } + + #[test] + fn parses_cli_args() { + let args: Vec = vec![ + "--p2p", + "--p2p-port", + "19091", + "--p2p-seed", + "abc123", + "--p2p-peer", + "1.2.3.4:19091", + ] + .into_iter() + .map(String::from) + .collect(); + + let cfg = P2pConfig::from_args(&args); + assert!(cfg.enabled); + assert_eq!(cfg.port, 19091); + assert_eq!(cfg.seed.as_deref(), Some("abc123")); + assert_eq!(cfg.manual_peers.len(), 1); + assert_eq!( + cfg.manual_peers[0], + "1.2.3.4:19091".parse::().unwrap() + ); + } + + #[test] + fn rejects_empty_seed() { + let mut cfg = P2pConfig::default(); + cfg.seed = Some(String::new()); + assert!(cfg.validate().is_err()); + } +} diff --git a/crates/aingle_cortex/src/p2p/discovery.rs b/crates/aingle_cortex/src/p2p/discovery.rs new file mode 100644 index 0000000..ee1c6d3 --- /dev/null +++ b/crates/aingle_cortex/src/p2p/discovery.rs @@ -0,0 +1,258 @@ +//! mDNS-based peer discovery for Cortex P2P. +//! +//! Feature-gated behind `p2p-mdns`. Provides a no-op stub when disabled. + +use std::net::SocketAddr; +use std::time::Instant; + +/// Discovered peer metadata. +#[derive(Debug, Clone)] +pub struct DiscoveredPeer { + pub node_id: String, + pub addr: SocketAddr, + pub seed_hash: String, + pub last_seen: Instant, +} + +// ── Full implementation (p2p-mdns feature) ─────────────────────── + +#[cfg(feature = "p2p-mdns")] +mod inner { + use super::DiscoveredPeer; + use mdns_sd::{ServiceDaemon, ServiceEvent, ServiceInfo}; + use std::collections::HashMap; + use std::net::{IpAddr, SocketAddr}; + use std::sync::{Arc, RwLock}; + use std::time::{Duration, Instant}; + + const SERVICE_TYPE: &str = "_cortex-sync._udp.local."; + + pub struct P2pDiscovery { + daemon: ServiceDaemon, + node_id: String, + seed_hash: String, + port: u16, + peers: Arc>>, + registered: bool, + running: Arc, + } + + impl P2pDiscovery { + pub fn new(node_id: String, seed_hash: String, port: u16) -> Result { + let daemon = ServiceDaemon::new() + .map_err(|e| format!("mDNS daemon: {}", e))?; + Ok(Self { + daemon, + node_id, + seed_hash, + port, + peers: Arc::new(RwLock::new(HashMap::new())), + registered: false, + running: Arc::new(std::sync::atomic::AtomicBool::new(false)), + }) + } + + pub fn register(&mut self) -> Result<(), String> { + if self.registered { + return Ok(()); + } + + let addresses: Vec = if_addrs::get_if_addrs() + .map_err(|e| format!("get interfaces: {}", e))? + .into_iter() + .filter(|iface| !iface.is_loopback()) + .map(|iface| iface.ip()) + .collect(); + + if addresses.is_empty() { + return Err("no network interfaces".to_string()); + } + + let instance_name = format!( + "cortex-{}", + &self.node_id[..8.min(self.node_id.len())] + ); + + let mut props = HashMap::new(); + props.insert("node_id".to_string(), self.node_id.clone()); + props.insert("version".to_string(), env!("CARGO_PKG_VERSION").to_string()); + props.insert("seed_hash".to_string(), self.seed_hash.clone()); + props.insert("p2p_port".to_string(), self.port.to_string()); + + let info = ServiceInfo::new( + SERVICE_TYPE, + &instance_name, + &format!("{}.local.", instance_name), + &addresses[0].to_string(), + self.port, + props, + ) + .map_err(|e| format!("service info: {}", e))?; + + self.daemon + .register(info) + .map_err(|e| format!("register: {}", e))?; + + self.registered = true; + tracing::info!("mDNS registered {} on port {}", instance_name, self.port); + Ok(()) + } + + pub fn start_browsing(&mut self) -> Result<(), String> { + self.running + .store(true, std::sync::atomic::Ordering::SeqCst); + + let receiver = self + .daemon + .browse(SERVICE_TYPE) + .map_err(|e| format!("browse: {}", e))?; + + let peers = self.peers.clone(); + let running = self.running.clone(); + let our_node_id = self.node_id.clone(); + let our_seed_hash = self.seed_hash.clone(); + + std::thread::spawn(move || { + while running.load(std::sync::atomic::Ordering::SeqCst) { + if let Ok(event) = receiver.recv_timeout(Duration::from_millis(100)) { + if let ServiceEvent::ServiceResolved(info) = event { + let node_id = info + .get_properties() + .get("node_id") + .map(|v| v.val_str().to_string()) + .unwrap_or_default(); + + if node_id == our_node_id || node_id.is_empty() { + continue; + } + + let peer_seed_hash = info + .get_properties() + .get("seed_hash") + .map(|v| v.val_str().to_string()) + .unwrap_or_default(); + + if peer_seed_hash != our_seed_hash { + continue; + } + + let p2p_port: u16 = info + .get_properties() + .get("p2p_port") + .and_then(|v| v.val_str().parse().ok()) + .unwrap_or(info.get_port()); + + if let Some(ip) = info.get_addresses().iter().next() { + let addr = SocketAddr::new(ip.to_ip_addr(), p2p_port); + let peer = DiscoveredPeer { + node_id: node_id.clone(), + addr, + seed_hash: peer_seed_hash, + last_seen: Instant::now(), + }; + if let Ok(mut map) = peers.write() { + map.insert(node_id, peer); + } + } + } + } + } + }); + + tracing::info!("mDNS browsing started"); + Ok(()) + } + + pub fn get_discovered_peers(&self) -> Vec { + self.peers + .read() + .map(|m| m.values().cloned().collect()) + .unwrap_or_default() + } + + pub fn stop(&mut self) { + self.running + .store(false, std::sync::atomic::Ordering::SeqCst); + if self.registered { + let instance_name = format!( + "cortex-{}", + &self.node_id[..8.min(self.node_id.len())] + ); + let _ = self + .daemon + .unregister(&format!("{}.{}", instance_name, SERVICE_TYPE)); + self.registered = false; + } + self.daemon.shutdown().ok(); + tracing::info!("mDNS stopped"); + } + } + + impl Drop for P2pDiscovery { + fn drop(&mut self) { + self.stop(); + } + } +} + +// ── Stub implementation (no mdns feature) ──────────────────────── + +#[cfg(not(feature = "p2p-mdns"))] +mod inner { + use super::DiscoveredPeer; + + pub struct P2pDiscovery { + _node_id: String, + } + + impl P2pDiscovery { + pub fn new(node_id: String, _seed_hash: String, _port: u16) -> Result { + Ok(Self { _node_id: node_id }) + } + + pub fn register(&mut self) -> Result<(), String> { + Ok(()) + } + + pub fn start_browsing(&mut self) -> Result<(), String> { + Ok(()) + } + + pub fn get_discovered_peers(&self) -> Vec { + Vec::new() + } + + pub fn stop(&mut self) {} + } +} + +pub use inner::P2pDiscovery; + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn stub_returns_empty_peers() { + let d = P2pDiscovery::new("nodeid".into(), "seedhash".into(), 19091).unwrap(); + assert!(d.get_discovered_peers().is_empty()); + } + + #[test] + fn stub_register_is_noop() { + let mut d = P2pDiscovery::new("nodeid".into(), "seedhash".into(), 19091).unwrap(); + assert!(d.register().is_ok()); + } + + #[test] + fn discovered_peer_has_required_fields() { + let peer = DiscoveredPeer { + node_id: "abc123".into(), + addr: "127.0.0.1:19091".parse().unwrap(), + seed_hash: "hash".into(), + last_seen: Instant::now(), + }; + assert_eq!(peer.node_id, "abc123"); + assert_eq!(peer.addr.port(), 19091); + } +} diff --git a/crates/aingle_cortex/src/p2p/gossip.rs b/crates/aingle_cortex/src/p2p/gossip.rs new file mode 100644 index 0000000..20dc231 --- /dev/null +++ b/crates/aingle_cortex/src/p2p/gossip.rs @@ -0,0 +1,662 @@ +//! Bloom filter gossip adapted for triple synchronization. +//! +//! Ported from `aingle_minimal::gossip` with `Hash` replaced by `[u8; 32]` +//! (compatible with `TripleId.0`). + +use std::collections::{BinaryHeap, HashSet, VecDeque}; +use std::time::{Duration, Instant}; + +// ── Constants ──────────────────────────────────────────────────── + +/// Number of bits in the bloom filter. +const BLOOM_FILTER_BITS: usize = 1024; +/// Number of u64 words in the bloom filter. +const BLOOM_FILTER_WORDS: usize = BLOOM_FILTER_BITS / 64; +/// Number of hash functions. +const BLOOM_HASH_COUNT: usize = 3; +/// Maximum tokens in the rate-limit bucket. +const MAX_BUCKET_TOKENS: f64 = 100.0; +/// Minimum backoff between gossip attempts. +const MIN_BACKOFF: Duration = Duration::from_millis(100); +/// Maximum backoff between gossip attempts. +const MAX_BACKOFF: Duration = Duration::from_secs(300); +/// Backoff multiplier on failure. +const BACKOFF_MULTIPLIER: f64 = 2.0; + +// ── BloomFilter ────────────────────────────────────────────────── + +/// Memory-efficient bloom filter operating on `[u8; 32]` keys. +/// +/// Uses packed `u64` words (128 bytes for 1024 bits). +#[derive(Debug, Clone)] +pub struct BloomFilter { + bits: [u64; BLOOM_FILTER_WORDS], + hash_count: usize, + item_count: usize, + bit_count: usize, +} + +impl BloomFilter { + pub fn new() -> Self { + Self { + bits: [0u64; BLOOM_FILTER_WORDS], + hash_count: BLOOM_HASH_COUNT, + item_count: 0, + bit_count: BLOOM_FILTER_BITS, + } + } + + pub fn with_capacity(bits: usize, hash_count: usize) -> Self { + let bit_count = bits.min(BLOOM_FILTER_BITS); + Self { + bits: [0u64; BLOOM_FILTER_WORDS], + hash_count, + item_count: 0, + bit_count, + } + } + + /// Insert a 32-byte key. + #[inline] + pub fn insert(&mut self, key: &[u8; 32]) { + for i in 0..self.hash_count { + let index = self.hash_index(key, i); + let word_idx = index / 64; + let bit_idx = index % 64; + self.bits[word_idx] |= 1u64 << bit_idx; + } + self.item_count += 1; + } + + /// Check membership (may have false positives). + #[inline] + pub fn may_contain(&self, key: &[u8; 32]) -> bool { + for i in 0..self.hash_count { + let index = self.hash_index(key, i); + let word_idx = index / 64; + let bit_idx = index % 64; + if (self.bits[word_idx] & (1u64 << bit_idx)) == 0 { + return false; + } + } + true + } + + pub fn clear(&mut self) { + self.bits = [0u64; BLOOM_FILTER_WORDS]; + self.item_count = 0; + } + + #[inline] + pub fn len(&self) -> usize { + self.item_count + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.item_count == 0 + } + + pub fn estimated_false_positive_rate(&self) -> f64 { + if self.item_count == 0 { + return 0.0; + } + let m = self.bit_count as f64; + let k = self.hash_count as f64; + let n = self.item_count as f64; + (1.0 - (-k * n / m).exp()).powf(k) + } + + #[inline] + fn hash_index(&self, key: &[u8; 32], seed: usize) -> usize { + let base = u64::from_le_bytes([ + key[0], key[1], key[2], key[3], key[4], key[5], key[6], key[7], + ]); + let mixed = base + .wrapping_mul(0x9e3779b97f4a7c15u64.wrapping_add(seed as u64)) + .wrapping_add(seed as u64); + let mixed = mixed ^ (mixed >> 33); + let mixed = mixed.wrapping_mul(0xff51afd7ed558ccdu64); + (mixed as usize) % self.bit_count + } + + /// Serialize to 128 bytes. + pub fn to_bytes(&self) -> Vec { + let mut bytes = Vec::with_capacity(BLOOM_FILTER_WORDS * 8); + for word in &self.bits { + bytes.extend_from_slice(&word.to_le_bytes()); + } + bytes + } + + /// Deserialize from bytes. + pub fn from_bytes(bytes: &[u8]) -> Self { + let mut bits = [0u64; BLOOM_FILTER_WORDS]; + for (i, chunk) in bytes.chunks(8).take(BLOOM_FILTER_WORDS).enumerate() { + let mut arr = [0u8; 8]; + arr[..chunk.len()].copy_from_slice(chunk); + bits[i] = u64::from_le_bytes(arr); + } + Self { + bits, + hash_count: BLOOM_HASH_COUNT, + item_count: 0, + bit_count: BLOOM_FILTER_BITS, + } + } +} + +impl Default for BloomFilter { + fn default() -> Self { + Self::new() + } +} + +// ── TokenBucket ────────────────────────────────────────────────── + +/// Token-bucket rate limiter for gossip traffic. +#[derive(Debug)] +pub struct TokenBucket { + tokens: f64, + max_tokens: f64, + refill_rate: f64, + last_refill: Instant, +} + +impl TokenBucket { + /// Create a bucket sized for `rate_mbps` megabits/sec. + pub fn new(rate_mbps: f64) -> Self { + let refill_rate = (rate_mbps * 125_000.0) / 1024.0; + Self { + tokens: MAX_BUCKET_TOKENS, + max_tokens: MAX_BUCKET_TOKENS, + refill_rate, + last_refill: Instant::now(), + } + } + + pub fn with_params(max_tokens: f64, refill_rate: f64) -> Self { + Self { + tokens: max_tokens, + max_tokens, + refill_rate, + last_refill: Instant::now(), + } + } + + pub fn try_consume(&mut self, tokens: f64) -> bool { + self.refill(); + if self.tokens >= tokens { + self.tokens -= tokens; + true + } else { + false + } + } + + pub fn has_tokens(&mut self, tokens: f64) -> bool { + self.refill(); + self.tokens >= tokens + } + + pub fn available(&mut self) -> f64 { + self.refill(); + self.tokens + } + + fn refill(&mut self) { + let now = Instant::now(); + let elapsed = now.duration_since(self.last_refill).as_secs_f64(); + if elapsed > 0.0 { + self.tokens = (self.tokens + elapsed * self.refill_rate).min(self.max_tokens); + self.last_refill = now; + } + } + + pub fn time_until_available(&mut self, tokens: f64) -> Duration { + self.refill(); + if self.tokens >= tokens { + Duration::ZERO + } else { + let needed = tokens - self.tokens; + Duration::from_secs_f64(needed / self.refill_rate) + } + } +} + +// ── MessagePriority & Queue ────────────────────────────────────── + +/// Priority levels for gossip messages. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub enum MessagePriority { + Low = 0, + Normal = 1, + High = 2, + Critical = 3, +} + +#[derive(Debug)] +pub struct PrioritizedMessage { + pub message: T, + pub priority: MessagePriority, + pub queued_at: Instant, + sequence: u64, +} + +impl PartialEq for PrioritizedMessage { + fn eq(&self, other: &Self) -> bool { + self.priority == other.priority && self.sequence == other.sequence + } +} +impl Eq for PrioritizedMessage {} + +impl PartialOrd for PrioritizedMessage { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for PrioritizedMessage { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + match self.priority.cmp(&other.priority) { + std::cmp::Ordering::Equal => other.sequence.cmp(&self.sequence), + ord => ord, + } + } +} + +/// Priority queue for gossip messages. +#[derive(Debug)] +pub struct MessageQueue { + heap: BinaryHeap>, + sequence: u64, + max_size: usize, +} + +impl MessageQueue { + pub fn new(max_size: usize) -> Self { + Self { + heap: BinaryHeap::with_capacity(max_size), + sequence: 0, + max_size, + } + } + + pub fn push(&mut self, message: T, priority: MessagePriority) -> bool { + if self.heap.len() >= self.max_size { + return false; + } + self.sequence += 1; + self.heap.push(PrioritizedMessage { + message, + priority, + queued_at: Instant::now(), + sequence: self.sequence, + }); + true + } + + pub fn pop(&mut self) -> Option { + self.heap.pop().map(|pm| pm.message) + } + + pub fn peek(&self) -> Option<&T> { + self.heap.peek().map(|pm| &pm.message) + } + + pub fn len(&self) -> usize { + self.heap.len() + } + + pub fn is_empty(&self) -> bool { + self.heap.is_empty() + } + + pub fn clear(&mut self) { + self.heap.clear(); + } +} + +impl Default for MessageQueue { + fn default() -> Self { + Self::new(100) + } +} + +// ── PeerGossipState ────────────────────────────────────────────── + +/// Per-peer gossip state with adaptive backoff. +#[derive(Debug, Clone)] +pub struct PeerGossipState { + pub failures: u32, + pub successes: u32, + pub current_backoff: Duration, + pub last_attempt: Instant, + pub known_ids: BloomFilter, +} + +impl PeerGossipState { + pub fn new() -> Self { + Self { + failures: 0, + successes: 0, + current_backoff: MIN_BACKOFF, + last_attempt: Instant::now(), + known_ids: BloomFilter::new(), + } + } + + pub fn record_success(&mut self) { + self.failures = 0; + self.successes = self.successes.saturating_add(1); + self.current_backoff = Duration::from_millis( + (self.current_backoff.as_millis() as f64 / BACKOFF_MULTIPLIER) as u64, + ) + .max(MIN_BACKOFF); + self.last_attempt = Instant::now(); + } + + pub fn record_failure(&mut self) { + self.successes = 0; + self.failures = self.failures.saturating_add(1); + self.current_backoff = Duration::from_millis( + (self.current_backoff.as_millis() as f64 * BACKOFF_MULTIPLIER) as u64, + ) + .min(MAX_BACKOFF); + self.last_attempt = Instant::now(); + } + + pub fn should_gossip(&self) -> bool { + self.last_attempt.elapsed() >= self.current_backoff + } +} + +impl Default for PeerGossipState { + fn default() -> Self { + Self::new() + } +} + +// ── TripleGossipManager ────────────────────────────────────────── + +/// Gossip manager adapted for triple IDs (`[u8; 32]`). +#[derive(Debug)] +pub struct TripleGossipManager { + /// Pending IDs to announce. + pending_announcements: VecDeque<[u8; 32]>, + /// Local bloom filter of known IDs. + local_filter: BloomFilter, + /// Recent IDs (dedup set). + recent_ids: HashSet<[u8; 32]>, + max_recent: usize, + /// Gossip round counter. + round: u64, +} + +impl TripleGossipManager { + pub fn new() -> Self { + Self { + pending_announcements: VecDeque::with_capacity(100), + local_filter: BloomFilter::new(), + recent_ids: HashSet::with_capacity(1000), + max_recent: 1000, + round: 0, + } + } + + /// Register a new local triple for announcement. + pub fn announce(&mut self, id: [u8; 32]) { + self.local_filter.insert(&id); + + if self.recent_ids.len() >= self.max_recent { + self.recent_ids.clear(); + } + self.recent_ids.insert(id); + self.pending_announcements.push_back(id); + } + + /// Check if we already know about an ID. + pub fn is_known(&self, id: &[u8; 32]) -> bool { + self.recent_ids.contains(id) || self.local_filter.may_contain(id) + } + + /// Remove an ID from the recent set (cannot remove from bloom filter). + pub fn remove_known(&mut self, id: &[u8; 32]) { + self.recent_ids.remove(id); + } + + /// Register a known ID (e.g. received from peer). + pub fn add_known(&mut self, id: [u8; 32]) { + self.local_filter.insert(&id); + if self.recent_ids.len() < self.max_recent { + self.recent_ids.insert(id); + } + } + + /// Find IDs that exist in `our_ids` but are missing from `peer_filter`. + pub fn find_missing( + &self, + peer_filter: &BloomFilter, + our_ids: &[[u8; 32]], + ) -> Vec<[u8; 32]> { + our_ids + .iter() + .filter(|id| !peer_filter.may_contain(id)) + .copied() + .collect() + } + + /// Get a reference to the local bloom filter. + pub fn get_bloom_filter(&self) -> &BloomFilter { + &self.local_filter + } + + /// Drain pending announcements (up to `limit`). + pub fn take_announcements(&mut self, limit: usize) -> Vec<[u8; 32]> { + let count = limit.min(self.pending_announcements.len()); + self.pending_announcements.drain(..count).collect() + } + + pub fn gossip_complete(&mut self) { + self.round += 1; + } + + pub fn round(&self) -> u64 { + self.round + } + + /// Current statistics. + pub fn stats(&self) -> GossipStats { + GossipStats { + round: self.round, + pending_announcements: self.pending_announcements.len(), + known_ids: self.recent_ids.len(), + bloom_filter_items: self.local_filter.len(), + bloom_filter_fpr: self.local_filter.estimated_false_positive_rate(), + } + } +} + +impl Default for TripleGossipManager { + fn default() -> Self { + Self::new() + } +} + +/// Gossip statistics. +#[derive(Debug, Clone, serde::Serialize)] +pub struct GossipStats { + pub round: u64, + pub pending_announcements: usize, + pub known_ids: usize, + pub bloom_filter_items: usize, + pub bloom_filter_fpr: f64, +} + +#[cfg(test)] +mod tests { + use super::*; + + // ── BloomFilter tests ──────────────────────────────────── + + #[test] + fn bloom_insert_contains() { + let mut filter = BloomFilter::new(); + let key = [1u8; 32]; + assert!(!filter.may_contain(&key)); + filter.insert(&key); + assert!(filter.may_contain(&key)); + assert_eq!(filter.len(), 1); + } + + #[test] + fn bloom_serialization() { + let mut filter = BloomFilter::new(); + let key = [1u8; 32]; + filter.insert(&key); + let bytes = filter.to_bytes(); + let restored = BloomFilter::from_bytes(&bytes); + assert!(restored.may_contain(&key)); + } + + #[test] + fn bloom_false_positive_rate() { + let mut filter = BloomFilter::new(); + assert_eq!(filter.estimated_false_positive_rate(), 0.0); + for i in 0..10u8 { + filter.insert(&[i; 32]); + } + let fpr = filter.estimated_false_positive_rate(); + assert!(fpr > 0.0 && fpr < 1.0); + } + + #[test] + fn bloom_to_bytes_size() { + let filter = BloomFilter::new(); + assert_eq!(filter.to_bytes().len(), 128); + } + + #[test] + fn bloom_round_trip_many() { + let mut filter = BloomFilter::new(); + for i in 0..50u8 { + filter.insert(&[i; 32]); + } + let bytes = filter.to_bytes(); + let restored = BloomFilter::from_bytes(&bytes); + for i in 0..50u8 { + assert!(restored.may_contain(&[i; 32])); + } + } + + // ── TokenBucket tests ──────────────────────────────────── + + #[test] + fn token_bucket_consume() { + let mut bucket = TokenBucket::new(1.0); + assert!(bucket.try_consume(1.0)); + // Drain + for _ in 0..200 { + bucket.try_consume(1.0); + } + assert!(!bucket.try_consume(100.0)); + } + + #[test] + fn token_bucket_refill() { + let mut bucket = TokenBucket::with_params(10.0, 1000.0); + for _ in 0..15 { + bucket.try_consume(1.0); + } + std::thread::sleep(Duration::from_millis(20)); + assert!(bucket.has_tokens(1.0)); + } + + // ── MessageQueue tests ─────────────────────────────────── + + #[test] + fn message_queue_priority_ordering() { + let mut queue: MessageQueue = MessageQueue::new(10); + queue.push("low".into(), MessagePriority::Low); + queue.push("critical".into(), MessagePriority::Critical); + queue.push("normal".into(), MessagePriority::Normal); + queue.push("high".into(), MessagePriority::High); + + assert_eq!(queue.pop(), Some("critical".into())); + assert_eq!(queue.pop(), Some("high".into())); + assert_eq!(queue.pop(), Some("normal".into())); + assert_eq!(queue.pop(), Some("low".into())); + } + + // ── PeerGossipState tests ──────────────────────────────── + + #[test] + fn peer_gossip_state_backoff() { + let mut state = PeerGossipState::new(); + std::thread::sleep(MIN_BACKOFF + Duration::from_millis(10)); + assert!(state.should_gossip()); + + let initial = state.current_backoff; + state.record_failure(); + assert!(state.current_backoff > initial); + assert!(!state.should_gossip()); + + std::thread::sleep(state.current_backoff + Duration::from_millis(10)); + assert!(state.should_gossip()); + + let before = state.current_backoff; + state.record_success(); + assert!(state.current_backoff < before); + } + + // ── TripleGossipManager tests ──────────────────────────── + + #[test] + fn announce_adds_to_filter() { + let mut mgr = TripleGossipManager::new(); + let id = [42u8; 32]; + mgr.announce(id); + assert!(mgr.is_known(&id)); + } + + #[test] + fn find_missing_returns_delta() { + let mgr = TripleGossipManager::new(); + let a = [1u8; 32]; + let b = [2u8; 32]; + let c = [3u8; 32]; + let d = [4u8; 32]; + + let mut peer_filter = BloomFilter::new(); + peer_filter.insert(&a); + peer_filter.insert(&b); + + let missing = mgr.find_missing(&peer_filter, &[a, b, c, d]); + assert_eq!(missing.len(), 2); + assert!(missing.contains(&c)); + assert!(missing.contains(&d)); + } + + #[test] + fn remove_known_from_recent() { + let mut mgr = TripleGossipManager::new(); + let id = [42u8; 32]; + mgr.announce(id); + assert!(mgr.recent_ids.contains(&id)); + mgr.remove_known(&id); + assert!(!mgr.recent_ids.contains(&id)); + // Still in bloom filter (cannot remove), so is_known may still return true + } + + #[test] + fn stats_reflect_state() { + let mut mgr = TripleGossipManager::new(); + mgr.announce([1u8; 32]); + mgr.announce([2u8; 32]); + let stats = mgr.stats(); + assert_eq!(stats.pending_announcements, 2); + assert_eq!(stats.bloom_filter_items, 2); + assert_eq!(stats.known_ids, 2); + assert_eq!(stats.round, 0); + } +} diff --git a/crates/aingle_cortex/src/p2p/identity.rs b/crates/aingle_cortex/src/p2p/identity.rs new file mode 100644 index 0000000..1430219 --- /dev/null +++ b/crates/aingle_cortex/src/p2p/identity.rs @@ -0,0 +1,149 @@ +//! Node identity backed by Ed25519 keypair with persistent storage. + +use aingle_graph::{Triple, TripleId}; +use ed25519_dalek::{Signer, SigningKey, Verifier, VerifyingKey}; +use rand::RngCore; +use std::path::Path; + +/// Persistent node identity for P2P authentication. +pub struct NodeIdentity { + signing_key: SigningKey, +} + +impl NodeIdentity { + /// Load an existing keypair from `{data_dir}/node.key`, or generate and persist a new one. + pub fn load_or_generate(data_dir: &Path) -> std::io::Result { + let key_path = data_dir.join("node.key"); + + if key_path.exists() { + let seed = std::fs::read(&key_path)?; + if seed.len() != 32 { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "node.key must be exactly 32 bytes", + )); + } + let mut arr = [0u8; 32]; + arr.copy_from_slice(&seed); + Ok(Self { + signing_key: SigningKey::from_bytes(&arr), + }) + } else { + let mut rng = rand::rng(); + let mut seed = [0u8; 32]; + rng.fill_bytes(&mut seed); + + std::fs::create_dir_all(data_dir)?; + + // Write with restrictive permissions (Unix 0o600). + #[cfg(unix)] + { + use std::os::unix::fs::OpenOptionsExt; + use std::io::Write; + let mut f = std::fs::OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .mode(0o600) + .open(&key_path)?; + f.write_all(&seed)?; + } + #[cfg(not(unix))] + { + std::fs::write(&key_path, &seed)?; + } + + Ok(Self { + signing_key: SigningKey::from_bytes(&seed), + }) + } + } + + /// Hex-encoded public key (64 characters). + pub fn node_id(&self) -> String { + hex::encode(self.signing_key.verifying_key().to_bytes()) + } + + /// Raw 32-byte public key. + pub fn public_key(&self) -> [u8; 32] { + self.signing_key.verifying_key().to_bytes() + } + + /// Sign arbitrary data with Ed25519. + pub fn sign(&self, data: &[u8]) -> [u8; 64] { + self.signing_key.sign(data).to_bytes() + } +} + +/// Verify an Ed25519 signature against a public key. +pub fn verify(pubkey: &[u8; 32], data: &[u8], sig: &[u8; 64]) -> bool { + let Ok(vk) = VerifyingKey::from_bytes(pubkey) else { + return false; + }; + let signature = ed25519_dalek::Signature::from_bytes(sig); + vk.verify(data, &signature).is_ok() +} + +/// Convenience wrapper: compute the TripleId hash bytes for a triple. +pub fn triple_hash(triple: &Triple) -> [u8; 32] { + TripleId::from_triple(triple).0 +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + #[test] + fn generate_creates_key_file() { + let dir = TempDir::new().unwrap(); + let _ = NodeIdentity::load_or_generate(dir.path()).unwrap(); + assert!(dir.path().join("node.key").exists()); + } + + #[test] + fn load_returns_same_identity() { + let dir = TempDir::new().unwrap(); + let id1 = NodeIdentity::load_or_generate(dir.path()).unwrap(); + let id2 = NodeIdentity::load_or_generate(dir.path()).unwrap(); + assert_eq!(id1.node_id(), id2.node_id()); + } + + #[test] + fn sign_and_verify() { + let dir = TempDir::new().unwrap(); + let id = NodeIdentity::load_or_generate(dir.path()).unwrap(); + let data = b"hello cortex p2p"; + let sig = id.sign(data); + assert!(verify(&id.public_key(), data, &sig)); + } + + #[test] + fn verify_rejects_bad_signature() { + let dir = TempDir::new().unwrap(); + let id = NodeIdentity::load_or_generate(dir.path()).unwrap(); + let data = b"hello cortex p2p"; + let mut sig = id.sign(data); + sig[0] ^= 0xff; + assert!(!verify(&id.public_key(), data, &sig)); + } + + #[cfg(unix)] + #[test] + fn file_permissions_are_restrictive() { + use std::os::unix::fs::MetadataExt; + let dir = TempDir::new().unwrap(); + let _ = NodeIdentity::load_or_generate(dir.path()).unwrap(); + let meta = std::fs::metadata(dir.path().join("node.key")).unwrap(); + assert_eq!(meta.mode() & 0o777, 0o600); + } + + #[test] + fn node_id_is_64_hex_chars() { + let dir = TempDir::new().unwrap(); + let id = NodeIdentity::load_or_generate(dir.path()).unwrap(); + let nid = id.node_id(); + assert_eq!(nid.len(), 64); + assert!(nid.chars().all(|c| c.is_ascii_hexdigit())); + } +} diff --git a/crates/aingle_cortex/src/p2p/manager.rs b/crates/aingle_cortex/src/p2p/manager.rs new file mode 100644 index 0000000..baca7e6 --- /dev/null +++ b/crates/aingle_cortex/src/p2p/manager.rs @@ -0,0 +1,1046 @@ +//! P2P manager — orchestrates identity, transport, gossip, sync, and discovery. + +use crate::p2p::config::P2pConfig; +use crate::p2p::discovery::P2pDiscovery; +use crate::p2p::gossip::{BloomFilter, GossipStats, TripleGossipManager}; +use crate::p2p::message::{P2pMessage, TombstoneWire, TripleWire}; +use crate::p2p::peer_store::{PeerSource, PeerStore, StoredPeer}; +use crate::p2p::rate_limiter::IngressRateLimiter; +use crate::p2p::sync_manager::{SyncStats, TripleSyncManager}; +use crate::p2p::transport::{P2pTransport, P2pTransportConfig}; +use crate::state::{AppState, Event}; + +use aingle_graph::TripleId; +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::sync::RwLock; + +// ── Manual peer reconnection (A2) ──────────────────────────────── + +/// Tracks reconnection state for a manually-configured peer. +struct ManualPeerTracker { + addr: SocketAddr, + retries: u32, + max_retries: u32, + current_backoff: Duration, + last_attempt: Instant, + abandoned: bool, +} + +impl ManualPeerTracker { + fn new(addr: SocketAddr) -> Self { + Self { + addr, + retries: 0, + max_retries: 10, + current_backoff: Duration::from_secs(5), + last_attempt: Instant::now(), + abandoned: false, + } + } + + fn should_retry(&self) -> bool { + !self.abandoned && self.last_attempt.elapsed() >= self.current_backoff + } + + fn record_failure(&mut self) { + self.retries += 1; + self.last_attempt = Instant::now(); + self.current_backoff = Duration::from_secs( + (self.current_backoff.as_secs() * 2).min(300), + ); + if self.retries >= self.max_retries { + self.abandoned = true; + } + } + + fn record_success(&mut self) { + self.retries = 0; + self.current_backoff = Duration::from_secs(5); + self.abandoned = false; + self.last_attempt = Instant::now(); + } +} + +// ── Health check tracking (A4) ─────────────────────────────────── + +/// Tracks outstanding pings for health checking. +struct PingTracker { + outstanding: HashMap, + timeout: Duration, +} + +impl PingTracker { + fn new(timeout: Duration) -> Self { + Self { + outstanding: HashMap::new(), + timeout, + } + } + + fn record_ping(&mut self, addr: SocketAddr, timestamp_ms: u64) { + self.outstanding.insert(addr, (timestamp_ms, Instant::now())); + } + + fn record_pong(&mut self, addr: &SocketAddr, _timestamp_ms: u64) { + self.outstanding.remove(addr); + } + + fn timed_out_peers(&self) -> Vec { + self.outstanding + .iter() + .filter(|(_, (_, sent))| sent.elapsed() >= self.timeout) + .map(|(addr, _)| *addr) + .collect() + } + + fn clear(&mut self, addr: &SocketAddr) { + self.outstanding.remove(addr); + } +} + +// ── Health events passed between tasks ─────────────────────────── + +enum HealthEvent { + PongReceived { addr: SocketAddr, timestamp_ms: u64 }, +} + +// ── P2P Manager ────────────────────────────────────────────────── + +/// Orchestrator for the entire P2P subsystem. +pub struct P2pManager { + config: P2pConfig, + node_id: String, + gossip: Arc>, + sync: Arc>, + transport: Arc>, + discovery: Arc>, + running: Arc, + tasks: Vec>, +} + +/// Serializable P2P status. +#[derive(Debug, Clone, serde::Serialize)] +pub struct P2pStatus { + pub node_id: String, + pub enabled: bool, + pub port: u16, + pub peer_count: usize, + pub connected_peers: Vec, + pub gossip_stats: GossipStats, + pub sync_stats: SyncStats, +} + +/// Per-peer status DTO. +#[derive(Debug, Clone, serde::Serialize)] +pub struct PeerStatusDto { + pub addr: String, + pub connected: bool, +} + +impl P2pManager { + /// Start the P2P subsystem: load identity, bind transport, connect manual peers, + /// start discovery, and spawn background tasks. + pub async fn start(config: P2pConfig, app_state: AppState) -> Result, String> { + // 1. Load or generate node identity. + let identity = crate::p2p::identity::NodeIdentity::load_or_generate(&config.data_dir) + .map_err(|e| format!("identity: {}", e))?; + let node_id = identity.node_id(); + + // 2. Compute seed hash. + let seed_hash = if let Some(ref seed) = config.seed { + hex::encode(blake3::hash(seed.as_bytes()).as_bytes()) + } else { + String::new() + }; + + // 3. Create gossip & sync managers. + let gossip = Arc::new(RwLock::new(TripleGossipManager::new())); + let sync = Arc::new(RwLock::new(TripleSyncManager::new(Duration::from_millis( + config.gossip_interval_ms, + )))); + + // 4. Create transport and start. + let transport_config = P2pTransportConfig { + port: config.port, + max_connections: config.max_peers * 2, + ..Default::default() + }; + let mut transport_inner = + P2pTransport::new(transport_config, node_id.clone(), seed_hash.clone()); + transport_inner.start().await?; + + // 5. Rebuild local IDs from graph. + { + let graph = app_state.graph.read().await; + sync.write().await.rebuild_local_ids(&graph); + } + + // A3: Load persistent peer store and merge with manual peers. + let peer_store = Arc::new(RwLock::new( + PeerStore::load(&config.data_dir, config.max_peers * 2), + )); + + // 6. Connect to manual peers + persisted peers. + let triple_count = { + let s = sync.read().await; + s.local_ids().len() as u64 + }; + + // Connect manual peers + for peer_addr in &config.manual_peers { + match transport_inner.connect(*peer_addr, triple_count).await { + Ok(()) => { + tracing::info!("P2P connected to manual peer {}", peer_addr); + let now_ms = now_millis(); + let mut ps = peer_store.write().await; + ps.add(StoredPeer { + addr: *peer_addr, + node_id: None, + last_connected_ms: now_ms, + source: PeerSource::Manual, + }); + let _ = ps.save(); + } + Err(e) => tracing::warn!("P2P failed to connect to {}: {}", peer_addr, e), + } + } + + // Connect persisted peers not already connected + { + let ps = peer_store.read().await; + for stored in ps.all() { + if !transport_inner.is_connected(&stored.addr) + && !config.manual_peers.contains(&stored.addr) + { + match transport_inner.connect(stored.addr, triple_count).await { + Ok(()) => { + tracing::info!("P2P reconnected to persisted peer {}", stored.addr); + } + Err(e) => { + tracing::debug!("P2P persisted peer {} unreachable: {}", stored.addr, e); + } + } + } + } + } + + let transport = Arc::new(RwLock::new(transport_inner)); + + // A6: Create ingress rate limiter. + let rate_limiter = Arc::new(RwLock::new(IngressRateLimiter::new( + config.max_triples_per_peer_per_min, + config.max_triples_global_per_min, + ))); + + // 7. Discovery. + let mut disc = + P2pDiscovery::new(node_id.clone(), seed_hash.clone(), config.port)?; + if config.mdns { + disc.register()?; + disc.start_browsing()?; + } + let discovery = Arc::new(RwLock::new(disc)); + + let running = Arc::new(AtomicBool::new(true)); + let mut tasks = Vec::new(); + + // A4: Health event channel between Task 3 (sender) and Task 6 (receiver). + let (health_tx, health_rx) = tokio::sync::mpsc::channel::(256); + + // ── Task 1: Event listener ─────────────────────────── + { + let gossip = gossip.clone(); + let sync = sync.clone(); + let transport = transport.clone(); + let running = running.clone(); + let broadcaster = app_state.broadcaster.clone(); + let graph = app_state.graph.clone(); + + tasks.push(tokio::spawn(async move { + let mut rx = broadcaster.subscribe(); + loop { + if !running.load(Ordering::Relaxed) { + break; + } + match rx.recv().await { + Ok(Event::TripleAdded { hash, .. }) => { + if let Some(id) = TripleId::from_hex(&hash) { + gossip.write().await.announce(id.0); + sync.write().await.add_local_id(id.0); + } + } + // A1: Handle triple deletions — tombstone propagation. + Ok(Event::TripleDeleted { hash }) => { + if let Some(id) = TripleId::from_hex(&hash) { + // 1. Remove from gossip recent_ids + gossip.write().await.remove_known(&id.0); + // 2. Remove from sync local_ids + let mut s = sync.write().await; + s.remove_local_id(&id.0); + // 3. Add tombstone + let ts_ms = now_millis(); + s.add_tombstone(id.0, ts_ms); + drop(s); + // 4. Broadcast AnnounceDelete to all connected peers + let peers = transport.read().await.connected_peers(); + let msg = P2pMessage::AnnounceDelete { + triple_id: hash, + tombstone_ts: ts_ms, + }; + let t = transport.read().await; + for peer_addr in &peers { + let _ = t.send(peer_addr, &msg).await; + } + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!("P2P event listener lagged by {} events, rebuilding", n); + let g = graph.read().await; + sync.write().await.rebuild_local_ids(&g); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + _ => {} + } + } + })); + } + + // ── Task 2: Gossip loop + tombstone sync ───────────── + { + let sync = sync.clone(); + let transport = transport.clone(); + let running = running.clone(); + let interval = config.gossip_interval_ms; + + tasks.push(tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_millis(interval)).await; + if !running.load(Ordering::Relaxed) { + break; + } + + let peers = sync.read().await.peers_needing_sync(); + for peer_addr in peers { + // Send bloom filter + let filter = sync.read().await.build_local_filter(); + let triple_count = sync.read().await.local_ids().len() as u64; + let msg = P2pMessage::BloomSync { + filter_bytes: filter.to_bytes(), + triple_count, + }; + let t = transport.read().await; + if let Err(e) = t.send(&peer_addr, &msg).await { + tracing::debug!("gossip send to {}: {}", peer_addr, e); + } + + // A1: Also send active tombstones + let tombstones = sync.read().await.active_tombstones(); + if !tombstones.is_empty() { + let wires: Vec = tombstones + .iter() + .map(|(id, ts)| TombstoneWire { + triple_id: hex::encode(id), + deleted_at_ms: *ts, + }) + .collect(); + let ts_msg = P2pMessage::TombstoneSync { tombstones: wires }; + let _ = t.send(&peer_addr, &ts_msg).await; + } + } + } + })); + } + + // ── Task 3: Incoming message handler ───────────────── + { + let gossip = gossip.clone(); + let sync = sync.clone(); + let transport = transport.clone(); + let running = running.clone(); + let graph = app_state.graph.clone(); + let seed_hash2 = seed_hash.clone(); + let rate_limiter = rate_limiter.clone(); + let peer_store = peer_store.clone(); + let health_tx = health_tx.clone(); + + tasks.push(tokio::spawn(async move { + loop { + if !running.load(Ordering::Relaxed) { + break; + } + // A5: Still uses polling recv (accept_loop would require + // transport refactoring beyond scope; see note below). + // Reduced poll interval from 50ms to 10ms for better latency. + tokio::time::sleep(Duration::from_millis(10)).await; + + let maybe = { + let t = transport.read().await; + t.recv().await + }; + + let (addr, msg) = match maybe { + Ok(Some(pair)) => pair, + _ => continue, + }; + + match msg { + P2pMessage::Hello { + seed_hash: peer_seed, + node_id: peer_node_id, + .. + } => { + let accepted = peer_seed == seed_hash2; + let ack = P2pMessage::HelloAck { + node_id: String::new(), + accepted, + reason: if accepted { + None + } else { + Some("seed_mismatch".into()) + }, + }; + let t = transport.read().await; + let _ = t.send(&addr, &ack).await; + if accepted { + // A3: Record in peer store + let now_ms = now_millis(); + let mut ps = peer_store.write().await; + ps.add(StoredPeer { + addr, + node_id: Some(peer_node_id.clone()), + last_connected_ms: now_ms, + source: PeerSource::RestApi, + }); + let _ = ps.save(); + } else { + tracing::warn!( + "P2P rejected {} from {}: seed mismatch", + &peer_node_id[..8.min(peer_node_id.len())], + addr + ); + } + } + P2pMessage::BloomSync { + filter_bytes, + .. + } => { + let peer_filter = BloomFilter::from_bytes(&filter_bytes); + let local_ids: Vec<[u8; 32]> = + sync.read().await.local_ids().to_vec(); + let missing = + gossip.read().await.find_missing(&peer_filter, &local_ids); + + if !missing.is_empty() { + let g = graph.read().await; + let mut wires = Vec::new(); + for id_bytes in &missing { + let tid = TripleId::new(*id_bytes); + if let Ok(Some(triple)) = g.get(&tid) { + wires.push(TripleWire::from_triple(&triple)); + } + } + if !wires.is_empty() { + let send_msg = P2pMessage::SendTriples { triples: wires }; + let t = transport.read().await; + let _ = t.send(&addr, &send_msg).await; + } + } + } + P2pMessage::RequestTriples { ids } => { + let g = graph.read().await; + let mut wires = Vec::new(); + for hex_id in &ids { + if let Some(tid) = TripleId::from_hex(hex_id) { + if let Ok(Some(triple)) = g.get(&tid) { + wires.push(TripleWire::from_triple(&triple)); + } + } + } + if !wires.is_empty() { + let send_msg = P2pMessage::SendTriples { triples: wires }; + let t = transport.read().await; + let _ = t.send(&addr, &send_msg).await; + } + } + P2pMessage::SendTriples { triples } => { + // A6: Apply backpressure before processing. + let total = triples.len(); + let allowed = rate_limiter.write().await.check(&addr, total); + if allowed < total { + tracing::warn!( + "P2P rate limited {} triples from {} (allowed {}/{})", + total - allowed, + addr, + allowed, + total + ); + } + + let converted: Vec<_> = triples + .iter() + .take(allowed) + .filter_map(|tw| tw.to_triple()) + .collect(); + let g = graph.read().await; + let result = sync + .write() + .await + .store_received_triples(converted, &g); + sync.write() + .await + .record_sync_result(addr, true, result.inserted); + if result.inserted > 0 { + tracing::info!( + "P2P synced {} triples from {} ({} dup, {} err)", + result.inserted, + addr, + result.duplicates, + result.errors + ); + // A3: Update last connected + let mut ps = peer_store.write().await; + ps.update_last_connected(&addr, now_millis()); + let _ = ps.save(); + } + } + P2pMessage::Announce { triple_id } => { + if let Some(tid) = TripleId::from_hex(&triple_id) { + let known = gossip.read().await.is_known(&tid.0); + if !known { + let req = P2pMessage::RequestTriples { + ids: vec![triple_id], + }; + let t = transport.read().await; + let _ = t.send(&addr, &req).await; + } + } + } + // A1: Handle incoming deletion announcement. + P2pMessage::AnnounceDelete { triple_id, tombstone_ts } => { + if let Some(tid) = TripleId::from_hex(&triple_id) { + let mut s = sync.write().await; + if !s.has_tombstone(&tid.0) { + s.remove_local_id(&tid.0); + s.add_tombstone(tid.0, tombstone_ts); + drop(s); + gossip.write().await.remove_known(&tid.0); + // Delete from local graph + let g = graph.read().await; + let _ = g.delete(&tid); + tracing::debug!( + "P2P applied tombstone for {} from {}", + &triple_id[..8.min(triple_id.len())], + addr + ); + } + } + } + // A1: Handle batch tombstone sync. + P2pMessage::TombstoneSync { tombstones } => { + for tw in &tombstones { + if let Some(tid) = TripleId::from_hex(&tw.triple_id) { + let mut s = sync.write().await; + if !s.has_tombstone(&tid.0) { + s.remove_local_id(&tid.0); + s.add_tombstone(tid.0, tw.deleted_at_ms); + drop(s); + gossip.write().await.remove_known(&tid.0); + let g = graph.read().await; + let _ = g.delete(&tid); + } + } + } + } + P2pMessage::Ping { timestamp_ms } => { + let count = sync.read().await.local_ids().len() as u64; + let pong = P2pMessage::Pong { + timestamp_ms, + triple_count: count, + }; + let t = transport.read().await; + let _ = t.send(&addr, &pong).await; + } + // A4: Forward pong to health task via channel. + P2pMessage::Pong { timestamp_ms, .. } => { + let _ = health_tx + .send(HealthEvent::PongReceived { + addr, + timestamp_ms, + }) + .await; + } + _ => {} + } + } + })); + } + + // ── Task 4: mDNS discovery reconnect ───────────────── + if config.mdns { + let transport = transport.clone(); + let discovery = discovery.clone(); + let running = running.clone(); + let sync = sync.clone(); + let peer_store = peer_store.clone(); + + tasks.push(tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_secs(30)).await; + if !running.load(Ordering::Relaxed) { + break; + } + + let discovered = discovery.read().await.get_discovered_peers(); + for peer in discovered { + let connected = transport.read().await.is_connected(&peer.addr); + if !connected { + let triple_count = sync.read().await.local_ids().len() as u64; + let result = transport + .write() + .await + .connect(peer.addr, triple_count) + .await; + if let Ok(()) = result { + tracing::info!( + "P2P discovered and connected to {}", + peer.node_id + ); + // A3: Record mDNS peer + let mut ps = peer_store.write().await; + ps.add(StoredPeer { + addr: peer.addr, + node_id: Some(peer.node_id.clone()), + last_connected_ms: now_millis(), + source: PeerSource::Mdns, + }); + let _ = ps.save(); + } + } + } + } + })); + } + + // ── Task 5: Manual peer reconnection (A2) ──────────── + { + let transport = transport.clone(); + let sync = sync.clone(); + let running = running.clone(); + let manual_peers: Vec = config.manual_peers.clone(); + + tasks.push(tokio::spawn(async move { + let mut trackers: Vec = manual_peers + .into_iter() + .map(ManualPeerTracker::new) + .collect(); + + loop { + tokio::time::sleep(Duration::from_secs(5)).await; + if !running.load(Ordering::Relaxed) { + break; + } + + for tracker in &mut trackers { + if tracker.abandoned { + continue; + } + let connected = transport.read().await.is_connected(&tracker.addr); + if connected { + if tracker.retries > 0 { + tracker.record_success(); + } + continue; + } + if !tracker.should_retry() { + continue; + } + let triple_count = sync.read().await.local_ids().len() as u64; + let result = transport + .write() + .await + .connect(tracker.addr, triple_count) + .await; + match result { + Ok(()) => { + tracing::info!("P2P reconnected to manual peer {}", tracker.addr); + tracker.record_success(); + } + Err(e) => { + tracing::debug!( + "P2P reconnect to {} failed (attempt {}): {}", + tracker.addr, + tracker.retries + 1, + e + ); + tracker.record_failure(); + } + } + } + } + })); + } + + // ── Task 6: Health checks — Ping/Pong (A4) ─────────── + { + let transport = transport.clone(); + let running = running.clone(); + let mut health_rx = health_rx; + + tasks.push(tokio::spawn(async move { + let mut tracker = PingTracker::new(Duration::from_secs(10)); + + loop { + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(30)) => { + if !running.load(Ordering::Relaxed) { + break; + } + + // Check timeouts — disconnect unresponsive peers + let timed_out = tracker.timed_out_peers(); + for addr in &timed_out { + tracing::warn!("P2P peer {} timed out (no pong), disconnecting", addr); + transport.write().await.disconnect(addr); + tracker.clear(addr); + } + + // Send pings to all connected peers + let peers = transport.read().await.connected_peers(); + let ts = now_millis(); + let ping = P2pMessage::Ping { timestamp_ms: ts }; + let t = transport.read().await; + for peer_addr in &peers { + if t.send(peer_addr, &ping).await.is_ok() { + tracker.record_ping(*peer_addr, ts); + } + } + } + event = health_rx.recv() => { + match event { + Some(HealthEvent::PongReceived { addr, timestamp_ms }) => { + tracker.record_pong(&addr, timestamp_ms); + } + None => break, + } + } + } + } + })); + } + + // ── Task 7: Periodic cleanup (A7) ──────────────────── + { + let sync = sync.clone(); + let peer_store = peer_store.clone(); + let running = running.clone(); + + tasks.push(tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_secs(300)).await; + if !running.load(Ordering::Relaxed) { + break; + } + + // 1. Remove inactive sync states (15 min) + sync.write() + .await + .cleanup_inactive(Duration::from_secs(900)); + + // 2. Remove expired tombstones (A1) + sync.write().await.cleanup_expired_tombstones(); + + // 3. Cleanup stale peers (24h) and save (A3) + let mut ps = peer_store.write().await; + ps.cleanup_stale(24 * 3600 * 1000); + let _ = ps.save(); + + tracing::debug!("P2P cleanup cycle completed"); + } + })); + } + + tracing::info!( + "P2P manager started: node={}, port={}", + &node_id[..16.min(node_id.len())], + config.port + ); + + Ok(Arc::new(Self { + config, + node_id, + gossip, + sync, + transport, + discovery, + running, + tasks, + })) + } + + /// Stop the P2P subsystem. + pub async fn stop(&self) { + self.running.store(false, Ordering::SeqCst); + for task in &self.tasks { + task.abort(); + } + self.transport.write().await.stop(); + self.discovery.write().await.stop(); + tracing::info!("P2P manager stopped"); + } + + /// Current P2P status. + pub async fn status(&self) -> P2pStatus { + let transport = self.transport.read().await; + let peers: Vec = transport + .connected_peers() + .iter() + .map(|addr| PeerStatusDto { + addr: addr.to_string(), + connected: true, + }) + .collect(); + + P2pStatus { + node_id: self.node_id.clone(), + enabled: self.config.enabled, + port: self.config.port, + peer_count: peers.len(), + connected_peers: peers, + gossip_stats: self.gossip.read().await.stats(), + sync_stats: self.sync.read().await.stats(), + } + } + + /// Connect to a new peer at runtime. + pub async fn add_peer(&self, addr: SocketAddr) -> Result<(), String> { + let triple_count = self.sync.read().await.local_ids().len() as u64; + self.transport + .write() + .await + .connect(addr, triple_count) + .await + } + + /// Disconnect a peer. + pub async fn remove_peer(&self, addr: SocketAddr) { + self.transport.write().await.disconnect(&addr); + } + + /// Node ID (hex public key). + pub fn node_id(&self) -> &str { + &self.node_id + } +} + +/// Current time in milliseconds since UNIX epoch. +fn now_millis() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64 +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn peer_status_dto_fields() { + let dto = PeerStatusDto { + addr: "127.0.0.1:19091".into(), + connected: true, + }; + assert!(dto.connected); + } + + #[test] + fn p2p_status_serialize() { + let status = P2pStatus { + node_id: "abc".into(), + enabled: true, + port: 19091, + peer_count: 0, + connected_peers: vec![], + gossip_stats: GossipStats { + round: 0, + pending_announcements: 0, + known_ids: 0, + bloom_filter_items: 0, + bloom_filter_fpr: 0.0, + }, + sync_stats: SyncStats { + peer_count: 0, + local_ids: 0, + total_successful_syncs: 0, + total_failed_syncs: 0, + }, + }; + let json = serde_json::to_string(&status).unwrap(); + assert!(json.contains("node_id")); + } + + #[tokio::test] + async fn manager_starts_and_stops() { + let mut config = P2pConfig::default(); + config.enabled = true; + config.port = 0; // OS-assigned + config.data_dir = tempfile::TempDir::new().unwrap().into_path(); + + let state = AppState::new(); + let manager = P2pManager::start(config, state).await.unwrap(); + assert!(!manager.node_id().is_empty()); + + let status = manager.status().await; + assert!(status.enabled); + assert_eq!(status.peer_count, 0); + + manager.stop().await; + } + + // ── A2: Manual peer tracker tests ──────────────────────── + + #[test] + fn manual_peer_tracker_new() { + let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap(); + let tracker = ManualPeerTracker::new(addr); + assert_eq!(tracker.retries, 0); + assert!(!tracker.abandoned); + assert_eq!(tracker.current_backoff, Duration::from_secs(5)); + } + + #[test] + fn tracker_should_retry_after_backoff() { + let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap(); + let mut tracker = ManualPeerTracker::new(addr); + tracker.last_attempt = Instant::now() - Duration::from_secs(6); + assert!(tracker.should_retry()); + } + + #[test] + fn tracker_record_failure_doubles_backoff() { + let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap(); + let mut tracker = ManualPeerTracker::new(addr); + let initial = tracker.current_backoff; + tracker.record_failure(); + assert_eq!(tracker.current_backoff, initial * 2); + assert_eq!(tracker.retries, 1); + } + + #[test] + fn tracker_record_failure_caps_at_max() { + let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap(); + let mut tracker = ManualPeerTracker::new(addr); + for _ in 0..20 { + tracker.record_failure(); + } + assert!(tracker.current_backoff <= Duration::from_secs(300)); + } + + #[test] + fn tracker_abandoned_after_max_retries() { + let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap(); + let mut tracker = ManualPeerTracker::new(addr); + for _ in 0..10 { + tracker.record_failure(); + } + assert!(tracker.abandoned); + assert!(!tracker.should_retry()); + } + + #[test] + fn tracker_record_success_resets() { + let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap(); + let mut tracker = ManualPeerTracker::new(addr); + tracker.record_failure(); + tracker.record_failure(); + tracker.record_success(); + assert_eq!(tracker.retries, 0); + assert_eq!(tracker.current_backoff, Duration::from_secs(5)); + assert!(!tracker.abandoned); + } + + // ── A4: Ping tracker tests ─────────────────────────────── + + #[test] + fn ping_tracker_new_empty() { + let tracker = PingTracker::new(Duration::from_secs(10)); + assert!(tracker.outstanding.is_empty()); + assert!(tracker.timed_out_peers().is_empty()); + } + + #[test] + fn ping_tracker_record_and_pong() { + let mut tracker = PingTracker::new(Duration::from_secs(10)); + let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap(); + tracker.record_ping(addr, 1000); + assert_eq!(tracker.outstanding.len(), 1); + tracker.record_pong(&addr, 1000); + assert!(tracker.outstanding.is_empty()); + } + + #[test] + fn ping_tracker_timed_out_detection() { + let mut tracker = PingTracker::new(Duration::from_millis(10)); + let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap(); + tracker.outstanding.insert(addr, (1000, Instant::now() - Duration::from_millis(50))); + let timed_out = tracker.timed_out_peers(); + assert_eq!(timed_out.len(), 1); + assert_eq!(timed_out[0], addr); + } + + #[test] + fn ping_tracker_clear_removes_entry() { + let mut tracker = PingTracker::new(Duration::from_secs(10)); + let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap(); + tracker.record_ping(addr, 1000); + tracker.clear(&addr); + assert!(tracker.outstanding.is_empty()); + } + + #[test] + fn ping_tracker_no_false_timeout() { + let mut tracker = PingTracker::new(Duration::from_secs(60)); + let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap(); + tracker.record_ping(addr, 1000); + // Just recorded, shouldn't be timed out + assert!(tracker.timed_out_peers().is_empty()); + } + + // ── A7: Cleanup tests ──────────────────────────────────── + + #[test] + fn cleanup_removes_inactive_sync_states() { + let mut sm = TripleSyncManager::new(Duration::from_secs(60)); + let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap(); + sm.get_peer_state(&addr); + std::thread::sleep(Duration::from_millis(10)); + sm.cleanup_inactive(Duration::from_millis(1)); + assert_eq!(sm.stats().peer_count, 0); + } + + #[test] + fn cleanup_removes_expired_tombstones() { + let mut sm = TripleSyncManager::with_tombstone_ttl( + Duration::from_secs(60), + Duration::from_millis(10), + ); + sm.add_tombstone([1u8; 32], 0); // very old + sm.cleanup_expired_tombstones(); + assert!(!sm.has_tombstone(&[1u8; 32])); + } + + #[test] + fn cleanup_interval_configurable() { + // Verify that the cleanup interval (5min in Task 7) can be configured + // by checking the tombstone TTL is customizable + let sm = TripleSyncManager::with_tombstone_ttl( + Duration::from_secs(60), + Duration::from_secs(7200), + ); + assert_eq!(sm.tombstone_ttl, Duration::from_secs(7200)); + } +} diff --git a/crates/aingle_cortex/src/p2p/message.rs b/crates/aingle_cortex/src/p2p/message.rs new file mode 100644 index 0000000..63cc24d --- /dev/null +++ b/crates/aingle_cortex/src/p2p/message.rs @@ -0,0 +1,424 @@ +//! P2P wire protocol messages adapted for triple synchronization. + +use aingle_graph::{NodeId, Predicate, Triple, TripleMeta, Value}; +use serde::{Deserialize, Serialize}; + +/// Maximum message size (4 MB). +pub const MAX_MESSAGE_SIZE: usize = 4 * 1024 * 1024; + +/// Maximum IDs in a single `RequestTriples` message. +pub const MAX_REQUEST_IDS: usize = 100; + +/// Maximum triples in a single `SendTriples` batch. +pub const MAX_BATCH_TRIPLES: usize = 5000; + +/// Protocol messages exchanged between Cortex P2P nodes. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum P2pMessage { + /// Handshake: announce identity and network membership. + Hello { + node_id: String, + /// blake3 hash of the seed (never the seed itself). + seed_hash: String, + version: String, + triple_count: u64, + }, + /// Handshake acknowledgement. + HelloAck { + node_id: String, + accepted: bool, + reason: Option, + }, + /// Bloom filter for set reconciliation. + BloomSync { + filter_bytes: Vec, + triple_count: u64, + }, + /// Request triples by their hex IDs. + RequestTriples { + ids: Vec, + }, + /// Batch of triples. + SendTriples { + triples: Vec, + }, + /// Lightweight announcement of a new triple. + Announce { + triple_id: String, + }, + /// Keep-alive ping. + Ping { + timestamp_ms: u64, + }, + /// Keep-alive pong. + Pong { + timestamp_ms: u64, + triple_count: u64, + }, + /// Announce a triple deletion (tombstone propagation). + AnnounceDelete { + triple_id: String, + tombstone_ts: u64, + }, + /// Batch tombstone synchronization. + TombstoneSync { + tombstones: Vec, + }, +} + +/// Wire format for a tombstone marker. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TombstoneWire { + pub triple_id: String, + pub deleted_at_ms: u64, +} + +/// Serializable wire format for a triple (no internal indices). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TripleWire { + pub subject: String, + pub predicate: String, + pub object: serde_json::Value, + pub created_at: Option, + pub author: Option, + pub source: Option, +} + +impl TripleWire { + /// Convert an `aingle_graph::Triple` into a wire representation. + pub fn from_triple(triple: &Triple) -> Self { + let subject = match &triple.subject { + NodeId::Named(s) => s.clone(), + NodeId::Hash(h) => format!("hash:{}", hex::encode(h)), + NodeId::Blank(id) => format!("_:b{}", id), + }; + + let predicate = triple.predicate.as_str().to_string(); + + let object = value_to_json(&triple.object); + + Self { + subject, + predicate, + object, + created_at: Some(triple.meta.created_at.to_rfc3339()), + author: triple.meta.author.as_ref().map(|n| match n { + NodeId::Named(s) => s.clone(), + NodeId::Hash(h) => format!("hash:{}", hex::encode(h)), + NodeId::Blank(id) => format!("_:b{}", id), + }), + source: triple.meta.source.clone(), + } + } + + /// Convert back into an `aingle_graph::Triple`. Returns `None` on parse failure. + pub fn to_triple(&self) -> Option { + let subject = NodeId::named(&self.subject); + let predicate = Predicate::named(&self.predicate); + let object = json_to_value(&self.object); + + let mut meta = TripleMeta::new(); + if let Some(ref ts) = self.created_at { + if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(ts) { + meta.created_at = dt.with_timezone(&chrono::Utc); + } + } + if let Some(ref author) = self.author { + meta = meta.with_author(NodeId::named(author)); + } + if let Some(ref source) = self.source { + meta = meta.with_source(source.as_str()); + } + + Some(Triple::with_meta(subject, predicate, object, meta)) + } +} + +/// Serialize a `P2pMessage` with a 4-byte big-endian length prefix + JSON payload. +impl P2pMessage { + pub fn to_bytes(&self) -> Vec { + let json = serde_json::to_vec(self).expect("P2pMessage is always serializable"); + let len = json.len() as u32; + let mut buf = Vec::with_capacity(4 + json.len()); + buf.extend_from_slice(&len.to_be_bytes()); + buf.extend_from_slice(&json); + buf + } + + /// Deserialize from `[4-byte len][JSON]`. + pub fn from_bytes(bytes: &[u8]) -> Result { + if bytes.len() < 4 { + return Err("message too short".to_string()); + } + let len = u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as usize; + if len > MAX_MESSAGE_SIZE { + return Err(format!("message too large: {} bytes", len)); + } + if bytes.len() < 4 + len { + return Err("incomplete message".to_string()); + } + serde_json::from_slice(&bytes[4..4 + len]).map_err(|e| format!("json parse error: {}", e)) + } +} + +// ── helpers ────────────────────────────────────────────────────── + +fn value_to_json(v: &Value) -> serde_json::Value { + match v { + Value::String(s) => serde_json::Value::String(s.clone()), + Value::Integer(n) => serde_json::json!(n), + Value::Float(f) => serde_json::json!(f), + Value::Boolean(b) => serde_json::json!(b), + Value::DateTime(s) => serde_json::json!({ "type": "datetime", "value": s }), + Value::Node(nid) => match nid { + NodeId::Named(s) => serde_json::json!({ "type": "node", "value": s }), + NodeId::Hash(h) => { + serde_json::json!({ "type": "node", "value": format!("hash:{}", hex::encode(h)) }) + } + NodeId::Blank(id) => { + serde_json::json!({ "type": "node", "value": format!("_:b{}", id) }) + } + }, + Value::Json(j) => j.clone(), + Value::Null => serde_json::Value::Null, + Value::Typed { value, datatype } => { + serde_json::json!({ "type": "typed", "value": value, "datatype": datatype }) + } + Value::LangString { value, lang } => { + serde_json::json!({ "type": "lang", "value": value, "lang": lang }) + } + Value::Bytes(b) => serde_json::json!({ "type": "bytes", "value": hex::encode(b) }), + } +} + +fn json_to_value(j: &serde_json::Value) -> Value { + match j { + serde_json::Value::String(s) => Value::String(s.clone()), + serde_json::Value::Number(n) => { + if let Some(i) = n.as_i64() { + Value::Integer(i) + } else if let Some(f) = n.as_f64() { + Value::Float(f) + } else { + Value::String(n.to_string()) + } + } + serde_json::Value::Bool(b) => Value::Boolean(*b), + serde_json::Value::Null => Value::Null, + serde_json::Value::Object(map) => { + if let Some(t) = map.get("type").and_then(|v| v.as_str()) { + let val = map + .get("value") + .and_then(|v| v.as_str()) + .unwrap_or_default(); + match t { + "node" => Value::Node(NodeId::named(val)), + "datetime" => Value::DateTime(val.to_string()), + "typed" => { + let dt = map + .get("datatype") + .and_then(|v| v.as_str()) + .unwrap_or_default(); + Value::Typed { + value: val.to_string(), + datatype: dt.to_string(), + } + } + "lang" => { + let lang = map + .get("lang") + .and_then(|v| v.as_str()) + .unwrap_or_default(); + Value::LangString { + value: val.to_string(), + lang: lang.to_string(), + } + } + "bytes" => { + let decoded = hex::decode(val).unwrap_or_default(); + Value::Bytes(decoded) + } + _ => Value::Json(j.clone()), + } + } else { + Value::Json(j.clone()) + } + } + serde_json::Value::Array(_) => Value::Json(j.clone()), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn hello_roundtrip() { + let msg = P2pMessage::Hello { + node_id: "abc123".into(), + seed_hash: "def456".into(), + version: "0.3.7".into(), + triple_count: 42, + }; + let bytes = msg.to_bytes(); + let parsed = P2pMessage::from_bytes(&bytes).unwrap(); + assert!(matches!(parsed, P2pMessage::Hello { triple_count: 42, .. })); + } + + #[test] + fn bloom_sync_roundtrip() { + let filter = vec![0xffu8; 128]; + let msg = P2pMessage::BloomSync { + filter_bytes: filter.clone(), + triple_count: 100, + }; + let bytes = msg.to_bytes(); + let parsed = P2pMessage::from_bytes(&bytes).unwrap(); + match parsed { + P2pMessage::BloomSync { filter_bytes, triple_count } => { + assert_eq!(filter_bytes.len(), 128); + assert_eq!(triple_count, 100); + } + _ => panic!("wrong variant"), + } + } + + #[test] + fn request_triples_roundtrip() { + let msg = P2pMessage::RequestTriples { + ids: vec!["aabb".into(), "ccdd".into()], + }; + let bytes = msg.to_bytes(); + let parsed = P2pMessage::from_bytes(&bytes).unwrap(); + match parsed { + P2pMessage::RequestTriples { ids } => assert_eq!(ids.len(), 2), + _ => panic!("wrong variant"), + } + } + + #[test] + fn send_triples_roundtrip() { + let tw = TripleWire { + subject: "test:a".into(), + predicate: "test:b".into(), + object: serde_json::json!("hello"), + created_at: None, + author: None, + source: None, + }; + let msg = P2pMessage::SendTriples { + triples: vec![tw], + }; + let bytes = msg.to_bytes(); + let parsed = P2pMessage::from_bytes(&bytes).unwrap(); + match parsed { + P2pMessage::SendTriples { triples } => assert_eq!(triples.len(), 1), + _ => panic!("wrong variant"), + } + } + + #[test] + fn announce_roundtrip() { + let msg = P2pMessage::Announce { + triple_id: "deadbeef".into(), + }; + let bytes = msg.to_bytes(); + let parsed = P2pMessage::from_bytes(&bytes).unwrap(); + assert!(matches!(parsed, P2pMessage::Announce { .. })); + } + + #[test] + fn ping_pong_roundtrip() { + let ping = P2pMessage::Ping { timestamp_ms: 1000 }; + let bytes = ping.to_bytes(); + let parsed = P2pMessage::from_bytes(&bytes).unwrap(); + assert!(matches!(parsed, P2pMessage::Ping { timestamp_ms: 1000 })); + + let pong = P2pMessage::Pong { + timestamp_ms: 1000, + triple_count: 50, + }; + let bytes = pong.to_bytes(); + let parsed = P2pMessage::from_bytes(&bytes).unwrap(); + assert!(matches!( + parsed, + P2pMessage::Pong { + timestamp_ms: 1000, + triple_count: 50 + } + )); + } + + #[test] + fn rejects_oversized_message() { + // Craft a length prefix > MAX_MESSAGE_SIZE. + let len = (MAX_MESSAGE_SIZE as u32 + 1).to_be_bytes(); + let mut bytes = vec![]; + bytes.extend_from_slice(&len); + bytes.extend_from_slice(&[0u8; 10]); + assert!(P2pMessage::from_bytes(&bytes).is_err()); + } + + #[test] + fn tombstone_wire_roundtrip() { + let tw = TombstoneWire { + triple_id: "abc123".into(), + deleted_at_ms: 1700000000000, + }; + let json = serde_json::to_vec(&tw).unwrap(); + let back: TombstoneWire = serde_json::from_slice(&json).unwrap(); + assert_eq!(back.triple_id, "abc123"); + assert_eq!(back.deleted_at_ms, 1700000000000); + } + + #[test] + fn announce_delete_roundtrip() { + let msg = P2pMessage::AnnounceDelete { + triple_id: "deadbeef".into(), + tombstone_ts: 1700000000000, + }; + let bytes = msg.to_bytes(); + let parsed = P2pMessage::from_bytes(&bytes).unwrap(); + match parsed { + P2pMessage::AnnounceDelete { triple_id, tombstone_ts } => { + assert_eq!(triple_id, "deadbeef"); + assert_eq!(tombstone_ts, 1700000000000); + } + _ => panic!("wrong variant"), + } + } + + #[test] + fn tombstone_sync_roundtrip() { + let tombstones = vec![ + TombstoneWire { triple_id: "aa".into(), deleted_at_ms: 100 }, + TombstoneWire { triple_id: "bb".into(), deleted_at_ms: 200 }, + ]; + let msg = P2pMessage::TombstoneSync { tombstones }; + let bytes = msg.to_bytes(); + let parsed = P2pMessage::from_bytes(&bytes).unwrap(); + match parsed { + P2pMessage::TombstoneSync { tombstones } => { + assert_eq!(tombstones.len(), 2); + assert_eq!(tombstones[0].triple_id, "aa"); + assert_eq!(tombstones[1].deleted_at_ms, 200); + } + _ => panic!("wrong variant"), + } + } + + #[test] + fn triple_wire_conversion() { + let triple = Triple::new( + NodeId::named("test:subject"), + Predicate::named("test:predicate"), + Value::String("world".into()), + ); + let wire = TripleWire::from_triple(&triple); + let back = wire.to_triple().unwrap(); + + assert_eq!(back.subject, triple.subject); + assert_eq!(back.predicate, triple.predicate); + assert_eq!(back.object, triple.object); + } +} diff --git a/crates/aingle_cortex/src/p2p/mod.rs b/crates/aingle_cortex/src/p2p/mod.rs new file mode 100644 index 0000000..6f5c532 --- /dev/null +++ b/crates/aingle_cortex/src/p2p/mod.rs @@ -0,0 +1,15 @@ +//! P2P networking for Cortex triple synchronization. +//! +//! Enables multi-node knowledge graph sync via QUIC transport, +//! bloom filter gossip, and optional mDNS discovery. + +pub mod config; +pub mod discovery; +pub mod gossip; +pub mod identity; +pub mod manager; +pub mod message; +pub mod peer_store; +pub mod rate_limiter; +pub mod sync_manager; +pub mod transport; diff --git a/crates/aingle_cortex/src/p2p/peer_store.rs b/crates/aingle_cortex/src/p2p/peer_store.rs new file mode 100644 index 0000000..364f2b9 --- /dev/null +++ b/crates/aingle_cortex/src/p2p/peer_store.rs @@ -0,0 +1,211 @@ +//! Persistent peer storage backed by a JSON file. +//! +//! Stores known peers in `{data_dir}/known_peers.json` so they survive restarts. +//! Peers are infrastructure metadata, not knowledge graph data. + +use serde::{Deserialize, Serialize}; +use std::net::SocketAddr; +use std::path::PathBuf; + +/// How a peer was originally discovered. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum PeerSource { + Manual, + Mdns, + RestApi, +} + +/// A peer entry persisted to disk. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StoredPeer { + pub addr: SocketAddr, + pub node_id: Option, + pub last_connected_ms: u64, + pub source: PeerSource, +} + +/// JSON-backed persistent peer list. +pub struct PeerStore { + path: PathBuf, + peers: Vec, + max_peers: usize, +} + +impl PeerStore { + /// Load from disk or create empty. + pub fn load(data_dir: &std::path::Path, max_peers: usize) -> Self { + let path = data_dir.join("known_peers.json"); + let peers = if path.exists() { + match std::fs::read_to_string(&path) { + Ok(content) => serde_json::from_str(&content).unwrap_or_default(), + Err(_) => Vec::new(), + } + } else { + Vec::new() + }; + Self { path, peers, max_peers } + } + + /// Write the current peer list to disk. + pub fn save(&self) -> Result<(), String> { + if let Some(parent) = self.path.parent() { + std::fs::create_dir_all(parent) + .map_err(|e| format!("create peer store dir: {}", e))?; + } + let json = serde_json::to_string_pretty(&self.peers) + .map_err(|e| format!("serialize peers: {}", e))?; + std::fs::write(&self.path, json) + .map_err(|e| format!("write peer store: {}", e))?; + Ok(()) + } + + /// Add a peer. Deduplicates by address. Enforces max_peers limit. + pub fn add(&mut self, peer: StoredPeer) { + // Deduplicate + if self.peers.iter().any(|p| p.addr == peer.addr) { + return; + } + // Enforce capacity + if self.peers.len() >= self.max_peers { + // Remove oldest (by last_connected_ms) + if let Some(oldest_idx) = self.peers.iter().enumerate() + .min_by_key(|(_, p)| p.last_connected_ms) + .map(|(i, _)| i) + { + self.peers.remove(oldest_idx); + } + } + self.peers.push(peer); + } + + /// Remove a peer by address. + pub fn remove(&mut self, addr: &SocketAddr) { + self.peers.retain(|p| p.addr != *addr); + } + + /// Get all stored peers. + pub fn all(&self) -> &[StoredPeer] { + &self.peers + } + + /// Update last_connected timestamp for a peer. + pub fn update_last_connected(&mut self, addr: &SocketAddr, ts_ms: u64) { + if let Some(peer) = self.peers.iter_mut().find(|p| p.addr == *addr) { + peer.last_connected_ms = ts_ms; + } + } + + /// Remove peers not connected for more than `max_age_ms` milliseconds. + pub fn cleanup_stale(&mut self, max_age_ms: u64) { + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + self.peers.retain(|p| { + p.last_connected_ms == 0 || now_ms.saturating_sub(p.last_connected_ms) < max_age_ms + }); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn temp_store(max_peers: usize) -> PeerStore { + let dir = tempfile::TempDir::new().unwrap(); + PeerStore::load(dir.path(), max_peers) + } + + fn addr(port: u16) -> SocketAddr { + format!("127.0.0.1:{}", port).parse().unwrap() + } + + fn stored_peer(port: u16, ts: u64) -> StoredPeer { + StoredPeer { + addr: addr(port), + node_id: None, + last_connected_ms: ts, + source: PeerSource::Manual, + } + } + + #[test] + fn peer_store_empty_on_first_load() { + let store = temp_store(100); + assert!(store.all().is_empty()); + } + + #[test] + fn peer_store_add_and_save() { + let dir = tempfile::TempDir::new().unwrap(); + let mut store = PeerStore::load(dir.path(), 100); + store.add(stored_peer(9000, 1000)); + assert_eq!(store.all().len(), 1); + assert!(store.save().is_ok()); + assert!(dir.path().join("known_peers.json").exists()); + } + + #[test] + fn peer_store_load_persisted_data() { + let dir = tempfile::TempDir::new().unwrap(); + { + let mut store = PeerStore::load(dir.path(), 100); + store.add(stored_peer(9000, 1000)); + store.add(stored_peer(9001, 2000)); + store.save().unwrap(); + } + let store2 = PeerStore::load(dir.path(), 100); + assert_eq!(store2.all().len(), 2); + } + + #[test] + fn peer_store_remove_existing() { + let mut store = temp_store(100); + store.add(stored_peer(9000, 1000)); + store.add(stored_peer(9001, 2000)); + store.remove(&addr(9000)); + assert_eq!(store.all().len(), 1); + assert_eq!(store.all()[0].addr, addr(9001)); + } + + #[test] + fn peer_store_deduplicates_same_addr() { + let mut store = temp_store(100); + store.add(stored_peer(9000, 1000)); + store.add(stored_peer(9000, 2000)); + assert_eq!(store.all().len(), 1); + } + + #[test] + fn peer_store_cleanup_stale() { + let mut store = temp_store(100); + // Add a peer with timestamp 0 (never connected) — should be kept + store.add(stored_peer(9000, 0)); + // Add a peer with an old timestamp + store.add(stored_peer(9001, 1)); + store.cleanup_stale(1000); // 1 second max age + // peer with ts=0 is kept (never-connected sentinel), old one removed + assert_eq!(store.all().len(), 1); + assert_eq!(store.all()[0].addr, addr(9000)); + } + + #[test] + fn peer_store_max_peers_enforced() { + let mut store = temp_store(2); + store.add(stored_peer(9000, 100)); + store.add(stored_peer(9001, 200)); + assert_eq!(store.all().len(), 2); + // Adding a third should evict the oldest (port 9000, ts=100) + store.add(stored_peer(9002, 300)); + assert_eq!(store.all().len(), 2); + assert!(!store.all().iter().any(|p| p.addr == addr(9000))); + } + + #[test] + fn peer_store_update_last_connected() { + let mut store = temp_store(100); + store.add(stored_peer(9000, 1000)); + store.update_last_connected(&addr(9000), 5000); + assert_eq!(store.all()[0].last_connected_ms, 5000); + } +} diff --git a/crates/aingle_cortex/src/p2p/rate_limiter.rs b/crates/aingle_cortex/src/p2p/rate_limiter.rs new file mode 100644 index 0000000..7f55d1d --- /dev/null +++ b/crates/aingle_cortex/src/p2p/rate_limiter.rs @@ -0,0 +1,122 @@ +//! Ingress rate limiter for P2P triple reception. +//! +//! Per-peer and global token-bucket rate limiting to prevent DoS +//! via excessive `SendTriples` messages. + +use crate::p2p::gossip::TokenBucket; +use std::collections::HashMap; +use std::net::SocketAddr; + +/// Rate limiter for incoming triples. +pub struct IngressRateLimiter { + per_peer: HashMap, + global: TokenBucket, + per_peer_rate: f64, + per_peer_max: f64, +} + +impl IngressRateLimiter { + /// Create a new limiter. + /// + /// `per_peer_per_min`: max triples per peer per minute. + /// `global_per_min`: max triples globally per minute. + pub fn new(per_peer_per_min: usize, global_per_min: usize) -> Self { + let per_peer_max = per_peer_per_min as f64; + let per_peer_rate = per_peer_max / 60.0; + let global_max = global_per_min as f64; + let global_rate = global_max / 60.0; + + Self { + per_peer: HashMap::new(), + global: TokenBucket::with_params(global_max, global_rate), + per_peer_rate, + per_peer_max, + } + } + + /// Check how many triples from `addr` are allowed out of `count`. + /// + /// Returns the number of allowed triples (0..=count). + pub fn check(&mut self, addr: &SocketAddr, count: usize) -> usize { + let bucket = self.per_peer.entry(*addr).or_insert_with(|| { + TokenBucket::with_params(self.per_peer_max, self.per_peer_rate) + }); + + let mut allowed = 0; + for _ in 0..count { + if bucket.try_consume(1.0) && self.global.try_consume(1.0) { + allowed += 1; + } else { + break; + } + } + allowed + } + + /// Remove rate-limiting state for a disconnected peer. + pub fn cleanup_peer(&mut self, addr: &SocketAddr) { + self.per_peer.remove(addr); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn addr(port: u16) -> SocketAddr { + format!("127.0.0.1:{}", port).parse().unwrap() + } + + #[test] + fn ingress_limiter_allows_within_limit() { + let mut limiter = IngressRateLimiter::new(100, 1000); + let allowed = limiter.check(&addr(9000), 50); + assert_eq!(allowed, 50); + } + + #[test] + fn ingress_limiter_blocks_over_limit() { + let mut limiter = IngressRateLimiter::new(10, 1000); + let allowed = limiter.check(&addr(9000), 20); + assert_eq!(allowed, 10); + } + + #[test] + fn ingress_limiter_per_peer_independence() { + let mut limiter = IngressRateLimiter::new(10, 1000); + let a1 = limiter.check(&addr(9000), 10); + let a2 = limiter.check(&addr(9001), 10); + assert_eq!(a1, 10); + assert_eq!(a2, 10); + } + + #[test] + fn ingress_limiter_global_limit_shared() { + let mut limiter = IngressRateLimiter::new(100, 15); + let a1 = limiter.check(&addr(9000), 10); + let a2 = limiter.check(&addr(9001), 10); + assert_eq!(a1, 10); + assert_eq!(a2, 5); // global cap hit + } + + #[test] + fn ingress_limiter_refills_over_time() { + // Use a high rate so refill is fast: 600/min = 10/sec + let mut limiter = IngressRateLimiter::new(600, 60000); + let a1 = limiter.check(&addr(9000), 10); + assert_eq!(a1, 10); + // After waiting 200ms at 10/sec, we get ~2 tokens + std::thread::sleep(std::time::Duration::from_millis(200)); + let a3 = limiter.check(&addr(9000), 1); + assert!(a3 > 0); + } + + #[test] + fn ingress_limiter_cleanup_peer() { + let mut limiter = IngressRateLimiter::new(10, 1000); + limiter.check(&addr(9000), 5); + assert!(limiter.per_peer.contains_key(&addr(9000))); + limiter.cleanup_peer(&addr(9000)); + assert!(!limiter.per_peer.contains_key(&addr(9000))); + } +} diff --git a/crates/aingle_cortex/src/p2p/sync_manager.rs b/crates/aingle_cortex/src/p2p/sync_manager.rs new file mode 100644 index 0000000..247b25d --- /dev/null +++ b/crates/aingle_cortex/src/p2p/sync_manager.rs @@ -0,0 +1,455 @@ +//! Triple synchronization manager. +//! +//! Tracks per-peer sync state and coordinates bloom-filter-based reconciliation +//! against the local `GraphDB`. + +use crate::p2p::gossip::BloomFilter; +use aingle_graph::{GraphDB, Triple, TripleId}; +use std::collections::HashMap; +use std::net::SocketAddr; +use std::time::{Duration, Instant}; + +/// Maximum IDs to request in a single reconciliation round. +const MAX_REQUEST_SIZE: usize = 100; + +/// Per-peer sync tracking. +#[derive(Debug, Clone)] +pub struct PeerSyncState { + pub last_sync: Instant, + pub peer_filter: Option, + pub pending_requests: Vec<[u8; 32]>, + pub successful_syncs: u32, + pub failed_syncs: u32, +} + +impl PeerSyncState { + pub fn new() -> Self { + Self { + last_sync: Instant::now(), + peer_filter: None, + pending_requests: Vec::new(), + successful_syncs: 0, + failed_syncs: 0, + } + } + + pub fn should_sync(&self, interval: Duration) -> bool { + self.last_sync.elapsed() >= interval + } + + pub fn record_success(&mut self) { + self.last_sync = Instant::now(); + self.successful_syncs += 1; + self.failed_syncs = 0; + } + + pub fn record_failure(&mut self) { + self.last_sync = Instant::now(); + self.failed_syncs += 1; + } +} + +impl Default for PeerSyncState { + fn default() -> Self { + Self::new() + } +} + +/// Manages triple synchronization with peers. +pub struct TripleSyncManager { + peer_states: HashMap, + sync_interval: Duration, + /// All known local triple IDs. + local_ids: Vec<[u8; 32]>, + max_local_ids: usize, + /// Tombstones: hash -> deletion timestamp_ms. + pub(crate) tombstones: HashMap<[u8; 32], u64>, + /// Tombstone time-to-live (default 24h). + pub(crate) tombstone_ttl: Duration, +} + +impl TripleSyncManager { + pub fn new(sync_interval: Duration) -> Self { + Self { + peer_states: HashMap::new(), + sync_interval, + local_ids: Vec::with_capacity(1000), + max_local_ids: 100_000, + tombstones: HashMap::new(), + tombstone_ttl: Duration::from_secs(24 * 3600), + } + } + + /// Create with a custom tombstone TTL. + pub fn with_tombstone_ttl(sync_interval: Duration, ttl: Duration) -> Self { + let mut mgr = Self::new(sync_interval); + mgr.tombstone_ttl = ttl; + mgr + } + + /// Register a new local triple ID. + pub fn add_local_id(&mut self, id: [u8; 32]) { + if self.local_ids.len() >= self.max_local_ids { + self.local_ids.drain(..self.max_local_ids / 2); + } + self.local_ids.push(id); + } + + /// Rebuild the local ID list by scanning the full graph. + pub fn rebuild_local_ids(&mut self, graph: &GraphDB) { + self.local_ids.clear(); + if let Ok(triples) = graph.find(aingle_graph::TriplePattern::any()) { + for triple in &triples { + self.local_ids.push(TripleId::from_triple(triple).0); + } + } + } + + /// Get a snapshot of all local IDs. + pub fn local_ids(&self) -> &[[u8; 32]] { + &self.local_ids + } + + /// Return peers whose sync interval has elapsed. + pub fn peers_needing_sync(&self) -> Vec { + self.peer_states + .iter() + .filter(|(_, s)| s.should_sync(self.sync_interval)) + .map(|(addr, _)| *addr) + .collect() + } + + /// Build a bloom filter from all local IDs. + pub fn build_local_filter(&self) -> BloomFilter { + let mut filter = BloomFilter::new(); + for id in &self.local_ids { + filter.insert(id); + } + filter + } + + /// Given a peer's bloom filter, return IDs we have that the peer is missing (capped). + pub fn process_peer_filter(&self, peer_filter: &BloomFilter) -> Vec<[u8; 32]> { + let mut missing = Vec::new(); + for id in &self.local_ids { + if !peer_filter.may_contain(id) { + missing.push(*id); + if missing.len() >= MAX_REQUEST_SIZE { + break; + } + } + } + missing + } + + /// Insert triples received from a peer into the graph. Duplicates are counted, not errors. + pub fn store_received_triples( + &mut self, + triples: Vec, + graph: &GraphDB, + ) -> StoreResult { + let mut result = StoreResult::default(); + for triple in triples { + let id = TripleId::from_triple(&triple); + match graph.insert(triple) { + Ok(_) => { + self.add_local_id(id.0); + result.inserted += 1; + } + Err(e) => { + let msg = format!("{}", e); + if msg.contains("duplicate") || msg.contains("exists") || msg.contains("already") { + result.duplicates += 1; + } else { + result.errors += 1; + } + } + } + } + result + } + + /// Record the outcome of a sync round for a given peer. + pub fn record_sync_result(&mut self, peer: SocketAddr, success: bool, _triples_synced: usize) { + let state = self.peer_states.entry(peer).or_default(); + if success { + state.record_success(); + } else { + state.record_failure(); + } + } + + /// Get or create state entry for a peer. + pub fn get_peer_state(&mut self, addr: &SocketAddr) -> &mut PeerSyncState { + self.peer_states.entry(*addr).or_default() + } + + /// Remove peers that haven't synced within `timeout`. + pub fn cleanup_inactive(&mut self, timeout: Duration) { + self.peer_states + .retain(|_, s| s.last_sync.elapsed() < timeout); + } + + /// Remove a local ID (used when a triple is deleted). + pub fn remove_local_id(&mut self, id: &[u8; 32]) { + self.local_ids.retain(|existing| existing != id); + } + + /// Add a tombstone marker for a deleted triple. + pub fn add_tombstone(&mut self, id: [u8; 32], ts_ms: u64) { + self.tombstones.insert(id, ts_ms); + } + + /// Check if a tombstone exists for the given ID. + pub fn has_tombstone(&self, id: &[u8; 32]) -> bool { + self.tombstones.contains_key(id) + } + + /// Remove expired tombstones (older than TTL). + pub fn cleanup_expired_tombstones(&mut self) { + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + let ttl_ms = self.tombstone_ttl.as_millis() as u64; + self.tombstones.retain(|_, ts| now_ms.saturating_sub(*ts) < ttl_ms); + } + + /// Return all active tombstones as (hash, timestamp_ms) pairs. + pub fn active_tombstones(&self) -> Vec<([u8; 32], u64)> { + self.tombstones.iter().map(|(k, v)| (*k, *v)).collect() + } + + /// Aggregate sync statistics. + pub fn stats(&self) -> SyncStats { + let mut total_successful = 0; + let mut total_failed = 0; + for s in self.peer_states.values() { + total_successful += s.successful_syncs; + total_failed += s.failed_syncs; + } + SyncStats { + peer_count: self.peer_states.len(), + local_ids: self.local_ids.len(), + total_successful_syncs: total_successful, + total_failed_syncs: total_failed, + } + } +} + +/// Result of a `store_received_triples` operation. +#[derive(Debug, Default)] +pub struct StoreResult { + pub inserted: usize, + pub duplicates: usize, + pub errors: usize, +} + +/// Aggregate sync statistics. +#[derive(Debug, Clone, serde::Serialize)] +pub struct SyncStats { + pub peer_count: usize, + pub local_ids: usize, + pub total_successful_syncs: u32, + pub total_failed_syncs: u32, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn new_empty() { + let sm = TripleSyncManager::new(Duration::from_secs(60)); + assert!(sm.local_ids().is_empty()); + assert!(sm.peers_needing_sync().is_empty()); + } + + #[test] + fn add_local_id() { + let mut sm = TripleSyncManager::new(Duration::from_secs(60)); + sm.add_local_id([1u8; 32]); + assert_eq!(sm.local_ids().len(), 1); + } + + #[test] + fn build_local_filter_contains_ids() { + let mut sm = TripleSyncManager::new(Duration::from_secs(60)); + let id = [5u8; 32]; + sm.add_local_id(id); + let filter = sm.build_local_filter(); + assert!(filter.may_contain(&id)); + assert!(!filter.may_contain(&[99u8; 32])); + } + + #[test] + fn process_peer_filter_finds_missing() { + let mut sm = TripleSyncManager::new(Duration::from_secs(60)); + let a = [1u8; 32]; + let b = [2u8; 32]; + let c = [3u8; 32]; + sm.add_local_id(a); + sm.add_local_id(b); + sm.add_local_id(c); + + let mut peer = BloomFilter::new(); + peer.insert(&a); + + let missing = sm.process_peer_filter(&peer); + assert!(missing.contains(&b)); + assert!(missing.contains(&c)); + assert!(!missing.contains(&a)); + } + + #[test] + fn peers_needing_sync_respects_interval() { + let mut sm = TripleSyncManager::new(Duration::from_millis(10)); + let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap(); + sm.get_peer_state(&addr); + + std::thread::sleep(Duration::from_millis(20)); + let peers = sm.peers_needing_sync(); + assert_eq!(peers.len(), 1); + } + + #[test] + fn store_received_triples_inserts() { + let graph = GraphDB::memory().unwrap(); + let mut sm = TripleSyncManager::new(Duration::from_secs(60)); + + let triple = Triple::new( + aingle_graph::NodeId::named("test:a"), + aingle_graph::Predicate::named("test:rel"), + aingle_graph::Value::String("val".into()), + ); + + let result = sm.store_received_triples(vec![triple], &graph); + assert_eq!(result.inserted, 1); + assert_eq!(result.duplicates, 0); + } + + #[test] + fn store_received_triples_skips_duplicates() { + let graph = GraphDB::memory().unwrap(); + let mut sm = TripleSyncManager::new(Duration::from_secs(60)); + + let triple = Triple::new( + aingle_graph::NodeId::named("test:a"), + aingle_graph::Predicate::named("test:rel"), + aingle_graph::Value::String("val".into()), + ); + + // Insert once directly. + let _ = graph.insert(triple.clone()); + + // Attempt to insert again via sync. + let result = sm.store_received_triples(vec![triple], &graph); + assert_eq!(result.inserted, 0); + // Depending on GraphDB error message, counted as duplicate or error. + assert!(result.duplicates > 0 || result.errors > 0); + } + + #[test] + fn record_sync_result_updates_state() { + let mut sm = TripleSyncManager::new(Duration::from_secs(60)); + let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap(); + sm.record_sync_result(addr, true, 10); + sm.record_sync_result(addr, true, 5); + + let stats = sm.stats(); + assert_eq!(stats.total_successful_syncs, 2); + } + + #[test] + fn cleanup_inactive_removes_old_peers() { + let mut sm = TripleSyncManager::new(Duration::from_secs(60)); + let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap(); + sm.get_peer_state(&addr); + + std::thread::sleep(Duration::from_millis(10)); + sm.cleanup_inactive(Duration::from_millis(1)); + assert_eq!(sm.stats().peer_count, 0); + } + + #[test] + fn remove_local_id_works() { + let mut sm = TripleSyncManager::new(Duration::from_secs(60)); + let a = [1u8; 32]; + let b = [2u8; 32]; + sm.add_local_id(a); + sm.add_local_id(b); + assert_eq!(sm.local_ids().len(), 2); + sm.remove_local_id(&a); + assert_eq!(sm.local_ids().len(), 1); + assert_eq!(sm.local_ids()[0], b); + } + + #[test] + fn add_tombstone_and_check() { + let mut sm = TripleSyncManager::new(Duration::from_secs(60)); + let id = [42u8; 32]; + assert!(!sm.has_tombstone(&id)); + sm.add_tombstone(id, 1700000000000); + assert!(sm.has_tombstone(&id)); + } + + #[test] + fn cleanup_expired_tombstones() { + let mut sm = TripleSyncManager::with_tombstone_ttl( + Duration::from_secs(60), + Duration::from_millis(50), + ); + let id = [1u8; 32]; + // Use a timestamp far in the past + sm.add_tombstone(id, 0); + sm.cleanup_expired_tombstones(); + assert!(!sm.has_tombstone(&id)); + } + + #[test] + fn tombstone_ttl_configurable() { + let sm = TripleSyncManager::with_tombstone_ttl( + Duration::from_secs(60), + Duration::from_secs(3600), + ); + assert_eq!(sm.tombstone_ttl, Duration::from_secs(3600)); + } + + #[test] + fn active_tombstones_returns_all() { + let mut sm = TripleSyncManager::new(Duration::from_secs(60)); + sm.add_tombstone([1u8; 32], 100); + sm.add_tombstone([2u8; 32], 200); + let active = sm.active_tombstones(); + assert_eq!(active.len(), 2); + } + + #[test] + fn duplicate_tombstone_is_noop() { + let mut sm = TripleSyncManager::new(Duration::from_secs(60)); + let id = [1u8; 32]; + sm.add_tombstone(id, 100); + sm.add_tombstone(id, 200); + assert_eq!(sm.active_tombstones().len(), 1); + // Second write overwrites timestamp + let ts = sm.tombstones.get(&id).unwrap(); + assert_eq!(*ts, 200); + } + + #[test] + fn stats_are_accurate() { + let mut sm = TripleSyncManager::new(Duration::from_secs(60)); + sm.add_local_id([1u8; 32]); + sm.add_local_id([2u8; 32]); + + let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap(); + sm.record_sync_result(addr, true, 1); + sm.record_sync_result(addr, false, 0); + + let stats = sm.stats(); + assert_eq!(stats.local_ids, 2); + assert_eq!(stats.peer_count, 1); + assert_eq!(stats.total_successful_syncs, 1); + assert_eq!(stats.total_failed_syncs, 1); + } +} diff --git a/crates/aingle_cortex/src/p2p/transport.rs b/crates/aingle_cortex/src/p2p/transport.rs new file mode 100644 index 0000000..f3b2aa8 --- /dev/null +++ b/crates/aingle_cortex/src/p2p/transport.rs @@ -0,0 +1,459 @@ +//! QUIC transport layer for P2P communication. +//! +//! Ported from `aingle_minimal::quic` with cortex-specific ALPN and +//! integrated seed-based handshake. + +use crate::p2p::message::{P2pMessage, MAX_MESSAGE_SIZE}; +use quinn::{ClientConfig, Connection, Endpoint, ServerConfig}; +use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer}; +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; + +/// Transport-layer configuration. +#[derive(Debug, Clone)] +pub struct P2pTransportConfig { + pub bind_addr: String, + pub port: u16, + pub keep_alive: Duration, + pub idle_timeout: Duration, + pub max_connections: usize, +} + +impl Default for P2pTransportConfig { + fn default() -> Self { + Self { + bind_addr: "0.0.0.0".to_string(), + port: 19091, + keep_alive: Duration::from_secs(15), + idle_timeout: Duration::from_secs(60), + max_connections: 64, + } + } +} + +/// QUIC-based P2P transport with length-prefixed JSON messages. +pub struct P2pTransport { + config: P2pTransportConfig, + endpoint: Option, + connections: HashMap, + node_id: String, + /// blake3(seed) for handshake verification. + seed_hash: String, + version: String, +} + +impl P2pTransport { + pub fn new(config: P2pTransportConfig, node_id: String, seed_hash: String) -> Self { + Self { + config, + endpoint: None, + connections: HashMap::new(), + node_id, + seed_hash, + version: env!("CARGO_PKG_VERSION").to_string(), + } + } + + /// Bind the QUIC endpoint. + pub async fn start(&mut self) -> Result<(), String> { + let addr: SocketAddr = format!("{}:{}", self.config.bind_addr, self.config.port) + .parse() + .map_err(|e| format!("invalid address: {}", e))?; + + let server_config = self.generate_server_config()?; + + let endpoint = Endpoint::server(server_config, addr) + .map_err(|e| format!("failed to create QUIC endpoint: {}", e))?; + + tracing::info!("P2P transport started on {}", addr); + self.endpoint = Some(endpoint); + Ok(()) + } + + /// Connect to a remote peer and perform the seed-based handshake. + pub async fn connect(&mut self, addr: SocketAddr, triple_count: u64) -> Result<(), String> { + let endpoint = self.endpoint.as_ref().ok_or("transport not started")?; + + if self.connections.len() >= self.config.max_connections { + return Err("max connections reached".to_string()); + } + + let client_config = self.generate_client_config()?; + + let connection = endpoint + .connect_with(client_config, addr, "cortex-peer") + .map_err(|e| format!("connect init failed: {}", e))? + .await + .map_err(|e| format!("connection failed: {}", e))?; + + // Handshake: send Hello. + let hello = P2pMessage::Hello { + node_id: self.node_id.clone(), + seed_hash: self.seed_hash.clone(), + version: self.version.clone(), + triple_count, + }; + Self::send_on_connection(&connection, &hello).await?; + + // Receive HelloAck. + let ack = Self::recv_from_connection(&connection).await?; + match ack { + P2pMessage::HelloAck { accepted, reason, .. } => { + if !accepted { + connection.close(1u32.into(), b"rejected"); + return Err(format!( + "handshake rejected: {}", + reason.unwrap_or_default() + )); + } + } + _ => { + connection.close(1u32.into(), b"bad handshake"); + return Err("unexpected handshake response".to_string()); + } + } + + tracing::debug!("P2P connected to {}", addr); + self.connections.insert(addr, connection); + Ok(()) + } + + /// Accept one incoming connection, verify seed, and complete handshake. + pub async fn accept(&mut self) -> Result, String> { + let endpoint = self.endpoint.as_ref().ok_or("transport not started")?; + + let incoming = match endpoint.accept().await { + Some(inc) => inc, + None => return Ok(None), + }; + + let connection = incoming + .await + .map_err(|e| format!("accept failed: {}", e))?; + + let remote = connection.remote_address(); + + // Read the Hello. + let hello = Self::recv_from_connection(&connection).await?; + + match &hello { + P2pMessage::Hello { seed_hash, node_id, .. } => { + let accepted = seed_hash == &self.seed_hash; + let reason = if accepted { + None + } else { + Some("seed_mismatch".to_string()) + }; + + let ack = P2pMessage::HelloAck { + node_id: self.node_id.clone(), + accepted, + reason, + }; + Self::send_on_connection(&connection, &ack).await?; + + if accepted { + tracing::info!("P2P accepted connection from {} ({})", remote, &node_id[..8.min(node_id.len())]); + self.connections.insert(remote, connection); + Ok(Some((remote, hello))) + } else { + connection.close(1u32.into(), b"seed_mismatch"); + Ok(None) + } + } + _ => { + connection.close(1u32.into(), b"expected_hello"); + Ok(None) + } + } + } + + /// Send a message to a connected peer. + pub async fn send(&self, addr: &SocketAddr, msg: &P2pMessage) -> Result<(), String> { + let connection = self + .connections + .get(addr) + .ok_or_else(|| format!("no connection to {}", addr))?; + Self::send_on_connection(connection, msg).await + } + + /// Receive the next message from any connected peer (non-blocking attempt). + pub async fn recv(&self) -> Result, String> { + for (addr, connection) in &self.connections { + if let Ok(msg) = Self::recv_from_connection(connection).await { + return Ok(Some((*addr, msg))); + } + } + Ok(None) + } + + /// Close a single peer connection. + pub fn disconnect(&mut self, addr: &SocketAddr) { + if let Some(conn) = self.connections.remove(addr) { + conn.close(0u32.into(), b"disconnected"); + } + } + + /// Check if connected to a specific peer. + pub fn is_connected(&self, addr: &SocketAddr) -> bool { + self.connections.contains_key(addr) + } + + pub fn connected_peers(&self) -> Vec { + self.connections.keys().copied().collect() + } + + pub fn connection_count(&self) -> usize { + self.connections.len() + } + + /// Close all connections and the endpoint. + pub fn stop(&mut self) { + for (_, conn) in self.connections.drain() { + conn.close(0u32.into(), b"shutdown"); + } + if let Some(ep) = self.endpoint.take() { + ep.close(0u32.into(), b"shutdown"); + } + tracing::info!("P2P transport stopped"); + } + + // ── internal helpers ───────────────────────────────────── + + async fn send_on_connection(conn: &Connection, msg: &P2pMessage) -> Result<(), String> { + let payload = msg.to_bytes(); + let mut stream = conn + .open_uni() + .await + .map_err(|e| format!("open stream: {}", e))?; + + stream + .write_all(&payload) + .await + .map_err(|e| format!("write: {}", e))?; + + stream.finish().map_err(|e| format!("finish: {}", e))?; + Ok(()) + } + + async fn recv_from_connection(conn: &Connection) -> Result { + let mut stream = conn + .accept_uni() + .await + .map_err(|e| format!("accept stream: {}", e))?; + + let mut len_buf = [0u8; 4]; + stream + .read_exact(&mut len_buf) + .await + .map_err(|e| format!("read len: {}", e))?; + let len = u32::from_be_bytes(len_buf) as usize; + + if len > MAX_MESSAGE_SIZE { + return Err(format!("message too large: {} bytes", len)); + } + + let mut payload = vec![0u8; len]; + stream + .read_exact(&mut payload) + .await + .map_err(|e| format!("read payload: {}", e))?; + + serde_json::from_slice(&payload).map_err(|e| format!("deserialize: {}", e)) + } + + fn generate_server_config(&self) -> Result { + let cert = rcgen::generate_simple_self_signed(vec![self.node_id.clone()]) + .map_err(|e| format!("cert gen: {}", e))?; + + let cert_der = CertificateDer::from(cert.cert.der().to_vec()); + let key_der = + PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der())); + + let mut server_crypto = rustls::ServerConfig::builder() + .with_no_client_auth() + .with_single_cert(vec![cert_der], key_der) + .map_err(|e| format!("tls config: {}", e))?; + + server_crypto.alpn_protocols = vec![b"cortex-p2p".to_vec()]; + + let mut server_config = ServerConfig::with_crypto(Arc::new( + quinn::crypto::rustls::QuicServerConfig::try_from(server_crypto) + .map_err(|e| format!("quic crypto: {}", e))?, + )); + + let mut transport = quinn::TransportConfig::default(); + transport.keep_alive_interval(Some(self.config.keep_alive)); + transport.max_idle_timeout(Some( + self.config + .idle_timeout + .try_into() + .map_err(|e| format!("timeout: {}", e))?, + )); + transport.max_concurrent_uni_streams(100u32.into()); + transport.max_concurrent_bidi_streams(100u32.into()); + server_config.transport_config(Arc::new(transport)); + + Ok(server_config) + } + + fn generate_client_config(&self) -> Result { + let crypto = rustls::ClientConfig::builder() + .dangerous() + .with_custom_certificate_verifier(Arc::new(LoggingCertVerifier)) + .with_no_client_auth(); + + let mut client_config = ClientConfig::new(Arc::new( + quinn::crypto::rustls::QuicClientConfig::try_from(crypto) + .map_err(|e| format!("quic client crypto: {}", e))?, + )); + + let mut transport = quinn::TransportConfig::default(); + transport.keep_alive_interval(Some(self.config.keep_alive)); + transport.max_idle_timeout(Some( + self.config + .idle_timeout + .try_into() + .map_err(|e| format!("timeout: {}", e))?, + )); + client_config.transport_config(Arc::new(transport)); + + Ok(client_config) + } +} + +/// Certificate verifier that accepts any cert (TOFU model) and logs fingerprints. +#[derive(Debug)] +struct LoggingCertVerifier; + +impl rustls::client::danger::ServerCertVerifier for LoggingCertVerifier { + fn verify_server_cert( + &self, + end_entity: &CertificateDer<'_>, + _intermediates: &[CertificateDer<'_>], + server_name: &rustls::pki_types::ServerName<'_>, + _ocsp_response: &[u8], + _now: rustls::pki_types::UnixTime, + ) -> std::result::Result { + let fingerprint = blake3::hash(end_entity.as_ref()); + tracing::info!( + "P2P peer cert fingerprint for {:?}: {}", + server_name, + hex::encode(fingerprint.as_bytes()) + ); + Ok(rustls::client::danger::ServerCertVerified::assertion()) + } + + fn verify_tls12_signature( + &self, + message: &[u8], + cert: &CertificateDer<'_>, + dss: &rustls::DigitallySignedStruct, + ) -> std::result::Result { + rustls::crypto::verify_tls12_signature( + message, + cert, + dss, + &rustls::crypto::ring::default_provider().signature_verification_algorithms, + ) + } + + fn verify_tls13_signature( + &self, + message: &[u8], + cert: &CertificateDer<'_>, + dss: &rustls::DigitallySignedStruct, + ) -> std::result::Result { + rustls::crypto::verify_tls13_signature( + message, + cert, + dss, + &rustls::crypto::ring::default_provider().signature_verification_algorithms, + ) + } + + fn supported_verify_schemes(&self) -> Vec { + rustls::crypto::ring::default_provider() + .signature_verification_algorithms + .supported_schemes() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn config_defaults() { + let cfg = P2pTransportConfig::default(); + assert_eq!(cfg.port, 19091); + assert_eq!(cfg.max_connections, 64); + } + + #[test] + fn transport_new_has_no_connections() { + let t = P2pTransport::new( + P2pTransportConfig::default(), + "abc".into(), + "hash".into(), + ); + assert_eq!(t.connection_count(), 0); + assert!(t.connected_peers().is_empty()); + } + + #[test] + fn is_connected_false_initially() { + let t = P2pTransport::new( + P2pTransportConfig::default(), + "abc".into(), + "hash".into(), + ); + let addr: SocketAddr = "127.0.0.1:19091".parse().unwrap(); + assert!(!t.is_connected(&addr)); + } + + #[tokio::test] + async fn start_and_stop() { + let mut t = P2pTransport::new( + P2pTransportConfig { + port: 0, // OS-assigned port + ..Default::default() + }, + "test-node".into(), + "test-hash".into(), + ); + // port 0 lets OS pick a free port + assert!(t.start().await.is_ok()); + t.stop(); + assert!(t.endpoint.is_none()); + } + + #[tokio::test] + async fn connect_to_nonexistent_fails() { + let mut t = P2pTransport::new( + P2pTransportConfig { + port: 0, + ..Default::default() + }, + "test-node".into(), + "test-hash".into(), + ); + t.start().await.unwrap(); + let addr: SocketAddr = "127.0.0.1:1".parse().unwrap(); + assert!(t.connect(addr, 0).await.is_err()); + t.stop(); + } + + #[tokio::test] + async fn disconnect_nonexistent_is_noop() { + let mut t = P2pTransport::new( + P2pTransportConfig::default(), + "abc".into(), + "hash".into(), + ); + let addr: SocketAddr = "127.0.0.1:19091".parse().unwrap(); + t.disconnect(&addr); // should not panic + } +} diff --git a/crates/aingle_cortex/src/state.rs b/crates/aingle_cortex/src/state.rs index f30fe72..d1d1864 100644 --- a/crates/aingle_cortex/src/state.rs +++ b/crates/aingle_cortex/src/state.rs @@ -36,6 +36,9 @@ pub struct AppState { /// This field is only available if the `auth` feature is enabled. #[cfg(feature = "auth")] pub user_store: Arc, + /// P2P manager for multi-node triple synchronization. + #[cfg(feature = "p2p")] + pub p2p: Option>, } impl AppState { @@ -64,6 +67,8 @@ impl AppState { audit_log: Arc::new(RwLock::new(AuditLog::default())), #[cfg(feature = "auth")] user_store, + #[cfg(feature = "p2p")] + p2p: None, } } @@ -90,6 +95,8 @@ impl AppState { audit_log: Arc::new(RwLock::new(AuditLog::default())), #[cfg(feature = "auth")] user_store, + #[cfg(feature = "p2p")] + p2p: None, } } @@ -116,6 +123,8 @@ impl AppState { audit_log: Arc::new(RwLock::new(AuditLog::with_path(10_000, path))), #[cfg(feature = "auth")] user_store, + #[cfg(feature = "p2p")] + p2p: None, } } From 122ad2cd0aa0e3ceb38ce616f10ed5a2a77d930e Mon Sep 17 00:00:00 2001 From: It Apilium Date: Sun, 8 Mar 2026 19:00:29 +0100 Subject: [PATCH 2/4] feat: add P2P REST endpoints for status, peer management, and discovery MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds /api/v1/p2p/ routes (behind `p2p` feature): - GET /status — node ID, port, connected peers, gossip/sync stats - GET /peers — list connected P2P peers - POST /peers — connect to a new peer by address - DELETE /peers/:addr — disconnect a peer --- crates/aingle_cortex/src/rest/mod.rs | 12 ++- crates/aingle_cortex/src/rest/p2p.rs | 146 +++++++++++++++++++++++++++ crates/aingle_cortex/src/server.rs | 5 + 3 files changed, 161 insertions(+), 2 deletions(-) create mode 100644 crates/aingle_cortex/src/rest/p2p.rs diff --git a/crates/aingle_cortex/src/rest/mod.rs b/crates/aingle_cortex/src/rest/mod.rs index 3da1c79..fa404b5 100644 --- a/crates/aingle_cortex/src/rest/mod.rs +++ b/crates/aingle_cortex/src/rest/mod.rs @@ -29,6 +29,8 @@ pub mod audit; mod memory; mod observability; +#[cfg(feature = "p2p")] +mod p2p; mod proof; mod proof_api; mod query; @@ -63,7 +65,7 @@ use axum::{ /// Create REST API router pub fn router() -> Router { - Router::new() + let router = Router::new() // Triple CRUD .route("/api/v1/triples", post(triples::create_triple)) .route("/api/v1/triples", get(triples::list_triples)) @@ -104,5 +106,11 @@ pub fn router() -> Router { // Reputation endpoints (Phase 3) .merge(reputation::reputation_router()) // Audit log endpoints (Phase 6.5) - .merge(audit::audit_router()) + .merge(audit::audit_router()); + + // P2P endpoints (feature-gated) + #[cfg(feature = "p2p")] + let router = router.merge(p2p::p2p_router()); + + router } diff --git a/crates/aingle_cortex/src/rest/p2p.rs b/crates/aingle_cortex/src/rest/p2p.rs new file mode 100644 index 0000000..1e248d1 --- /dev/null +++ b/crates/aingle_cortex/src/rest/p2p.rs @@ -0,0 +1,146 @@ +//! REST endpoints for P2P status and peer management. + +use crate::state::AppState; +use axum::{ + extract::State, + http::StatusCode, + response::IntoResponse, + routing::{delete, get}, + Json, Router, +}; +use serde::Deserialize; + +/// Mount P2P routes. +pub fn p2p_router() -> Router { + Router::new() + .route("/api/v1/p2p/status", get(p2p_status)) + .route("/api/v1/p2p/peers", get(list_peers).post(add_peer)) + .route("/api/v1/p2p/peers/{node_id}", delete(remove_peer)) +} + +#[derive(Deserialize)] +struct AddPeerRequest { + addr: String, +} + +async fn p2p_status(State(state): State) -> impl IntoResponse { + let p2p = match &state.p2p { + Some(mgr) => mgr, + None => { + return ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({"error": "P2P not enabled"})), + ) + .into_response() + } + }; + let status = p2p.status().await; + (StatusCode::OK, Json(serde_json::to_value(status).unwrap())).into_response() +} + +async fn list_peers(State(state): State) -> impl IntoResponse { + let p2p = match &state.p2p { + Some(mgr) => mgr, + None => { + return ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({"error": "P2P not enabled"})), + ) + .into_response() + } + }; + let status = p2p.status().await; + ( + StatusCode::OK, + Json(serde_json::to_value(&status.connected_peers).unwrap()), + ) + .into_response() +} + +async fn add_peer( + State(state): State, + Json(body): Json, +) -> impl IntoResponse { + let p2p = match &state.p2p { + Some(mgr) => mgr, + None => { + return ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({"error": "P2P not enabled"})), + ) + .into_response() + } + }; + + let addr = match body.addr.parse() { + Ok(a) => a, + Err(_) => { + return ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": "invalid address"})), + ) + .into_response() + } + }; + + match p2p.add_peer(addr).await { + Ok(()) => ( + StatusCode::OK, + Json(serde_json::json!({"status": "connected", "addr": body.addr})), + ) + .into_response(), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": e})), + ) + .into_response(), + } +} + +async fn remove_peer( + State(state): State, + axum::extract::Path(node_id): axum::extract::Path, +) -> impl IntoResponse { + let p2p = match &state.p2p { + Some(mgr) => mgr, + None => { + return ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({"error": "P2P not enabled"})), + ) + .into_response() + } + }; + + // Find the peer address by matching node_id prefix in connected peers. + let status = p2p.status().await; + let peer_addr = status + .connected_peers + .iter() + .find(|p| p.addr.contains(&node_id)) + .map(|p| p.addr.clone()); + + match peer_addr { + Some(addr_str) => { + if let Ok(addr) = addr_str.parse() { + p2p.remove_peer(addr).await; + ( + StatusCode::OK, + Json(serde_json::json!({"status": "disconnected"})), + ) + .into_response() + } else { + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": "address parse error"})), + ) + .into_response() + } + } + None => ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({"error": "peer not found"})), + ) + .into_response(), + } +} diff --git a/crates/aingle_cortex/src/server.rs b/crates/aingle_cortex/src/server.rs index 74fbd56..92c8f8f 100644 --- a/crates/aingle_cortex/src/server.rs +++ b/crates/aingle_cortex/src/server.rs @@ -105,6 +105,11 @@ impl CortexServer { &self.state } + /// Returns a mutable reference to the shared `AppState`. + pub fn state_mut(&mut self) -> &mut AppState { + &mut self.state + } + /// Builds the `axum` router, combining all API routes and middleware. pub fn build_router(&self) -> Router { let mut app: Router = Router::new(); From d10add8e4d59962b90a6bf74c7e9afef7d6f3c40 Mon Sep 17 00:00:00 2001 From: It Apilium Date: Sun, 8 Mar 2026 19:42:36 +0100 Subject: [PATCH 3/4] fix: resolve 4 critical P2P bugs found during E2E testing Add lock-free accept loop for incoming QUIC connections (Task 0), fix ALPN protocol mismatch on client config, register peers in sync_manager at all connection points, and expose helper methods on P2pTransport for external accept pattern. --- crates/aingle_cortex/src/p2p/manager.rs | 97 +++++++++++++++++++++++ crates/aingle_cortex/src/p2p/transport.rs | 34 +++++++- 2 files changed, 130 insertions(+), 1 deletion(-) diff --git a/crates/aingle_cortex/src/p2p/manager.rs b/crates/aingle_cortex/src/p2p/manager.rs index baca7e6..5164190 100644 --- a/crates/aingle_cortex/src/p2p/manager.rs +++ b/crates/aingle_cortex/src/p2p/manager.rs @@ -195,6 +195,8 @@ impl P2pManager { match transport_inner.connect(*peer_addr, triple_count).await { Ok(()) => { tracing::info!("P2P connected to manual peer {}", peer_addr); + // Register in sync manager so gossip can reach this peer. + sync.write().await.get_peer_state(peer_addr); let now_ms = now_millis(); let mut ps = peer_store.write().await; ps.add(StoredPeer { @@ -219,6 +221,7 @@ impl P2pManager { match transport_inner.connect(stored.addr, triple_count).await { Ok(()) => { tracing::info!("P2P reconnected to persisted peer {}", stored.addr); + sync.write().await.get_peer_state(&stored.addr); } Err(e) => { tracing::debug!("P2P persisted peer {} unreachable: {}", stored.addr, e); @@ -251,6 +254,98 @@ impl P2pManager { // A4: Health event channel between Task 3 (sender) and Task 6 (receiver). let (health_tx, health_rx) = tokio::sync::mpsc::channel::(256); + // ── Task 0: Accept incoming connections ─────────────── + // + // Accepts new QUIC connections without holding the transport lock. + // Clones the endpoint (Arc-based, cheap), accepts + handshakes + // outside the lock, then briefly write-locks to store the connection. + { + let transport = transport.clone(); + let sync = sync.clone(); + let running = running.clone(); + let accept_seed_hash = seed_hash.clone(); + let peer_store = peer_store.clone(); + + // Get endpoint clone and node_id once. + let (accept_endpoint, accept_node_id) = { + let t = transport.read().await; + (t.endpoint_clone(), t.node_id_str().to_string()) + }; + + if let Some(endpoint) = accept_endpoint { + tasks.push(tokio::spawn(async move { + loop { + if !running.load(Ordering::Relaxed) { + break; + } + + // Wait for incoming connection WITHOUT holding any lock. + let incoming = match endpoint.accept().await { + Some(inc) => inc, + None => break, // endpoint closed + }; + + let connection = match incoming.await { + Ok(conn) => conn, + Err(e) => { + tracing::debug!("P2P accept handshake error: {}", e); + continue; + } + }; + + let remote = connection.remote_address(); + + // Read Hello from the new connection. + let hello = match P2pTransport::recv_from_conn(&connection).await { + Ok(msg) => msg, + Err(e) => { + tracing::debug!("P2P accept recv error from {}: {}", remote, e); + connection.close(1u32.into(), b"read_error"); + continue; + } + }; + + match hello { + P2pMessage::Hello { seed_hash: peer_seed, node_id: peer_nid, .. } => { + let accepted = peer_seed == accept_seed_hash; + let ack = P2pMessage::HelloAck { + node_id: accept_node_id.clone(), + accepted, + reason: if accepted { None } else { Some("seed_mismatch".into()) }, + }; + if P2pTransport::send_on_conn(&connection, &ack).await.is_err() { + continue; + } + + if accepted { + tracing::info!("P2P accepted connection from {} ({})", remote, &peer_nid[..8.min(peer_nid.len())]); + // Store connection (brief write lock). + transport.write().await.store_connection(remote, connection); + // Register in sync manager for gossip. + sync.write().await.get_peer_state(&remote); + // Record in peer store. + let mut ps = peer_store.write().await; + ps.add(StoredPeer { + addr: remote, + node_id: Some(peer_nid), + last_connected_ms: now_millis(), + source: PeerSource::RestApi, + }); + let _ = ps.save(); + } else { + tracing::warn!("P2P rejected connection from {}: seed mismatch", remote); + connection.close(1u32.into(), b"seed_mismatch"); + } + } + _ => { + connection.close(1u32.into(), b"expected_hello"); + } + } + } + })); + } + } + // ── Task 1: Event listener ─────────────────────────── { let gossip = gossip.clone(); @@ -609,6 +704,7 @@ impl P2pManager { "P2P discovered and connected to {}", peer.node_id ); + sync.write().await.get_peer_state(&peer.addr); // A3: Record mDNS peer let mut ps = peer_store.write().await; ps.add(StoredPeer { @@ -667,6 +763,7 @@ impl P2pManager { match result { Ok(()) => { tracing::info!("P2P reconnected to manual peer {}", tracker.addr); + sync.write().await.get_peer_state(&tracker.addr); tracker.record_success(); } Err(e) => { diff --git a/crates/aingle_cortex/src/p2p/transport.rs b/crates/aingle_cortex/src/p2p/transport.rs index f3b2aa8..14bbbf7 100644 --- a/crates/aingle_cortex/src/p2p/transport.rs +++ b/crates/aingle_cortex/src/p2p/transport.rs @@ -209,6 +209,36 @@ impl P2pTransport { self.connections.len() } + /// Clone the QUIC endpoint (cheap, Arc-based) for use outside the lock. + pub fn endpoint_clone(&self) -> Option { + self.endpoint.clone() + } + + /// Get the seed hash for handshake verification. + pub fn seed_hash(&self) -> &str { + &self.seed_hash + } + + /// Get the node ID. + pub fn node_id_str(&self) -> &str { + &self.node_id + } + + /// Store an externally-accepted connection. + pub fn store_connection(&mut self, addr: SocketAddr, conn: Connection) { + self.connections.insert(addr, conn); + } + + /// Send a message on a raw connection (not stored in self.connections). + pub async fn send_on_conn(conn: &Connection, msg: &P2pMessage) -> Result<(), String> { + Self::send_on_connection(conn, msg).await + } + + /// Receive a message from a raw connection. + pub async fn recv_from_conn(conn: &Connection) -> Result { + Self::recv_from_connection(conn).await + } + /// Close all connections and the endpoint. pub fn stop(&mut self) { for (_, conn) in self.connections.drain() { @@ -300,11 +330,13 @@ impl P2pTransport { } fn generate_client_config(&self) -> Result { - let crypto = rustls::ClientConfig::builder() + let mut crypto = rustls::ClientConfig::builder() .dangerous() .with_custom_certificate_verifier(Arc::new(LoggingCertVerifier)) .with_no_client_auth(); + crypto.alpn_protocols = vec![b"cortex-p2p".to_vec()]; + let mut client_config = ClientConfig::new(Arc::new( quinn::crypto::rustls::QuicClientConfig::try_from(crypto) .map_err(|e| format!("quic client crypto: {}", e))?, From 33513c995a577e166fcaa84b5b1816d32194455a Mon Sep 17 00:00:00 2001 From: It Apilium Date: Sun, 8 Mar 2026 20:09:48 +0100 Subject: [PATCH 4/4] chore: bump all product crate versions to 0.3.8 --- Cargo.lock | 20 ++++++++++---------- crates/aingle_ai/Cargo.toml | 2 +- crates/aingle_contracts/Cargo.toml | 2 +- crates/aingle_cortex/Cargo.toml | 2 +- crates/aingle_cortex/src/p2p/message.rs | 2 +- crates/aingle_graph/Cargo.toml | 2 +- crates/aingle_logic/Cargo.toml | 2 +- crates/aingle_minimal/Cargo.toml | 2 +- crates/aingle_viz/Cargo.toml | 2 +- crates/aingle_zk/Cargo.toml | 2 +- crates/hope_agents/Cargo.toml | 2 +- crates/titans_memory/Cargo.toml | 2 +- 12 files changed, 21 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9d1d560..c371bf1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -86,7 +86,7 @@ dependencies = [ [[package]] name = "aingle_ai" -version = "0.3.7" +version = "0.3.8" dependencies = [ "blake2", "candle-core 0.9.2", @@ -108,7 +108,7 @@ dependencies = [ [[package]] name = "aingle_contracts" -version = "0.3.7" +version = "0.3.8" dependencies = [ "blake3", "dashmap 6.1.0", @@ -127,7 +127,7 @@ dependencies = [ [[package]] name = "aingle_cortex" -version = "0.3.7" +version = "0.3.8" dependencies = [ "aingle_graph", "aingle_logic", @@ -173,7 +173,7 @@ dependencies = [ [[package]] name = "aingle_graph" -version = "0.3.7" +version = "0.3.8" dependencies = [ "bincode", "blake3", @@ -194,7 +194,7 @@ dependencies = [ [[package]] name = "aingle_logic" -version = "0.3.7" +version = "0.3.8" dependencies = [ "aingle_graph", "chrono", @@ -210,7 +210,7 @@ dependencies = [ [[package]] name = "aingle_minimal" -version = "0.3.7" +version = "0.3.8" dependencies = [ "async-io", "async-tungstenite", @@ -252,7 +252,7 @@ dependencies = [ [[package]] name = "aingle_viz" -version = "0.3.7" +version = "0.3.8" dependencies = [ "aingle_graph", "aingle_minimal", @@ -274,7 +274,7 @@ dependencies = [ [[package]] name = "aingle_zk" -version = "0.3.7" +version = "0.3.8" dependencies = [ "blake3", "bulletproofs", @@ -3657,7 +3657,7 @@ dependencies = [ [[package]] name = "hope_agents" -version = "0.3.7" +version = "0.3.8" dependencies = [ "chrono", "criterion", @@ -7056,7 +7056,7 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "titans_memory" -version = "0.3.7" +version = "0.3.8" dependencies = [ "blake3", "chrono", diff --git a/crates/aingle_ai/Cargo.toml b/crates/aingle_ai/Cargo.toml index d37e265..3676a66 100644 --- a/crates/aingle_ai/Cargo.toml +++ b/crates/aingle_ai/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aingle_ai" -version = "0.3.7" +version = "0.3.8" description = "AI integration layer for AIngle - Titans Memory, Nested Learning, HOPE Agents" license = "Apache-2.0" repository = "https://github.com/ApiliumCode/aingle" diff --git a/crates/aingle_contracts/Cargo.toml b/crates/aingle_contracts/Cargo.toml index 27a65f4..6599426 100644 --- a/crates/aingle_contracts/Cargo.toml +++ b/crates/aingle_contracts/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aingle_contracts" -version = "0.3.7" +version = "0.3.8" description = "Smart Contracts DSL and WASM Runtime for AIngle" license = "Apache-2.0" repository = "https://github.com/ApiliumCode/aingle" diff --git a/crates/aingle_cortex/Cargo.toml b/crates/aingle_cortex/Cargo.toml index 9987562..2edba31 100644 --- a/crates/aingle_cortex/Cargo.toml +++ b/crates/aingle_cortex/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aingle_cortex" -version = "0.3.7" +version = "0.3.8" description = "Córtex API - REST/GraphQL/SPARQL interface for AIngle semantic graphs" license = "Apache-2.0" repository = "https://github.com/ApiliumCode/aingle" diff --git a/crates/aingle_cortex/src/p2p/message.rs b/crates/aingle_cortex/src/p2p/message.rs index 63cc24d..c763778 100644 --- a/crates/aingle_cortex/src/p2p/message.rs +++ b/crates/aingle_cortex/src/p2p/message.rs @@ -257,7 +257,7 @@ mod tests { let msg = P2pMessage::Hello { node_id: "abc123".into(), seed_hash: "def456".into(), - version: "0.3.7".into(), + version: "0.3.8".into(), triple_count: 42, }; let bytes = msg.to_bytes(); diff --git a/crates/aingle_graph/Cargo.toml b/crates/aingle_graph/Cargo.toml index eacfbce..8f6a5d6 100644 --- a/crates/aingle_graph/Cargo.toml +++ b/crates/aingle_graph/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aingle_graph" -version = "0.3.7" +version = "0.3.8" description = "Native GraphDB for AIngle - Semantic triple store with SPO indexes" license = "Apache-2.0" repository = "https://github.com/ApiliumCode/aingle" diff --git a/crates/aingle_logic/Cargo.toml b/crates/aingle_logic/Cargo.toml index ffb49cd..00f2cce 100644 --- a/crates/aingle_logic/Cargo.toml +++ b/crates/aingle_logic/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aingle_logic" -version = "0.3.7" +version = "0.3.8" description = "Proof-of-Logic validation engine for AIngle semantic graphs" license = "Apache-2.0" repository = "https://github.com/ApiliumCode/aingle" diff --git a/crates/aingle_minimal/Cargo.toml b/crates/aingle_minimal/Cargo.toml index 51af75d..98beffb 100644 --- a/crates/aingle_minimal/Cargo.toml +++ b/crates/aingle_minimal/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aingle_minimal" -version = "0.3.7" +version = "0.3.8" description = "Ultra-light AIngle node for IoT devices (<1MB RAM)" license = "Apache-2.0" repository = "https://github.com/ApiliumCode/aingle" diff --git a/crates/aingle_viz/Cargo.toml b/crates/aingle_viz/Cargo.toml index f639b96..f022169 100644 --- a/crates/aingle_viz/Cargo.toml +++ b/crates/aingle_viz/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aingle_viz" -version = "0.3.7" +version = "0.3.8" description = "DAG Visualization for AIngle - Web-based graph explorer" license = "Apache-2.0" repository = "https://github.com/ApiliumCode/aingle" diff --git a/crates/aingle_zk/Cargo.toml b/crates/aingle_zk/Cargo.toml index 3dcca38..151eaec 100644 --- a/crates/aingle_zk/Cargo.toml +++ b/crates/aingle_zk/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aingle_zk" -version = "0.3.7" +version = "0.3.8" description = "Zero-Knowledge Proofs for AIngle - privacy-preserving cryptographic primitives" license = "Apache-2.0" repository = "https://github.com/ApiliumCode/aingle" diff --git a/crates/hope_agents/Cargo.toml b/crates/hope_agents/Cargo.toml index 84238f6..440c3ee 100644 --- a/crates/hope_agents/Cargo.toml +++ b/crates/hope_agents/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hope_agents" -version = "0.3.7" +version = "0.3.8" description = "HOPE Agents: Hierarchical Optimizing Policy Engine for AIngle AI agents" license = "Apache-2.0" repository = "https://github.com/ApiliumCode/aingle" diff --git a/crates/titans_memory/Cargo.toml b/crates/titans_memory/Cargo.toml index e778336..3011802 100644 --- a/crates/titans_memory/Cargo.toml +++ b/crates/titans_memory/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "titans_memory" -version = "0.3.7" +version = "0.3.8" description = "Titans Memory: Neural-inspired memory system for AIngle AI agents" license = "Apache-2.0" repository = "https://github.com/ApiliumCode/aingle"