Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions quickwit/quickwit-ingest/src/ingest_v2/debouncing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::sync::Arc;
use quickwit_proto::control_plane::{
GetOrCreateOpenShardsRequest, GetOrCreateOpenShardsSubrequest,
};
use quickwit_proto::ingest::ShardIds;
use quickwit_proto::types::{IndexId, SourceId};
use tokio::sync::{OwnedRwLockWriteGuard, RwLock};

Expand Down Expand Up @@ -69,7 +68,6 @@ impl GetOrCreateOpenShardsRequestDebouncer {
#[derive(Default)]
pub(super) struct DebouncedGetOrCreateOpenShardsRequest {
subrequests: Vec<GetOrCreateOpenShardsSubrequest>,
pub closed_shards: Vec<ShardIds>,
pub unavailable_leaders: Vec<String>,
rendezvous: Rendezvous,
}
Expand All @@ -85,8 +83,8 @@ impl DebouncedGetOrCreateOpenShardsRequest {
}
let request = GetOrCreateOpenShardsRequest {
subrequests: self.subrequests,
closed_shards: self.closed_shards,
unavailable_leaders: self.unavailable_leaders,
..Default::default()
};
(Some(request), self.rendezvous)
}
Expand Down
46 changes: 17 additions & 29 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,7 @@ use quickwit_proto::control_plane::{
AdviseResetShardsRequest, ControlPlaneService, ControlPlaneServiceClient,
};
use quickwit_proto::indexing::ShardPositionsUpdate;
use quickwit_proto::ingest::ingester::{
AckReplicationMessage, CloseShardsRequest, CloseShardsResponse, DecommissionRequest,
DecommissionResponse, FetchMessage, IngesterService, IngesterServiceClient,
IngesterServiceStream, IngesterStatus, InitShardFailure, InitShardSuccess, InitShardsRequest,
InitShardsResponse, ObservationMessage, OpenFetchStreamRequest, OpenObservationStreamRequest,
OpenReplicationStreamRequest, OpenReplicationStreamResponse, PersistFailure,
PersistFailureReason, PersistRequest, PersistResponse, PersistSuccess, ReplicateFailureReason,
ReplicateSubrequest, RetainShardsForSource, RetainShardsRequest, RetainShardsResponse,
SynReplicationMessage, TruncateShardsRequest, TruncateShardsResponse,
};
use quickwit_proto::ingest::ingester::*;
use quickwit_proto::ingest::{
CommitTypeV2, DocBatchV2, IngestV2Error, IngestV2Result, ParseFailure, Shard, ShardIds,
};
Expand Down Expand Up @@ -469,7 +460,7 @@ impl Ingester {
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
reason: PersistFailureReason::ShardClosed as i32,
reason: PersistFailureReason::NodeUnavailable as i32,
};
persist_failures.push(persist_failure);
}
Expand Down Expand Up @@ -499,7 +490,7 @@ impl Ingester {
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
reason: PersistFailureReason::ShardNotFound as i32,
reason: PersistFailureReason::NoShardsAvailable as i32,
};
persist_failures.push(persist_failure);
continue;
Expand Down Expand Up @@ -558,7 +549,7 @@ impl Ingester {
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: Some(shard_id),
reason: PersistFailureReason::ShardRateLimited as i32,
reason: PersistFailureReason::NoShardsAvailable as i32,
};
persist_failures.push(persist_failure);
continue;
Expand Down Expand Up @@ -673,7 +664,7 @@ impl Ingester {
// TODO: Handle replication error:
// 1. Close and evict all the shards hosted by the follower.
// 2. Close and evict the replication client.
// 3. Return `PersistFailureReason::ShardClosed` to router.
// 3. Return `PersistFailureReason::NodeUnavailable` to router.
continue;
}
};
Expand All @@ -689,14 +680,8 @@ impl Ingester {
for replicate_failure in replicate_response.failures {
// TODO: If the replica shard is closed, close the primary shard if it is not
// already.
let persist_failure_reason = match replicate_failure.reason() {
ReplicateFailureReason::Unspecified => PersistFailureReason::Unspecified,
ReplicateFailureReason::ShardNotFound => {
PersistFailureReason::ShardNotFound
}
ReplicateFailureReason::ShardClosed => PersistFailureReason::ShardClosed,
ReplicateFailureReason::WalFull => PersistFailureReason::WalFull,
};
let persist_failure_reason: PersistFailureReason =
replicate_failure.reason().into();
let persist_failure = PersistFailure {
subrequest_id: replicate_failure.subrequest_id,
index_uid: replicate_failure.index_uid,
Expand Down Expand Up @@ -736,15 +721,15 @@ impl Ingester {
"failed to persist records to shard `{queue_id}`: {io_error}"
);
shards_to_close.insert(queue_id);
PersistFailureReason::ShardClosed
PersistFailureReason::NodeUnavailable
}
AppendDocBatchError::QueueNotFound(_) => {
error!(
"failed to persist records to shard `{queue_id}`: WAL queue \
not found"
);
shards_to_delete.insert(queue_id);
PersistFailureReason::ShardNotFound
PersistFailureReason::NodeUnavailable
}
};
let persist_failure = PersistFailure {
Expand Down Expand Up @@ -2159,7 +2144,7 @@ mod tests {
let persist_failure = &persist_response.failures[0];
assert_eq!(
persist_failure.reason(),
PersistFailureReason::ShardRateLimited
PersistFailureReason::NoShardsAvailable
);
}

Expand Down Expand Up @@ -2222,7 +2207,10 @@ mod tests {
assert_eq!(persist_failure.index_uid(), &index_uid);
assert_eq!(persist_failure.source_id, "test-source");
assert_eq!(persist_failure.shard_id(), ShardId::from(1));
assert_eq!(persist_failure.reason(), PersistFailureReason::ShardClosed,);
assert_eq!(
persist_failure.reason(),
PersistFailureReason::NodeUnavailable,
);

let state_guard = ingester.state.lock_fully().await.unwrap();
let shard = state_guard.shards.get(&queue_id).unwrap();
Expand Down Expand Up @@ -2274,7 +2262,7 @@ mod tests {
assert_eq!(persist_failure.shard_id(), ShardId::from(1));
assert_eq!(
persist_failure.reason(),
PersistFailureReason::ShardNotFound
PersistFailureReason::NodeUnavailable
);

let state_guard = ingester.state.lock_fully().await.unwrap();
Expand Down Expand Up @@ -2704,7 +2692,7 @@ mod tests {
assert_eq!(persist_failure.shard_id(), ShardId::from(1));
assert_eq!(
persist_failure.reason(),
PersistFailureReason::ShardNotFound
PersistFailureReason::NoShardsAvailable
);

let state_guard = ingester.state.lock_fully().await.unwrap();
Expand Down Expand Up @@ -2783,7 +2771,7 @@ mod tests {
assert_eq!(persist_failure.shard_id(), ShardId::from(1));
assert_eq!(
persist_failure.reason(),
PersistFailureReason::ShardRateLimited
PersistFailureReason::NoShardsAvailable
);

let state_guard = ingester.state.lock_fully().await.unwrap();
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ mod publish_tracker;
mod rate_meter;
mod replication;
mod router;
#[allow(dead_code)]
mod routing_table;
mod state;
mod workbench;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub(super) struct IngesterNode {

#[derive(Debug)]
pub(super) struct RoutingEntry {
nodes: HashMap<NodeId, IngesterNode>,
pub nodes: HashMap<NodeId, IngesterNode>,
}

/// Given a slice of candidates, picks the better of two random choices.
Expand Down
Loading
Loading