Draft
Conversation
| // If the ticket is valid protobuf with the right fields, it's a worker request. | ||
| // Otherwise, treat as UTF-8 SQL from an external client. | ||
| if let Ok(sql) = std::str::from_utf8(&ticket.ticket) { | ||
| // Heuristic: df-distributed tickets are protobuf (binary), |
Author
There was a problem hiding this comment.
hacky, needs to be better
| sasl2-sys = { git = "https://github.com/quickwit-oss/rust-sasl/", rev = "085a4c7" } | ||
|
|
||
| [patch.'https://github.com/quickwit-oss/tantivy/'] | ||
| tantivy = { path = "/Users/alex.bianchi/oss/tantivy/.worktrees/bianchi/tantivydf" } |
Author
There was a problem hiding this comment.
of course would not be merged like this, going to need to move around dependencies
aade193 to
bfc6492
Compare
Implements a DataFusion SchemaProvider that lazily resolves Quickwit index names from the metastore. When DataFusion encounters an unknown table name like "my_index", the catalog: 1. Fetches IndexMetadata from the metastore 2. Builds a DocMapper to get the tantivy schema 3. Resolves the storage URI 4. Creates a QuickwitTableProvider with StorageSplitOpener factory 5. Returns the provider — no manual register_table() needed The catalog is registered on the SessionContext when storage_resolver and searcher_context are provided (production path). Tests that don't have storage continue to register tables manually. Also adds quickwit-config and quickwit-doc-mapper dependencies, and makes quickwit-search::leaf module public for open_index_with_caches. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
StorageSplitOpener now accepts a TokenizerManager via with_tokenizer_manager(). The catalog passes the doc mapper's tokenizer manager when creating opener factories. This ensures full-text queries on fields with custom tokenizers (language-specific, lowercase, custom patterns) work correctly. Without the tokenizer manager, tantivy falls back to the default tokenizer which produces wrong query results. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Set target_partitions=1 on the coordinator session. This makes the DF optimizer use CollectLeft join mode (broadcast build side) instead of Partitioned mode with hash repartition. Cross-worker parallelism is handled by df-distributed's stage decomposition. Per-split parallelism comes from tantivy segment count (declared via output_partitioning on the DataSource). target_partitions was only adding wasteful shuffles. Before: HashJoinExec(Partitioned) + 2x RepartitionExec per split After: HashJoinExec(CollectLeft) — no repartition, no shuffle Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Integration test now uses: - TestSandbox to create real indexed splits on RAM storage - StorageSplitOpener calling open_index_with_caches (with ByteRangeCache) - Real metastore for split discovery - SearcherPool for worker discovery - Flight service on tonic server Known gap: InvertedIndexDataSource does sync I/O on storage-backed directories. Quickwit's existing search path uses warmup() to pre-fetch posting lists before tantivy executes. The DF path needs warmup integration for full-text queries over storage-backed splits. Fast-field-only queries work without warmup. Also adds footer_start/footer_end to OpenerMetadata and the codec proto so storage-backed openers get the split bundle offsets across the wire. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Adds warmup module to tantivy-datafusion that pre-fetches inverted index and fast field data before tantivy executes synchronously. Tantivy does sync I/O in its query path. Storage-backed directories (S3/GCS) only support async reads. The warmup runs after opener.open() and before generate_batch(): - InvertedIndexDataSource: warms term dictionaries + posting lists - FastFieldDataSource: warms fast field column file slices Also fixes segment sizes: callers now pass split_meta.num_docs as the segment size (1 segment per split), so the provider creates the correct partition count. No more vec![0] fallback. The serve_integration test now passes end-to-end: - TestSandbox creates real indexed splits on RAM storage - StorageSplitOpener calls open_index_with_caches - warmup pre-fetches inverted index + fast field data - Distributed full-text join query returns correct results Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
StorageSplitOpener now caches the opened+warmed Index via OnceCell. When both InvertedIndexDataSource and FastFieldDataSource for the same split call opener.open(), only the first call downloads from storage and runs warmup. The second call returns the cached Index. Warmup moved from the individual DataSources into the opener: - No more warmup in InvertedIndexDataSource::open() - No more warmup in FastFieldDataSource::open() - Single warmup_all() call in StorageSplitOpener::open() - One download, one warmup per split regardless of how many DataSources reference it Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Each DataSource now warms only the fields it actually needs: - InvertedIndexDataSource: warms term dict + postings for only the queried text fields (from raw_queries field names) - FastFieldDataSource: warms only the projected fast field columns (from projected_schema field names) The opener no longer calls warmup_all(). The ByteRangeCache is shared across DataSources via the cached Index (OnceCell), so data fetched by one DataSource is available to the other without re-downloading. No double I/O, no over-fetching. Adds warmup_fast_fields_by_name() for targeted fast field warmup. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
THIS IS A DRAFT, IT'S STILL VERY CLAUDE-Y AND I WILL NEED TO DO A DEEP REVIEW MYSELF BEFORE ASKING FOR ONE FROM OTHERS
Distributed DataFusion Execution for Quickwit
Adds a new
quickwit-datafusioncrate that enables distributed SQL query execution over Quickwit splits using DataFusion +datafusion-distributed+tantivy-datafusion.Architecture
graph TD Client["Client (pyarrow / SQL / REST)"] Client --> Coordinator subgraph Coordinator["Coordinator (any searcher node)"] direction TB SB["QuickwitSessionBuilder"] SB --> CAT["QuickwitSchemaProvider<br/>(lazy catalog — resolves index names<br/>from metastore automatically)"] SB --> WR["QuickwitWorkerResolver<br/>(reads SearcherPool / Chitchat)"] SB --> TC["TantivyCodec (stateless)"] SB --> DPO["DistributedPhysicalOptimizerRule"] SB --> BSP["build_search_plan()"] BSP --> MS["metastore.list_splits()"] BSP --> JOIN["per-split: inv ⋈ f join"] BSP --> UNION["UNION ALL across splits<br/>(schema-aligned, NULL-filled)"] BSP --> SORT["sort / limit / agg on top"] end Coordinator -->|"Arrow Flight gRPC<br/>(same port as SearchService)"| W1 Coordinator -->|"Arrow Flight gRPC"| W2 Coordinator -->|"Arrow Flight gRPC"| W3 subgraph W1["Searcher Node 1"] direction TB TC1["TantivyCodec → decode plan<br/>+ restore pushed filters"] OF1["OpenerFactory →<br/>StorageSplitOpener<br/>→ open_index_with_caches()<br/>+ TokenizerManager"] LN1["tantivy-df leaf nodes execute"] TC1 --> OF1 --> LN1 end subgraph W2["Searcher Node 2"] direction TB TC2["decode + filters"] --> OF2["open split"] --> LN2["execute"] end subgraph W3["Searcher Node 3"] direction TB TC3["decode + filters"] --> OF3["open split"] --> LN3["execute"] end W1 -->|"RecordBatch stream"| Merge W2 -->|"RecordBatch stream"| Merge W3 -->|"RecordBatch stream"| Merge subgraph Merge["Coordinator merges"] SPM["SortPreservingMergeExec"] AGG["AggregateExec(mode=Final)"] SPM --> AGG end Merge --> Result["RecordBatches → Client"]Per-split plan decomposition
graph TD subgraph "Single split (one UNION ALL child)" HJ["HashJoinExec<br/>on (_doc_id, _segment_ord)"] INV["InvertedIndexDataSource<br/>(query=true, full-text filter)"] FF["FastFieldDataSource<br/>(partitions=N, pushed_filters serialized)"] HJ --> INV HJ --> FF endFor queries without full-text search, the inverted index join is skipped — just a FastFieldDataSource scan with filter pushdown.
Splits are isolated by UNION ALL boundaries —
(_doc_id, _segment_ord)can collide across splits, but never across union children. Schema differences across splits are handled by aligning to the canonical schema (from the doc mapper): missing columns → NULL, type mismatches → error with CAST suggestion.How it works
Query path (SQL → RecordBatches):
do_getor REST)QuickwitSchemaProviderlazily resolves index names from the metastore — no manual table registrationbuild_search_plan()queries metastore for splits, builds per-split join plans using DataFrame API, aligns schemas, unions across splitsDistributedPhysicalOptimizerRuledecomposes into stages — leaf scans to workers, merge/sort/agg on coordinatorTantivyCodecdecodes nodes + pushed filters,OpenerFactory(fromSessionConfig) createsStorageSplitOpenerwhich callsopen_index_with_caches()with tokenizer manager, executes tantivy-df leaf nodes, streams RecordBatches backSortPreservingMergeExec/AggregateExec(Final)Split openers
SplitIndexOpenerStorageSplitOpeneropen_index_with_caches(): S3/GCS/local download, footer + fast field caches, tokenizer manager from doc mapperCrate structure
tantivy-datafusion(separate repo):IndexOpenertrait withidentifier()for serializationTantivyCodec— stateless codec, serializes all 3 DataSource types + pushed filters viaPhysicalExprNodeOpenerFactoryExt— registers opener factory onSessionConfigquickwit-datafusion(this PR):catalog.rsQuickwitSchemaProvider— lazy catalog, resolves index names from metastore, builds doc mapper, createsStorageSplitOpenerfactory with tokenizer managersplit_opener.rsSplitIndexOpener(test) +StorageSplitOpener(production:open_index_with_caches+ tokenizer)table_provider.rsQuickwitTableProvider— queries metastore for splits at scan timequery_translator.rsSearchRequest→ DataFrame: QueryAst→Expr, per-split join composition, UNION ALL, schema alignmentresolver.rsQuickwitWorkerResolver— reads SearcherPool (Chitchat)session.rsQuickwitSessionBuilder— wires codec + resolver + catalog + optimizer ruleflight.rsQuickwitFlightService— handles df-distributed worker traffic AND external SQL queries on same portworker.rsstart_localhost_contextquickwit-servechanges:grpc.rs— Flight service on same tonic gRPC port as SearchServicelib.rs— Flight + session builder startup alongside SearcherPooldatafusion_api.rs— REST endpoint behindenable_datafusion_endpointflagquickwit-search/leaf.rs—open_index_with_cachesmade publicClient-facing endpoints
Arrow Flight (same gRPC port, always on):
Dispatch: UTF-8 ticket → SQL query. Binary protobuf ticket → df-distributed worker fragment.
REST (opt-in):
Distributed plan example
Tests (15 passing)
single_node.rsjoin_plan.rsdistributed_join_plan.rsserve_integration.rsschema_evolution.rsRemaining work
target_partitionswith segment countTaskEstimatorfor rendezvous-hash split affinity (team member working on plan-aware worker assignment in df-distributed)