diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 4a2e43ee09a..79c77536e16 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -7194,6 +7194,7 @@ dependencies = [ "fail", "futures", "http 1.4.0", + "indexmap 2.13.0", "itertools 0.14.0", "mockall", "mrecordlog", diff --git a/quickwit/quickwit-ingest/Cargo.toml b/quickwit/quickwit-ingest/Cargo.toml index 3149f2aaaf3..f47561fe2c9 100644 --- a/quickwit/quickwit-ingest/Cargo.toml +++ b/quickwit/quickwit-ingest/Cargo.toml @@ -18,6 +18,7 @@ bytesize = { workspace = true } fail = { workspace = true, optional = true } futures = { workspace = true } http = { workspace = true } +indexmap = { workspace = true } itertools = { workspace = true } mockall = { workspace = true, optional = true } mrecordlog = { workspace = true } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity_score.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity_score.rs index 6f8abc66ef8..1927eb788f7 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity_score.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity_score.rs @@ -42,23 +42,22 @@ const WAL_CAPACITY_LOOKBACK_WINDOW_LEN: usize = 6; const WAL_CAPACITY_READINGS_LEN: usize = WAL_CAPACITY_LOOKBACK_WINDOW_LEN + 1; struct WalMemoryCapacityTimeSeries { + memory_capacity: ByteSize, readings: RingBuffer, } impl WalMemoryCapacityTimeSeries { - fn new() -> Self { + fn new(memory_capacity: ByteSize) -> Self { + #[cfg(not(test))] + assert!(memory_capacity.as_u64() > 0); Self { + memory_capacity, readings: RingBuffer::default(), } } - fn record(&mut self, memory_used: ByteSize, memory_allocated: ByteSize) { - let allocated = memory_allocated.as_u64(); - if allocated == 0 { - self.readings.push_back(1.0); - return; - } - let remaining = 1.0 - (memory_used.as_u64() as f64 / allocated as f64); + fn record(&mut self, memory_used: ByteSize) { + let remaining = 1.0 - (memory_used.as_u64() as f64 / self.memory_capacity.as_u64() as f64); self.readings.push_back(remaining.clamp(0.0, 1.0)); } @@ -121,23 +120,27 @@ pub struct IngesterCapacityScore { /// Periodically snapshots the ingester's WAL memory usage and open shard counts, computes /// a capacity score, and broadcasts it to other nodes via Chitchat. -pub(crate) struct BroadcastIngesterCapacityScoreTask { +pub struct BroadcastIngesterCapacityScoreTask { cluster: Cluster, weak_state: WeakIngesterState, wal_capacity_time_series: WalMemoryCapacityTimeSeries, } impl BroadcastIngesterCapacityScoreTask { - pub fn spawn(cluster: Cluster, weak_state: WeakIngesterState) -> JoinHandle<()> { + pub fn spawn( + cluster: Cluster, + weak_state: WeakIngesterState, + memory_capacity: ByteSize, + ) -> JoinHandle<()> { let mut broadcaster = Self { cluster, weak_state, - wal_capacity_time_series: WalMemoryCapacityTimeSeries::new(), + wal_capacity_time_series: WalMemoryCapacityTimeSeries::new(memory_capacity), }; tokio::spawn(async move { broadcaster.run().await }) } - async fn snapshot(&self) -> Result> { + async fn snapshot(&self) -> Result> { let state = self .weak_state .upgrade() @@ -155,10 +158,9 @@ impl BroadcastIngesterCapacityScoreTask { .map_err(|_| anyhow::anyhow!("failed to acquire ingester state lock"))?; let usage = guard.mrecordlog.resource_usage(); let memory_used = ByteSize::b(usage.memory_used_bytes as u64); - let memory_allocated = ByteSize::b(usage.memory_allocated_bytes as u64); let open_shard_counts = guard.get_open_shard_counts(); - Ok(Some((memory_used, memory_allocated, open_shard_counts))) + Ok(Some((memory_used, open_shard_counts))) } async fn run(&mut self) { @@ -168,7 +170,7 @@ impl BroadcastIngesterCapacityScoreTask { loop { interval.tick().await; - let (memory_used, memory_allocated, open_shard_counts) = match self.snapshot().await { + let (memory_used, open_shard_counts) = match self.snapshot().await { Ok(Some(snapshot)) => snapshot, Ok(None) => continue, Err(error) => { @@ -177,8 +179,7 @@ impl BroadcastIngesterCapacityScoreTask { } }; - self.wal_capacity_time_series - .record(memory_used, memory_allocated); + self.wal_capacity_time_series.record(memory_used); let remaining_capacity = self.wal_capacity_time_series.current().unwrap_or(1.0); let capacity_delta = self.wal_capacity_time_series.delta().unwrap_or(0.0); @@ -272,30 +273,31 @@ mod tests { use crate::ingest_v2::state::IngesterState; fn ts() -> WalMemoryCapacityTimeSeries { - WalMemoryCapacityTimeSeries::new() + WalMemoryCapacityTimeSeries::new(ByteSize::b(100)) } - /// Helper: record a reading with `used` out of `allocated` bytes. - fn record(series: &mut WalMemoryCapacityTimeSeries, used: u64, allocated: u64) { - series.record(ByteSize::b(used), ByteSize::b(allocated)); + /// Helper: record a reading with `used` bytes against the series' fixed capacity. + fn record(series: &mut WalMemoryCapacityTimeSeries, used: u64) { + series.record(ByteSize::b(used)); } #[test] fn test_wal_memory_capacity_current_after_record() { - let mut series = ts(); + let mut series = WalMemoryCapacityTimeSeries::new(ByteSize::b(256)); // 192 of 256 used => 25% remaining - record(&mut series, 192, 256); + series.record(ByteSize::b(192)); assert_eq!(series.current(), Some(0.25)); // 16 of 256 used => 93.75% remaining - record(&mut series, 16, 256); + series.record(ByteSize::b(16)); assert_eq!(series.current(), Some(0.9375)); } #[test] fn test_wal_memory_capacity_record_saturates_at_zero() { let mut series = ts(); - record(&mut series, 200, 100); + // 200 used out of 100 capacity => clamped to 0.0 + record(&mut series, 200); assert_eq!(series.current(), Some(0.0)); } @@ -303,9 +305,9 @@ mod tests { fn test_wal_memory_capacity_delta_growing() { let mut series = ts(); // oldest: 60 of 100 used => 40% remaining - record(&mut series, 60, 100); + record(&mut series, 60); // current: 20 of 100 used => 80% remaining - record(&mut series, 20, 100); + record(&mut series, 20); // delta = 0.80 - 0.40 = 0.40 assert_eq!(series.delta(), Some(0.40)); } @@ -314,9 +316,9 @@ mod tests { fn test_wal_memory_capacity_delta_shrinking() { let mut series = ts(); // oldest: 20 of 100 used => 80% remaining - record(&mut series, 20, 100); + record(&mut series, 20); // current: 60 of 100 used => 40% remaining - record(&mut series, 60, 100); + record(&mut series, 60); // delta = 0.40 - 0.80 = -0.40 assert_eq!(series.delta(), Some(-0.40)); } @@ -326,7 +328,7 @@ mod tests { // Node A: capacity draining — usage increases 10, 20, ..., 70 over 7 ticks. let mut node_a = ts(); for used in (10..=70).step_by(10) { - record(&mut node_a, used, 100); + record(&mut node_a, used); } let a_remaining = node_a.current().unwrap(); let a_delta = node_a.delta().unwrap(); @@ -335,7 +337,7 @@ mod tests { // Node B: steady at 50% usage over 7 ticks. let mut node_b = ts(); for _ in 0..7 { - record(&mut node_b, 50, 100); + record(&mut node_b, 50); } let b_remaining = node_b.current().unwrap(); let b_delta = node_b.delta().unwrap(); @@ -361,7 +363,7 @@ mod tests { let task = BroadcastIngesterCapacityScoreTask { cluster, weak_state, - wal_capacity_time_series: WalMemoryCapacityTimeSeries::new(), + wal_capacity_time_series: WalMemoryCapacityTimeSeries::new(ByteSize::mb(1)), }; assert!(task.snapshot().await.is_err()); } @@ -388,14 +390,13 @@ mod tests { let open_shard_counts = state_guard.get_open_shard_counts(); drop(state_guard); - // Simulate 500 of 1000 bytes used => 50% remaining, 0 delta => score = 6 + // Simulate 500 of 1000 bytes capacity used => 50% remaining, 0 delta => score = 6 let mut task = BroadcastIngesterCapacityScoreTask { cluster: cluster.clone(), weak_state: state.weak(), - wal_capacity_time_series: WalMemoryCapacityTimeSeries::new(), + wal_capacity_time_series: WalMemoryCapacityTimeSeries::new(ByteSize::b(1000)), }; - task.wal_capacity_time_series - .record(ByteSize::b(500), ByteSize::b(1000)); + task.wal_capacity_time_series.record(ByteSize::b(500)); let remaining = task.wal_capacity_time_series.current().unwrap(); let delta = task.wal_capacity_time_series.delta().unwrap(); @@ -440,16 +441,16 @@ mod tests { // Fill to exactly the lookback window length (6 readings), all same value. for _ in 0..WAL_CAPACITY_LOOKBACK_WINDOW_LEN { - record(&mut series, 50, 100); + record(&mut series, 50); } assert_eq!(series.delta(), Some(0.0)); // 7th reading fills the ring buffer. Delta spans 6 intervals. - record(&mut series, 0, 100); + record(&mut series, 0); assert_eq!(series.delta(), Some(0.50)); // 8th reading evicts the oldest 50-remaining. Delta still spans 6 intervals. - record(&mut series, 0, 100); + record(&mut series, 0); assert_eq!(series.delta(), Some(0.50)); } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs index d2184a0e392..18a00209de1 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs @@ -26,6 +26,10 @@ pub(in crate::ingest_v2) const BROADCAST_INTERVAL_PERIOD: Duration = if cfg!(tes Duration::from_secs(5) }; +pub use ingester_capacity_score::{ + BroadcastIngesterCapacityScoreTask, IngesterCapacityScoreUpdate, + setup_ingester_capacity_update_listener, +}; pub use local_shards::{ BroadcastLocalShardsTask, LocalShardsUpdate, ShardInfo, ShardInfos, setup_local_shards_update_listener, diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 65c268881ac..fc1a44f19bb 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -59,7 +59,7 @@ use tokio::time::{sleep, timeout}; use tracing::{debug, error, info, warn}; use super::IngesterPool; -use super::broadcast::BroadcastLocalShardsTask; +use super::broadcast::{BroadcastIngesterCapacityScoreTask, BroadcastLocalShardsTask}; use super::doc_mapper::validate_doc_batch; use super::fetch::FetchStreamTask; use super::idle::CloseIdleShardsTask; @@ -144,7 +144,8 @@ impl Ingester { let state = IngesterState::load(wal_dir_path, rate_limiter_settings); let weak_state = state.weak(); - BroadcastLocalShardsTask::spawn(cluster, weak_state.clone()); + BroadcastLocalShardsTask::spawn(cluster.clone(), weak_state.clone()); + BroadcastIngesterCapacityScoreTask::spawn(cluster, weak_state.clone(), memory_capacity); CloseIdleShardsTask::spawn(weak_state, idle_shard_timeout); let ingester = Self { diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs index c8543faf793..3a801763feb 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs @@ -22,6 +22,8 @@ mod metrics; mod models; mod mrecord; mod mrecordlog_utils; +#[allow(dead_code)] +mod node_routing_table; mod publish_tracker; mod rate_meter; mod replication; @@ -36,7 +38,10 @@ use std::ops::{Add, AddAssign}; use std::time::Duration; use std::{env, fmt}; -pub use broadcast::{LocalShardsUpdate, ShardInfo, ShardInfos, setup_local_shards_update_listener}; +pub use broadcast::{ + LocalShardsUpdate, ShardInfo, ShardInfos, setup_ingester_capacity_update_listener, + setup_local_shards_update_listener, +}; use bytes::buf::Writer; use bytes::{BufMut, BytesMut}; use bytesize::ByteSize; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/node_routing_table.rs b/quickwit/quickwit-ingest/src/ingest_v2/node_routing_table.rs new file mode 100644 index 00000000000..95710669cef --- /dev/null +++ b/quickwit/quickwit-ingest/src/ingest_v2/node_routing_table.rs @@ -0,0 +1,420 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; + +use indexmap::IndexMap; +use quickwit_proto::ingest::Shard; +use quickwit_proto::types::{IndexId, IndexUid, NodeId, SourceId, SourceUid}; + +use crate::IngesterPool; + +/// A single ingester node's routing-relevant data for a specific (index, source) pair. +/// Each entry is self-describing: it carries its own node_id, index_uid, and source_id +/// so it can always be attributed back to a specific source on a specific node. +#[derive(Debug, Clone)] +pub(super) struct IngesterNode { + pub node_id: NodeId, + pub index_uid: IndexUid, + pub source_id: SourceId, + /// Score from 0-10. Higher means more available capacity. + pub capacity_score: usize, + /// Number of open shards on this node for this (index, source) pair. Tiebreaker for power of + /// two choices comparison - we favor a node with more open shards. + pub open_shard_count: usize, +} + +#[derive(Debug)] +pub(super) struct RoutingEntry { + pub nodes: IndexMap, +} + +/// Given a slice of candidates, picks the better of two random choices. +/// Higher capacity_score wins; tiebreak on more open_shard_count (more landing spots). +fn power_of_two_choices<'a>(candidates: &[&'a IngesterNode]) -> &'a IngesterNode { + let len = candidates.len(); + debug_assert!(len >= 2); + + let idx_1 = rand::random_range(0..len); + // Random offset in [1, len) wraps around, guaranteeing idx_2 != idx_1. + let idx_2 = (idx_1 + rand::random_range(1..len)) % len; + let (a, b) = (candidates[idx_1], candidates[idx_2]); + + if (a.capacity_score, a.open_shard_count) >= (b.capacity_score, b.open_shard_count) { + a + } else { + b + } +} + +impl RoutingEntry { + /// Pick an ingester node to persist the request to. Uses power of two choices based on reported + /// ingester capacity, if more than one eligible node exists. + pub fn pick_node( + &self, + ingester_pool: &IngesterPool, + unavailable_leaders: &HashSet, + ) -> Option<&IngesterNode> { + let eligible: Vec<&IngesterNode> = self + .nodes + .values() + .filter(|node| { + node.capacity_score > 0 + && node.open_shard_count > 0 + && ingester_pool.contains_key(&node.node_id) + && !unavailable_leaders.contains(&node.node_id) + }) + .collect(); + + match eligible.len() { + 0 => None, + 1 => Some(eligible[0]), + _ => Some(power_of_two_choices(&eligible)), + } + } +} + +#[derive(Debug, Default)] +pub(super) struct NodeBasedRoutingTable { + table: HashMap<(IndexId, SourceId), RoutingEntry>, +} + +impl NodeBasedRoutingTable { + pub fn find_entry(&self, index_id: &str, source_id: &str) -> Option<&RoutingEntry> { + let key = (index_id.to_string(), source_id.to_string()); + self.table.get(&key) + } + + pub fn debug_info(&self) -> HashMap> { + let mut per_index: HashMap> = HashMap::new(); + for ((index_id, source_id), entry) in &self.table { + for (node_id, node) in &entry.nodes { + per_index + .entry(index_id.clone()) + .or_default() + .push(serde_json::json!({ + "source_id": source_id, + "node_id": node_id, + "capacity_score": node.capacity_score, + "open_shard_count": node.open_shard_count, + })); + } + } + per_index + } + + pub fn has_open_nodes( + &self, + index_id: &str, + source_id: &str, + ingester_pool: &IngesterPool, + unavailable_leaders: &HashSet, + ) -> bool { + let key = (index_id.to_string(), source_id.to_string()); + let Some(entry) = self.table.get(&key) else { + return false; + }; + entry.nodes.values().any(|node| { + node.capacity_score > 0 + && node.open_shard_count > 0 + && ingester_pool.contains_key(&node.node_id) + && !unavailable_leaders.contains(&node.node_id) + }) + } + + /// Applies a capacity update from the IngesterCapacityScoreUpdate broadcast. This is the + /// primary way the table learns about node availability and capacity. + pub fn apply_capacity_update( + &mut self, + node_id: NodeId, + source_uid: SourceUid, + capacity_score: usize, + open_shard_count: usize, + ) { + let key = ( + source_uid.index_uid.index_id.to_string(), + source_uid.source_id.clone(), + ); + + let entry = self.table.entry(key).or_insert_with(|| RoutingEntry { + nodes: IndexMap::new(), + }); + + let ingester_node = IngesterNode { + node_id: node_id.clone(), + index_uid: source_uid.index_uid, + source_id: source_uid.source_id, + capacity_score, + open_shard_count, + }; + entry.nodes.insert(node_id, ingester_node); + } + + /// Merges nodes from a GetOrCreateOpenShards control plane response into the + /// table. Only adds nodes that aren't already present — existing nodes keep + /// their real capacity scores from the broadcast. + /// TODO: New nodes get a default capacity_score of 5 until GetOrCreateOpenShards contains + /// capacity scores. + pub fn merge_from_shards( + &mut self, + index_uid: IndexUid, + source_id: SourceId, + shards: Vec, + ) { + let key = (index_uid.index_id.to_string(), source_id.clone()); + + let mut per_leader_count: HashMap = HashMap::new(); + for shard in &shards { + if shard.is_open() { + *per_leader_count + .entry(NodeId::from(shard.leader_id.clone())) + .or_default() += 1; + } + } + + let entry = self.table.entry(key).or_insert_with(|| RoutingEntry { + nodes: IndexMap::new(), + }); + + for (node_id, open_shard_count) in per_leader_count { + if entry.nodes.contains_key(&node_id) { + continue; + } + let ingester_node = IngesterNode { + node_id: node_id.clone(), + index_uid: index_uid.clone(), + source_id: source_id.clone(), + capacity_score: 5, + open_shard_count, + }; + entry.nodes.insert(node_id, ingester_node); + } + } +} + +#[cfg(test)] +mod tests { + use quickwit_proto::ingest::ShardState; + use quickwit_proto::ingest::ingester::IngesterServiceClient; + use quickwit_proto::types::ShardId; + + use super::*; + + fn source_uid(index_id: &str, incarnation_id: u128, source_id: &str) -> SourceUid { + SourceUid { + index_uid: IndexUid::for_test(index_id, incarnation_id), + source_id: source_id.to_string(), + } + } + + #[test] + fn test_apply_capacity_update() { + let mut table = NodeBasedRoutingTable::default(); + let uid = source_uid("test-index", 0, "test-source"); + let key = ("test-index".to_string(), "test-source".to_string()); + + // Insert first node. + table.apply_capacity_update("node-1".into(), uid.clone(), 8, 3); + let entry = table.table.get(&key).unwrap(); + assert_eq!(entry.nodes.len(), 1); + assert_eq!(entry.nodes.get("node-1").unwrap().capacity_score, 8); + + // Update existing node. + table.apply_capacity_update("node-1".into(), uid.clone(), 4, 5); + let node = table.table.get(&key).unwrap().nodes.get("node-1").unwrap(); + assert_eq!(node.capacity_score, 4); + assert_eq!(node.open_shard_count, 5); + + // Add second node. + table.apply_capacity_update("node-2".into(), uid.clone(), 6, 2); + assert_eq!(table.table.get(&key).unwrap().nodes.len(), 2); + + // Zero shards: node stays in table but becomes ineligible for routing. + table.apply_capacity_update("node-1".into(), uid.clone(), 0, 0); + let entry = table.table.get(&key).unwrap(); + assert_eq!(entry.nodes.len(), 2); + assert_eq!(entry.nodes.get("node-1").unwrap().open_shard_count, 0); + assert_eq!(entry.nodes.get("node-1").unwrap().capacity_score, 0); + } + + #[test] + fn test_has_open_nodes() { + let mut table = NodeBasedRoutingTable::default(); + let pool = IngesterPool::default(); + let uid = source_uid("test-index", 0, "test-source"); + + // Empty table. + assert!(!table.has_open_nodes("test-index", "test-source", &pool, &HashSet::new())); + + // Node exists but is not in pool. + table.apply_capacity_update("node-1".into(), uid.clone(), 8, 3); + assert!(!table.has_open_nodes("test-index", "test-source", &pool, &HashSet::new())); + + // Node is in pool → true. + pool.insert("node-1".into(), IngesterServiceClient::mocked()); + assert!(table.has_open_nodes("test-index", "test-source", &pool, &HashSet::new())); + + // Node is unavailable → false. + let unavailable: HashSet = HashSet::from(["node-1".into()]); + assert!(!table.has_open_nodes("test-index", "test-source", &pool, &unavailable)); + + // Second node available → true despite first being unavailable. + table.apply_capacity_update("node-2".into(), uid.clone(), 6, 2); + pool.insert("node-2".into(), IngesterServiceClient::mocked()); + assert!(table.has_open_nodes("test-index", "test-source", &pool, &unavailable)); + + // Node with capacity_score=0 is not eligible. + table.apply_capacity_update("node-2".into(), uid, 0, 2); + assert!(!table.has_open_nodes("test-index", "test-source", &pool, &unavailable)); + } + + #[test] + fn test_pick_node() { + let mut table = NodeBasedRoutingTable::default(); + let pool = IngesterPool::default(); + let uid = source_uid("test-index", 0, "test-source"); + let key = ("test-index".to_string(), "test-source".to_string()); + + // Node exists but not in pool → None. + table.apply_capacity_update("node-1".into(), uid.clone(), 8, 3); + assert!( + table + .table + .get(&key) + .unwrap() + .pick_node(&pool, &HashSet::new()) + .is_none() + ); + + // Single node in pool → picks it. + pool.insert("node-1".into(), IngesterServiceClient::mocked()); + let picked = table + .table + .get(&key) + .unwrap() + .pick_node(&pool, &HashSet::new()) + .unwrap(); + assert_eq!(picked.node_id, NodeId::from("node-1")); + + // Multiple nodes → something is returned. + table.apply_capacity_update("node-2".into(), uid.clone(), 2, 1); + pool.insert("node-2".into(), IngesterServiceClient::mocked()); + assert!( + table + .table + .get(&key) + .unwrap() + .pick_node(&pool, &HashSet::new()) + .is_some() + ); + + // Node with capacity_score=0 is skipped. + table.apply_capacity_update("node-1".into(), uid.clone(), 0, 3); + table.apply_capacity_update("node-2".into(), uid, 0, 1); + assert!( + table + .table + .get(&key) + .unwrap() + .pick_node(&pool, &HashSet::new()) + .is_none() + ); + } + + #[test] + fn test_power_of_two_choices() { + // 3 candidates: best appears in the random pair 2/3 of the time and always + // wins when it does, so it should win ~67% of 1000 runs. Asserting > 550 + // is ~7.5 standard deviations from the mean — effectively impossible to flake. + let high = IngesterNode { + node_id: "high".into(), + index_uid: IndexUid::for_test("idx", 0), + source_id: "src".into(), + capacity_score: 9, + open_shard_count: 2, + }; + let mid = IngesterNode { + node_id: "mid".into(), + index_uid: IndexUid::for_test("idx", 0), + source_id: "src".into(), + capacity_score: 5, + open_shard_count: 2, + }; + let low = IngesterNode { + node_id: "low".into(), + index_uid: IndexUid::for_test("idx", 0), + source_id: "src".into(), + capacity_score: 1, + open_shard_count: 2, + }; + let candidates: Vec<&IngesterNode> = vec![&high, &mid, &low]; + + let mut high_wins = 0; + for _ in 0..1000 { + if power_of_two_choices(&candidates).node_id == "high" { + high_wins += 1; + } + } + assert!(high_wins > 550, "high won only {high_wins}/1000 times"); + } + + #[test] + fn test_merge_from_shards() { + let mut table = NodeBasedRoutingTable::default(); + let index_uid = IndexUid::for_test("test-index", 0); + let key = ("test-index".to_string(), "test-source".to_string()); + + let make_shard = |id: u64, leader: &str, open: bool| Shard { + index_uid: Some(index_uid.clone()), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(id)), + shard_state: if open { + ShardState::Open as i32 + } else { + ShardState::Closed as i32 + }, + leader_id: leader.to_string(), + ..Default::default() + }; + + // Two open shards on node-1, one open on node-2, one closed (ignored). + let shards = vec![ + make_shard(1, "node-1", true), + make_shard(2, "node-1", true), + make_shard(3, "node-2", true), + make_shard(4, "node-2", false), + ]; + table.merge_from_shards(index_uid.clone(), "test-source".into(), shards); + + let entry = table.table.get(&key).unwrap(); + assert_eq!(entry.nodes.len(), 2); + + let n1 = entry.nodes.get("node-1").unwrap(); + assert_eq!(n1.open_shard_count, 2); + assert_eq!(n1.capacity_score, 5); + + let n2 = entry.nodes.get("node-2").unwrap(); + assert_eq!(n2.open_shard_count, 1); + + // Merging again adds new nodes but preserves existing ones. + let shards = vec![make_shard(10, "node-3", true)]; + table.merge_from_shards(index_uid, "test-source".into(), shards); + + let entry = table.table.get(&key).unwrap(); + assert_eq!(entry.nodes.len(), 3); + assert!(entry.nodes.get("node-1").is_some()); + assert!(entry.nodes.get("node-2").is_some()); + assert!(entry.nodes.get("node-3").is_some()); + } +} diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index 67ad31a2722..ccd00f0209c 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -43,12 +43,13 @@ use tokio::sync::{Mutex, Semaphore}; use tokio::time::error::Elapsed; use tracing::{error, info}; -use super::broadcast::LocalShardsUpdate; +use super::broadcast::{IngesterCapacityScoreUpdate, LocalShardsUpdate}; use super::debouncing::{ DebouncedGetOrCreateOpenShardsRequest, GetOrCreateOpenShardsRequestDebouncer, }; use super::ingester::PERSIST_REQUEST_TIMEOUT; use super::metrics::IngestResultMetrics; +use super::node_routing_table::NodeBasedRoutingTable; use super::routing_table::{NextOpenShardError, RoutingTable}; use super::workbench::IngestWorkbench; use super::{IngesterPool, pending_subrequests}; @@ -105,6 +106,9 @@ struct RouterState { debouncer: GetOrCreateOpenShardsRequestDebouncer, // Holds the routing table mapping index and source IDs to shards. routing_table: RoutingTable, + // Node-based routing table, populated by capacity broadcasts. + // Not yet used for routing — will replace `routing_table` in a follow-up PR. + node_routing_table: NodeBasedRoutingTable, } impl fmt::Debug for IngestRouter { @@ -130,6 +134,7 @@ impl IngestRouter { self_node_id: self_node_id.clone(), table: HashMap::default(), }, + node_routing_table: NodeBasedRoutingTable::default(), })); let ingest_semaphore_permits = get_ingest_router_buffer_size().as_u64() as usize; let ingest_semaphore = Arc::new(Semaphore::new(ingest_semaphore_permits)); @@ -151,7 +156,10 @@ impl IngestRouter { .subscribe::(weak_router_state.clone()) .forever(); self.event_broker - .subscribe::(weak_router_state) + .subscribe::(weak_router_state.clone()) + .forever(); + self.event_broker + .subscribe::(weak_router_state) .forever(); } @@ -694,6 +702,22 @@ impl EventSubscriber for WeakRouterState { } } +#[async_trait] +impl EventSubscriber for WeakRouterState { + async fn handle_event(&mut self, update: IngesterCapacityScoreUpdate) { + let Some(state) = self.0.upgrade() else { + return; + }; + let mut state_guard = state.lock().await; + state_guard.node_routing_table.apply_capacity_update( + update.node_id, + update.source_uid, + update.capacity_score, + update.open_shard_count, + ); + } +} + pub(super) struct PersistRequestSummary { pub leader_id: NodeId, pub subrequest_ids: Vec,