Skip to content

QuickwitDatafusion#6160

Draft
alexanderbianchi wants to merge 12 commits intoquickwit-oss:mainfrom
alexanderbianchi:bianchi/quickwitdf
Draft

QuickwitDatafusion#6160
alexanderbianchi wants to merge 12 commits intoquickwit-oss:mainfrom
alexanderbianchi:bianchi/quickwitdf

Conversation

@alexanderbianchi
Copy link

@alexanderbianchi alexanderbianchi commented Feb 14, 2026

THIS IS A DRAFT, IT'S STILL VERY CLAUDE-Y AND I WILL NEED TO DO A DEEP REVIEW MYSELF BEFORE ASKING FOR ONE FROM OTHERS

Distributed DataFusion Execution for Quickwit

Adds a new quickwit-datafusion crate that enables distributed SQL query execution over Quickwit splits using DataFusion + datafusion-distributed + tantivy-datafusion.

Architecture

graph TD
    Client["Client (pyarrow / SQL / REST)"]
    
    Client --> Coordinator

    subgraph Coordinator["Coordinator (any searcher node)"]
        direction TB
        SB["QuickwitSessionBuilder"]
        SB --> CAT["QuickwitSchemaProvider<br/>(lazy catalog — resolves index names<br/>from metastore automatically)"]
        SB --> WR["QuickwitWorkerResolver<br/>(reads SearcherPool / Chitchat)"]
        SB --> TC["TantivyCodec (stateless)"]
        SB --> DPO["DistributedPhysicalOptimizerRule"]
        SB --> BSP["build_search_plan()"]
        BSP --> MS["metastore.list_splits()"]
        BSP --> JOIN["per-split: inv ⋈ f join"]
        BSP --> UNION["UNION ALL across splits<br/>(schema-aligned, NULL-filled)"]
        BSP --> SORT["sort / limit / agg on top"]
    end

    Coordinator -->|"Arrow Flight gRPC<br/>(same port as SearchService)"| W1
    Coordinator -->|"Arrow Flight gRPC"| W2
    Coordinator -->|"Arrow Flight gRPC"| W3

    subgraph W1["Searcher Node 1"]
        direction TB
        TC1["TantivyCodec → decode plan<br/>+ restore pushed filters"]
        OF1["OpenerFactory →<br/>StorageSplitOpener<br/>→ open_index_with_caches()<br/>+ TokenizerManager"]
        LN1["tantivy-df leaf nodes execute"]
        TC1 --> OF1 --> LN1
    end

    subgraph W2["Searcher Node 2"]
        direction TB
        TC2["decode + filters"] --> OF2["open split"] --> LN2["execute"]
    end

    subgraph W3["Searcher Node 3"]
        direction TB
        TC3["decode + filters"] --> OF3["open split"] --> LN3["execute"]
    end

    W1 -->|"RecordBatch stream"| Merge
    W2 -->|"RecordBatch stream"| Merge
    W3 -->|"RecordBatch stream"| Merge

    subgraph Merge["Coordinator merges"]
        SPM["SortPreservingMergeExec"]
        AGG["AggregateExec(mode=Final)"]
        SPM --> AGG
    end

    Merge --> Result["RecordBatches → Client"]
Loading

Per-split plan decomposition

graph TD
    subgraph "Single split (one UNION ALL child)"
        HJ["HashJoinExec<br/>on (_doc_id, _segment_ord)"]
        INV["InvertedIndexDataSource<br/>(query=true, full-text filter)"]
        FF["FastFieldDataSource<br/>(partitions=N, pushed_filters serialized)"]
        HJ --> INV
        HJ --> FF
    end
Loading

For queries without full-text search, the inverted index join is skipped — just a FastFieldDataSource scan with filter pushdown.

Splits are isolated by UNION ALL boundaries — (_doc_id, _segment_ord) can collide across splits, but never across union children. Schema differences across splits are handled by aligning to the canonical schema (from the doc mapper): missing columns → NULL, type mismatches → error with CAST suggestion.

How it works

Query path (SQL → RecordBatches):

  1. Client sends SQL to any searcher node (via Arrow Flight do_get or REST)
  2. QuickwitSchemaProvider lazily resolves index names from the metastore — no manual table registration
  3. build_search_plan() queries metastore for splits, builds per-split join plans using DataFrame API, aligns schemas, unions across splits
  4. DistributedPhysicalOptimizerRule decomposes into stages — leaf scans to workers, merge/sort/agg on coordinator
  5. Workers receive plan fragments via Arrow Flight, TantivyCodec decodes nodes + pushed filters, OpenerFactory (from SessionConfig) creates StorageSplitOpener which calls open_index_with_caches() with tokenizer manager, executes tantivy-df leaf nodes, streams RecordBatches back
  6. Coordinator merges via SortPreservingMergeExec / AggregateExec(Final)

