Skip to content

Commit ad71de4

Browse files
committed
CR comments
1 parent 6627ae7 commit ad71de4

6 files changed

Lines changed: 132 additions & 86 deletions

File tree

quickwit/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-lambda-server/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ openssl = { workspace = true, optional = true }
3232

3333
quickwit-common = { workspace = true }
3434
quickwit-config = { workspace = true }
35+
quickwit-doc-mapper = { workspace = true }
3536
quickwit-proto = { workspace = true }
3637
quickwit-search = { workspace = true }
3738
quickwit-storage = { workspace = true }

quickwit/quickwit-lambda-server/src/handler.rs

Lines changed: 99 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,20 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::str::FromStr;
16+
use std::sync::Arc;
17+
1518
use base64::prelude::*;
1619
use prost::Message;
20+
use quickwit_common::uri::Uri;
21+
use quickwit_doc_mapper::DocMapper;
1722
use quickwit_proto::search::lambda_single_split_result::Outcome;
1823
use quickwit_proto::search::{
19-
LambdaSearchResponses, LambdaSingleSplitResult, LeafSearchRequest, SplitIdAndFooterOffsets,
24+
LambdaSearchResponses, LambdaSingleSplitResult, LeafRequestRef, LeafSearchRequest,
25+
SearchRequest,
2026
};
21-
use quickwit_search::leaf::multi_index_leaf_search;
27+
use quickwit_search::leaf::single_doc_mapping_leaf_search;
28+
use quickwit_storage::Storage;
2229
use serde::{Deserialize, Serialize};
2330
use tracing::{error, info, instrument, warn};
2431

@@ -50,57 +57,106 @@ pub async fn handle_leaf_search(
5057
ctx: &LambdaSearcherContext,
5158
) -> LambdaResult<LambdaSearchResponsePayload> {
5259
// Decode base64 payload
53-
let request_bytes = BASE64_STANDARD
60+
let request_bytes: Vec<u8> = BASE64_STANDARD
5461
.decode(&event.payload)
55-
.map_err(|e| LambdaError::Serialization(format!("base64 decode error: {}", e)))?;
62+
.map_err(|err| LambdaError::Serialization(format!("base64 decode error: {}", err)))?;
5663

5764
// Deserialize LeafSearchRequest
5865
let leaf_search_request = LeafSearchRequest::decode(&request_bytes[..])?;
5966

60-
let all_splits: Vec<(usize, SplitIdAndFooterOffsets)> =
61-
leaf_search_request
62-
.leaf_requests
63-
.iter()
64-
.enumerate()
65-
.flat_map(|(leaf_req_idx, leaf_request_ref)| {
66-
leaf_request_ref.split_offsets.iter().cloned().map(
67-
move |split_id_and_footer_offsets| (leaf_req_idx, split_id_and_footer_offsets),
68-
)
69-
})
70-
.collect();
67+
// Unpack the shared fields once instead of cloning per split.
68+
let search_request: Arc<SearchRequest> = leaf_search_request
69+
.search_request
70+
.ok_or_else(|| LambdaError::Internal("no search request".to_string()))?
71+
.into();
7172

72-
let num_splits = all_splits.len();
73-
info!(num_splits, "processing leaf search request (per-split)");
73+
let doc_mappers: Vec<Arc<DocMapper>> = leaf_search_request
74+
.doc_mappers
75+
.iter()
76+
.map(String::as_str)
77+
.map(serde_json::from_str::<Arc<DocMapper>>)
78+
.collect::<Result<Vec<_>, _>>()
79+
.map_err(|err| {
80+
LambdaError::Internal(format!("failed to deserialize doc mapper: `{err}`"))
81+
})?;
82+
83+
// Resolve storage for every index URI upfront.
84+
let mut storages: Vec<Arc<dyn quickwit_storage::Storage>> =
85+
Vec::with_capacity(leaf_search_request.index_uris.len());
86+
for uri_str in &leaf_search_request.index_uris {
87+
let uri = Uri::from_str(uri_str)
88+
.map_err(|err| LambdaError::Internal(format!("invalid index uri: {err}")))?;
89+
let storage =
90+
ctx.storage_resolver.resolve(&uri).await.map_err(|err| {
91+
LambdaError::Internal(format!("failed to resolve storage: {err}"))
92+
})?;
93+
storages.push(storage);
94+
}
95+
96+
let split_results: Vec<LambdaSingleSplitResult> = lambda_leaf_search(
97+
search_request,
98+
leaf_search_request.leaf_requests,
99+
&doc_mappers[..],
100+
&storages[..],
101+
ctx,
102+
)
103+
.await?;
104+
let wrapper = LambdaSearchResponses { split_results };
105+
let response_bytes = wrapper.encode_to_vec();
106+
let payload = BASE64_STANDARD.encode(&response_bytes);
107+
108+
Ok(LambdaSearchResponsePayload { payload })
109+
}
74110

