From 35a88928bda1c6f66f8a26a6343fb979893f2002 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Fri, 13 Feb 2026 15:10:32 -0500 Subject: [PATCH 1/9] Implement node based routing table --- quickwit/Cargo.lock | 1 + quickwit/quickwit-ingest/Cargo.toml | 1 + .../src/ingest_v2/broadcast/mod.rs | 1 + quickwit/quickwit-ingest/src/ingest_v2/mod.rs | 2 + .../src/ingest_v2/node_routing_table.rs | 440 ++++++++++++++++++ .../quickwit-ingest/src/ingest_v2/router.rs | 28 +- 6 files changed, 471 insertions(+), 2 deletions(-) create mode 100644 quickwit/quickwit-ingest/src/ingest_v2/node_routing_table.rs diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 4a2e43ee09a..79c77536e16 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -7194,6 +7194,7 @@ dependencies = [ "fail", "futures", "http 1.4.0", + "indexmap 2.13.0", "itertools 0.14.0", "mockall", "mrecordlog", diff --git a/quickwit/quickwit-ingest/Cargo.toml b/quickwit/quickwit-ingest/Cargo.toml index 3149f2aaaf3..f47561fe2c9 100644 --- a/quickwit/quickwit-ingest/Cargo.toml +++ b/quickwit/quickwit-ingest/Cargo.toml @@ -18,6 +18,7 @@ bytesize = { workspace = true } fail = { workspace = true, optional = true } futures = { workspace = true } http = { workspace = true } +indexmap = { workspace = true } itertools = { workspace = true } mockall = { workspace = true, optional = true } mrecordlog = { workspace = true } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs index d2184a0e392..577591f7d49 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs @@ -26,6 +26,7 @@ pub(in crate::ingest_v2) const BROADCAST_INTERVAL_PERIOD: Duration = if cfg!(tes Duration::from_secs(5) }; +pub(crate) use ingester_capacity_score::IngesterCapacityScoreUpdate; pub use local_shards::{ BroadcastLocalShardsTask, LocalShardsUpdate, ShardInfo, ShardInfos, setup_local_shards_update_listener, diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs index c8543faf793..4cc598382dd 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs @@ -22,6 +22,8 @@ mod metrics; mod models; mod mrecord; mod mrecordlog_utils; +#[allow(dead_code)] +mod node_routing_table; mod publish_tracker; mod rate_meter; mod replication; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/node_routing_table.rs b/quickwit/quickwit-ingest/src/ingest_v2/node_routing_table.rs new file mode 100644 index 00000000000..57a4e6c1f3f --- /dev/null +++ b/quickwit/quickwit-ingest/src/ingest_v2/node_routing_table.rs @@ -0,0 +1,440 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; + +use indexmap::IndexMap; +use quickwit_proto::ingest::Shard; +use quickwit_proto::types::{IndexId, IndexUid, NodeId, SourceId, SourceUid}; + +use crate::IngesterPool; + +/// A single ingester node's routing-relevant data for a specific (index, source) pair. +/// Each entry is self-describing: it carries its own node_id, index_uid, and source_id +/// so it can always be attributed back to a specific source on a specific node. +#[derive(Debug, Clone)] +pub(super) struct IngesterNode { + pub node_id: NodeId, + pub index_uid: IndexUid, + pub source_id: SourceId, + /// Score from 0-10. Higher means more available capacity. + pub capacity_score: usize, + /// Number of open shards on this node for this (index, source) pair. Tiebreaker for power of + /// two choices comparison - we favor a node with more open shards. + pub open_shard_count: usize, +} + +#[derive(Debug)] +pub(super) struct RoutingEntry { + pub nodes: IndexMap, +} + +/// Given a slice of candidates, picks the better of two random choices. +/// Higher capacity_score wins; tiebreak on more open_shard_count (more landing spots). +fn power_of_two_choices<'a>(candidates: &[&'a IngesterNode]) -> &'a IngesterNode { + let len = candidates.len(); + debug_assert!(len >= 2); + + let idx_1 = rand::random_range(0..len); + // Random offset in [1, len) wraps around, guaranteeing idx_2 != idx_1. + let idx_2 = (idx_1 + rand::random_range(1..len)) % len; + let (a, b) = (candidates[idx_1], candidates[idx_2]); + + if (a.capacity_score, a.open_shard_count) >= (b.capacity_score, b.open_shard_count) { + a + } else { + b + } +} + +impl RoutingEntry { + /// Pick an ingester node to persist the request to. Uses power of two choices based on reported + /// ingester capacity, if more than one eligible node exists. + pub fn pick_node( + &self, + ingester_pool: &IngesterPool, + unavailable_leaders: &HashSet, + ) -> Option<&NodeId> { + let eligible: Vec<&IngesterNode> = self + .nodes + .values() + .filter(|node| { + node.capacity_score > 0 + && node.open_shard_count > 0 + && ingester_pool.contains_key(&node.node_id) + && !unavailable_leaders.contains(&node.node_id) + }) + .collect(); + + match eligible.len() { + 0 => None, + 1 => Some(&eligible[0].node_id), + _ => Some(&power_of_two_choices(&eligible).node_id), + } + } +} + +#[derive(Debug, Default)] +pub(super) struct NodeBasedRoutingTable { + table: HashMap<(IndexId, SourceId), RoutingEntry>, +} + +impl NodeBasedRoutingTable { + pub fn find_entry(&self, index_id: &str, source_id: &str) -> Option<&RoutingEntry> { + let key = (index_id.to_string(), source_id.to_string()); + self.table.get(&key) + } + + pub fn debug_info(&self) -> HashMap> { + let mut per_index: HashMap> = HashMap::new(); + for ((index_id, source_id), entry) in &self.table { + for (node_id, node) in &entry.nodes { + per_index + .entry(index_id.clone()) + .or_default() + .push(serde_json::json!({ + "source_id": source_id, + "node_id": node_id, + "capacity_score": node.capacity_score, + "open_shard_count": node.open_shard_count, + })); + } + } + per_index + } + + pub fn has_open_nodes( + &self, + index_id: &str, + source_id: &str, + ingester_pool: &IngesterPool, + unavailable_leaders: &HashSet, + ) -> bool { + let key = (index_id.to_string(), source_id.to_string()); + let Some(entry) = self.table.get(&key) else { + return false; + }; + entry.nodes.values().any(|node| { + node.capacity_score > 0 + && node.open_shard_count > 0 + && ingester_pool.contains_key(&node.node_id) + && !unavailable_leaders.contains(&node.node_id) + }) + } + + /// Applies a capacity update from the IngesterCapacityScoreUpdate broadcast. This is the + /// primary way the table learns about node availability and capacity. + /// + /// When `open_shard_count == 0`, the node is removed (it can't accept requests). + /// When `open_shard_count > 0`, the node is upserted with the latest capacity. + pub fn apply_capacity_update( + &mut self, + node_id: NodeId, + source_uid: SourceUid, + capacity_score: usize, + open_shard_count: usize, + ) { + let key = ( + source_uid.index_uid.index_id.to_string(), + source_uid.source_id.clone(), + ); + + if open_shard_count == 0 { + // Node has no open shards for this source — remove it. + if let Some(entry) = self.table.get_mut(&key) { + entry.nodes.swap_remove(&node_id); + if entry.nodes.is_empty() { + self.table.remove(&key); + } + } + return; + } + + let entry = self.table.entry(key).or_insert_with(|| RoutingEntry { + nodes: IndexMap::new(), + }); + + let ingester_node = IngesterNode { + node_id: node_id.clone(), + index_uid: source_uid.index_uid, + source_id: source_uid.source_id, + capacity_score, + open_shard_count, + }; + entry.nodes.insert(node_id, ingester_node); + } + + /// Seeds the table from a GetOrCreateOpenShards control plane response. + /// Used at cold start or when all shards for a source have closed and the + /// router asked the CP to create new ones. Replaces the entire entry for + /// this (index, source) — the CP response is authoritative. + /// + /// TODO: Capacity score defaults to 5 (mid-range). + /// Return capacity scores from the control plane in the GetOrCreateOpenShards response. + pub fn seed_from_shards( + &mut self, + index_uid: IndexUid, + source_id: SourceId, + shards: Vec, + ) { + let key = (index_uid.index_id.to_string(), source_id.clone()); + + let mut per_leader_count: HashMap = HashMap::new(); + for shard in &shards { + if shard.is_open() { + *per_leader_count + .entry(NodeId::from(shard.leader_id.clone())) + .or_default() += 1; + } + } + + let mut nodes = IndexMap::new(); + for (node_id, open_shard_count) in per_leader_count { + let ingester_node = IngesterNode { + node_id: node_id.clone(), + index_uid: index_uid.clone(), + source_id: source_id.clone(), + capacity_score: 5, + open_shard_count, + }; + nodes.insert(node_id, ingester_node); + } + + self.table.insert(key, RoutingEntry { nodes }); + } +} + +#[cfg(test)] +mod tests { + use quickwit_proto::ingest::ShardState; + use quickwit_proto::ingest::ingester::IngesterServiceClient; + use quickwit_proto::types::ShardId; + + use super::*; + + fn source_uid(index_id: &str, incarnation_id: u128, source_id: &str) -> SourceUid { + SourceUid { + index_uid: IndexUid::for_test(index_id, incarnation_id), + source_id: source_id.to_string(), + } + } + + #[test] + fn test_apply_capacity_update() { + let mut table = NodeBasedRoutingTable::default(); + let uid = source_uid("test-index", 0, "test-source"); + let key = ("test-index".to_string(), "test-source".to_string()); + + // Remove on empty table is a no-op. + table.apply_capacity_update("node-1".into(), uid.clone(), 0, 0); + assert!(table.table.is_empty()); + + // Insert first node. + table.apply_capacity_update("node-1".into(), uid.clone(), 8, 3); + let entry = table.table.get(&key).unwrap(); + assert_eq!(entry.nodes.len(), 1); + assert_eq!(entry.nodes.get("node-1").unwrap().capacity_score, 8); + + // Update existing node. + table.apply_capacity_update("node-1".into(), uid.clone(), 4, 5); + let node = table.table.get(&key).unwrap().nodes.get("node-1").unwrap(); + assert_eq!(node.capacity_score, 4); + assert_eq!(node.open_shard_count, 5); + + // Add second node. + table.apply_capacity_update("node-2".into(), uid.clone(), 6, 2); + assert_eq!(table.table.get(&key).unwrap().nodes.len(), 2); + + // Remove first node (zero shards), second remains. + table.apply_capacity_update("node-1".into(), uid.clone(), 0, 0); + let entry = table.table.get(&key).unwrap(); + assert_eq!(entry.nodes.len(), 1); + assert!(entry.nodes.get("node-1").is_none()); + assert!(entry.nodes.get("node-2").is_some()); + + // Remove last node → entire entry is cleaned up. + table.apply_capacity_update("node-2".into(), uid, 0, 0); + assert!(!table.table.contains_key(&key)); + } + + #[test] + fn test_has_open_nodes() { + let mut table = NodeBasedRoutingTable::default(); + let pool = IngesterPool::default(); + let uid = source_uid("test-index", 0, "test-source"); + + // Empty table. + assert!(!table.has_open_nodes("test-index", "test-source", &pool, &HashSet::new())); + + // Node exists but is not in pool. + table.apply_capacity_update("node-1".into(), uid.clone(), 8, 3); + assert!(!table.has_open_nodes("test-index", "test-source", &pool, &HashSet::new())); + + // Node is in pool → true. + pool.insert("node-1".into(), IngesterServiceClient::mocked()); + assert!(table.has_open_nodes("test-index", "test-source", &pool, &HashSet::new())); + + // Node is unavailable → false. + let unavailable: HashSet = HashSet::from(["node-1".into()]); + assert!(!table.has_open_nodes("test-index", "test-source", &pool, &unavailable)); + + // Second node available → true despite first being unavailable. + table.apply_capacity_update("node-2".into(), uid.clone(), 6, 2); + pool.insert("node-2".into(), IngesterServiceClient::mocked()); + assert!(table.has_open_nodes("test-index", "test-source", &pool, &unavailable)); + + // Node with capacity_score=0 is not eligible. + table.apply_capacity_update("node-2".into(), uid, 0, 2); + assert!(!table.has_open_nodes("test-index", "test-source", &pool, &unavailable)); + } + + #[test] + fn test_pick_node() { + let mut table = NodeBasedRoutingTable::default(); + let pool = IngesterPool::default(); + let uid = source_uid("test-index", 0, "test-source"); + let key = ("test-index".to_string(), "test-source".to_string()); + + // Node exists but not in pool → None. + table.apply_capacity_update("node-1".into(), uid.clone(), 8, 3); + assert!( + table + .table + .get(&key) + .unwrap() + .pick_node(&pool, &HashSet::new()) + .is_none() + ); + + // Single node in pool → picks it. + pool.insert("node-1".into(), IngesterServiceClient::mocked()); + assert_eq!( + table + .table + .get(&key) + .unwrap() + .pick_node(&pool, &HashSet::new()), + Some(&"node-1".into()), + ); + + // Multiple nodes → something is returned. + table.apply_capacity_update("node-2".into(), uid.clone(), 2, 1); + pool.insert("node-2".into(), IngesterServiceClient::mocked()); + assert!( + table + .table + .get(&key) + .unwrap() + .pick_node(&pool, &HashSet::new()) + .is_some() + ); + + // Node with capacity_score=0 is skipped. + table.apply_capacity_update("node-1".into(), uid.clone(), 0, 3); + table.apply_capacity_update("node-2".into(), uid, 0, 1); + assert!( + table + .table + .get(&key) + .unwrap() + .pick_node(&pool, &HashSet::new()) + .is_none() + ); + } + + #[test] + fn test_power_of_two_choices() { + // 3 candidates: best appears in the random pair 2/3 of the time and always + // wins when it does, so it should win ~67% of 1000 runs. Asserting > 550 + // is ~7.5 standard deviations from the mean — effectively impossible to flake. + let high = IngesterNode { + node_id: "high".into(), + index_uid: IndexUid::for_test("idx", 0), + source_id: "src".into(), + capacity_score: 9, + open_shard_count: 2, + }; + let mid = IngesterNode { + node_id: "mid".into(), + index_uid: IndexUid::for_test("idx", 0), + source_id: "src".into(), + capacity_score: 5, + open_shard_count: 2, + }; + let low = IngesterNode { + node_id: "low".into(), + index_uid: IndexUid::for_test("idx", 0), + source_id: "src".into(), + capacity_score: 1, + open_shard_count: 2, + }; + let candidates: Vec<&IngesterNode> = vec![&high, &mid, &low]; + + let mut high_wins = 0; + for _ in 0..1000 { + if power_of_two_choices(&candidates).node_id == "high" { + high_wins += 1; + } + } + assert!(high_wins > 550, "high won only {high_wins}/1000 times"); + } + + #[test] + fn test_seed_from_shards() { + let mut table = NodeBasedRoutingTable::default(); + let index_uid = IndexUid::for_test("test-index", 0); + let key = ("test-index".to_string(), "test-source".to_string()); + + let make_shard = |id: u64, leader: &str, open: bool| Shard { + index_uid: Some(index_uid.clone()), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(id)), + shard_state: if open { + ShardState::Open as i32 + } else { + ShardState::Closed as i32 + }, + leader_id: leader.to_string(), + ..Default::default() + }; + + // Two open shards on node-1, one open on node-2, one closed (ignored). + let shards = vec![ + make_shard(1, "node-1", true), + make_shard(2, "node-1", true), + make_shard(3, "node-2", true), + make_shard(4, "node-2", false), + ]; + table.seed_from_shards(index_uid.clone(), "test-source".into(), shards); + + let entry = table.table.get(&key).unwrap(); + assert_eq!(entry.nodes.len(), 2); + + let n1 = entry.nodes.get("node-1").unwrap(); + assert_eq!(n1.open_shard_count, 2); + assert_eq!(n1.capacity_score, 5); + + let n2 = entry.nodes.get("node-2").unwrap(); + assert_eq!(n2.open_shard_count, 1); + + // Seeding again replaces the entry entirely. + let shards = vec![make_shard(10, "node-3", true)]; + table.seed_from_shards(index_uid, "test-source".into(), shards); + + let entry = table.table.get(&key).unwrap(); + assert_eq!(entry.nodes.len(), 1); + assert!(entry.nodes.get("node-3").is_some()); + assert!(entry.nodes.get("node-1").is_none()); + } +} diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index 67ad31a2722..ccd00f0209c 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -43,12 +43,13 @@ use tokio::sync::{Mutex, Semaphore}; use tokio::time::error::Elapsed; use tracing::{error, info}; -use super::broadcast::LocalShardsUpdate; +use super::broadcast::{IngesterCapacityScoreUpdate, LocalShardsUpdate}; use super::debouncing::{ DebouncedGetOrCreateOpenShardsRequest, GetOrCreateOpenShardsRequestDebouncer, }; use super::ingester::PERSIST_REQUEST_TIMEOUT; use super::metrics::IngestResultMetrics; +use super::node_routing_table::NodeBasedRoutingTable; use super::routing_table::{NextOpenShardError, RoutingTable}; use super::workbench::IngestWorkbench; use super::{IngesterPool, pending_subrequests}; @@ -105,6 +106,9 @@ struct RouterState { debouncer: GetOrCreateOpenShardsRequestDebouncer, // Holds the routing table mapping index and source IDs to shards. routing_table: RoutingTable, + // Node-based routing table, populated by capacity broadcasts. + // Not yet used for routing — will replace `routing_table` in a follow-up PR. + node_routing_table: NodeBasedRoutingTable, } impl fmt::Debug for IngestRouter { @@ -130,6 +134,7 @@ impl IngestRouter { self_node_id: self_node_id.clone(), table: HashMap::default(), }, + node_routing_table: NodeBasedRoutingTable::default(), })); let ingest_semaphore_permits = get_ingest_router_buffer_size().as_u64() as usize; let ingest_semaphore = Arc::new(Semaphore::new(ingest_semaphore_permits)); @@ -151,7 +156,10 @@ impl IngestRouter { .subscribe::(weak_router_state.clone()) .forever(); self.event_broker - .subscribe::(weak_router_state) + .subscribe::(weak_router_state.clone()) + .forever(); + self.event_broker + .subscribe::(weak_router_state) .forever(); } @@ -694,6 +702,22 @@ impl EventSubscriber for WeakRouterState { } } +#[async_trait] +impl EventSubscriber for WeakRouterState { + async fn handle_event(&mut self, update: IngesterCapacityScoreUpdate) { + let Some(state) = self.0.upgrade() else { + return; + }; + let mut state_guard = state.lock().await; + state_guard.node_routing_table.apply_capacity_update( + update.node_id, + update.source_uid, + update.capacity_score, + update.open_shard_count, + ); + } +} + pub(super) struct PersistRequestSummary { pub leader_id: NodeId, pub subrequest_ids: Vec, From f16ed311a3952e674ab287c8396f6914b5a5f015 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Tue, 17 Feb 2026 15:31:29 -0500 Subject: [PATCH 2/9] Small function signature changes --- .../src/ingest_v2/node_routing_table.rs | 90 ++++++++----------- 1 file changed, 35 insertions(+), 55 deletions(-) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/node_routing_table.rs b/quickwit/quickwit-ingest/src/ingest_v2/node_routing_table.rs index 57a4e6c1f3f..95710669cef 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/node_routing_table.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/node_routing_table.rs @@ -65,7 +65,7 @@ impl RoutingEntry { &self, ingester_pool: &IngesterPool, unavailable_leaders: &HashSet, - ) -> Option<&NodeId> { + ) -> Option<&IngesterNode> { let eligible: Vec<&IngesterNode> = self .nodes .values() @@ -79,8 +79,8 @@ impl RoutingEntry { match eligible.len() { 0 => None, - 1 => Some(&eligible[0].node_id), - _ => Some(&power_of_two_choices(&eligible).node_id), + 1 => Some(eligible[0]), + _ => Some(power_of_two_choices(&eligible)), } } } @@ -135,9 +135,6 @@ impl NodeBasedRoutingTable { /// Applies a capacity update from the IngesterCapacityScoreUpdate broadcast. This is the /// primary way the table learns about node availability and capacity. - /// - /// When `open_shard_count == 0`, the node is removed (it can't accept requests). - /// When `open_shard_count > 0`, the node is upserted with the latest capacity. pub fn apply_capacity_update( &mut self, node_id: NodeId, @@ -150,17 +147,6 @@ impl NodeBasedRoutingTable { source_uid.source_id.clone(), ); - if open_shard_count == 0 { - // Node has no open shards for this source — remove it. - if let Some(entry) = self.table.get_mut(&key) { - entry.nodes.swap_remove(&node_id); - if entry.nodes.is_empty() { - self.table.remove(&key); - } - } - return; - } - let entry = self.table.entry(key).or_insert_with(|| RoutingEntry { nodes: IndexMap::new(), }); @@ -175,14 +161,12 @@ impl NodeBasedRoutingTable { entry.nodes.insert(node_id, ingester_node); } - /// Seeds the table from a GetOrCreateOpenShards control plane response. - /// Used at cold start or when all shards for a source have closed and the - /// router asked the CP to create new ones. Replaces the entire entry for - /// this (index, source) — the CP response is authoritative. - /// - /// TODO: Capacity score defaults to 5 (mid-range). - /// Return capacity scores from the control plane in the GetOrCreateOpenShards response. - pub fn seed_from_shards( + /// Merges nodes from a GetOrCreateOpenShards control plane response into the + /// table. Only adds nodes that aren't already present — existing nodes keep + /// their real capacity scores from the broadcast. + /// TODO: New nodes get a default capacity_score of 5 until GetOrCreateOpenShards contains + /// capacity scores. + pub fn merge_from_shards( &mut self, index_uid: IndexUid, source_id: SourceId, @@ -199,8 +183,14 @@ impl NodeBasedRoutingTable { } } - let mut nodes = IndexMap::new(); + let entry = self.table.entry(key).or_insert_with(|| RoutingEntry { + nodes: IndexMap::new(), + }); + for (node_id, open_shard_count) in per_leader_count { + if entry.nodes.contains_key(&node_id) { + continue; + } let ingester_node = IngesterNode { node_id: node_id.clone(), index_uid: index_uid.clone(), @@ -208,10 +198,8 @@ impl NodeBasedRoutingTable { capacity_score: 5, open_shard_count, }; - nodes.insert(node_id, ingester_node); + entry.nodes.insert(node_id, ingester_node); } - - self.table.insert(key, RoutingEntry { nodes }); } } @@ -236,10 +224,6 @@ mod tests { let uid = source_uid("test-index", 0, "test-source"); let key = ("test-index".to_string(), "test-source".to_string()); - // Remove on empty table is a no-op. - table.apply_capacity_update("node-1".into(), uid.clone(), 0, 0); - assert!(table.table.is_empty()); - // Insert first node. table.apply_capacity_update("node-1".into(), uid.clone(), 8, 3); let entry = table.table.get(&key).unwrap(); @@ -256,16 +240,12 @@ mod tests { table.apply_capacity_update("node-2".into(), uid.clone(), 6, 2); assert_eq!(table.table.get(&key).unwrap().nodes.len(), 2); - // Remove first node (zero shards), second remains. + // Zero shards: node stays in table but becomes ineligible for routing. table.apply_capacity_update("node-1".into(), uid.clone(), 0, 0); let entry = table.table.get(&key).unwrap(); - assert_eq!(entry.nodes.len(), 1); - assert!(entry.nodes.get("node-1").is_none()); - assert!(entry.nodes.get("node-2").is_some()); - - // Remove last node → entire entry is cleaned up. - table.apply_capacity_update("node-2".into(), uid, 0, 0); - assert!(!table.table.contains_key(&key)); + assert_eq!(entry.nodes.len(), 2); + assert_eq!(entry.nodes.get("node-1").unwrap().open_shard_count, 0); + assert_eq!(entry.nodes.get("node-1").unwrap().capacity_score, 0); } #[test] @@ -319,14 +299,13 @@ mod tests { // Single node in pool → picks it. pool.insert("node-1".into(), IngesterServiceClient::mocked()); - assert_eq!( - table - .table - .get(&key) - .unwrap() - .pick_node(&pool, &HashSet::new()), - Some(&"node-1".into()), - ); + let picked = table + .table + .get(&key) + .unwrap() + .pick_node(&pool, &HashSet::new()) + .unwrap(); + assert_eq!(picked.node_id, NodeId::from("node-1")); // Multiple nodes → something is returned. table.apply_capacity_update("node-2".into(), uid.clone(), 2, 1); @@ -391,7 +370,7 @@ mod tests { } #[test] - fn test_seed_from_shards() { + fn test_merge_from_shards() { let mut table = NodeBasedRoutingTable::default(); let index_uid = IndexUid::for_test("test-index", 0); let key = ("test-index".to_string(), "test-source".to_string()); @@ -416,7 +395,7 @@ mod tests { make_shard(3, "node-2", true), make_shard(4, "node-2", false), ]; - table.seed_from_shards(index_uid.clone(), "test-source".into(), shards); + table.merge_from_shards(index_uid.clone(), "test-source".into(), shards); let entry = table.table.get(&key).unwrap(); assert_eq!(entry.nodes.len(), 2); @@ -428,13 +407,14 @@ mod tests { let n2 = entry.nodes.get("node-2").unwrap(); assert_eq!(n2.open_shard_count, 1); - // Seeding again replaces the entry entirely. + // Merging again adds new nodes but preserves existing ones. let shards = vec![make_shard(10, "node-3", true)]; - table.seed_from_shards(index_uid, "test-source".into(), shards); + table.merge_from_shards(index_uid, "test-source".into(), shards); let entry = table.table.get(&key).unwrap(); - assert_eq!(entry.nodes.len(), 1); + assert_eq!(entry.nodes.len(), 3); + assert!(entry.nodes.get("node-1").is_some()); + assert!(entry.nodes.get("node-2").is_some()); assert!(entry.nodes.get("node-3").is_some()); - assert!(entry.nodes.get("node-1").is_none()); } } From da49a83f58921c00414f63b79f1e8acbbae9d871 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Wed, 18 Feb 2026 10:30:26 -0500 Subject: [PATCH 3/9] Spawn task, and use the correct memory capacity (rather than allocated) --- .../broadcast/ingester_capacity_score.rs | 79 ++++++++++--------- .../src/ingest_v2/broadcast/mod.rs | 2 +- .../quickwit-ingest/src/ingest_v2/ingester.rs | 5 +- quickwit/quickwit-ingest/src/ingest_v2/mod.rs | 5 +- quickwit/quickwit-serve/src/lib.rs | 3 +- 5 files changed, 50 insertions(+), 44 deletions(-) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity_score.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity_score.rs index 6f8abc66ef8..392e9e11551 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity_score.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity_score.rs @@ -42,23 +42,22 @@ const WAL_CAPACITY_LOOKBACK_WINDOW_LEN: usize = 6; const WAL_CAPACITY_READINGS_LEN: usize = WAL_CAPACITY_LOOKBACK_WINDOW_LEN + 1; struct WalMemoryCapacityTimeSeries { + memory_capacity: ByteSize, readings: RingBuffer, } impl WalMemoryCapacityTimeSeries { - fn new() -> Self { + fn new(memory_capacity: ByteSize) -> Self { + assert!(memory_capacity.as_u64() > 0); Self { + memory_capacity, readings: RingBuffer::default(), } } - fn record(&mut self, memory_used: ByteSize, memory_allocated: ByteSize) { - let allocated = memory_allocated.as_u64(); - if allocated == 0 { - self.readings.push_back(1.0); - return; - } - let remaining = 1.0 - (memory_used.as_u64() as f64 / allocated as f64); + fn record(&mut self, memory_used: ByteSize) { + let remaining = + 1.0 - (memory_used.as_u64() as f64 / self.memory_capacity.as_u64() as f64); self.readings.push_back(remaining.clamp(0.0, 1.0)); } @@ -121,23 +120,27 @@ pub struct IngesterCapacityScore { /// Periodically snapshots the ingester's WAL memory usage and open shard counts, computes /// a capacity score, and broadcasts it to other nodes via Chitchat. -pub(crate) struct BroadcastIngesterCapacityScoreTask { +pub struct BroadcastIngesterCapacityScoreTask { cluster: Cluster, weak_state: WeakIngesterState, wal_capacity_time_series: WalMemoryCapacityTimeSeries, } impl BroadcastIngesterCapacityScoreTask { - pub fn spawn(cluster: Cluster, weak_state: WeakIngesterState) -> JoinHandle<()> { + pub fn spawn( + cluster: Cluster, + weak_state: WeakIngesterState, + memory_capacity: ByteSize, + ) -> JoinHandle<()> { let mut broadcaster = Self { cluster, weak_state, - wal_capacity_time_series: WalMemoryCapacityTimeSeries::new(), + wal_capacity_time_series: WalMemoryCapacityTimeSeries::new(memory_capacity), }; tokio::spawn(async move { broadcaster.run().await }) } - async fn snapshot(&self) -> Result> { + async fn snapshot(&self) -> Result> { let state = self .weak_state .upgrade() @@ -155,10 +158,9 @@ impl BroadcastIngesterCapacityScoreTask { .map_err(|_| anyhow::anyhow!("failed to acquire ingester state lock"))?; let usage = guard.mrecordlog.resource_usage(); let memory_used = ByteSize::b(usage.memory_used_bytes as u64); - let memory_allocated = ByteSize::b(usage.memory_allocated_bytes as u64); let open_shard_counts = guard.get_open_shard_counts(); - Ok(Some((memory_used, memory_allocated, open_shard_counts))) + Ok(Some((memory_used, open_shard_counts))) } async fn run(&mut self) { @@ -168,7 +170,7 @@ impl BroadcastIngesterCapacityScoreTask { loop { interval.tick().await; - let (memory_used, memory_allocated, open_shard_counts) = match self.snapshot().await { + let (memory_used, open_shard_counts) = match self.snapshot().await { Ok(Some(snapshot)) => snapshot, Ok(None) => continue, Err(error) => { @@ -177,8 +179,7 @@ impl BroadcastIngesterCapacityScoreTask { } }; - self.wal_capacity_time_series - .record(memory_used, memory_allocated); + self.wal_capacity_time_series.record(memory_used); let remaining_capacity = self.wal_capacity_time_series.current().unwrap_or(1.0); let capacity_delta = self.wal_capacity_time_series.delta().unwrap_or(0.0); @@ -272,30 +273,31 @@ mod tests { use crate::ingest_v2::state::IngesterState; fn ts() -> WalMemoryCapacityTimeSeries { - WalMemoryCapacityTimeSeries::new() + WalMemoryCapacityTimeSeries::new(ByteSize::b(100)) } - /// Helper: record a reading with `used` out of `allocated` bytes. - fn record(series: &mut WalMemoryCapacityTimeSeries, used: u64, allocated: u64) { - series.record(ByteSize::b(used), ByteSize::b(allocated)); + /// Helper: record a reading with `used` bytes against the series' fixed capacity. + fn record(series: &mut WalMemoryCapacityTimeSeries, used: u64) { + series.record(ByteSize::b(used)); } #[test] fn test_wal_memory_capacity_current_after_record() { - let mut series = ts(); + let mut series = WalMemoryCapacityTimeSeries::new(ByteSize::b(256)); // 192 of 256 used => 25% remaining - record(&mut series, 192, 256); + series.record(ByteSize::b(192)); assert_eq!(series.current(), Some(0.25)); // 16 of 256 used => 93.75% remaining - record(&mut series, 16, 256); + series.record(ByteSize::b(16)); assert_eq!(series.current(), Some(0.9375)); } #[test] fn test_wal_memory_capacity_record_saturates_at_zero() { let mut series = ts(); - record(&mut series, 200, 100); + // 200 used out of 100 capacity => clamped to 0.0 + record(&mut series, 200); assert_eq!(series.current(), Some(0.0)); } @@ -303,9 +305,9 @@ mod tests { fn test_wal_memory_capacity_delta_growing() { let mut series = ts(); // oldest: 60 of 100 used => 40% remaining - record(&mut series, 60, 100); + record(&mut series, 60); // current: 20 of 100 used => 80% remaining - record(&mut series, 20, 100); + record(&mut series, 20); // delta = 0.80 - 0.40 = 0.40 assert_eq!(series.delta(), Some(0.40)); } @@ -314,9 +316,9 @@ mod tests { fn test_wal_memory_capacity_delta_shrinking() { let mut series = ts(); // oldest: 20 of 100 used => 80% remaining - record(&mut series, 20, 100); + record(&mut series, 20); // current: 60 of 100 used => 40% remaining - record(&mut series, 60, 100); + record(&mut series, 60); // delta = 0.40 - 0.80 = -0.40 assert_eq!(series.delta(), Some(-0.40)); } @@ -326,7 +328,7 @@ mod tests { // Node A: capacity draining — usage increases 10, 20, ..., 70 over 7 ticks. let mut node_a = ts(); for used in (10..=70).step_by(10) { - record(&mut node_a, used, 100); + record(&mut node_a, used); } let a_remaining = node_a.current().unwrap(); let a_delta = node_a.delta().unwrap(); @@ -335,7 +337,7 @@ mod tests { // Node B: steady at 50% usage over 7 ticks. let mut node_b = ts(); for _ in 0..7 { - record(&mut node_b, 50, 100); + record(&mut node_b, 50); } let b_remaining = node_b.current().unwrap(); let b_delta = node_b.delta().unwrap(); @@ -361,7 +363,7 @@ mod tests { let task = BroadcastIngesterCapacityScoreTask { cluster, weak_state, - wal_capacity_time_series: WalMemoryCapacityTimeSeries::new(), + wal_capacity_time_series: WalMemoryCapacityTimeSeries::new(ByteSize::mb(1)), }; assert!(task.snapshot().await.is_err()); } @@ -388,14 +390,13 @@ mod tests { let open_shard_counts = state_guard.get_open_shard_counts(); drop(state_guard); - // Simulate 500 of 1000 bytes used => 50% remaining, 0 delta => score = 6 + // Simulate 500 of 1000 bytes capacity used => 50% remaining, 0 delta => score = 6 let mut task = BroadcastIngesterCapacityScoreTask { cluster: cluster.clone(), weak_state: state.weak(), - wal_capacity_time_series: WalMemoryCapacityTimeSeries::new(), + wal_capacity_time_series: WalMemoryCapacityTimeSeries::new(ByteSize::b(1000)), }; - task.wal_capacity_time_series - .record(ByteSize::b(500), ByteSize::b(1000)); + task.wal_capacity_time_series.record(ByteSize::b(500)); let remaining = task.wal_capacity_time_series.current().unwrap(); let delta = task.wal_capacity_time_series.delta().unwrap(); @@ -440,16 +441,16 @@ mod tests { // Fill to exactly the lookback window length (6 readings), all same value. for _ in 0..WAL_CAPACITY_LOOKBACK_WINDOW_LEN { - record(&mut series, 50, 100); + record(&mut series, 50); } assert_eq!(series.delta(), Some(0.0)); // 7th reading fills the ring buffer. Delta spans 6 intervals. - record(&mut series, 0, 100); + record(&mut series, 0); assert_eq!(series.delta(), Some(0.50)); // 8th reading evicts the oldest 50-remaining. Delta still spans 6 intervals. - record(&mut series, 0, 100); + record(&mut series, 0); assert_eq!(series.delta(), Some(0.50)); } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs index 577591f7d49..8c6e8792c8a 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs @@ -26,7 +26,7 @@ pub(in crate::ingest_v2) const BROADCAST_INTERVAL_PERIOD: Duration = if cfg!(tes Duration::from_secs(5) }; -pub(crate) use ingester_capacity_score::IngesterCapacityScoreUpdate; +pub use ingester_capacity_score::{IngesterCapacityScoreUpdate, setup_ingester_capacity_update_listener, BroadcastIngesterCapacityScoreTask}; pub use local_shards::{ BroadcastLocalShardsTask, LocalShardsUpdate, ShardInfo, ShardInfos, setup_local_shards_update_listener, diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 65c268881ac..fc1a44f19bb 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -59,7 +59,7 @@ use tokio::time::{sleep, timeout}; use tracing::{debug, error, info, warn}; use super::IngesterPool; -use super::broadcast::BroadcastLocalShardsTask; +use super::broadcast::{BroadcastIngesterCapacityScoreTask, BroadcastLocalShardsTask}; use super::doc_mapper::validate_doc_batch; use super::fetch::FetchStreamTask; use super::idle::CloseIdleShardsTask; @@ -144,7 +144,8 @@ impl Ingester { let state = IngesterState::load(wal_dir_path, rate_limiter_settings); let weak_state = state.weak(); - BroadcastLocalShardsTask::spawn(cluster, weak_state.clone()); + BroadcastLocalShardsTask::spawn(cluster.clone(), weak_state.clone()); + BroadcastIngesterCapacityScoreTask::spawn(cluster, weak_state.clone(), memory_capacity); CloseIdleShardsTask::spawn(weak_state, idle_shard_timeout); let ingester = Self { diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs index 4cc598382dd..3a801763feb 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs @@ -38,7 +38,10 @@ use std::ops::{Add, AddAssign}; use std::time::Duration; use std::{env, fmt}; -pub use broadcast::{LocalShardsUpdate, ShardInfo, ShardInfos, setup_local_shards_update_listener}; +pub use broadcast::{ + LocalShardsUpdate, ShardInfo, ShardInfos, setup_ingester_capacity_update_listener, + setup_local_shards_update_listener, +}; use bytes::buf::Writer; use bytes::{BufMut, BytesMut}; use bytesize::ByteSize; diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index ca4520ff0ce..f56e5549c3a 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -82,7 +82,7 @@ use quickwit_indexing::models::ShardPositionsService; use quickwit_indexing::start_indexing_service; use quickwit_ingest::{ GetMemoryCapacity, IngestRequest, IngestRouter, IngestServiceClient, Ingester, IngesterPool, - LocalShardsUpdate, get_idle_shard_timeout, setup_local_shards_update_listener, + LocalShardsUpdate, get_idle_shard_timeout, setup_local_shards_update_listener, setup_ingester_capacity_update_listener, start_ingest_api_service, wait_for_ingester_decommission, wait_for_ingester_status, }; use quickwit_jaeger::JaegerService; @@ -906,6 +906,7 @@ async fn setup_ingest_v2( event_broker.clone(), ); ingest_router.subscribe(); + setup_ingester_capacity_update_listener(cluster.clone(), event_broker.clone()).await.forever(); let ingest_router_service = IngestRouterServiceClient::tower() .stack_layer(INGEST_GRPC_SERVER_METRICS_LAYER.clone()) From ef5f48f2802ace2c74b65e3861600e85227be325 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Wed, 18 Feb 2026 10:35:28 -0500 Subject: [PATCH 4/9] lint --- .../src/ingest_v2/broadcast/ingester_capacity_score.rs | 3 +-- quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs | 5 ++++- quickwit/quickwit-serve/src/lib.rs | 9 ++++++--- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity_score.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity_score.rs index 392e9e11551..c701c91dc55 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity_score.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity_score.rs @@ -56,8 +56,7 @@ impl WalMemoryCapacityTimeSeries { } fn record(&mut self, memory_used: ByteSize) { - let remaining = - 1.0 - (memory_used.as_u64() as f64 / self.memory_capacity.as_u64() as f64); + let remaining = 1.0 - (memory_used.as_u64() as f64 / self.memory_capacity.as_u64() as f64); self.readings.push_back(remaining.clamp(0.0, 1.0)); } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs index 8c6e8792c8a..18a00209de1 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs @@ -26,7 +26,10 @@ pub(in crate::ingest_v2) const BROADCAST_INTERVAL_PERIOD: Duration = if cfg!(tes Duration::from_secs(5) }; -pub use ingester_capacity_score::{IngesterCapacityScoreUpdate, setup_ingester_capacity_update_listener, BroadcastIngesterCapacityScoreTask}; +pub use ingester_capacity_score::{ + BroadcastIngesterCapacityScoreTask, IngesterCapacityScoreUpdate, + setup_ingester_capacity_update_listener, +}; pub use local_shards::{ BroadcastLocalShardsTask, LocalShardsUpdate, ShardInfo, ShardInfos, setup_local_shards_update_listener, diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index f56e5549c3a..60515bc819f 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -82,8 +82,9 @@ use quickwit_indexing::models::ShardPositionsService; use quickwit_indexing::start_indexing_service; use quickwit_ingest::{ GetMemoryCapacity, IngestRequest, IngestRouter, IngestServiceClient, Ingester, IngesterPool, - LocalShardsUpdate, get_idle_shard_timeout, setup_local_shards_update_listener, setup_ingester_capacity_update_listener, - start_ingest_api_service, wait_for_ingester_decommission, wait_for_ingester_status, + LocalShardsUpdate, get_idle_shard_timeout, setup_ingester_capacity_update_listener, + setup_local_shards_update_listener, start_ingest_api_service, wait_for_ingester_decommission, + wait_for_ingester_status, }; use quickwit_jaeger::JaegerService; use quickwit_janitor::{JanitorService, start_janitor_service}; @@ -906,7 +907,9 @@ async fn setup_ingest_v2( event_broker.clone(), ); ingest_router.subscribe(); - setup_ingester_capacity_update_listener(cluster.clone(), event_broker.clone()).await.forever(); + setup_ingester_capacity_update_listener(cluster.clone(), event_broker.clone()) + .await + .forever(); let ingest_router_service = IngestRouterServiceClient::tower() .stack_layer(INGEST_GRPC_SERVER_METRICS_LAYER.clone()) From 2a1e3f767c42062e66640236c8be35dea391d423 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Wed, 18 Feb 2026 10:37:17 -0500 Subject: [PATCH 5/9] Remove subscription --- quickwit/quickwit-serve/src/lib.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 60515bc819f..9c75173b493 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -907,9 +907,6 @@ async fn setup_ingest_v2( event_broker.clone(), ); ingest_router.subscribe(); - setup_ingester_capacity_update_listener(cluster.clone(), event_broker.clone()) - .await - .forever(); let ingest_router_service = IngestRouterServiceClient::tower() .stack_layer(INGEST_GRPC_SERVER_METRICS_LAYER.clone()) From f14a894f8b5c88d82f2a281a0318fdcf05fd8d8d Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Wed, 18 Feb 2026 10:58:59 -0500 Subject: [PATCH 6/9] lint --- quickwit/quickwit-serve/src/lib.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 9c75173b493..ca4520ff0ce 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -82,9 +82,8 @@ use quickwit_indexing::models::ShardPositionsService; use quickwit_indexing::start_indexing_service; use quickwit_ingest::{ GetMemoryCapacity, IngestRequest, IngestRouter, IngestServiceClient, Ingester, IngesterPool, - LocalShardsUpdate, get_idle_shard_timeout, setup_ingester_capacity_update_listener, - setup_local_shards_update_listener, start_ingest_api_service, wait_for_ingester_decommission, - wait_for_ingester_status, + LocalShardsUpdate, get_idle_shard_timeout, setup_local_shards_update_listener, + start_ingest_api_service, wait_for_ingester_decommission, wait_for_ingester_status, }; use quickwit_jaeger::JaegerService; use quickwit_janitor::{JanitorService, start_janitor_service}; From f3e82275207a209ef16fba61c5895065a56fa0d6 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Wed, 18 Feb 2026 11:39:51 -0500 Subject: [PATCH 7/9] make assert on non-tests only --- .../src/ingest_v2/broadcast/ingester_capacity_score.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity_score.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity_score.rs index c701c91dc55..1927eb788f7 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity_score.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity_score.rs @@ -48,6 +48,7 @@ struct WalMemoryCapacityTimeSeries { impl WalMemoryCapacityTimeSeries { fn new(memory_capacity: ByteSize) -> Self { + #[cfg(not(test))] assert!(memory_capacity.as_u64() > 0); Self { memory_capacity, From 3da8aa69af0f01d0b1336d918a9561e7f2e8961a Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Thu, 19 Feb 2026 10:56:55 -0500 Subject: [PATCH 8/9] Address PR comments --- quickwit/Cargo.lock | 1 - quickwit/quickwit-ingest/Cargo.toml | 1 - .../src/ingest_v2/node_routing_table.rs | 25 ++++++++----------- 3 files changed, 11 insertions(+), 16 deletions(-) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 79c77536e16..4a2e43ee09a 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -7194,7 +7194,6 @@ dependencies = [ "fail", "futures", "http 1.4.0", - "indexmap 2.13.0", "itertools 0.14.0", "mockall", "mrecordlog", diff --git a/quickwit/quickwit-ingest/Cargo.toml b/quickwit/quickwit-ingest/Cargo.toml index f47561fe2c9..3149f2aaaf3 100644 --- a/quickwit/quickwit-ingest/Cargo.toml +++ b/quickwit/quickwit-ingest/Cargo.toml @@ -18,7 +18,6 @@ bytesize = { workspace = true } fail = { workspace = true, optional = true } futures = { workspace = true } http = { workspace = true } -indexmap = { workspace = true } itertools = { workspace = true } mockall = { workspace = true, optional = true } mrecordlog = { workspace = true } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/node_routing_table.rs b/quickwit/quickwit-ingest/src/ingest_v2/node_routing_table.rs index 95710669cef..2e49e26b783 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/node_routing_table.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/node_routing_table.rs @@ -14,9 +14,10 @@ use std::collections::{HashMap, HashSet}; -use indexmap::IndexMap; use quickwit_proto::ingest::Shard; use quickwit_proto::types::{IndexId, IndexUid, NodeId, SourceId, SourceUid}; +use rand::rng; +use rand::seq::IndexedRandom; use crate::IngesterPool; @@ -37,19 +38,15 @@ pub(super) struct IngesterNode { #[derive(Debug)] pub(super) struct RoutingEntry { - pub nodes: IndexMap, + nodes: HashMap, } /// Given a slice of candidates, picks the better of two random choices. /// Higher capacity_score wins; tiebreak on more open_shard_count (more landing spots). fn power_of_two_choices<'a>(candidates: &[&'a IngesterNode]) -> &'a IngesterNode { - let len = candidates.len(); - debug_assert!(len >= 2); - - let idx_1 = rand::random_range(0..len); - // Random offset in [1, len) wraps around, guaranteeing idx_2 != idx_1. - let idx_2 = (idx_1 + rand::random_range(1..len)) % len; - let (a, b) = (candidates[idx_1], candidates[idx_2]); + debug_assert!(candidates.len() >= 2); + let mut iter = candidates.choose_multiple(&mut rng(), 2); + let (&a, &b) = (iter.next().unwrap(), iter.next().unwrap()); if (a.capacity_score, a.open_shard_count) >= (b.capacity_score, b.open_shard_count) { a @@ -148,7 +145,7 @@ impl NodeBasedRoutingTable { ); let entry = self.table.entry(key).or_insert_with(|| RoutingEntry { - nodes: IndexMap::new(), + nodes: HashMap::new(), }); let ingester_node = IngesterNode { @@ -184,7 +181,7 @@ impl NodeBasedRoutingTable { } let entry = self.table.entry(key).or_insert_with(|| RoutingEntry { - nodes: IndexMap::new(), + nodes: HashMap::new(), }); for (node_id, open_shard_count) in per_leader_count { @@ -413,8 +410,8 @@ mod tests { let entry = table.table.get(&key).unwrap(); assert_eq!(entry.nodes.len(), 3); - assert!(entry.nodes.get("node-1").is_some()); - assert!(entry.nodes.get("node-2").is_some()); - assert!(entry.nodes.get("node-3").is_some()); + assert!(entry.nodes.contains_key("node-1")); + assert!(entry.nodes.contains_key("node-2")); + assert!(entry.nodes.contains_key("node-3")); } } From 099081f9a928e2c3468da4f59d824ed23ecc0fa7 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Thu, 19 Feb 2026 11:49:03 -0500 Subject: [PATCH 9/9] Invert lock order on init to match lock_fully --- quickwit/quickwit-ingest/src/ingest_v2/state.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/state.rs b/quickwit/quickwit-ingest/src/ingest_v2/state.rs index bf1c648c6cb..a14f4ae9a44 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/state.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/state.rs @@ -154,8 +154,10 @@ impl IngesterState { /// queues. Empty queues are deleted, while non-empty queues are recovered. However, the /// corresponding shards are closed and become read-only. pub async fn init(&self, wal_dir_path: &Path, rate_limiter_settings: RateLimiterSettings) { - let mut inner_guard = self.inner.lock().await; + // Acquire locks in the same order as `lock_fully` (mrecordlog first, then inner) to + // prevent ABBA deadlocks with the broadcast capacity task. let mut mrecordlog_guard = self.mrecordlog.write().await; + let mut inner_guard = self.inner.lock().await; let now = Instant::now();