Split openers

Opener Usage How it opens
SplitIndexOpener Tests DashMap lookup (in-memory)
StorageSplitOpener Production open_index_with_caches(): S3/GCS/local download, footer + fast field caches, tokenizer manager from doc mapper

Crate structure

tantivy-datafusion (separate repo):

  • IndexOpener trait with identifier() for serialization
  • TantivyCodec — stateless codec, serializes all 3 DataSource types + pushed filters via PhysicalExprNode
  • OpenerFactoryExt — registers opener factory on SessionConfig

quickwit-datafusion (this PR):

File Purpose
catalog.rs QuickwitSchemaProvider — lazy catalog, resolves index names from metastore, builds doc mapper, creates StorageSplitOpener factory with tokenizer manager
split_opener.rs SplitIndexOpener (test) + StorageSplitOpener (production: open_index_with_caches + tokenizer)
table_provider.rs QuickwitTableProvider — queries metastore for splits at scan time
query_translator.rs SearchRequest → DataFrame: QueryAst→Expr, per-split join composition, UNION ALL, schema alignment
resolver.rs QuickwitWorkerResolver — reads SearcherPool (Chitchat)
session.rs QuickwitSessionBuilder — wires codec + resolver + catalog + optimizer rule
flight.rs QuickwitFlightService — handles df-distributed worker traffic AND external SQL queries on same port
worker.rs Test helper for df-distributed's start_localhost_context

quickwit-serve changes:

  • grpc.rs — Flight service on same tonic gRPC port as SearchService
  • lib.rs — Flight + session builder startup alongside SearcherPool
  • datafusion_api.rs — REST endpoint behind enable_datafusion_endpoint flag
  • quickwit-search/leaf.rsopen_index_with_caches made public

Client-facing endpoints

Arrow Flight (same gRPC port, always on):

import pyarrow.flight as flight
client = flight.connect("grpc://quickwit-node:7281")
ticket = flight.Ticket(b"SELECT id, price FROM my_index WHERE category = 'electronics'")
table = client.do_get(ticket).read_all()

Dispatch: UTF-8 ticket → SQL query. Binary protobuf ticket → df-distributed worker fragment.

REST (opt-in):

searcher:
  enable_datafusion_endpoint: true
POST /api/v1/{index_id}/datafusion
{"sql": "SELECT id, price FROM my_index ORDER BY id LIMIT 10"}

Distributed plan example

┌───── DistributedExec (coordinator) ──
│ SortPreservingMergeExec: [id ASC]
│   NetworkCoalesceExec: 3 tasks → 3 workers
└──────────────────────────────────────
  ┌───── Stage 1 (workers) ──
  │ DistributedUnionExec: worker0:[split-1] worker1:[split-2] worker2:[split-3]
  │   SortExec → HashJoinExec on (_doc_id, _segment_ord)
  │     InvertedIndexDataSource(query=true)
  │     FastFieldDataSource(pushed_filters=[...])  ← filters survive the wire
  │   ... ×3 splits
  └──────────────────────────────────────

Tests (15 passing)

Test file Tests What it covers
single_node.rs 3 Mock metastore → QuickwitTableProvider, full-text joins
join_plan.rs 6 Single-split and cross-split join plans (inv ⋈ f ⋈ d)
distributed_join_plan.rs 2 Distributed joins via df-distributed test infra
serve_integration.rs 1 Real TCP Flight servers + SearcherPool + distributed execution
schema_evolution.rs 3 NULL-fill, type mismatch error, SQL CAST

Remaining work

  • Unnecessary RepartitionExec: Co-partitioned joins shouldn't shuffle — need to align target_partitions with segment count
  • Cache affinity: Workers assigned round-robin; needs custom TaskEstimator for rendezvous-hash split affinity (team member working on plan-aware worker assignment in df-distributed)

