Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/quickwit-ingest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ bytesize = { workspace = true }
fail = { workspace = true, optional = true }
futures = { workspace = true }
http = { workspace = true }
indexmap = { workspace = true }
itertools = { workspace = true }
mockall = { workspace = true, optional = true }
mrecordlog = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,22 @@ const WAL_CAPACITY_LOOKBACK_WINDOW_LEN: usize = 6;
const WAL_CAPACITY_READINGS_LEN: usize = WAL_CAPACITY_LOOKBACK_WINDOW_LEN + 1;

struct WalMemoryCapacityTimeSeries {
memory_capacity: ByteSize,
readings: RingBuffer<f64, WAL_CAPACITY_READINGS_LEN>,
}

impl WalMemoryCapacityTimeSeries {
fn new() -> Self {
fn new(memory_capacity: ByteSize) -> Self {
#[cfg(not(test))]
assert!(memory_capacity.as_u64() > 0);
Self {
memory_capacity,
readings: RingBuffer::default(),
}
}

fn record(&mut self, memory_used: ByteSize, memory_allocated: ByteSize) {
let allocated = memory_allocated.as_u64();
if allocated == 0 {
self.readings.push_back(1.0);
return;
}
let remaining = 1.0 - (memory_used.as_u64() as f64 / allocated as f64);
fn record(&mut self, memory_used: ByteSize) {
let remaining = 1.0 - (memory_used.as_u64() as f64 / self.memory_capacity.as_u64() as f64);
self.readings.push_back(remaining.clamp(0.0, 1.0));
}

Expand Down Expand Up @@ -121,23 +120,27 @@ pub struct IngesterCapacityScore {

/// Periodically snapshots the ingester's WAL memory usage and open shard counts, computes
/// a capacity score, and broadcasts it to other nodes via Chitchat.
pub(crate) struct BroadcastIngesterCapacityScoreTask {
pub struct BroadcastIngesterCapacityScoreTask {
cluster: Cluster,
weak_state: WeakIngesterState,
wal_capacity_time_series: WalMemoryCapacityTimeSeries,
}

impl BroadcastIngesterCapacityScoreTask {
pub fn spawn(cluster: Cluster, weak_state: WeakIngesterState) -> JoinHandle<()> {
pub fn spawn(
cluster: Cluster,
weak_state: WeakIngesterState,
memory_capacity: ByteSize,
) -> JoinHandle<()> {
let mut broadcaster = Self {
cluster,
weak_state,
wal_capacity_time_series: WalMemoryCapacityTimeSeries::new(),
wal_capacity_time_series: WalMemoryCapacityTimeSeries::new(memory_capacity),
};
tokio::spawn(async move { broadcaster.run().await })
}

async fn snapshot(&self) -> Result<Option<(ByteSize, ByteSize, OpenShardCounts)>> {
async fn snapshot(&self) -> Result<Option<(ByteSize, OpenShardCounts)>> {
let state = self
.weak_state
.upgrade()
Expand All @@ -155,10 +158,9 @@ impl BroadcastIngesterCapacityScoreTask {
.map_err(|_| anyhow::anyhow!("failed to acquire ingester state lock"))?;
let usage = guard.mrecordlog.resource_usage();
let memory_used = ByteSize::b(usage.memory_used_bytes as u64);
let memory_allocated = ByteSize::b(usage.memory_allocated_bytes as u64);
let open_shard_counts = guard.get_open_shard_counts();

Ok(Some((memory_used, memory_allocated, open_shard_counts)))
Ok(Some((memory_used, open_shard_counts)))
}

async fn run(&mut self) {
Expand All @@ -168,7 +170,7 @@ impl BroadcastIngesterCapacityScoreTask {
loop {
interval.tick().await;

let (memory_used, memory_allocated, open_shard_counts) = match self.snapshot().await {
let (memory_used, open_shard_counts) = match self.snapshot().await {
Ok(Some(snapshot)) => snapshot,
Ok(None) => continue,
Err(error) => {
Expand All @@ -177,8 +179,7 @@ impl BroadcastIngesterCapacityScoreTask {
}
};

self.wal_capacity_time_series
.record(memory_used, memory_allocated);
self.wal_capacity_time_series.record(memory_used);

let remaining_capacity = self.wal_capacity_time_series.current().unwrap_or(1.0);
let capacity_delta = self.wal_capacity_time_series.delta().unwrap_or(0.0);
Expand Down Expand Up @@ -272,40 +273,41 @@ mod tests {
use crate::ingest_v2::state::IngesterState;

fn ts() -> WalMemoryCapacityTimeSeries {
WalMemoryCapacityTimeSeries::new()
WalMemoryCapacityTimeSeries::new(ByteSize::b(100))
}

/// Helper: record a reading with `used` out of `allocated` bytes.
fn record(series: &mut WalMemoryCapacityTimeSeries, used: u64, allocated: u64) {
series.record(ByteSize::b(used), ByteSize::b(allocated));
/// Helper: record a reading with `used` bytes against the series' fixed capacity.
fn record(series: &mut WalMemoryCapacityTimeSeries, used: u64) {
series.record(ByteSize::b(used));
}

#[test]
fn test_wal_memory_capacity_current_after_record() {
let mut series = ts();
let mut series = WalMemoryCapacityTimeSeries::new(ByteSize::b(256));
// 192 of 256 used => 25% remaining
record(&mut series, 192, 256);
series.record(ByteSize::b(192));
assert_eq!(series.current(), Some(0.25));

// 16 of 256 used => 93.75% remaining
record(&mut series, 16, 256);
series.record(ByteSize::b(16));
assert_eq!(series.current(), Some(0.9375));
}

#[test]
fn test_wal_memory_capacity_record_saturates_at_zero() {
let mut series = ts();
record(&mut series, 200, 100);
// 200 used out of 100 capacity => clamped to 0.0
record(&mut series, 200);
assert_eq!(series.current(), Some(0.0));
}

#[test]
fn test_wal_memory_capacity_delta_growing() {
let mut series = ts();
// oldest: 60 of 100 used => 40% remaining
record(&mut series, 60, 100);
record(&mut series, 60);
// current: 20 of 100 used => 80% remaining
record(&mut series, 20, 100);
record(&mut series, 20);
// delta = 0.80 - 0.40 = 0.40
assert_eq!(series.delta(), Some(0.40));
}
Expand All @@ -314,9 +316,9 @@ mod tests {
fn test_wal_memory_capacity_delta_shrinking() {
let mut series = ts();
// oldest: 20 of 100 used => 80% remaining
record(&mut series, 20, 100);
record(&mut series, 20);
// current: 60 of 100 used => 40% remaining
record(&mut series, 60, 100);
record(&mut series, 60);
// delta = 0.40 - 0.80 = -0.40
assert_eq!(series.delta(), Some(-0.40));
}
Expand All @@ -326,7 +328,7 @@ mod tests {
// Node A: capacity draining — usage increases 10, 20, ..., 70 over 7 ticks.
let mut node_a = ts();
for used in (10..=70).step_by(10) {
record(&mut node_a, used, 100);
record(&mut node_a, used);
}
let a_remaining = node_a.current().unwrap();
let a_delta = node_a.delta().unwrap();
Expand All @@ -335,7 +337,7 @@ mod tests {
// Node B: steady at 50% usage over 7 ticks.
let mut node_b = ts();
for _ in 0..7 {
record(&mut node_b, 50, 100);
record(&mut node_b, 50);
}
let b_remaining = node_b.current().unwrap();
let b_delta = node_b.delta().unwrap();
Expand All @@ -361,7 +363,7 @@ mod tests {
let task = BroadcastIngesterCapacityScoreTask {
cluster,
weak_state,
wal_capacity_time_series: WalMemoryCapacityTimeSeries::new(),
wal_capacity_time_series: WalMemoryCapacityTimeSeries::new(ByteSize::mb(1)),
};
assert!(task.snapshot().await.is_err());
}
Expand All @@ -388,14 +390,13 @@ mod tests {
let open_shard_counts = state_guard.get_open_shard_counts();
drop(state_guard);

// Simulate 500 of 1000 bytes used => 50% remaining, 0 delta => score = 6
// Simulate 500 of 1000 bytes capacity used => 50% remaining, 0 delta => score = 6
let mut task = BroadcastIngesterCapacityScoreTask {
cluster: cluster.clone(),
weak_state: state.weak(),
wal_capacity_time_series: WalMemoryCapacityTimeSeries::new(),
wal_capacity_time_series: WalMemoryCapacityTimeSeries::new(ByteSize::b(1000)),
};
task.wal_capacity_time_series
.record(ByteSize::b(500), ByteSize::b(1000));
task.wal_capacity_time_series.record(ByteSize::b(500));

let remaining = task.wal_capacity_time_series.current().unwrap();
let delta = task.wal_capacity_time_series.delta().unwrap();
Expand Down Expand Up @@ -440,16 +441,16 @@ mod tests {

// Fill to exactly the lookback window length (6 readings), all same value.
for _ in 0..WAL_CAPACITY_LOOKBACK_WINDOW_LEN {
record(&mut series, 50, 100);
record(&mut series, 50);
}
assert_eq!(series.delta(), Some(0.0));

// 7th reading fills the ring buffer. Delta spans 6 intervals.
record(&mut series, 0, 100);
record(&mut series, 0);
assert_eq!(series.delta(), Some(0.50));

// 8th reading evicts the oldest 50-remaining. Delta still spans 6 intervals.
record(&mut series, 0, 100);
record(&mut series, 0);
assert_eq!(series.delta(), Some(0.50));
}
}
4 changes: 4 additions & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ pub(in crate::ingest_v2) const BROADCAST_INTERVAL_PERIOD: Duration = if cfg!(tes
Duration::from_secs(5)
};

pub use ingester_capacity_score::{
BroadcastIngesterCapacityScoreTask, IngesterCapacityScoreUpdate,
setup_ingester_capacity_update_listener,
};
pub use local_shards::{
BroadcastLocalShardsTask, LocalShardsUpdate, ShardInfo, ShardInfos,
setup_local_shards_update_listener,
Expand Down
5 changes: 3 additions & 2 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use tokio::time::{sleep, timeout};
use tracing::{debug, error, info, warn};

use super::IngesterPool;
use super::broadcast::BroadcastLocalShardsTask;
use super::broadcast::{BroadcastIngesterCapacityScoreTask, BroadcastLocalShardsTask};
use super::doc_mapper::validate_doc_batch;
use super::fetch::FetchStreamTask;
use super::idle::CloseIdleShardsTask;
Expand Down Expand Up @@ -144,7 +144,8 @@ impl Ingester {
let state = IngesterState::load(wal_dir_path, rate_limiter_settings);

let weak_state = state.weak();
BroadcastLocalShardsTask::spawn(cluster, weak_state.clone());
BroadcastLocalShardsTask::spawn(cluster.clone(), weak_state.clone());
BroadcastIngesterCapacityScoreTask::spawn(cluster, weak_state.clone(), memory_capacity);
CloseIdleShardsTask::spawn(weak_state, idle_shard_timeout);

let ingester = Self {
Expand Down
7 changes: 6 additions & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ mod metrics;
mod models;
mod mrecord;
mod mrecordlog_utils;
#[allow(dead_code)]
mod node_routing_table;
mod publish_tracker;
mod rate_meter;
mod replication;
Expand All @@ -36,7 +38,10 @@ use std::ops::{Add, AddAssign};
use std::time::Duration;
use std::{env, fmt};

pub use broadcast::{LocalShardsUpdate, ShardInfo, ShardInfos, setup_local_shards_update_listener};
pub use broadcast::{
LocalShardsUpdate, ShardInfo, ShardInfos, setup_ingester_capacity_update_listener,
setup_local_shards_update_listener,
};
use bytes::buf::Writer;
use bytes::{BufMut, BytesMut};
use bytesize::ByteSize;
Expand Down
Loading
Loading