diff --git a/quickwit/quickwit-ingest/src/ingest_v2/debouncing.rs b/quickwit/quickwit-ingest/src/ingest_v2/debouncing.rs index 041f2928c45..19d6f5d691d 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/debouncing.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/debouncing.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use quickwit_proto::control_plane::{ GetOrCreateOpenShardsRequest, GetOrCreateOpenShardsSubrequest, }; -use quickwit_proto::ingest::ShardIds; use quickwit_proto::types::{IndexId, SourceId}; use tokio::sync::{OwnedRwLockWriteGuard, RwLock}; @@ -69,7 +68,6 @@ impl GetOrCreateOpenShardsRequestDebouncer { #[derive(Default)] pub(super) struct DebouncedGetOrCreateOpenShardsRequest { subrequests: Vec, - pub closed_shards: Vec, pub unavailable_leaders: Vec, rendezvous: Rendezvous, } @@ -85,8 +83,8 @@ impl DebouncedGetOrCreateOpenShardsRequest { } let request = GetOrCreateOpenShardsRequest { subrequests: self.subrequests, - closed_shards: self.closed_shards, unavailable_leaders: self.unavailable_leaders, + ..Default::default() }; (Some(request), self.rendezvous) } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index fc1a44f19bb..a98ae0636d5 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -37,16 +37,7 @@ use quickwit_proto::control_plane::{ AdviseResetShardsRequest, ControlPlaneService, ControlPlaneServiceClient, }; use quickwit_proto::indexing::ShardPositionsUpdate; -use quickwit_proto::ingest::ingester::{ - AckReplicationMessage, CloseShardsRequest, CloseShardsResponse, DecommissionRequest, - DecommissionResponse, FetchMessage, IngesterService, IngesterServiceClient, - IngesterServiceStream, IngesterStatus, InitShardFailure, InitShardSuccess, InitShardsRequest, - InitShardsResponse, ObservationMessage, OpenFetchStreamRequest, OpenObservationStreamRequest, - OpenReplicationStreamRequest, OpenReplicationStreamResponse, PersistFailure, - PersistFailureReason, PersistRequest, PersistResponse, PersistSuccess, ReplicateFailureReason, - ReplicateSubrequest, RetainShardsForSource, RetainShardsRequest, RetainShardsResponse, - SynReplicationMessage, TruncateShardsRequest, TruncateShardsResponse, -}; +use quickwit_proto::ingest::ingester::*; use quickwit_proto::ingest::{ CommitTypeV2, DocBatchV2, IngestV2Error, IngestV2Result, ParseFailure, Shard, ShardIds, }; @@ -469,7 +460,7 @@ impl Ingester { index_uid: subrequest.index_uid, source_id: subrequest.source_id, shard_id: subrequest.shard_id, - reason: PersistFailureReason::ShardClosed as i32, + reason: PersistFailureReason::NodeUnavailable as i32, }; persist_failures.push(persist_failure); } @@ -499,7 +490,7 @@ impl Ingester { index_uid: subrequest.index_uid, source_id: subrequest.source_id, shard_id: subrequest.shard_id, - reason: PersistFailureReason::ShardNotFound as i32, + reason: PersistFailureReason::NoShardsAvailable as i32, }; persist_failures.push(persist_failure); continue; @@ -558,7 +549,7 @@ impl Ingester { index_uid: subrequest.index_uid, source_id: subrequest.source_id, shard_id: Some(shard_id), - reason: PersistFailureReason::ShardRateLimited as i32, + reason: PersistFailureReason::NoShardsAvailable as i32, }; persist_failures.push(persist_failure); continue; @@ -673,7 +664,7 @@ impl Ingester { // TODO: Handle replication error: // 1. Close and evict all the shards hosted by the follower. // 2. Close and evict the replication client. - // 3. Return `PersistFailureReason::ShardClosed` to router. + // 3. Return `PersistFailureReason::NodeUnavailable` to router. continue; } }; @@ -689,14 +680,8 @@ impl Ingester { for replicate_failure in replicate_response.failures { // TODO: If the replica shard is closed, close the primary shard if it is not // already. - let persist_failure_reason = match replicate_failure.reason() { - ReplicateFailureReason::Unspecified => PersistFailureReason::Unspecified, - ReplicateFailureReason::ShardNotFound => { - PersistFailureReason::ShardNotFound - } - ReplicateFailureReason::ShardClosed => PersistFailureReason::ShardClosed, - ReplicateFailureReason::WalFull => PersistFailureReason::WalFull, - }; + let persist_failure_reason: PersistFailureReason = + replicate_failure.reason().into(); let persist_failure = PersistFailure { subrequest_id: replicate_failure.subrequest_id, index_uid: replicate_failure.index_uid, @@ -736,7 +721,7 @@ impl Ingester { "failed to persist records to shard `{queue_id}`: {io_error}" ); shards_to_close.insert(queue_id); - PersistFailureReason::ShardClosed + PersistFailureReason::NodeUnavailable } AppendDocBatchError::QueueNotFound(_) => { error!( @@ -744,7 +729,7 @@ impl Ingester { not found" ); shards_to_delete.insert(queue_id); - PersistFailureReason::ShardNotFound + PersistFailureReason::NodeUnavailable } }; let persist_failure = PersistFailure { @@ -2159,7 +2144,7 @@ mod tests { let persist_failure = &persist_response.failures[0]; assert_eq!( persist_failure.reason(), - PersistFailureReason::ShardRateLimited + PersistFailureReason::NoShardsAvailable ); } @@ -2222,7 +2207,10 @@ mod tests { assert_eq!(persist_failure.index_uid(), &index_uid); assert_eq!(persist_failure.source_id, "test-source"); assert_eq!(persist_failure.shard_id(), ShardId::from(1)); - assert_eq!(persist_failure.reason(), PersistFailureReason::ShardClosed,); + assert_eq!( + persist_failure.reason(), + PersistFailureReason::NodeUnavailable, + ); let state_guard = ingester.state.lock_fully().await.unwrap(); let shard = state_guard.shards.get(&queue_id).unwrap(); @@ -2274,7 +2262,7 @@ mod tests { assert_eq!(persist_failure.shard_id(), ShardId::from(1)); assert_eq!( persist_failure.reason(), - PersistFailureReason::ShardNotFound + PersistFailureReason::NodeUnavailable ); let state_guard = ingester.state.lock_fully().await.unwrap(); @@ -2704,7 +2692,7 @@ mod tests { assert_eq!(persist_failure.shard_id(), ShardId::from(1)); assert_eq!( persist_failure.reason(), - PersistFailureReason::ShardNotFound + PersistFailureReason::NoShardsAvailable ); let state_guard = ingester.state.lock_fully().await.unwrap(); @@ -2783,7 +2771,7 @@ mod tests { assert_eq!(persist_failure.shard_id(), ShardId::from(1)); assert_eq!( persist_failure.reason(), - PersistFailureReason::ShardRateLimited + PersistFailureReason::NoShardsAvailable ); let state_guard = ingester.state.lock_fully().await.unwrap(); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs index 3a801763feb..0bb3d6b6138 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs @@ -28,6 +28,7 @@ mod publish_tracker; mod rate_meter; mod replication; mod router; +#[allow(dead_code)] mod routing_table; mod state; mod workbench; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/node_routing_table.rs b/quickwit/quickwit-ingest/src/ingest_v2/node_routing_table.rs index 2e49e26b783..f354011ede2 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/node_routing_table.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/node_routing_table.rs @@ -38,7 +38,7 @@ pub(super) struct IngesterNode { #[derive(Debug)] pub(super) struct RoutingEntry { - nodes: HashMap, + pub nodes: HashMap, } /// Given a slice of candidates, picks the better of two random choices. diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index ccd00f0209c..da3d989d93e 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -27,33 +27,29 @@ use quickwit_proto::control_plane::{ ControlPlaneService, ControlPlaneServiceClient, GetOrCreateOpenShardsRequest, GetOrCreateOpenShardsSubrequest, }; -use quickwit_proto::indexing::ShardPositionsUpdate; use quickwit_proto::ingest::ingester::{ IngesterService, PersistFailureReason, PersistRequest, PersistResponse, PersistSubrequest, }; use quickwit_proto::ingest::router::{ IngestFailureReason, IngestRequestV2, IngestResponseV2, IngestRouterService, }; -use quickwit_proto::ingest::{ - CommitTypeV2, IngestV2Error, IngestV2Result, RateLimitingCause, ShardState, -}; -use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceId, SubrequestId}; +use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result, RateLimitingCause}; +use quickwit_proto::types::{NodeId, SubrequestId}; use serde_json::{Value as JsonValue, json}; use tokio::sync::{Mutex, Semaphore}; use tokio::time::error::Elapsed; use tracing::{error, info}; -use super::broadcast::{IngesterCapacityScoreUpdate, LocalShardsUpdate}; +use super::broadcast::IngesterCapacityScoreUpdate; use super::debouncing::{ DebouncedGetOrCreateOpenShardsRequest, GetOrCreateOpenShardsRequestDebouncer, }; use super::ingester::PERSIST_REQUEST_TIMEOUT; use super::metrics::IngestResultMetrics; use super::node_routing_table::NodeBasedRoutingTable; -use super::routing_table::{NextOpenShardError, RoutingTable}; use super::workbench::IngestWorkbench; use super::{IngesterPool, pending_subrequests}; -use crate::{LeaderId, get_ingest_router_buffer_size}; +use crate::get_ingest_router_buffer_size; /// Duration after which ingest requests time out with [`IngestV2Error::Timeout`]. fn ingest_request_timeout() -> Duration { @@ -102,12 +98,7 @@ pub struct IngestRouter { } struct RouterState { - // Debounces `GetOrCreateOpenShardsRequest` requests to the control plane. debouncer: GetOrCreateOpenShardsRequestDebouncer, - // Holds the routing table mapping index and source IDs to shards. - routing_table: RoutingTable, - // Node-based routing table, populated by capacity broadcasts. - // Not yet used for routing — will replace `routing_table` in a follow-up PR. node_routing_table: NodeBasedRoutingTable, } @@ -130,10 +121,6 @@ impl IngestRouter { ) -> Self { let state = Arc::new(Mutex::new(RouterState { debouncer: GetOrCreateOpenShardsRequestDebouncer::default(), - routing_table: RoutingTable { - self_node_id: self_node_id.clone(), - table: HashMap::default(), - }, node_routing_table: NodeBasedRoutingTable::default(), })); let ingest_semaphore_permits = get_ingest_router_buffer_size().as_u64() as usize; @@ -152,12 +139,6 @@ impl IngestRouter { pub fn subscribe(&self) { let weak_router_state = WeakRouterState(Arc::downgrade(&self.state)); - self.event_broker - .subscribe::(weak_router_state.clone()) - .forever(); - self.event_broker - .subscribe::(weak_router_state.clone()) - .forever(); self.event_broker .subscribe::(weak_router_state) .forever(); @@ -171,22 +152,19 @@ impl IngestRouter { ingester_pool: &IngesterPool, ) -> DebouncedGetOrCreateOpenShardsRequest { let mut debounced_request = DebouncedGetOrCreateOpenShardsRequest::default(); - - // `closed_shards` and `unavailable_leaders` are populated by calls to `has_open_shards` - // as we're looking for open shards to route the subrequests to. - let unavailable_leaders: &mut HashSet = &mut workbench.unavailable_leaders; + let unavailable_leaders = &workbench.unavailable_leaders; let mut state_guard = self.state.lock().await; for subrequest in pending_subrequests(&workbench.subworkbenches) { - if !state_guard.routing_table.has_open_shards( + if !state_guard.node_routing_table.has_open_nodes( &subrequest.index_id, &subrequest.source_id, ingester_pool, - &mut debounced_request.closed_shards, unavailable_leaders, ) { - // No shard available! Let's attempt to create one. + // No known nodes with open shards for this source. Ask the control + // plane to create shards so we have somewhere to route to. let acquire_result = state_guard .debouncer .acquire(&subrequest.index_id, &subrequest.source_id); @@ -208,9 +186,6 @@ impl IngestRouter { } drop(state_guard); - if !debounced_request.is_empty() && !debounced_request.closed_shards.is_empty() { - info!(closed_shards=?debounced_request.closed_shards, "reporting closed shard(s) to control plane"); - } if !debounced_request.is_empty() && !unavailable_leaders.is_empty() { info!(unavailable_leaders=?unavailable_leaders, "reporting unavailable leader(s) to control plane"); @@ -267,7 +242,7 @@ impl IngestRouter { let mut state_guard = self.state.lock().await; for success in response.successes { - state_guard.routing_table.replace_shards( + state_guard.node_routing_table.merge_from_shards( success.index_uid().clone(), success.source_id, success.open_shards, @@ -285,8 +260,7 @@ impl IngestRouter { workbench: &mut IngestWorkbench, mut persist_futures: FuturesUnordered>, ) { - let mut closed_shards: HashMap<(IndexUid, SourceId), Vec> = HashMap::new(); - let mut deleted_shards: HashMap<(IndexUid, SourceId), Vec> = HashMap::new(); + let mut unavailable_leaders: HashSet = HashSet::new(); while let Some((persist_summary, persist_result)) = persist_futures.next().await { match persist_result { @@ -298,33 +272,12 @@ impl IngestRouter { workbench.record_persist_failure(&persist_failure); match persist_failure.reason() { - PersistFailureReason::ShardClosed => { - let shard_id = persist_failure.shard_id().clone(); - let index_uid: IndexUid = persist_failure.index_uid().clone(); - let source_id: SourceId = persist_failure.source_id; - closed_shards - .entry((index_uid, source_id)) - .or_default() - .push(shard_id); - } - PersistFailureReason::ShardNotFound => { - let shard_id = persist_failure.shard_id().clone(); - let index_uid: IndexUid = persist_failure.index_uid().clone(); - let source_id: SourceId = persist_failure.source_id; - deleted_shards - .entry((index_uid, source_id)) - .or_default() - .push(shard_id); - } - PersistFailureReason::WalFull - | PersistFailureReason::ShardRateLimited => { - // Let's record that the shard is rate limited or that the ingester - // that hosts has its wal full. - // - // That way we will avoid to retry the persist request on the very - // same node. - let shard_id = persist_failure.shard_id().clone(); - workbench.rate_limited_shards.insert(shard_id); + PersistFailureReason::NoShardsAvailable => {} + PersistFailureReason::NodeUnavailable + | PersistFailureReason::WalFull + | PersistFailureReason::Timeout => { + unavailable_leaders + .insert(NodeId::from(persist_response.leader_id.clone())); } _ => {} } @@ -348,20 +301,7 @@ impl IngestRouter { } }; } - if !closed_shards.is_empty() || !deleted_shards.is_empty() { - let mut state_guard = self.state.lock().await; - - for ((index_uid, source_id), shard_ids) in closed_shards { - state_guard - .routing_table - .close_shards(&index_uid, source_id, &shard_ids); - } - for ((index_uid, source_id), shard_ids) in deleted_shards { - state_guard - .routing_table - .delete_shards(&index_uid, source_id, &shard_ids); - } - } + workbench.unavailable_leaders.extend(unavailable_leaders); } async fn batch_persist(&self, workbench: &mut IngestWorkbench, commit_type: CommitTypeV2) { @@ -373,47 +313,35 @@ impl IngestRouter { self.populate_routing_table_debounced(workbench, debounced_request) .await; - // Subrequests for which no shards are available to route the subrequests to. + let unavailable_leaders = &workbench.unavailable_leaders; let mut no_shards_available_subrequest_ids: Vec = Vec::new(); - // Subrequests for which the shards are rate limited. - let mut rate_limited_subrequest_ids: Vec = Vec::new(); - - let mut per_leader_persist_subrequests: HashMap<&LeaderId, Vec> = + let mut per_leader_persist_subrequests: HashMap<&NodeId, Vec> = HashMap::new(); - let rate_limited_shards: &HashSet = &workbench.rate_limited_shards; let state_guard = self.state.lock().await; for subrequest in pending_subrequests(&workbench.subworkbenches) { - let next_open_shard_res_opt = state_guard - .routing_table + let ingester_node = state_guard + .node_routing_table .find_entry(&subrequest.index_id, &subrequest.source_id) - .map(|entry| { - entry.next_open_shard_round_robin(&self.ingester_pool, rate_limited_shards) - }); - let next_open_shard = match next_open_shard_res_opt { - Some(Ok(next_open_shard)) => next_open_shard, - Some(Err(NextOpenShardError::RateLimited)) => { - rate_limited_subrequest_ids.push(subrequest.subrequest_id); - continue; - } - Some(Err(NextOpenShardError::NoShardsAvailable)) | None => { + .and_then(|entry| entry.pick_node(&self.ingester_pool, unavailable_leaders)); + + let ingester_node = match ingester_node { + Some(node) => node, + None => { no_shards_available_subrequest_ids.push(subrequest.subrequest_id); continue; } }; let persist_subrequest = PersistSubrequest { subrequest_id: subrequest.subrequest_id, - index_uid: next_open_shard.index_uid.clone().into(), - source_id: next_open_shard.source_id.clone(), - // We don't necessarily persist to this shard. We persist to the shard with the most - // capacity on that node. - // TODO: Clean this up. - shard_id: Some(next_open_shard.shard_id.clone()), + index_uid: Some(ingester_node.index_uid.clone()), + source_id: subrequest.source_id.clone(), + shard_id: None, doc_batch: subrequest.doc_batch.clone(), }; per_leader_persist_subrequests - .entry(&next_open_shard.leader_id) + .entry(&ingester_node.node_id) .or_default() .push(persist_subrequest); } @@ -461,9 +389,6 @@ impl IngestRouter { for subrequest_id in no_shards_available_subrequest_ids { workbench.record_no_shards_available(subrequest_id); } - for subrequest_id in rate_limited_subrequest_ids { - workbench.record_rate_limited(subrequest_id); - } self.process_persist_results(workbench, persist_futures) .await; } @@ -516,7 +441,7 @@ impl IngestRouter { pub async fn debug_info(&self) -> JsonValue { let state_guard = self.state.lock().await; - let routing_table_json = state_guard.routing_table.debug_info(); + let routing_table_json = state_guard.node_routing_table.debug_info(); json!({ "routing_table": routing_table_json, @@ -640,68 +565,6 @@ impl IngestRouterService for IngestRouter { #[derive(Clone)] struct WeakRouterState(Weak>); -#[async_trait] -impl EventSubscriber for WeakRouterState { - async fn handle_event(&mut self, local_shards_update: LocalShardsUpdate) { - let Some(state) = self.0.upgrade() else { - return; - }; - let leader_id = local_shards_update.leader_id; - let index_uid = local_shards_update.source_uid.index_uid; - let source_id = local_shards_update.source_uid.source_id; - - let mut open_shard_ids: Vec = Vec::new(); - let mut closed_shard_ids: Vec = Vec::new(); - - for shard_info in local_shards_update.shard_infos { - match shard_info.shard_state { - ShardState::Open => open_shard_ids.push(shard_info.shard_id), - ShardState::Closed => closed_shard_ids.push(shard_info.shard_id), - ShardState::Unavailable | ShardState::Unspecified => { - // Ingesters never broadcast the `Unavailable`` state because, from their point - // of view, they are never unavailable. - } - } - } - let mut state_guard = state.lock().await; - - state_guard - .routing_table - .close_shards(&index_uid, &source_id, &closed_shard_ids); - - state_guard.routing_table.insert_open_shards( - &leader_id, - index_uid, - source_id, - &open_shard_ids, - ); - } -} - -#[async_trait] -impl EventSubscriber for WeakRouterState { - async fn handle_event(&mut self, shard_positions_update: ShardPositionsUpdate) { - let Some(state) = self.0.upgrade() else { - return; - }; - let mut deleted_shard_ids: Vec = Vec::new(); - - for (shard_id, shard_position) in shard_positions_update.updated_shard_positions { - if shard_position.is_eof() { - deleted_shard_ids.push(shard_id); - } - } - let mut state_guard = state.lock().await; - - let index_uid = shard_positions_update.source_uid.index_uid; - let source_id = shard_positions_update.source_uid.source_id; - - state_guard - .routing_table - .delete_shards(&index_uid, &source_id, &deleted_shard_ids); - } -} - #[async_trait] impl EventSubscriber for WeakRouterState { async fn handle_event(&mut self, update: IngesterCapacityScoreUpdate) { @@ -725,9 +588,6 @@ pub(super) struct PersistRequestSummary { #[cfg(test)] mod tests { - use std::collections::BTreeSet; - - use mockall::Sequence; use quickwit_proto::control_plane::{ GetOrCreateOpenShardsFailure, GetOrCreateOpenShardsFailureReason, GetOrCreateOpenShardsResponse, GetOrCreateOpenShardsSuccess, MockControlPlaneService, @@ -737,15 +597,11 @@ mod tests { }; use quickwit_proto::ingest::router::IngestSubrequest; use quickwit_proto::ingest::{ - CommitTypeV2, DocBatchV2, ParseFailure, ParseFailureReason, Shard, ShardIds, ShardState, + CommitTypeV2, DocBatchV2, ParseFailure, ParseFailureReason, Shard, ShardState, }; - use quickwit_proto::types::{DocUid, Position, SourceUid}; - use tokio::task::yield_now; + use quickwit_proto::types::{DocUid, IndexUid, Position, ShardId, SourceUid}; use super::*; - use crate::RateMibPerSec; - use crate::ingest_v2::broadcast::ShardInfo; - use crate::ingest_v2::routing_table::{RoutingEntry, RoutingTableEntry}; use crate::ingest_v2::workbench::SubworkbenchFailure; #[tokio::test] @@ -770,34 +626,18 @@ mod tests { assert!(get_or_create_open_shard_request_opt.is_none()); assert!(rendezvous.is_empty()); - let mut state_guard = router.state.lock().await; - - let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0); - state_guard.routing_table.table.insert( - ("test-index-0".into(), "test-source".into()), - RoutingTableEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - local_shards: vec![ - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(1), - shard_state: ShardState::Closed, - leader_id: "test-ingester-0".into(), - }, - RoutingEntry { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - shard_id: ShardId::from(2), - shard_state: ShardState::Open, - leader_id: "test-ingester-0".into(), - }, - ], - ..Default::default() - }, - ); - drop(state_guard); + { + let mut state_guard = router.state.lock().await; + state_guard.node_routing_table.apply_capacity_update( + "test-ingester-0".into(), + SourceUid { + index_uid: IndexUid::for_test("test-index-0", 0), + source_id: "test-source".to_string(), + }, + 8, + 1, + ); + } let ingest_subrequests: Vec = vec![ IngestSubrequest { @@ -833,24 +673,12 @@ mod tests { assert_eq!(subrequest.index_id, "test-index-1"); assert_eq!(subrequest.source_id, "test-source"); - assert_eq!(get_or_create_open_shard_request.closed_shards.len(), 1); - assert_eq!( - get_or_create_open_shard_request.closed_shards[0], - ShardIds { - index_uid: Some(IndexUid::for_test("test-index-0", 0)), - source_id: "test-source".to_string(), - shard_ids: vec![ShardId::from(1)], - } - ); - assert_eq!( - get_or_create_open_shard_request.unavailable_leaders.len(), - 1 - ); - assert_eq!( - get_or_create_open_shard_request.unavailable_leaders[0], - "test-ingester-0" + assert!( + get_or_create_open_shard_request + .unavailable_leaders + .is_empty() ); - assert_eq!(workbench.unavailable_leaders.len(), 1); + assert!(workbench.unavailable_leaders.is_empty()); let (get_or_create_open_shard_request_opt, rendezvous_2) = router .make_get_or_create_open_shard_request(&mut workbench, &ingester_pool) @@ -867,27 +695,26 @@ mod tests { ingester_pool.insert("test-ingester-0".into(), IngesterServiceClient::mocked()); { - // Ingester-0 has been marked as unavailable due to the previous requests. + // Ingester-0 is in pool and in table, but marked unavailable on the workbench + // (simulating a prior transport error). has_open_nodes returns false → both + // subrequests trigger CP request. + workbench + .unavailable_leaders + .insert("test-ingester-0".into()); let (get_or_create_open_shard_request_opt, _rendezvous) = router .make_get_or_create_open_shard_request(&mut workbench, &ingester_pool) .await .take(); let get_or_create_open_shard_request = get_or_create_open_shard_request_opt.unwrap(); assert_eq!(get_or_create_open_shard_request.subrequests.len(), 2); - assert_eq!(workbench.unavailable_leaders.len(), 1); assert_eq!( - workbench - .unavailable_leaders - .iter() - .next() - .unwrap() - .to_string(), - "test-ingester-0" + get_or_create_open_shard_request.unavailable_leaders.len(), + 1 ); } { - // With a fresh workbench, the ingester is not marked as unavailable, and present in the - // pool. + // Fresh workbench: ingester-0 is in pool, in table, and NOT unavailable. + // has_open_nodes returns true for index-0 → only index-1 triggers request. let mut workbench = IngestWorkbench::new(ingest_subrequests, 3); let (get_or_create_open_shard_request_opt, _rendezvous) = router .make_get_or_create_open_shard_request(&mut workbench, &ingester_pool) @@ -900,9 +727,10 @@ mod tests { assert_eq!(subrequest.index_id, "test-index-1"); assert_eq!(subrequest.source_id, "test-source"); - assert_eq!( - get_or_create_open_shard_request.unavailable_leaders.len(), - 0 + assert!( + get_or_create_open_shard_request + .unavailable_leaders + .is_empty() ); } } @@ -947,6 +775,7 @@ mod tests { source_id: "test-source".to_string(), shard_id: Some(ShardId::from(1)), shard_state: ShardState::Open as i32, + leader_id: "test-ingester-0".to_string(), ..Default::default() }], }, @@ -960,6 +789,7 @@ mod tests { source_id: "test-source".to_string(), shard_id: Some(ShardId::from(1)), shard_state: ShardState::Open as i32, + leader_id: "test-ingester-1".to_string(), ..Default::default() }, Shard { @@ -967,6 +797,7 @@ mod tests { source_id: "test-source".to_string(), shard_id: Some(ShardId::from(2)), shard_state: ShardState::Open as i32, + leader_id: "test-ingester-1".to_string(), ..Default::default() }, ], @@ -1057,23 +888,6 @@ mod tests { .populate_routing_table(&mut workbench, get_or_create_open_shards_request) .await; - let state_guard = router.state.lock().await; - let routing_table = &state_guard.routing_table; - assert_eq!(routing_table.len(), 2); - - let routing_entry_0 = routing_table - .find_entry("test-index-0", "test-source") - .unwrap(); - assert_eq!(routing_entry_0.len(), 1); - assert_eq!(routing_entry_0.all_shards()[0].shard_id, ShardId::from(1)); - - let routing_entry_1 = routing_table - .find_entry("test-index-1", "test-source") - .unwrap(); - assert_eq!(routing_entry_1.len(), 2); - assert_eq!(routing_entry_1.all_shards()[0].shard_id, ShardId::from(1)); - assert_eq!(routing_entry_1.all_shards()[1].shard_id, ShardId::from(2)); - let subworkbench = workbench.subworkbenches.get(&2).unwrap(); assert!(matches!( subworkbench.last_failure_opt, @@ -1278,7 +1092,7 @@ mod tests { index_uid: Some(index_uid.clone()), source_id: "test-source".to_string(), shard_id: Some(ShardId::from(1)), - reason: PersistFailureReason::ShardRateLimited as i32, + reason: PersistFailureReason::NoShardsAvailable as i32, }], }); (persist_summary, persist_result) @@ -1294,89 +1108,6 @@ mod tests { )); } - #[tokio::test] - async fn test_router_process_persist_results_closes_and_deletes_shards() { - let self_node_id = "test-router".into(); - let control_plane = ControlPlaneServiceClient::from_mock(MockControlPlaneService::new()); - let ingester_pool = IngesterPool::default(); - let replication_factor = 1; - let router = IngestRouter::new( - self_node_id, - control_plane, - ingester_pool.clone(), - replication_factor, - EventBroker::default(), - ); - let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0); - let mut state_guard = router.state.lock().await; - state_guard.routing_table.replace_shards( - index_uid.clone(), - "test-source", - vec![ - Shard { - index_uid: Some(index_uid.clone()), - shard_id: Some(ShardId::from(1)), - shard_state: ShardState::Open as i32, - leader_id: "test-ingester-0".to_string(), - ..Default::default() - }, - Shard { - index_uid: Some(index_uid.clone()), - shard_id: Some(ShardId::from(2)), - shard_state: ShardState::Open as i32, - leader_id: "test-ingester-0".to_string(), - ..Default::default() - }, - ], - ); - drop(state_guard); - - let mut workbench = IngestWorkbench::new(Vec::new(), 2); - let persist_futures = FuturesUnordered::new(); - - persist_futures.push(async { - let persist_summary = PersistRequestSummary { - leader_id: "test-ingester-0".into(), - subrequest_ids: vec![0], - }; - let persist_result = Ok::<_, IngestV2Error>(PersistResponse { - leader_id: "test-ingester-0".to_string(), - successes: Vec::new(), - failures: vec![ - PersistFailure { - subrequest_id: 0, - index_uid: Some(index_uid.clone()), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), - reason: PersistFailureReason::ShardNotFound as i32, - }, - PersistFailure { - subrequest_id: 1, - index_uid: Some(index_uid.clone()), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(2)), - reason: PersistFailureReason::ShardClosed as i32, - }, - ], - }); - (persist_summary, persist_result) - }); - router - .process_persist_results(&mut workbench, persist_futures) - .await; - - let state_guard = router.state.lock().await; - let routing_table_entry = state_guard - .routing_table - .find_entry("test-index-0", "test-source") - .unwrap(); - assert_eq!(routing_table_entry.len(), 1); - - let shard = routing_table_entry.all_shards()[0]; - assert_eq!(shard.shard_id, ShardId::from(2)); - assert_eq!(shard.shard_state, ShardState::Closed); - } - #[tokio::test] async fn test_router_process_persist_results_does_not_remove_unavailable_leaders() { let self_node_id = "test-router".into(); @@ -1470,151 +1201,76 @@ mod tests { let self_node_id = "test-router".into(); let control_plane = ControlPlaneServiceClient::from_mock(MockControlPlaneService::new()); let ingester_pool = IngesterPool::default(); - let replication_factor = 1; let router = IngestRouter::new( self_node_id, control_plane, ingester_pool.clone(), - replication_factor, + 1, EventBroker::default(), ); - let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0); - let index_uid2: IndexUid = IndexUid::for_test("test-index-1", 0); - let mut state_guard = router.state.lock().await; - state_guard.routing_table.replace_shards( - index_uid.clone(), - "test-source", - vec![Shard { - index_uid: Some(index_uid.clone()), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), - shard_state: ShardState::Open as i32, - leader_id: "test-ingester-0".to_string(), - ..Default::default() - }], - ); - state_guard.routing_table.replace_shards( - index_uid2.clone(), - "test-source", - vec![ - Shard { - index_uid: Some(index_uid2.clone()), + + let index_uid_0: IndexUid = IndexUid::for_test("test-index-0", 0); + let index_uid_1: IndexUid = IndexUid::for_test("test-index-1", 0); + { + let mut state_guard = router.state.lock().await; + state_guard.node_routing_table.merge_from_shards( + index_uid_0.clone(), + "test-source".to_string(), + vec![Shard { + index_uid: Some(index_uid_0.clone()), source_id: "test-source".to_string(), shard_id: Some(ShardId::from(1)), shard_state: ShardState::Open as i32, leader_id: "test-ingester-0".to_string(), - follower_id: Some("test-ingester-1".to_string()), ..Default::default() - }, - Shard { - index_uid: Some(index_uid2.clone()), + }], + ); + state_guard.node_routing_table.merge_from_shards( + index_uid_1.clone(), + "test-source".to_string(), + vec![Shard { + index_uid: Some(index_uid_1.clone()), source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(2)), + shard_id: Some(ShardId::from(1)), shard_state: ShardState::Open as i32, leader_id: "test-ingester-1".to_string(), - follower_id: Some("test-ingester-2".to_string()), ..Default::default() - }, - ], - ); - drop(state_guard); + }], + ); + } + let index_uid_0_clone = index_uid_0.clone(); let mut mock_ingester_0 = MockIngesterService::new(); - let index_uid_clone = index_uid.clone(); - let index_uid2_clone = index_uid2.clone(); - mock_ingester_0 - .expect_persist() - .once() - .returning(move |request| { - assert_eq!(request.leader_id, "test-ingester-0"); - assert_eq!(request.subrequests.len(), 2); - assert_eq!(request.commit_type(), CommitTypeV2::Auto); - - let subrequest = &request.subrequests[0]; - assert_eq!(subrequest.subrequest_id, 0); - assert_eq!(subrequest.index_uid(), &index_uid_clone); - assert_eq!(subrequest.source_id, "test-source"); - assert_eq!(subrequest.shard_id(), ShardId::from(1)); - assert_eq!( - subrequest.doc_batch, - Some(DocBatchV2::for_test(["", "test-doc-foo", "test-doc-bar"])) - ); - - let subrequest = &request.subrequests[1]; - assert_eq!(subrequest.subrequest_id, 1); - assert_eq!(subrequest.index_uid(), &index_uid2_clone); - assert_eq!(subrequest.source_id, "test-source"); - assert_eq!(subrequest.shard_id(), ShardId::from(1)); - assert_eq!( - subrequest.doc_batch, - Some(DocBatchV2::for_test(["test-doc-qux"])) - ); - - let response = PersistResponse { - leader_id: request.leader_id, - successes: vec![ - PersistSuccess { - subrequest_id: 0, - index_uid: Some(index_uid_clone.clone()), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), - replication_position_inclusive: Some(Position::offset(1u64)), - num_persisted_docs: 2, - parse_failures: vec![ParseFailure { - doc_uid: Some(DocUid::for_test(0)), - reason: ParseFailureReason::InvalidJson as i32, - message: "invalid JSON".to_string(), - }], - }, - PersistSuccess { - subrequest_id: 1, - index_uid: Some(index_uid2_clone.clone()), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), - replication_position_inclusive: Some(Position::offset(0u64)), - num_persisted_docs: 1, - parse_failures: Vec::new(), - }, - ], - failures: Vec::new(), - }; - Ok(response) - }); mock_ingester_0 .expect_persist() .once() .returning(move |request| { assert_eq!(request.leader_id, "test-ingester-0"); assert_eq!(request.subrequests.len(), 1); - assert_eq!(request.commit_type(), CommitTypeV2::Auto); + assert!(request.subrequests[0].shard_id.is_none()); - let subrequest = &request.subrequests[0]; - assert_eq!(subrequest.subrequest_id, 0); - assert_eq!(subrequest.index_uid(), &index_uid); - assert_eq!(subrequest.source_id, "test-source"); - assert_eq!(subrequest.shard_id(), ShardId::from(1)); - assert_eq!( - subrequest.doc_batch, - Some(DocBatchV2::for_test(["test-doc-moo", "test-doc-baz"])) - ); - - let response = PersistResponse { + Ok(PersistResponse { leader_id: request.leader_id, successes: vec![PersistSuccess { subrequest_id: 0, - index_uid: Some(index_uid.clone()), + index_uid: Some(index_uid_0_clone.clone()), source_id: "test-source".to_string(), shard_id: Some(ShardId::from(1)), - replication_position_inclusive: Some(Position::offset(3u64)), - num_persisted_docs: 4, - parse_failures: Vec::new(), + replication_position_inclusive: Some(Position::offset(1u64)), + num_persisted_docs: 2, + parse_failures: vec![ParseFailure { + doc_uid: Some(DocUid::for_test(0)), + reason: ParseFailureReason::InvalidJson as i32, + message: "invalid JSON".to_string(), + }], }], failures: Vec::new(), - }; - Ok(response) + }) }); - let ingester_0 = IngesterServiceClient::from_mock(mock_ingester_0); - ingester_pool.insert("test-ingester-0".into(), ingester_0.clone()); + ingester_pool.insert( + "test-ingester-0".into(), + IngesterServiceClient::from_mock(mock_ingester_0), + ); let mut mock_ingester_1 = MockIngesterService::new(); mock_ingester_1 @@ -1623,136 +1279,94 @@ mod tests { .returning(move |request| { assert_eq!(request.leader_id, "test-ingester-1"); assert_eq!(request.subrequests.len(), 1); - assert_eq!(request.commit_type(), CommitTypeV2::Auto); - - let subrequest = &request.subrequests[0]; - assert_eq!(subrequest.subrequest_id, 1); - assert_eq!(subrequest.index_uid(), &index_uid2); - assert_eq!(subrequest.source_id, "test-source"); - assert_eq!(subrequest.shard_id(), ShardId::from(2)); - assert_eq!( - subrequest.doc_batch, - Some(DocBatchV2::for_test(["test-doc-tux"])) - ); + assert!(request.subrequests[0].shard_id.is_none()); - let response = PersistResponse { + Ok(PersistResponse { leader_id: request.leader_id, successes: vec![PersistSuccess { subrequest_id: 1, - index_uid: Some(index_uid2.clone()), + index_uid: Some(index_uid_1.clone()), source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(2)), + shard_id: Some(ShardId::from(1)), replication_position_inclusive: Some(Position::offset(0u64)), num_persisted_docs: 1, parse_failures: Vec::new(), }], failures: Vec::new(), - }; - Ok(response) + }) }); - let ingester_1 = IngesterServiceClient::from_mock(mock_ingester_1); - ingester_pool.insert("test-ingester-1".into(), ingester_1); + ingester_pool.insert( + "test-ingester-1".into(), + IngesterServiceClient::from_mock(mock_ingester_1), + ); + + let response = router + .ingest(IngestRequestV2 { + subrequests: vec![ + IngestSubrequest { + subrequest_id: 0, + index_id: "test-index-0".to_string(), + source_id: "test-source".to_string(), + doc_batch: Some(DocBatchV2::for_test(["", "test-doc-foo", "test-doc-bar"])), + }, + IngestSubrequest { + subrequest_id: 1, + index_id: "test-index-1".to_string(), + source_id: "test-source".to_string(), + doc_batch: Some(DocBatchV2::for_test(["test-doc-qux"])), + }, + ], + commit_type: CommitTypeV2::Auto as i32, + }) + .await + .unwrap(); - let ingest_request = IngestRequestV2 { - subrequests: vec![ - IngestSubrequest { - subrequest_id: 0, - index_id: "test-index-0".to_string(), - source_id: "test-source".to_string(), - doc_batch: Some(DocBatchV2::for_test(["", "test-doc-foo", "test-doc-bar"])), - }, - IngestSubrequest { - subrequest_id: 1, - index_id: "test-index-1".to_string(), - source_id: "test-source".to_string(), - doc_batch: Some(DocBatchV2::for_test(["test-doc-qux"])), - }, - ], - commit_type: CommitTypeV2::Auto as i32, - }; - let response = router.ingest(ingest_request).await.unwrap(); assert_eq!(response.successes.len(), 2); assert_eq!(response.failures.len(), 0); let parse_failures = &response.successes[0].parse_failures; assert_eq!(parse_failures.len(), 1); - - let parse_failure = &parse_failures[0]; - assert_eq!(parse_failure.doc_uid(), DocUid::for_test(0)); - assert_eq!(parse_failure.reason(), ParseFailureReason::InvalidJson); - - let ingest_request = IngestRequestV2 { - subrequests: vec![ - IngestSubrequest { - subrequest_id: 0, - index_id: "test-index-0".to_string(), - source_id: "test-source".to_string(), - doc_batch: Some(DocBatchV2::for_test(["test-doc-moo", "test-doc-baz"])), - }, - IngestSubrequest { - subrequest_id: 1, - index_id: "test-index-1".to_string(), - source_id: "test-source".to_string(), - doc_batch: Some(DocBatchV2::for_test(["test-doc-tux"])), - }, - ], - commit_type: CommitTypeV2::Auto as i32, - }; - let response = router.ingest(ingest_request).await.unwrap(); - assert_eq!(response.successes.len(), 2); - assert_eq!(response.failures.len(), 0); - } + assert_eq!(parse_failures[0].doc_uid(), DocUid::for_test(0)); + assert_eq!(parse_failures[0].reason(), ParseFailureReason::InvalidJson); + } #[tokio::test] async fn test_router_ingest_retry() { let self_node_id = "test-router".into(); let control_plane = ControlPlaneServiceClient::from_mock(MockControlPlaneService::new()); let ingester_pool = IngesterPool::default(); - let replication_factor = 1; let router = IngestRouter::new( self_node_id, control_plane, ingester_pool.clone(), - replication_factor, + 1, EventBroker::default(), ); - let mut state_guard = router.state.lock().await; let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0); - state_guard.routing_table.replace_shards( - index_uid.clone(), - "test-source", - vec![Shard { - index_uid: Some(index_uid.clone()), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), - shard_state: ShardState::Open as i32, - leader_id: "test-ingester-0".to_string(), - ..Default::default() - }], - ); - drop(state_guard); + { + let mut state_guard = router.state.lock().await; + state_guard.node_routing_table.merge_from_shards( + index_uid.clone(), + "test-source".to_string(), + vec![Shard { + index_uid: Some(index_uid.clone()), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(1)), + shard_state: ShardState::Open as i32, + leader_id: "test-ingester-0".to_string(), + ..Default::default() + }], + ); + } let mut mock_ingester_0 = MockIngesterService::new(); let index_uid_clone = index_uid.clone(); + // First attempt: returns NoShardsAvailable (transient, doesn't mark leader unavailable). mock_ingester_0 .expect_persist() .once() .returning(move |request| { - assert_eq!(request.leader_id, "test-ingester-0"); - assert_eq!(request.subrequests.len(), 1); - assert_eq!(request.commit_type(), CommitTypeV2::Auto); - - let subrequest = &request.subrequests[0]; - assert_eq!(subrequest.subrequest_id, 0); - assert_eq!(subrequest.index_uid(), &index_uid_clone); - assert_eq!(subrequest.source_id, "test-source"); - assert_eq!(subrequest.shard_id(), ShardId::from(1)); - assert_eq!( - subrequest.doc_batch, - Some(DocBatchV2::for_test(["test-doc-foo"])) - ); - - let response = PersistResponse { + Ok(PersistResponse { leader_id: request.leader_id, successes: Vec::new(), failures: vec![PersistFailure { @@ -1760,30 +1374,16 @@ mod tests { index_uid: Some(index_uid_clone.clone()), source_id: "test-source".to_string(), shard_id: Some(ShardId::from(1)), - reason: PersistFailureReason::Timeout as i32, + reason: PersistFailureReason::NoShardsAvailable as i32, }], - }; - Ok(response) + }) }); + // Second attempt: succeeds. mock_ingester_0 .expect_persist() .once() .returning(move |request| { - assert_eq!(request.leader_id, "test-ingester-0"); - assert_eq!(request.subrequests.len(), 1); - assert_eq!(request.commit_type(), CommitTypeV2::Auto); - - let subrequest = &request.subrequests[0]; - assert_eq!(subrequest.subrequest_id, 0); - assert_eq!(subrequest.index_uid(), &index_uid); - assert_eq!(subrequest.source_id, "test-source"); - assert_eq!(subrequest.shard_id(), ShardId::from(1)); - assert_eq!( - subrequest.doc_batch, - Some(DocBatchV2::for_test(["test-doc-foo"])) - ); - - let response = PersistResponse { + Ok(PersistResponse { leader_id: request.leader_id, successes: vec![PersistSuccess { subrequest_id: 0, @@ -1795,115 +1395,27 @@ mod tests { parse_failures: Vec::new(), }], failures: Vec::new(), - }; - Ok(response) + }) }); - let ingester_0 = IngesterServiceClient::from_mock(mock_ingester_0); - ingester_pool.insert("test-ingester-0".into(), ingester_0.clone()); - - let ingest_request = IngestRequestV2 { - subrequests: vec![IngestSubrequest { - subrequest_id: 0, - index_id: "test-index-0".to_string(), - source_id: "test-source".to_string(), - doc_batch: Some(DocBatchV2::for_test(["test-doc-foo"])), - }], - commit_type: CommitTypeV2::Auto as i32, - }; - router.ingest(ingest_request).await.unwrap(); - } - - #[tokio::test] - async fn test_router_updates_routing_table_on_chitchat_events() { - let self_node_id = "test-router".into(); - let control_plane = ControlPlaneServiceClient::from_mock(MockControlPlaneService::new()); - let ingester_pool = IngesterPool::default(); - let replication_factor = 1; - let event_broker = EventBroker::default(); - let router = IngestRouter::new( - self_node_id, - control_plane, - ingester_pool.clone(), - replication_factor, - event_broker.clone(), - ); - router.subscribe(); - let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0); - - let mut state_guard = router.state.lock().await; - state_guard.routing_table.replace_shards( - index_uid.clone(), - "test-source", - vec![Shard { - index_uid: Some(index_uid.clone()), - shard_id: Some(ShardId::from(1)), - shard_state: ShardState::Open as i32, - leader_id: "test-ingester".to_string(), - ..Default::default() - }], + ingester_pool.insert( + "test-ingester-0".into(), + IngesterServiceClient::from_mock(mock_ingester_0), ); - drop(state_guard); - - let local_shards_update = LocalShardsUpdate { - leader_id: "test-ingester".into(), - source_uid: SourceUid { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - }, - shard_infos: BTreeSet::from_iter([ - ShardInfo { - shard_id: ShardId::from(1), - shard_state: ShardState::Closed, - short_term_ingestion_rate: RateMibPerSec(0), - long_term_ingestion_rate: RateMibPerSec(0), - }, - ShardInfo { - shard_id: ShardId::from(2), - shard_state: ShardState::Open, - short_term_ingestion_rate: RateMibPerSec(0), - long_term_ingestion_rate: RateMibPerSec(0), - }, - ]), - }; - event_broker.publish(local_shards_update); - - // Yield so that the event is processed. - yield_now().await; - - let state_guard = router.state.lock().await; - let shards = state_guard - .routing_table - .find_entry("test-index-0", "test-source") - .unwrap() - .all_shards(); - assert_eq!(shards.len(), 2); - assert_eq!(shards[0].shard_id, ShardId::from(1)); - assert_eq!(shards[0].shard_state, ShardState::Closed); - assert_eq!(shards[1].shard_id, ShardId::from(2)); - assert_eq!(shards[1].shard_state, ShardState::Open); - drop(state_guard); - let shard_positions_update = ShardPositionsUpdate { - source_uid: SourceUid { - index_uid: index_uid.clone(), - source_id: "test-source".to_string(), - }, - updated_shard_positions: vec![(ShardId::from(1), Position::eof(0u64))], - }; - event_broker.publish(shard_positions_update); - - // Yield so that the event is processed. - yield_now().await; - - let state_guard = router.state.lock().await; - let shards = state_guard - .routing_table - .find_entry("test-index-0", "test-source") - .unwrap() - .all_shards(); - assert_eq!(shards.len(), 1); - assert_eq!(shards[0].shard_id, ShardId::from(2)); - drop(state_guard); + let response = router + .ingest(IngestRequestV2 { + subrequests: vec![IngestSubrequest { + subrequest_id: 0, + index_id: "test-index-0".to_string(), + source_id: "test-source".to_string(), + doc_batch: Some(DocBatchV2::for_test(["test-doc-foo"])), + }], + commit_type: CommitTypeV2::Auto as i32, + }) + .await + .unwrap(); + assert_eq!(response.successes.len(), 1); + assert_eq!(response.failures.len(), 0); } #[tokio::test] @@ -1922,42 +1434,48 @@ mod tests { let index_uid_0: IndexUid = IndexUid::for_test("test-index-0", 0); let index_uid_1: IndexUid = IndexUid::for_test("test-index-1", 0); - let mut state_guard = router.state.lock().await; - state_guard.routing_table.replace_shards( - index_uid_0.clone(), - "test-source", - vec![Shard { - index_uid: Some(index_uid_0.clone()), - shard_id: Some(ShardId::from(1)), - shard_state: ShardState::Open as i32, - leader_id: "test-ingester".to_string(), - ..Default::default() - }], - ); - state_guard.routing_table.replace_shards( - index_uid_1.clone(), - "test-source", - vec![Shard { - index_uid: Some(index_uid_1.clone()), - shard_id: Some(ShardId::from(2)), - shard_state: ShardState::Open as i32, - leader_id: "test-ingester".to_string(), - ..Default::default() - }], - ); - drop(state_guard); + { + let mut state_guard = router.state.lock().await; + state_guard.node_routing_table.merge_from_shards( + index_uid_0.clone(), + "test-source".to_string(), + vec![Shard { + index_uid: Some(index_uid_0.clone()), + shard_id: Some(ShardId::from(1)), + shard_state: ShardState::Open as i32, + leader_id: "test-ingester-0".to_string(), + ..Default::default() + }], + ); + state_guard.node_routing_table.merge_from_shards( + index_uid_1.clone(), + "test-source".to_string(), + vec![Shard { + index_uid: Some(index_uid_1.clone()), + shard_id: Some(ShardId::from(2)), + shard_state: ShardState::Open as i32, + leader_id: "test-ingester-1".to_string(), + ..Default::default() + }], + ); + } let debug_info = router.debug_info().await; let routing_table = &debug_info["routing_table"]; assert_eq!(routing_table.as_object().unwrap().len(), 2); - assert_eq!(routing_table["test-index-0"].as_array().unwrap().len(), 1); - assert_eq!(routing_table["test-index-1"].as_array().unwrap().len(), 1); + let index_0_entries = routing_table["test-index-0"].as_array().unwrap(); + assert_eq!(index_0_entries.len(), 1); + assert_eq!(index_0_entries[0]["node_id"], "test-ingester-0"); + assert_eq!(index_0_entries[0]["capacity_score"], 5); + + let index_1_entries = routing_table["test-index-1"].as_array().unwrap(); + assert_eq!(index_1_entries.len(), 1); + assert_eq!(index_1_entries[0]["node_id"], "test-ingester-1"); } #[tokio::test] - async fn test_router_does_not_retry_rate_limited_shards() { - // We avoid retrying a shard limited shard at the scale of a workbench. + async fn test_router_returns_rate_limited_failure() { let self_node_id = "test-router".into(); let control_plane = ControlPlaneServiceClient::from_mock(MockControlPlaneService::new()); let ingester_pool = IngesterPool::default(); @@ -1969,138 +1487,51 @@ mod tests { replication_factor, EventBroker::default(), ); - let mut state_guard = router.state.lock().await; let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0); - - state_guard.routing_table.replace_shards( - index_uid.clone(), - "test-source", - vec![ - Shard { + { + let mut state_guard = router.state.lock().await; + state_guard.node_routing_table.merge_from_shards( + index_uid.clone(), + "test-source".to_string(), + vec![Shard { index_uid: Some(index_uid.clone()), source_id: "test-source".to_string(), shard_id: Some(ShardId::from(1)), shard_state: ShardState::Open as i32, leader_id: "test-ingester-0".to_string(), ..Default::default() - }, - Shard { - index_uid: Some(index_uid.clone()), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(2)), - shard_state: ShardState::Open as i32, - leader_id: "test-ingester-0".to_string(), - ..Default::default() - }, - ], - ); - drop(state_guard); - - // We have two shards. - // - shard 1 is rate limited - // - shard 2 is timeout. - // We expect a retry on shard 2 that is then successful. - let mut seq = Sequence::new(); + }], + ); + } let mut mock_ingester_0 = MockIngesterService::new(); - mock_ingester_0 - .expect_persist() - .times(1) - .returning(move |request| { - assert_eq!(request.leader_id, "test-ingester-0"); - assert_eq!(request.commit_type(), CommitTypeV2::Auto); - assert_eq!(request.subrequests.len(), 1); - let subrequest = &request.subrequests[0]; - assert_eq!(subrequest.subrequest_id, 0); - let index_uid = subrequest.index_uid().clone(); - assert_eq!(subrequest.source_id, "test-source"); - assert_eq!(subrequest.shard_id(), ShardId::from(1)); - assert_eq!( - subrequest.doc_batch, - Some(DocBatchV2::for_test(["test-doc-foo"])) - ); - - let response = PersistResponse { - leader_id: request.leader_id, - successes: Vec::new(), - failures: vec![PersistFailure { - subrequest_id: 0, - index_uid: Some(index_uid), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), - reason: PersistFailureReason::ShardRateLimited as i32, - }], - }; - Ok(response) - }) - .in_sequence(&mut seq); - - mock_ingester_0 - .expect_persist() - .times(1) - .returning(move |request| { - assert_eq!(request.leader_id, "test-ingester-0"); - assert_eq!(request.commit_type(), CommitTypeV2::Auto); - assert_eq!(request.subrequests.len(), 1); - let subrequest = &request.subrequests[0]; - assert_eq!(subrequest.subrequest_id, 0); - let index_uid = subrequest.index_uid().clone(); - assert_eq!(subrequest.source_id, "test-source"); - assert_eq!(subrequest.shard_id(), ShardId::from(2)); - assert_eq!( - subrequest.doc_batch, - Some(DocBatchV2::for_test(["test-doc-foo"])) - ); - - let response = PersistResponse { - leader_id: request.leader_id, - successes: Vec::new(), - failures: vec![PersistFailure { - subrequest_id: 0, - index_uid: Some(index_uid), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), - reason: PersistFailureReason::Timeout as i32, - }], - }; - Ok(response) - }) - .in_sequence(&mut seq); - - mock_ingester_0 - .expect_persist() - .times(1) - .returning(move |request| { - assert_eq!(request.leader_id, "test-ingester-0"); - assert_eq!(request.commit_type(), CommitTypeV2::Auto); - assert_eq!(request.subrequests.len(), 1); - let subrequest = &request.subrequests[0]; - assert_eq!(subrequest.subrequest_id, 0); - let index_uid = subrequest.index_uid().clone(); - assert_eq!(subrequest.source_id, "test-source"); - assert_eq!(subrequest.shard_id(), ShardId::from(2)); - assert_eq!( - subrequest.doc_batch, - Some(DocBatchV2::for_test(["test-doc-foo"])) - ); - - let response = PersistResponse { - leader_id: request.leader_id, - successes: vec![PersistSuccess { - subrequest_id: 0, - index_uid: Some(index_uid), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), - num_persisted_docs: 1, - replication_position_inclusive: Some(Position::offset(0u64)), - parse_failures: Vec::new(), - }], - failures: Vec::new(), - }; - Ok(response) - }) - .in_sequence(&mut seq); + mock_ingester_0.expect_persist().returning(move |request| { + assert_eq!(request.leader_id, "test-ingester-0"); + assert_eq!(request.commit_type(), CommitTypeV2::Auto); + assert_eq!(request.subrequests.len(), 1); + let subrequest = &request.subrequests[0]; + assert_eq!(subrequest.subrequest_id, 0); + let index_uid = subrequest.index_uid().clone(); + assert_eq!(subrequest.source_id, "test-source"); + assert!(subrequest.shard_id.is_none()); + assert_eq!( + subrequest.doc_batch, + Some(DocBatchV2::for_test(["test-doc-foo"])) + ); + let response = PersistResponse { + leader_id: request.leader_id, + successes: Vec::new(), + failures: vec![PersistFailure { + subrequest_id: 0, + index_uid: Some(index_uid), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(1)), + reason: PersistFailureReason::NoShardsAvailable as i32, + }], + }; + Ok(response) + }); let ingester_0 = IngesterServiceClient::from_mock(mock_ingester_0); ingester_pool.insert("test-ingester-0".into(), ingester_0.clone()); @@ -2113,88 +1544,130 @@ mod tests { }], commit_type: CommitTypeV2::Auto as i32, }; - router.ingest(ingest_request).await.unwrap(); + let ingest_response = router.ingest(ingest_request).await.unwrap(); + assert_eq!(ingest_response.successes.len(), 0); + assert_eq!(ingest_response.failures.len(), 1); + assert_eq!( + ingest_response.failures[0].reason(), + IngestFailureReason::NoShardsAvailable + ); } #[tokio::test] - async fn test_router_returns_rate_limited_failure() { - let self_node_id = "test-router".into(); - let control_plane = ControlPlaneServiceClient::from_mock(MockControlPlaneService::new()); - let ingester_pool = IngesterPool::default(); - let replication_factor = 1; + async fn test_router_updates_node_routing_table_on_capacity_update() { + let event_broker = EventBroker::default(); let router = IngestRouter::new( - self_node_id, - control_plane, - ingester_pool.clone(), - replication_factor, - EventBroker::default(), + "test-router".into(), + ControlPlaneServiceClient::from_mock(MockControlPlaneService::new()), + IngesterPool::default(), + 1, + event_broker.clone(), ); - let mut state_guard = router.state.lock().await; - let index_uid: IndexUid = IndexUid::for_test("test-index-0", 0); + router.subscribe(); - state_guard.routing_table.replace_shards( - index_uid.clone(), - "test-source", - vec![Shard { - index_uid: Some(index_uid.clone()), + event_broker.publish(IngesterCapacityScoreUpdate { + node_id: "test-ingester-0".into(), + source_uid: SourceUid { + index_uid: IndexUid::for_test("test-index", 0), source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), - shard_state: ShardState::Open as i32, - leader_id: "test-ingester-0".to_string(), - ..Default::default() - }], - ); - drop(state_guard); - - let mut mock_ingester_0 = MockIngesterService::new(); - mock_ingester_0 - .expect_persist() - .times(1) - .returning(move |request| { - assert_eq!(request.leader_id, "test-ingester-0"); - assert_eq!(request.commit_type(), CommitTypeV2::Auto); - assert_eq!(request.subrequests.len(), 1); - let subrequest = &request.subrequests[0]; - assert_eq!(subrequest.subrequest_id, 0); - let index_uid = subrequest.index_uid().clone(); - assert_eq!(subrequest.source_id, "test-source"); - assert_eq!(subrequest.shard_id(), ShardId::from(1)); - assert_eq!( - subrequest.doc_batch, - Some(DocBatchV2::for_test(["test-doc-foo"])) - ); + }, + capacity_score: 7, + open_shard_count: 3, + }); + // Give the async subscriber a moment to process. + tokio::time::sleep(Duration::from_millis(10)).await; - let response = PersistResponse { - leader_id: request.leader_id, - successes: Vec::new(), - failures: vec![PersistFailure { - subrequest_id: 0, - index_uid: Some(index_uid), - source_id: "test-source".to_string(), - shard_id: Some(ShardId::from(1)), - reason: PersistFailureReason::ShardRateLimited as i32, - }], - }; - Ok(response) - }); - let ingester_0 = IngesterServiceClient::from_mock(mock_ingester_0); - ingester_pool.insert("test-ingester-0".into(), ingester_0.clone()); + let state_guard = router.state.lock().await; + let entry = state_guard + .node_routing_table + .find_entry("test-index", "test-source") + .unwrap(); + let node = entry.nodes.get("test-ingester-0").unwrap(); + assert_eq!(node.capacity_score, 7); + assert_eq!(node.open_shard_count, 3); + } - let ingest_request = IngestRequestV2 { - subrequests: vec![IngestSubrequest { + #[tokio::test] + async fn test_router_process_persist_results_marks_unavailable_on_persist_failure() { + let router = IngestRouter::new( + "test-router".into(), + ControlPlaneServiceClient::from_mock(MockControlPlaneService::new()), + IngesterPool::default(), + 1, + EventBroker::default(), + ); + let ingest_subrequests = vec![ + IngestSubrequest { subrequest_id: 0, index_id: "test-index-0".to_string(), source_id: "test-source".to_string(), - doc_batch: Some(DocBatchV2::for_test(["test-doc-foo"])), - }], - commit_type: CommitTypeV2::Auto as i32, - }; - let ingest_response = router.ingest(ingest_request).await.unwrap(); - assert_eq!(ingest_response.successes.len(), 0); - assert_eq!(ingest_response.failures.len(), 1); - assert_eq!( - ingest_response.failures[0].reason(), - IngestFailureReason::ShardRateLimited + ..Default::default() + }, + IngestSubrequest { + subrequest_id: 1, + index_id: "test-index-1".to_string(), + source_id: "test-source".to_string(), + ..Default::default() + }, + ]; + let mut workbench = IngestWorkbench::new(ingest_subrequests, 2); + + // NoShardsAvailable does NOT mark the leader as unavailable. + let persist_futures = FuturesUnordered::new(); + persist_futures.push(async { + let summary = PersistRequestSummary { + leader_id: "test-ingester-0".into(), + subrequest_ids: vec![0], + }; + let result = Ok::<_, IngestV2Error>(PersistResponse { + leader_id: "test-ingester-0".to_string(), + successes: Vec::new(), + failures: vec![PersistFailure { + subrequest_id: 0, + index_uid: Some(IndexUid::for_test("test-index-0", 0)), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(1)), + reason: PersistFailureReason::NoShardsAvailable as i32, + }], + }); + (summary, result) + }); + router + .process_persist_results(&mut workbench, persist_futures) + .await; + assert!( + !workbench + .unavailable_leaders + .contains(&NodeId::from("test-ingester-0")) + ); + + // NodeUnavailable DOES mark the leader as unavailable. + let persist_futures = FuturesUnordered::new(); + persist_futures.push(async { + let summary = PersistRequestSummary { + leader_id: "test-ingester-1".into(), + subrequest_ids: vec![1], + }; + let result = Ok::<_, IngestV2Error>(PersistResponse { + leader_id: "test-ingester-1".to_string(), + successes: Vec::new(), + failures: vec![PersistFailure { + subrequest_id: 1, + index_uid: Some(IndexUid::for_test("test-index-1", 0)), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(1)), + reason: PersistFailureReason::NodeUnavailable as i32, + }], + }); + (summary, result) + }); + router + .process_persist_results(&mut workbench, persist_futures) + .await; + assert!( + workbench + .unavailable_leaders + .contains(&NodeId::from("test-ingester-1")) ); } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs b/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs index 311aef138d8..3e7b22969e8 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs @@ -24,7 +24,7 @@ use quickwit_proto::ingest::router::{ IngestFailure, IngestFailureReason, IngestResponseV2, IngestSubrequest, IngestSuccess, }; use quickwit_proto::ingest::{IngestV2Error, RateLimitingCause}; -use quickwit_proto::types::{NodeId, ShardId, SubrequestId}; +use quickwit_proto::types::{NodeId, SubrequestId}; use tracing::warn; use super::publish_tracker::PublishTracker; @@ -35,7 +35,6 @@ use super::router::PersistRequestSummary; #[derive(Default)] pub(super) struct IngestWorkbench { pub subworkbenches: BTreeMap, - pub rate_limited_shards: HashSet, pub num_successes: usize, /// The number of batch persist attempts. This is not sum of the number of attempts for each /// subrequest. @@ -228,13 +227,6 @@ impl IngestWorkbench { self.record_failure(subrequest_id, SubworkbenchFailure::NoShardsAvailable); } - pub fn record_rate_limited(&mut self, subrequest_id: SubrequestId) { - self.record_failure( - subrequest_id, - SubworkbenchFailure::RateLimited(RateLimitingCause::ShardRateLimiting), - ); - } - /// Marks a node as unavailable for the span of the workbench. /// /// Remaining attempts will treat the node as if it was not in the ingester pool. @@ -433,7 +425,7 @@ mod tests { assert!(!subworkbench.last_failure_is_transient()); subworkbench.last_failure_opt = Some(SubworkbenchFailure::Persist( - PersistFailureReason::ShardRateLimited, + PersistFailureReason::NoShardsAvailable, )); assert!(subworkbench.is_pending()); assert!(subworkbench.last_failure_is_transient()); @@ -807,7 +799,7 @@ mod tests { let persist_failure = PersistFailure { subrequest_id: 42, - reason: PersistFailureReason::ShardRateLimited as i32, + reason: PersistFailureReason::NoShardsAvailable as i32, ..Default::default() }; workbench.record_persist_failure(&persist_failure); diff --git a/quickwit/quickwit-proto/protos/quickwit/ingester.proto b/quickwit/quickwit-proto/protos/quickwit/ingester.proto index a5b651d94d8..25a4705d58a 100644 --- a/quickwit/quickwit-proto/protos/quickwit/ingester.proto +++ b/quickwit/quickwit-proto/protos/quickwit/ingester.proto @@ -96,11 +96,10 @@ message PersistSuccess { enum PersistFailureReason { PERSIST_FAILURE_REASON_UNSPECIFIED = 0; - PERSIST_FAILURE_REASON_SHARD_NOT_FOUND = 1; - PERSIST_FAILURE_REASON_SHARD_CLOSED = 2; - PERSIST_FAILURE_REASON_SHARD_RATE_LIMITED = 3; PERSIST_FAILURE_REASON_WAL_FULL = 4; PERSIST_FAILURE_REASON_TIMEOUT = 5; + PERSIST_FAILURE_REASON_NO_SHARDS_AVAILABLE = 6; + PERSIST_FAILURE_REASON_NODE_UNAVAILABLE = 7; } message PersistFailure { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs index 018e19a39a9..07b8d5b64a1 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs @@ -397,11 +397,10 @@ pub struct ObservationMessage { #[repr(i32)] pub enum PersistFailureReason { Unspecified = 0, - ShardNotFound = 1, - ShardClosed = 2, - ShardRateLimited = 3, WalFull = 4, Timeout = 5, + NoShardsAvailable = 6, + NodeUnavailable = 7, } impl PersistFailureReason { /// String value of the enum field names used in the ProtoBuf definition. @@ -411,22 +410,20 @@ impl PersistFailureReason { pub fn as_str_name(&self) -> &'static str { match self { Self::Unspecified => "PERSIST_FAILURE_REASON_UNSPECIFIED", - Self::ShardNotFound => "PERSIST_FAILURE_REASON_SHARD_NOT_FOUND", - Self::ShardClosed => "PERSIST_FAILURE_REASON_SHARD_CLOSED", - Self::ShardRateLimited => "PERSIST_FAILURE_REASON_SHARD_RATE_LIMITED", Self::WalFull => "PERSIST_FAILURE_REASON_WAL_FULL", Self::Timeout => "PERSIST_FAILURE_REASON_TIMEOUT", + Self::NoShardsAvailable => "PERSIST_FAILURE_REASON_NO_SHARDS_AVAILABLE", + Self::NodeUnavailable => "PERSIST_FAILURE_REASON_NODE_UNAVAILABLE", } } /// Creates an enum from field names used in the ProtoBuf definition. pub fn from_str_name(value: &str) -> ::core::option::Option { match value { "PERSIST_FAILURE_REASON_UNSPECIFIED" => Some(Self::Unspecified), - "PERSIST_FAILURE_REASON_SHARD_NOT_FOUND" => Some(Self::ShardNotFound), - "PERSIST_FAILURE_REASON_SHARD_CLOSED" => Some(Self::ShardClosed), - "PERSIST_FAILURE_REASON_SHARD_RATE_LIMITED" => Some(Self::ShardRateLimited), "PERSIST_FAILURE_REASON_WAL_FULL" => Some(Self::WalFull), "PERSIST_FAILURE_REASON_TIMEOUT" => Some(Self::Timeout), + "PERSIST_FAILURE_REASON_NO_SHARDS_AVAILABLE" => Some(Self::NoShardsAvailable), + "PERSIST_FAILURE_REASON_NODE_UNAVAILABLE" => Some(Self::NodeUnavailable), _ => None, } } diff --git a/quickwit/quickwit-proto/src/ingest/mod.rs b/quickwit/quickwit-proto/src/ingest/mod.rs index fda347d7931..6d529d79fbd 100644 --- a/quickwit/quickwit-proto/src/ingest/mod.rs +++ b/quickwit/quickwit-proto/src/ingest/mod.rs @@ -313,11 +313,10 @@ impl From for IngestFailureReason { fn from(reason: PersistFailureReason) -> Self { match reason { PersistFailureReason::Unspecified => IngestFailureReason::Unspecified, - PersistFailureReason::ShardNotFound => IngestFailureReason::NoShardsAvailable, - PersistFailureReason::ShardClosed => IngestFailureReason::NoShardsAvailable, + PersistFailureReason::NoShardsAvailable => IngestFailureReason::NoShardsAvailable, PersistFailureReason::WalFull => IngestFailureReason::WalFull, - PersistFailureReason::ShardRateLimited => IngestFailureReason::ShardRateLimited, PersistFailureReason::Timeout => IngestFailureReason::Timeout, + PersistFailureReason::NodeUnavailable => IngestFailureReason::NoShardsAvailable, } } } @@ -326,8 +325,8 @@ impl From for PersistFailureReason { fn from(reason: ReplicateFailureReason) -> Self { match reason { ReplicateFailureReason::Unspecified => PersistFailureReason::Unspecified, - ReplicateFailureReason::ShardNotFound => PersistFailureReason::ShardNotFound, - ReplicateFailureReason::ShardClosed => PersistFailureReason::ShardClosed, + ReplicateFailureReason::ShardNotFound => PersistFailureReason::NoShardsAvailable, + ReplicateFailureReason::ShardClosed => PersistFailureReason::NoShardsAvailable, ReplicateFailureReason::WalFull => PersistFailureReason::WalFull, } } diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index ca4520ff0ce..60515bc819f 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -82,8 +82,9 @@ use quickwit_indexing::models::ShardPositionsService; use quickwit_indexing::start_indexing_service; use quickwit_ingest::{ GetMemoryCapacity, IngestRequest, IngestRouter, IngestServiceClient, Ingester, IngesterPool, - LocalShardsUpdate, get_idle_shard_timeout, setup_local_shards_update_listener, - start_ingest_api_service, wait_for_ingester_decommission, wait_for_ingester_status, + LocalShardsUpdate, get_idle_shard_timeout, setup_ingester_capacity_update_listener, + setup_local_shards_update_listener, start_ingest_api_service, wait_for_ingester_decommission, + wait_for_ingester_status, }; use quickwit_jaeger::JaegerService; use quickwit_janitor::{JanitorService, start_janitor_service}; @@ -906,6 +907,9 @@ async fn setup_ingest_v2( event_broker.clone(), ); ingest_router.subscribe(); + setup_ingester_capacity_update_listener(cluster.clone(), event_broker.clone()) + .await + .forever(); let ingest_router_service = IngestRouterServiceClient::tower() .stack_layer(INGEST_GRPC_SERVER_METRICS_LAYER.clone())