// If the ticket is valid protobuf with the right fields, it's a worker request.
// Otherwise, treat as UTF-8 SQL from an external client.
if let Ok(sql) = std::str::from_utf8(&ticket.ticket) {
// Heuristic: df-distributed tickets are protobuf (binary),
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hacky, needs to be better

sasl2-sys = { git = "https://github.com/quickwit-oss/rust-sasl/", rev = "085a4c7" }

[patch.'https://github.com/quickwit-oss/tantivy/']
tantivy = { path = "/Users/alex.bianchi/oss/tantivy/.worktrees/bianchi/tantivydf" }
Copy link
Author

@alexanderbianchi alexanderbianchi Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

of course would not be merged like this, going to need to move around dependencies

alexanderbianchi and others added 7 commits February 14, 2026 14:00
Implements a DataFusion SchemaProvider that lazily resolves Quickwit
index names from the metastore. When DataFusion encounters an unknown
table name like "my_index", the catalog:

1. Fetches IndexMetadata from the metastore
2. Builds a DocMapper to get the tantivy schema
3. Resolves the storage URI
4. Creates a QuickwitTableProvider with StorageSplitOpener factory
5. Returns the provider — no manual register_table() needed

The catalog is registered on the SessionContext when storage_resolver
and searcher_context are provided (production path). Tests that don't
have storage continue to register tables manually.

Also adds quickwit-config and quickwit-doc-mapper dependencies,
and makes quickwit-search::leaf module public for open_index_with_caches.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
StorageSplitOpener now accepts a TokenizerManager via
with_tokenizer_manager(). The catalog passes the doc mapper's
tokenizer manager when creating opener factories.

This ensures full-text queries on fields with custom tokenizers
(language-specific, lowercase, custom patterns) work correctly.
Without the tokenizer manager, tantivy falls back to the default
tokenizer which produces wrong query results.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Set target_partitions=1 on the coordinator session. This makes
the DF optimizer use CollectLeft join mode (broadcast build side)
instead of Partitioned mode with hash repartition.

Cross-worker parallelism is handled by df-distributed's stage
decomposition. Per-split parallelism comes from tantivy segment
count (declared via output_partitioning on the DataSource).
target_partitions was only adding wasteful shuffles.

Before: HashJoinExec(Partitioned) + 2x RepartitionExec per split
After:  HashJoinExec(CollectLeft) — no repartition, no shuffle

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Integration test now uses:
- TestSandbox to create real indexed splits on RAM storage
- StorageSplitOpener calling open_index_with_caches (with ByteRangeCache)
- Real metastore for split discovery
- SearcherPool for worker discovery
- Flight service on tonic server

Known gap: InvertedIndexDataSource does sync I/O on storage-backed
directories. Quickwit's existing search path uses warmup() to
pre-fetch posting lists before tantivy executes. The DF path needs
warmup integration for full-text queries over storage-backed splits.
Fast-field-only queries work without warmup.

Also adds footer_start/footer_end to OpenerMetadata and the codec
proto so storage-backed openers get the split bundle offsets across
the wire.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Adds warmup module to tantivy-datafusion that pre-fetches inverted
index and fast field data before tantivy executes synchronously.

Tantivy does sync I/O in its query path. Storage-backed directories
(S3/GCS) only support async reads. The warmup runs after
opener.open() and before generate_batch():
- InvertedIndexDataSource: warms term dictionaries + posting lists
- FastFieldDataSource: warms fast field column file slices

Also fixes segment sizes: callers now pass split_meta.num_docs as
the segment size (1 segment per split), so the provider creates
the correct partition count. No more vec![0] fallback.

The serve_integration test now passes end-to-end:
- TestSandbox creates real indexed splits on RAM storage
- StorageSplitOpener calls open_index_with_caches
- warmup pre-fetches inverted index + fast field data
- Distributed full-text join query returns correct results

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
StorageSplitOpener now caches the opened+warmed Index via OnceCell.
When both InvertedIndexDataSource and FastFieldDataSource for the
same split call opener.open(), only the first call downloads from
storage and runs warmup. The second call returns the cached Index.

Warmup moved from the individual DataSources into the opener:
- No more warmup in InvertedIndexDataSource::open()
- No more warmup in FastFieldDataSource::open()
- Single warmup_all() call in StorageSplitOpener::open()
- One download, one warmup per split regardless of how many
  DataSources reference it

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Each DataSource now warms only the fields it actually needs:

- InvertedIndexDataSource: warms term dict + postings for only the
  queried text fields (from raw_queries field names)
- FastFieldDataSource: warms only the projected fast field columns
  (from projected_schema field names)

The opener no longer calls warmup_all(). The ByteRangeCache is
shared across DataSources via the cached Index (OnceCell), so
data fetched by one DataSource is available to the other without
re-downloading. No double I/O, no over-fetching.

Adds warmup_fast_fields_by_name() for targeted fast field warmup.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant