diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs index 0f3af2bc5ba..11147f975f9 100644 --- a/quickwit/quickwit-common/src/lib.rs +++ b/quickwit/quickwit-common/src/lib.rs @@ -36,6 +36,7 @@ pub mod rate_limited_tracing; pub mod rate_limiter; pub mod rendezvous_hasher; pub mod retry; +pub mod ring_buffer; pub mod runtimes; pub mod shared_consts; pub mod sorted_iter; diff --git a/quickwit/quickwit-common/src/ring_buffer.rs b/quickwit/quickwit-common/src/ring_buffer.rs new file mode 100644 index 00000000000..5d884d8188f --- /dev/null +++ b/quickwit/quickwit-common/src/ring_buffer.rs @@ -0,0 +1,170 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::{Debug, Formatter}; + +/// Fixed-size buffer that keeps the last N elements pushed into it. +/// +/// `head` is the write cursor. It advances by one on each push and wraps +/// back to 0 when it reaches N, overwriting the oldest element. +/// +/// ```text +/// RingBuffer after pushing 1, 2, 3, 4, 5, 6: +/// +/// buffer = [5, 6, 3, 4] head = 2 len = 4 +/// ^ +/// next write goes here +/// +/// logical view (oldest → newest): [3, 4, 5, 6] +/// ``` +pub struct RingBuffer { + buffer: [T; N], + head: usize, + len: usize, +} + +impl Default for RingBuffer { + fn default() -> Self { + Self { + buffer: [T::default(); N], + head: 0, + len: 0, + } + } +} + +impl Debug for RingBuffer { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_list().entries(self.iter()).finish() + } +} + +impl RingBuffer { + pub fn push_back(&mut self, value: T) { + self.buffer[self.head] = value; + self.head = (self.head + 1) % N; + if self.len < N { + self.len += 1; + } + } + + pub fn last(&self) -> Option { + if self.len == 0 { + return None; + } + Some(self.buffer[(self.head + N - 1) % N]) + } + + pub fn front(&self) -> Option { + if self.len == 0 { + return None; + } + Some(self.buffer[(self.head + N - self.len) % N]) + } + + pub fn len(&self) -> usize { + self.len + } + + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + pub fn iter(&self) -> impl Iterator + '_ { + let start = (self.head + N - self.len) % N; + (0..self.len).map(move |i| &self.buffer[(start + i) % N]) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_empty() { + let rb = RingBuffer::::default(); + assert!(rb.is_empty()); + assert_eq!(rb.len(), 0); + assert_eq!(rb.last(), None); + assert_eq!(rb.front(), None); + assert_eq!(rb.iter().count(), 0); + } + + #[test] + fn test_single_push() { + let mut rb = RingBuffer::::default(); + rb.push_back(10); + assert_eq!(rb.len(), 1); + assert!(!rb.is_empty()); + assert_eq!(rb.last(), Some(10)); + assert_eq!(rb.front(), Some(10)); + assert_eq!(rb.iter().copied().collect::>(), vec![10]); + } + + #[test] + fn test_partial_fill() { + let mut rb = RingBuffer::::default(); + rb.push_back(1); + rb.push_back(2); + rb.push_back(3); + assert_eq!(rb.len(), 3); + assert_eq!(rb.last(), Some(3)); + assert_eq!(rb.front(), Some(1)); + assert_eq!(rb.iter().copied().collect::>(), vec![1, 2, 3]); + } + + #[test] + fn test_exactly_full() { + let mut rb = RingBuffer::::default(); + for i in 1..=4 { + rb.push_back(i); + } + assert_eq!(rb.len(), 4); + assert_eq!(rb.last(), Some(4)); + assert_eq!(rb.front(), Some(1)); + assert_eq!(rb.iter().copied().collect::>(), vec![1, 2, 3, 4]); + } + + #[test] + fn test_wrap_around() { + let mut rb = RingBuffer::::default(); + for i in 1..=6 { + rb.push_back(i); + } + assert_eq!(rb.len(), 4); + assert_eq!(rb.last(), Some(6)); + assert_eq!(rb.front(), Some(3)); + assert_eq!(rb.iter().copied().collect::>(), vec![3, 4, 5, 6]); + } + + #[test] + fn test_many_wraps() { + let mut rb = RingBuffer::::default(); + for i in 1..=100 { + rb.push_back(i); + } + assert_eq!(rb.len(), 3); + assert_eq!(rb.last(), Some(100)); + assert_eq!(rb.front(), Some(98)); + assert_eq!(rb.iter().copied().collect::>(), vec![98, 99, 100]); + } + + #[test] + fn test_debug() { + let mut rb = RingBuffer::::default(); + rb.push_back(1); + rb.push_back(2); + assert_eq!(format!("{:?}", rb), "[1, 2]"); + } +} diff --git a/quickwit/quickwit-common/src/shared_consts.rs b/quickwit/quickwit-common/src/shared_consts.rs index 9923705f0b2..437058f28fb 100644 --- a/quickwit/quickwit-common/src/shared_consts.rs +++ b/quickwit/quickwit-common/src/shared_consts.rs @@ -64,6 +64,9 @@ pub const SCROLL_BATCH_LEN: usize = 1_000; /// Prefix used in chitchat to broadcast the list of primary shards hosted by a leader. pub const INGESTER_PRIMARY_SHARDS_PREFIX: &str = "ingester.primary_shards:"; +/// Prefix used in chitchat to broadcast per-source ingester capacity scores and open shard counts. +pub const INGESTER_CAPACITY_SCORE_PREFIX: &str = "ingester.capacity_score:"; + /// File name for the encoded list of fields in the split pub const SPLIT_FIELDS_FILE_NAME: &str = "split_fields"; diff --git a/quickwit/quickwit-ingest/Cargo.toml b/quickwit/quickwit-ingest/Cargo.toml index 3dfa0bf6c0c..3149f2aaaf3 100644 --- a/quickwit/quickwit-ingest/Cargo.toml +++ b/quickwit/quickwit-ingest/Cargo.toml @@ -18,6 +18,7 @@ bytesize = { workspace = true } fail = { workspace = true, optional = true } futures = { workspace = true } http = { workspace = true } +itertools = { workspace = true } mockall = { workspace = true, optional = true } mrecordlog = { workspace = true } once_cell = { workspace = true } @@ -43,7 +44,6 @@ quickwit-doc-mapper = { workspace = true, features = ["testsuite"] } quickwit-proto = { workspace = true } [dev-dependencies] -itertools = { workspace = true } mockall = { workspace = true } rand = { workspace = true } rand_distr = { workspace = true } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity_score.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity_score.rs new file mode 100644 index 00000000000..6f8abc66ef8 --- /dev/null +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/ingester_capacity_score.rs @@ -0,0 +1,455 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeSet; + +use anyhow::{Context, Result}; +use bytesize::ByteSize; +use quickwit_cluster::{Cluster, ListenerHandle}; +use quickwit_common::pubsub::{Event, EventBroker}; +use quickwit_common::ring_buffer::RingBuffer; +use quickwit_common::shared_consts::INGESTER_CAPACITY_SCORE_PREFIX; +use quickwit_proto::ingest::ingester::IngesterStatus; +use quickwit_proto::types::{IndexUid, NodeId, SourceId, SourceUid}; +use serde::{Deserialize, Serialize}; +use tokio::task::JoinHandle; +use tracing::{info, warn}; + +use super::{BROADCAST_INTERVAL_PERIOD, make_key, parse_key}; +use crate::ingest_v2::state::WeakIngesterState; + +pub type OpenShardCounts = Vec<(IndexUid, SourceId, usize)>; + +/// The lookback window length is meant to capture readings far enough back in time to give +/// a rough rate of change estimate. At size 6, with broadcast interval of 5 seconds, this would be +/// 30 seconds of readings. +const WAL_CAPACITY_LOOKBACK_WINDOW_LEN: usize = 6; + +/// The ring buffer stores one extra element so that `delta()` can compare the newest reading +/// with the one that is exactly `WAL_CAPACITY_LOOKBACK_WINDOW_LEN` steps ago. Otherwise, that +/// reading would be discarded when the next reading is inserted. +const WAL_CAPACITY_READINGS_LEN: usize = WAL_CAPACITY_LOOKBACK_WINDOW_LEN + 1; + +struct WalMemoryCapacityTimeSeries { + readings: RingBuffer, +} + +impl WalMemoryCapacityTimeSeries { + fn new() -> Self { + Self { + readings: RingBuffer::default(), + } + } + + fn record(&mut self, memory_used: ByteSize, memory_allocated: ByteSize) { + let allocated = memory_allocated.as_u64(); + if allocated == 0 { + self.readings.push_back(1.0); + return; + } + let remaining = 1.0 - (memory_used.as_u64() as f64 / allocated as f64); + self.readings.push_back(remaining.clamp(0.0, 1.0)); + } + + fn current(&self) -> Option { + self.readings.last() + } + + /// How much remaining capacity changed between the oldest and newest readings. + /// Positive = improving, negative = draining. + fn delta(&self) -> Option { + let current = self.readings.last()?; + let oldest = self.readings.front()?; + Some(current - oldest) + } +} + +/// Computes a capacity score from 0 to 10 using a PD controller. +/// +/// The score has two components: +/// +/// - **P (proportional):** How much WAL capacity remains right now. An ingester with 100% free +/// capacity gets `PROPORTIONAL_WEIGHT` points; 50% gets half; and so on. If remaining capacity +/// drops to `MIN_PERMISSIBLE_CAPACITY` or below, the score is immediately 0. +/// +/// - **D (derivative):** Up to `DERIVATIVE_WEIGHT` bonus points based on how fast remaining +/// capacity is changing over the lookback window. A higher drain rate is worse, so we invert it: +/// `drain / MAX_DRAIN_RATE` normalizes the drain to a 0–1 penalty, and subtracting from 1 +/// converts it into a 0–1 bonus. Multiplied by `DERIVATIVE_WEIGHT`, a stable node gets the full +/// bonus and a node draining at `MAX_DRAIN_RATE` or faster gets nothing. +/// +/// Putting it together: a completely idle ingester scores 10 (8 + 2). +/// One that is full but stable scores ~2. One that is draining rapidly scores less. +/// A score of 0 means the ingester is at or below minimum permissible capacity. +/// +/// Below this remaining capacity fraction, the score is immediately 0. +const MIN_PERMISSIBLE_CAPACITY: f64 = 0.05; +/// Weight of the proportional term (max points from P). +const PROPORTIONAL_WEIGHT: f64 = 8.0; +/// Weight of the derivative term (max points from D). +const DERIVATIVE_WEIGHT: f64 = 2.0; +/// The drain rate (as a fraction of total capacity over the lookback window) at which the +/// derivative penalty is fully applied. Drain rates beyond this are clamped. +const MAX_DRAIN_RATE: f64 = 0.10; + +fn compute_capacity_score(remaining_capacity: f64, capacity_delta: f64) -> usize { + if remaining_capacity <= MIN_PERMISSIBLE_CAPACITY { + return 0; + } + let p = PROPORTIONAL_WEIGHT * remaining_capacity; + let drain = (-capacity_delta).clamp(0.0, MAX_DRAIN_RATE); + let d = DERIVATIVE_WEIGHT * (1.0 - drain / MAX_DRAIN_RATE); + (p + d).clamp(0.0, 10.0) as usize +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct IngesterCapacityScore { + pub capacity_score: usize, + pub open_shard_count: usize, +} + +/// Periodically snapshots the ingester's WAL memory usage and open shard counts, computes +/// a capacity score, and broadcasts it to other nodes via Chitchat. +pub(crate) struct BroadcastIngesterCapacityScoreTask { + cluster: Cluster, + weak_state: WeakIngesterState, + wal_capacity_time_series: WalMemoryCapacityTimeSeries, +} + +impl BroadcastIngesterCapacityScoreTask { + pub fn spawn(cluster: Cluster, weak_state: WeakIngesterState) -> JoinHandle<()> { + let mut broadcaster = Self { + cluster, + weak_state, + wal_capacity_time_series: WalMemoryCapacityTimeSeries::new(), + }; + tokio::spawn(async move { broadcaster.run().await }) + } + + async fn snapshot(&self) -> Result> { + let state = self + .weak_state + .upgrade() + .context("ingester state has been dropped")?; + + // lock fully asserts that the ingester is ready. There's a likelihood that this task runs + // before the WAL is loaded, so we make sure that the ingester is ready just in case. + if *state.status_rx.borrow() != IngesterStatus::Ready { + return Ok(None); + } + + let guard = state + .lock_fully() + .await + .map_err(|_| anyhow::anyhow!("failed to acquire ingester state lock"))?; + let usage = guard.mrecordlog.resource_usage(); + let memory_used = ByteSize::b(usage.memory_used_bytes as u64); + let memory_allocated = ByteSize::b(usage.memory_allocated_bytes as u64); + let open_shard_counts = guard.get_open_shard_counts(); + + Ok(Some((memory_used, memory_allocated, open_shard_counts))) + } + + async fn run(&mut self) { + let mut interval = tokio::time::interval(BROADCAST_INTERVAL_PERIOD); + let mut previous_sources: BTreeSet = BTreeSet::new(); + + loop { + interval.tick().await; + + let (memory_used, memory_allocated, open_shard_counts) = match self.snapshot().await { + Ok(Some(snapshot)) => snapshot, + Ok(None) => continue, + Err(error) => { + info!("stopping ingester capacity broadcast: {error}"); + return; + } + }; + + self.wal_capacity_time_series + .record(memory_used, memory_allocated); + + let remaining_capacity = self.wal_capacity_time_series.current().unwrap_or(1.0); + let capacity_delta = self.wal_capacity_time_series.delta().unwrap_or(0.0); + let capacity_score = compute_capacity_score(remaining_capacity, capacity_delta); + + previous_sources = self + .broadcast_capacity(capacity_score, &open_shard_counts, &previous_sources) + .await; + } + } + + async fn broadcast_capacity( + &self, + capacity_score: usize, + open_shard_counts: &OpenShardCounts, + previous_sources: &BTreeSet, + ) -> BTreeSet { + let mut current_sources = BTreeSet::new(); + + for (index_uid, source_id, open_shard_count) in open_shard_counts { + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: source_id.clone(), + }; + let key = make_key(INGESTER_CAPACITY_SCORE_PREFIX, &source_uid); + let capacity = IngesterCapacityScore { + capacity_score, + open_shard_count: *open_shard_count, + }; + let value = serde_json::to_string(&capacity) + .expect("`IngesterCapacityScore` should be JSON serializable"); + self.cluster.set_self_key_value(key, value).await; + current_sources.insert(source_uid); + } + + for removed_source in previous_sources.difference(¤t_sources) { + let key = make_key(INGESTER_CAPACITY_SCORE_PREFIX, removed_source); + self.cluster.remove_self_key(&key).await; + } + + current_sources + } +} + +#[derive(Debug, Clone)] +pub struct IngesterCapacityScoreUpdate { + pub node_id: NodeId, + pub source_uid: SourceUid, + pub capacity_score: usize, + pub open_shard_count: usize, +} + +impl Event for IngesterCapacityScoreUpdate {} + +pub async fn setup_ingester_capacity_update_listener( + cluster: Cluster, + event_broker: EventBroker, +) -> ListenerHandle { + cluster + .subscribe(INGESTER_CAPACITY_SCORE_PREFIX, move |event| { + let Some(source_uid) = parse_key(event.key) else { + warn!("failed to parse source UID from key `{}`", event.key); + return; + }; + let Ok(ingester_capacity) = serde_json::from_str::(event.value) + else { + warn!("failed to parse ingester capacity `{}`", event.value); + return; + }; + let node_id: NodeId = event.node.node_id.clone().into(); + event_broker.publish(IngesterCapacityScoreUpdate { + node_id, + source_uid, + capacity_score: ingester_capacity.capacity_score, + open_shard_count: ingester_capacity.open_shard_count, + }); + }) + .await +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::sync::atomic::{AtomicUsize, Ordering}; + + use quickwit_cluster::{ChannelTransport, create_cluster_for_test}; + use quickwit_proto::types::ShardId; + + use super::*; + use crate::ingest_v2::models::IngesterShard; + use crate::ingest_v2::state::IngesterState; + + fn ts() -> WalMemoryCapacityTimeSeries { + WalMemoryCapacityTimeSeries::new() + } + + /// Helper: record a reading with `used` out of `allocated` bytes. + fn record(series: &mut WalMemoryCapacityTimeSeries, used: u64, allocated: u64) { + series.record(ByteSize::b(used), ByteSize::b(allocated)); + } + + #[test] + fn test_wal_memory_capacity_current_after_record() { + let mut series = ts(); + // 192 of 256 used => 25% remaining + record(&mut series, 192, 256); + assert_eq!(series.current(), Some(0.25)); + + // 16 of 256 used => 93.75% remaining + record(&mut series, 16, 256); + assert_eq!(series.current(), Some(0.9375)); + } + + #[test] + fn test_wal_memory_capacity_record_saturates_at_zero() { + let mut series = ts(); + record(&mut series, 200, 100); + assert_eq!(series.current(), Some(0.0)); + } + + #[test] + fn test_wal_memory_capacity_delta_growing() { + let mut series = ts(); + // oldest: 60 of 100 used => 40% remaining + record(&mut series, 60, 100); + // current: 20 of 100 used => 80% remaining + record(&mut series, 20, 100); + // delta = 0.80 - 0.40 = 0.40 + assert_eq!(series.delta(), Some(0.40)); + } + + #[test] + fn test_wal_memory_capacity_delta_shrinking() { + let mut series = ts(); + // oldest: 20 of 100 used => 80% remaining + record(&mut series, 20, 100); + // current: 60 of 100 used => 40% remaining + record(&mut series, 60, 100); + // delta = 0.40 - 0.80 = -0.40 + assert_eq!(series.delta(), Some(-0.40)); + } + + #[test] + fn test_capacity_score_draining_vs_stable() { + // Node A: capacity draining — usage increases 10, 20, ..., 70 over 7 ticks. + let mut node_a = ts(); + for used in (10..=70).step_by(10) { + record(&mut node_a, used, 100); + } + let a_remaining = node_a.current().unwrap(); + let a_delta = node_a.delta().unwrap(); + let a_score = compute_capacity_score(a_remaining, a_delta); + + // Node B: steady at 50% usage over 7 ticks. + let mut node_b = ts(); + for _ in 0..7 { + record(&mut node_b, 50, 100); + } + let b_remaining = node_b.current().unwrap(); + let b_delta = node_b.delta().unwrap(); + let b_score = compute_capacity_score(b_remaining, b_delta); + + // p=2.4, d=0 (max drain) => 2 + assert_eq!(a_score, 2); + // p=4, d=2 (stable) => 6 + assert_eq!(b_score, 6); + assert!(b_score > a_score); + } + + #[tokio::test] + async fn test_snapshot_state_dropped() { + let transport = ChannelTransport::default(); + let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) + .await + .unwrap(); + let (_temp_dir, state) = IngesterState::for_test().await; + let weak_state = state.weak(); + drop(state); + + let task = BroadcastIngesterCapacityScoreTask { + cluster, + weak_state, + wal_capacity_time_series: WalMemoryCapacityTimeSeries::new(), + }; + assert!(task.snapshot().await.is_err()); + } + + #[tokio::test] + async fn test_broadcast_ingester_capacity() { + let transport = ChannelTransport::default(); + let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) + .await + .unwrap(); + let event_broker = EventBroker::default(); + + let (_temp_dir, state) = IngesterState::for_test().await; + let index_uid = IndexUid::for_test("test-index", 0); + let mut state_guard = state.lock_partially().await.unwrap(); + let shard = IngesterShard::new_solo( + index_uid.clone(), + SourceId::from("test-source"), + ShardId::from(0), + ) + .advertisable() + .build(); + state_guard.shards.insert(shard.queue_id(), shard); + let open_shard_counts = state_guard.get_open_shard_counts(); + drop(state_guard); + + // Simulate 500 of 1000 bytes used => 50% remaining, 0 delta => score = 6 + let mut task = BroadcastIngesterCapacityScoreTask { + cluster: cluster.clone(), + weak_state: state.weak(), + wal_capacity_time_series: WalMemoryCapacityTimeSeries::new(), + }; + task.wal_capacity_time_series + .record(ByteSize::b(500), ByteSize::b(1000)); + + let remaining = task.wal_capacity_time_series.current().unwrap(); + let delta = task.wal_capacity_time_series.delta().unwrap(); + let capacity_score = compute_capacity_score(remaining, delta); + assert_eq!(capacity_score, 6); + + let update_counter = Arc::new(AtomicUsize::new(0)); + let update_counter_clone = update_counter.clone(); + let index_uid_clone = index_uid.clone(); + let _sub = event_broker.subscribe(move |event: IngesterCapacityScoreUpdate| { + update_counter_clone.fetch_add(1, Ordering::Release); + assert_eq!(event.source_uid.index_uid, index_uid_clone); + assert_eq!(event.source_uid.source_id, "test-source"); + assert_eq!(event.capacity_score, 6); + assert_eq!(event.open_shard_count, 1); + }); + + let _listener = + setup_ingester_capacity_update_listener(cluster.clone(), event_broker).await; + + let previous_sources = BTreeSet::new(); + task.broadcast_capacity(capacity_score, &open_shard_counts, &previous_sources) + .await; + tokio::time::sleep(BROADCAST_INTERVAL_PERIOD * 2).await; + + assert_eq!(update_counter.load(Ordering::Acquire), 1); + + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: SourceId::from("test-source"), + }; + let key = make_key(INGESTER_CAPACITY_SCORE_PREFIX, &source_uid); + let value = cluster.get_self_key_value(&key).await.unwrap(); + let deserialized: IngesterCapacityScore = serde_json::from_str(&value).unwrap(); + assert_eq!(deserialized.capacity_score, 6); + assert_eq!(deserialized.open_shard_count, 1); + } + + #[test] + fn test_wal_memory_capacity_delta_spans_lookback_window() { + let mut series = ts(); + + // Fill to exactly the lookback window length (6 readings), all same value. + for _ in 0..WAL_CAPACITY_LOOKBACK_WINDOW_LEN { + record(&mut series, 50, 100); + } + assert_eq!(series.delta(), Some(0.0)); + + // 7th reading fills the ring buffer. Delta spans 6 intervals. + record(&mut series, 0, 100); + assert_eq!(series.delta(), Some(0.50)); + + // 8th reading evicts the oldest 50-remaining. Delta still spans 6 intervals. + record(&mut series, 0, 100); + assert_eq!(series.delta(), Some(0.50)); + } +} diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/local_shards.rs similarity index 91% rename from quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs rename to quickwit/quickwit-ingest/src/ingest_v2/broadcast/local_shards.rs index 9bbbe94bb47..6ba10915f56 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/local_shards.rs @@ -18,6 +18,7 @@ use std::time::Duration; use bytesize::ByteSize; use quickwit_cluster::{Cluster, ListenerHandle}; use quickwit_common::pubsub::{Event, EventBroker}; +use quickwit_common::ring_buffer::RingBuffer; use quickwit_common::shared_consts::INGESTER_PRIMARY_SHARDS_PREFIX; use quickwit_common::sorted_iter::{KeyDiff, SortedByKeyIterator}; use quickwit_common::tower::{ConstantRate, Rate}; @@ -27,15 +28,10 @@ use serde::{Deserialize, Serialize, Serializer}; use tokio::task::JoinHandle; use tracing::{debug, warn}; -use super::metrics::INGEST_V2_METRICS; -use super::state::WeakIngesterState; +use super::{BROADCAST_INTERVAL_PERIOD, make_key, parse_key}; use crate::RateMibPerSec; - -const BROADCAST_INTERVAL_PERIOD: Duration = if cfg!(test) { - Duration::from_millis(50) -} else { - Duration::from_secs(5) -}; +use crate::ingest_v2::metrics::INGEST_V2_METRICS; +use crate::ingest_v2::state::WeakIngesterState; const ONE_MIB: ByteSize = ByteSize::mib(1); @@ -152,7 +148,7 @@ impl LocalShardsSnapshot { /// Takes a snapshot of the primary shards hosted by the ingester at regular intervals and /// broadcasts it to other nodes via Chitchat. -pub(super) struct BroadcastLocalShardsTask { +pub struct BroadcastLocalShardsTask { cluster: Cluster, weak_state: WeakIngesterState, shard_throughput_time_series_map: ShardThroughputTimeSeriesMap, @@ -229,36 +225,24 @@ impl ShardThroughputTimeSeriesMap { #[derive(Default)] struct ShardThroughputTimeSeries { shard_state: ShardState, - measurements: [ByteSize; SHARD_THROUGHPUT_LONG_TERM_WINDOW_LEN], - len: usize, + throughput: RingBuffer, } impl ShardThroughputTimeSeries { fn last(&self) -> ByteSize { - self.measurements.last().copied().unwrap_or_default() + self.throughput.last().unwrap_or_default() } fn average(&self) -> ByteSize { - if self.len == 0 { + if self.throughput.is_empty() { return ByteSize::default(); } - let sum = self - .measurements - .iter() - .rev() - .take(self.len) - .map(ByteSize::as_u64) - .sum::(); - ByteSize::b(sum / self.len as u64) + let sum = self.throughput.iter().map(ByteSize::as_u64).sum::(); + ByteSize::b(sum / self.throughput.len() as u64) } fn record(&mut self, new_throughput_measurement: ByteSize) { - self.len = (self.len + 1).min(SHARD_THROUGHPUT_LONG_TERM_WINDOW_LEN); - self.measurements.rotate_left(1); - let Some(last_measurement) = self.measurements.last_mut() else { - return; - }; - *last_measurement = new_throughput_measurement; + self.throughput.push_back(new_throughput_measurement); } } @@ -338,13 +322,13 @@ impl BroadcastLocalShardsTask { source_uid, shard_infos, } => { - let key = make_key(source_uid); + let key = make_key(INGESTER_PRIMARY_SHARDS_PREFIX, source_uid); let value = serde_json::to_string(&shard_infos) .expect("`ShardInfos` should be JSON serializable"); self.cluster.set_self_key_value(key, value).await; } ShardInfosChange::Removed { source_uid } => { - let key = make_key(source_uid); + let key = make_key(INGESTER_PRIMARY_SHARDS_PREFIX, source_uid); self.cluster.remove_self_key(&key).await; } } @@ -371,22 +355,6 @@ impl BroadcastLocalShardsTask { } } -fn make_key(source_uid: &SourceUid) -> String { - format!( - "{INGESTER_PRIMARY_SHARDS_PREFIX}{}:{}", - source_uid.index_uid, source_uid.source_id - ) -} - -fn parse_key(key: &str) -> Option { - let (index_uid_str, source_id_str) = key.rsplit_once(':')?; - - Some(SourceUid { - index_uid: index_uid_str.parse().ok()?, - source_id: source_id_str.to_string(), - }) -} - #[derive(Debug, Clone)] pub struct LocalShardsUpdate { pub leader_id: NodeId, @@ -429,10 +397,12 @@ mod tests { use std::sync::atomic::{AtomicUsize, Ordering}; use quickwit_cluster::{ChannelTransport, create_cluster_for_test}; + use quickwit_common::shared_consts::INGESTER_PRIMARY_SHARDS_PREFIX; use quickwit_proto::ingest::ShardState; - use quickwit_proto::types::{IndexUid, SourceId}; + use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceId, SourceUid}; use super::*; + use crate::RateMibPerSec; use crate::ingest_v2::models::IngesterShard; use crate::ingest_v2::state::IngesterState; @@ -626,30 +596,6 @@ mod tests { assert!(value_opt.is_none()); } - #[test] - fn test_make_key() { - let source_uid = SourceUid { - index_uid: IndexUid::for_test("test-index", 0), - source_id: SourceId::from("test-source"), - }; - let key = make_key(&source_uid); - assert_eq!( - key, - "ingester.primary_shards:test-index:00000000000000000000000000:test-source" - ); - } - - #[test] - fn test_parse_key() { - let key = "test-index:00000000000000000000000000:test-source"; - let source_uid = parse_key(key).unwrap(); - assert_eq!( - &source_uid.index_uid.to_string(), - "test-index:00000000000000000000000000" - ); - assert_eq!(source_uid.source_id, "test-source".to_string()); - } - #[tokio::test] async fn test_local_shards_update_listener() { let transport = ChannelTransport::default(); @@ -686,7 +632,7 @@ mod tests { index_uid: index_uid.clone(), source_id: SourceId::from("test-source"), }; - let key = make_key(&source_uid); + let key = make_key(INGESTER_PRIMARY_SHARDS_PREFIX, &source_uid); let value = serde_json::to_string(&vec![ShardInfo { shard_id: ShardId::from(1), shard_state: ShardState::Open, diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs new file mode 100644 index 00000000000..d2184a0e392 --- /dev/null +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs @@ -0,0 +1,76 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[allow(dead_code)] +mod ingester_capacity_score; +mod local_shards; + +use std::time::Duration; + +use quickwit_proto::types::SourceUid; + +pub(in crate::ingest_v2) const BROADCAST_INTERVAL_PERIOD: Duration = if cfg!(test) { + Duration::from_millis(50) +} else { + Duration::from_secs(5) +}; + +pub use local_shards::{ + BroadcastLocalShardsTask, LocalShardsUpdate, ShardInfo, ShardInfos, + setup_local_shards_update_listener, +}; + +fn make_key(prefix: &str, source_uid: &SourceUid) -> String { + format!("{prefix}{}:{}", source_uid.index_uid, source_uid.source_id) +} + +fn parse_key(key: &str) -> Option { + let (index_uid_str, source_id_str) = key.rsplit_once(':')?; + Some(SourceUid { + index_uid: index_uid_str.parse().ok()?, + source_id: source_id_str.to_string(), + }) +} + +#[cfg(test)] +mod tests { + use quickwit_common::shared_consts::INGESTER_PRIMARY_SHARDS_PREFIX; + use quickwit_proto::types::{IndexUid, SourceId, SourceUid}; + + use super::*; + + #[test] + fn test_make_key() { + let source_uid = SourceUid { + index_uid: IndexUid::for_test("test-index", 0), + source_id: SourceId::from("test-source"), + }; + let key = make_key(INGESTER_PRIMARY_SHARDS_PREFIX, &source_uid); + assert_eq!( + key, + "ingester.primary_shards:test-index:00000000000000000000000000:test-source" + ); + } + + #[test] + fn test_parse_key() { + let key = "test-index:00000000000000000000000000:test-source"; + let source_uid = parse_key(key).unwrap(); + assert_eq!( + &source_uid.index_uid.to_string(), + "test-index:00000000000000000000000000" + ); + assert_eq!(source_uid.source_id, "test-source".to_string()); + } +} diff --git a/quickwit/quickwit-ingest/src/ingest_v2/state.rs b/quickwit/quickwit-ingest/src/ingest_v2/state.rs index 591ef4f704f..bf1c648c6cb 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/state.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/state.rs @@ -19,6 +19,7 @@ use std::path::Path; use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; +use itertools::Itertools; use mrecordlog::error::{DeleteQueueError, TruncateError}; use quickwit_common::pretty::PrettyDisplay; use quickwit_common::rate_limiter::{RateLimiter, RateLimiterSettings}; @@ -87,6 +88,17 @@ impl InnerIngesterState { .max_by_key(|(available_permits, _)| *available_permits) .map(|(_, shard)| shard) } + + pub fn get_open_shard_counts(&self) -> Vec<(IndexUid, SourceId, usize)> { + self.shards + .values() + .filter(|shard| shard.is_advertisable && !shard.is_replica() && shard.is_open()) + .map(|shard| (shard.index_uid.clone(), shard.source_id.clone())) + .counts() + .into_iter() + .map(|((index_uid, source_id), count)| (index_uid, source_id, count)) + .collect() + } } impl IngesterState { @@ -467,7 +479,7 @@ impl WeakIngesterState { #[cfg(test)] mod tests { use bytesize::ByteSize; - use quickwit_proto::types::ShardId; + use quickwit_proto::types::{NodeId, ShardId, SourceId}; use tokio::time::timeout; use super::*; @@ -642,4 +654,77 @@ mod tests { locked_state.find_most_capacity_shard_mut(&index_uid, &SourceId::from("other-source")); assert!(shard_opt.is_none()); } + + fn open_shard( + index_uid: IndexUid, + source_id: SourceId, + shard_id: ShardId, + is_replica: bool, + ) -> IngesterShard { + let builder = if is_replica { + IngesterShard::new_replica(index_uid, source_id, shard_id, NodeId::from("test-leader")) + } else { + IngesterShard::new_solo(index_uid, source_id, shard_id) + }; + builder.advertisable().build() + } + + #[tokio::test] + async fn test_get_open_shard_counts() { + let (_temp_dir, state) = IngesterState::for_test().await; + let mut state_guard = state.lock_partially().await.unwrap(); + + let index_a = IndexUid::for_test("index-a", 0); + let index_b = IndexUid::for_test("index-b", 0); + let index_c = IndexUid::for_test("index-c", 0); + + // (index-a, source-a): 1 open solo shard. + let s = open_shard( + index_a.clone(), + SourceId::from("source-a"), + ShardId::from(1), + false, + ); + state_guard.shards.insert(s.queue_id(), s); + + // (index-b, source-b): 1 open solo + 1 replica. Only the solo should be counted. + let s = open_shard( + index_b.clone(), + SourceId::from("source-b"), + ShardId::from(2), + false, + ); + state_guard.shards.insert(s.queue_id(), s); + let s = open_shard( + index_b.clone(), + SourceId::from("source-b"), + ShardId::from(3), + true, + ); + state_guard.shards.insert(s.queue_id(), s); + + // (index-c, source-c): 2 open solo shards. + let s = open_shard( + index_c.clone(), + SourceId::from("source-c"), + ShardId::from(4), + false, + ); + state_guard.shards.insert(s.queue_id(), s); + let s = open_shard( + index_c.clone(), + SourceId::from("source-c"), + ShardId::from(5), + false, + ); + state_guard.shards.insert(s.queue_id(), s); + + let mut counts = state_guard.get_open_shard_counts(); + counts.sort_by(|a, b| a.0.cmp(&b.0)); + + assert_eq!(counts.len(), 3); + assert_eq!(counts[0], (index_a, SourceId::from("source-a"), 1)); + assert_eq!(counts[1], (index_b, SourceId::from("source-b"), 1)); + assert_eq!(counts[2], (index_c, SourceId::from("source-c"), 2)); + } }