75-
// Process each split in parallel using a JoinSet. The SearchPermitProvider
76-
// inside SearcherContext gates concurrency based on memory budget.
111+
/// Lambda leaf search returns individual split results.
112+
async fn lambda_leaf_search(
113+
search_request: Arc<SearchRequest>,
114+
leaf_req_ref: Vec<LeafRequestRef>,
115+
doc_mappers: &[Arc<DocMapper>],
116+
storages: &[Arc<dyn Storage>],
117+
ctx: &LambdaSearcherContext,
118+
) -> LambdaResult<Vec<LambdaSingleSplitResult>> {
119+
// Flatten leaf_requests into per-split tasks using pre-resolved Arc references.
77120
let mut split_search_joinset: tokio::task::JoinSet<(String, Result<_, String>)> =
78121
tokio::task::JoinSet::new();
79-
for (leaf_req_idx, split) in all_splits {
80-
let split_id = split.split_id.clone();
81-
let leaf_request_ref = &leaf_search_request.leaf_requests[leaf_req_idx];
82-
let single_split_request = LeafSearchRequest {
83-
search_request: leaf_search_request.search_request.clone(),
84-
doc_mappers: leaf_search_request.doc_mappers.clone(),
85-
index_uris: leaf_search_request.index_uris.clone(),
86-
leaf_requests: vec![quickwit_proto::search::LeafRequestRef {
87-
index_uri_ord: leaf_request_ref.index_uri_ord,
88-
doc_mapper_ord: leaf_request_ref.doc_mapper_ord,
89-
split_offsets: vec![split],
90-
}],
91-
};
92-
93-
let searcher_context = ctx.searcher_context.clone();
94-
let storage_resolver = ctx.storage_resolver.clone();
95-
split_search_joinset.spawn(async move {
96-
let result =
97-
multi_index_leaf_search(searcher_context, single_split_request, storage_resolver)
98-
.await
99-
.map_err(|err| format!("{err}"));
100-
(split_id, result)
101-
});
122+
123+
for leaf_req in leaf_req_ref {
124+
let doc_mapper = doc_mappers
125+
.get(leaf_req.doc_mapper_ord as usize)
126+
.ok_or_else(|| {
127+
LambdaError::Internal(format!(
128+
"doc_mapper_ord out of bounds: {}",
129+
leaf_req.doc_mapper_ord
130+
))
131+
})?
132+
.clone();
133+
let storage = storages[leaf_req.index_uri_ord as usize].clone();
134+
135+
for split_id_and_footer_offsets in leaf_req.split_offsets {
136+
let split_id = split_id_and_footer_offsets.split_id.clone();
137+
let searcher_context = ctx.searcher_context.clone();
138+
let search_request = search_request.clone();
139+
let doc_mapper = doc_mapper.clone();
140+
let storage = storage.clone();
141+
let split = split_id_and_footer_offsets.clone();
142+
split_search_joinset.spawn(async move {
143+
let result = single_doc_mapping_leaf_search(
144+
searcher_context,
145+
search_request,
146+
storage,
147+
vec![split],
148+
doc_mapper,
149+
)
150+
.await
151+
.map_err(|err| format!("{err}"));
152+
(split_id, result)
153+
});
154+
}
102155
}
103156

157+
let num_splits = split_search_joinset.len();
158+
info!(num_splits, "processing leaf search request (per-split)");
159+
104160
// Collect results. Order is irrelevant: each result is tagged with its split_id.
105161
let mut split_results: Vec<LambdaSingleSplitResult> = Vec::with_capacity(num_splits);
106162
let mut num_successes: usize = 0;
@@ -140,9 +196,5 @@ pub async fn handle_leaf_search(
140196
num_failures, "leaf search completed (per-split)"
141197
);
142198

143-
let wrapper = LambdaSearchResponses { split_results };
144-
let response_bytes = wrapper.encode_to_vec();
145-
let payload = BASE64_STANDARD.encode(&response_bytes);
146-
147-
Ok(LambdaSearchResponsePayload { payload })
199+
Ok(split_results)
148200
}

quickwit/quickwit-query/src/query_ast/range_query.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -181,8 +181,8 @@ impl BuildTantivyAst for RangeQuery {
181181
convert_bounds(&self.lower_bound, &self.upper_bound, field_entry.name())?;
182182
let truncate_datetime =
183183
|date: &DateTime| date.truncate(date_options.get_precision());
184-
let lower_bound = map_bound(&lower_bound, truncate_datetime);
185-
let upper_bound = map_bound(&upper_bound, truncate_datetime);
184+
let lower_bound = lower_bound.as_ref().map(truncate_datetime);
185+
let upper_bound = upper_bound.as_ref().map(truncate_datetime);
186186
FastFieldRangeQuery::new(
187187
lower_bound.map(|val| Term::from_field_date(field, val)),
188188
upper_bound.map(|val| Term::from_field_date(field, val)),
@@ -277,14 +277,6 @@ impl BuildTantivyAst for RangeQuery {
277277
}
278278
}
279279

280-
fn map_bound<TFrom, TTo>(bound: &Bound<TFrom>, transform: impl Fn(&TFrom) -> TTo) -> Bound<TTo> {
281-
match bound {
282-
Bound::Excluded(from_val) => Bound::Excluded(transform(from_val)),
283-
Bound::Included(from_val) => Bound::Included(transform(from_val)),
284-
Bound::Unbounded => Bound::Unbounded,
285-
}
286-
}
287-
288280
#[cfg(test)]
289281
mod tests {
290282
use std::ops::Bound;

quickwit/quickwit-search/src/leaf.rs

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,17 @@ async fn leaf_search_single_split(
499499
let mut leaf_search_state_guard =
500500
SplitSearchStateGuard::new(ctx.split_outcome_counters.clone());
501501

502+
// We already checked if the result was already in the partial result cache,
503+
// but it's not a bad idea to check again.
504+
if let Some(cached_answer) = ctx
505+
.searcher_context
506+
.leaf_search_cache
507+
.get(split.clone(), search_request.clone())
508+
{
509+
leaf_search_state_guard.set_state(SplitSearchState::CacheHit);
510+
return Ok(Some(cached_answer));
511+
}
512+
502513
let query_ast: QueryAst = serde_json::from_str(search_request.query_ast.as_str())
503514
.map_err(|err| SearchError::InvalidQuery(err.to_string()))?;
504515

@@ -721,17 +732,6 @@ fn visit_aggregation_mut(
721732
modified_something
722733
}
723734

724-
/// Maps a `Bound<T>` to a `Bound<U>` by applying a function to the contained value.
725-
/// Equivalent to `Bound::map`, which is currently unstable.
726-
pub fn map_bound<T, U>(bound: Bound<T>, f: impl FnOnce(T) -> U) -> Bound<U> {
727-
use Bound::*;
728-
match bound {
729-
Unbounded => Unbounded,
730-
Included(x) => Included(f(x)),
731-
Excluded(x) => Excluded(f(x)),
732-
}
733-
}
734-
735735
// returns the max of left and right, that isn't unbounded. Useful for making
736736
// the intersection of lower bound of ranges
737737
fn max_bound<T: Ord + Copy>(left: Bound<T>, right: Bound<T>) -> Bound<T> {
@@ -864,12 +864,8 @@ fn remove_redundant_timestamp_range(
864864
if final_start_timestamp != Bound::Unbounded || final_end_timestamp != Bound::Unbounded {
865865
let range = RangeQuery {
866866
field: timestamp_field.to_string(),
867-
lower_bound: map_bound(final_start_timestamp, |bound| {
868-
bound.into_timestamp_nanos().into()
869-
}),
870-
upper_bound: map_bound(final_end_timestamp, |bound| {
871-
bound.into_timestamp_nanos().into()
872-
}),
867+
lower_bound: final_start_timestamp.map(|bound| bound.into_timestamp_nanos().into()),
868+
upper_bound: final_end_timestamp.map(|bound| bound.into_timestamp_nanos().into()),
873869
};
874870
new_ast = if let QueryAst::Bool(mut bool_query) = new_ast {
875871
if bool_query.must.is_empty()
@@ -1390,12 +1386,12 @@ fn disable_search_request_hits(search_request: &mut SearchRequest) {
13901386
}
13911387

13921388
/// Searches multiple splits for a specific index and a single doc mapping
1393-
/// Offloads splits to Lambda invocations, distributing them accross batches
1389+
/// Offloads splits to Lambda invocations, distributing them across batches
13941390
/// balanced by document count. Each batch is invoked independently; a failure
13951391
/// in one batch does not affect others.
13961392
async fn run_offloaded_search_tasks(
13971393
searcher_context: &SearcherContext,
1398-
request: &SearchRequest,
1394+
search_request: &SearchRequest,
13991395
doc_mapper: &DocMapper,
14001396
index_uri: Uri,
14011397
splits_with_requests: Vec<(SplitIdAndFooterOffsets, SearchRequest)>,
@@ -1436,16 +1432,14 @@ async fn run_offloaded_search_tasks(
14361432
lambda_config.max_splits_per_invocation,
14371433
);
14381434

1439-
let mut search_request_for_leaf = request.clone();
1440-
search_request_for_leaf.start_offset = 0;
1441-
search_request_for_leaf.max_hits += request.start_offset;
1442-
14431435
let mut lambda_tasks_joinset = JoinSet::new();
14441436
for batch in batches {
14451437
let batch_split_ids: Vec<String> =
14461438
batch.iter().map(|split| split.split_id.clone()).collect();
14471439
let leaf_request = LeafSearchRequest {
1448-
search_request: Some(search_request_for_leaf.clone()),
1440+
// Note this is not the split-specific rewritten request, we ship the main request,
1441+
// and the leaf will apply the split specific rewrite on its own.
1442+
search_request: Some(search_request.clone()),
14491443
doc_mappers: vec![doc_mapper_str.clone()],
14501444
index_uris: vec![index_uri.as_str().to_string()], //< careful here. Calling to_string() directly would return a redacted uri.
14511445
leaf_requests: vec![quickwit_proto::search::LeafRequestRef {
@@ -1474,12 +1468,13 @@ async fn run_offloaded_search_tasks(
14741468
for split_result in split_results {
14751469
match split_result.outcome {
14761470
Some(Outcome::Response(response)) => {
1477-
if let Some((split_info, search_req)) =
1471+
if let Some((split_info, single_split_search_req)) =
14781472
split_lookup.remove(&split_result.split_id)
14791473
{
1474+
// We use the single_split_search_req to perform the search
14801475
searcher_context.leaf_search_cache.put(
14811476
split_info,
1482-
search_req,
1477+
single_split_search_req,
14831478
response.clone(),
14841479
);
14851480
}
@@ -1616,8 +1611,12 @@ pub async fn single_doc_mapping_leaf_search(
16161611
CanSplitDoBetter::from_request(&request, doc_mapper.timestamp_field_name());
16171612
let mut split_with_req: Vec<(SplitIdAndFooterOffsets, SearchRequest)> =
16181613
split_filter.optimize(&request, splits)?;
1619-
for (split, search_request) in &mut split_with_req {
1620-
rewrite_request(search_request, split, doc_mapper.timestamp_field_name());
1614+
for (split, single_split_search_request) in &mut split_with_req {
1615+
rewrite_request(
1616+
single_split_search_request,
1617+
split,
1618+
doc_mapper.timestamp_field_name(),
1619+
);
16211620
}
16221621
let split_filter_arc: Arc<RwLock<CanSplitDoBetter>> = Arc::new(RwLock::new(split_filter));
16231622

quickwit/quickwit-search/src/service.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,8 @@ impl SearchServiceImpl {
153153
}
154154
}
155155

156-
pub fn deserialize_doc_mapper(doc_mapper_str: &str) -> crate::Result<Arc<DocMapper>> {
156+
/// Deserializes a JSON-encoded doc mapper string into an `Arc<DocMapper>`.
157+
pub(crate) fn deserialize_doc_mapper(doc_mapper_str: &str) -> crate::Result<Arc<DocMapper>> {
157158
let doc_mapper = serde_json::from_str::<Arc<DocMapper>>(doc_mapper_str).map_err(|err| {
158159
SearchError::Internal(format!("failed to deserialize doc mapper: `{err}`"))
159160
})?;

0 commit comments

Comments
 (0)