diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 873ba8a5ee487..1b0347f3a0949 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -22,6 +22,7 @@ use std::fmt::{Debug, Formatter}; use std::pin::Pin; use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::{Context, Poll}; use std::vec; @@ -71,11 +72,12 @@ use crate::sort_pushdown::SortOrderPushdownResult; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays; use futures::stream::Stream; -use futures::{FutureExt, StreamExt, TryStreamExt, ready}; +use futures::{FutureExt, StreamExt, TryStreamExt}; use log::trace; use parking_lot::Mutex; mod distributor_channels; +use crate::repartition::distributor_channels::SendError; use distributor_channels::{ DistributionReceiver, DistributionSender, channels, partition_aware_channels, }; @@ -89,41 +91,49 @@ use distributor_channels::{ /// # Batch Flow with Spilling /// /// ```text -/// Input Stream ──▶ Partition Logic ──▶ try_grow() -/// │ -/// ┌───────────────┴────────────────┐ -/// │ │ -/// ▼ ▼ -/// try_grow() succeeds try_grow() fails -/// (Memory Available) (Memory Pressure) -/// │ │ -/// ▼ ▼ -/// RepartitionBatch::Memory spill_writer.push_batch() -/// (batch held in memory) (batch written to disk) -/// │ │ -/// │ ▼ -/// │ RepartitionBatch::Spilled -/// │ (marker - no batch data) -/// │ │ -/// └────────┬───────────────────────┘ -/// │ -/// ▼ -/// Send to channel -/// │ -/// ▼ -/// Output Stream (poll) -/// │ -/// ┌──────────────┴─────────────┐ -/// │ │ -/// ▼ ▼ -/// RepartitionBatch::Memory RepartitionBatch::Spilled -/// Return batch immediately Poll spill_stream (blocks) -/// │ │ -/// └────────┬───────────────────┘ -/// │ -/// ▼ -/// Return batch -/// (FIFO order preserved) +/// Input Stream ◀──────┐ +/// │ │ +/// ▼ │ +/// Partition Logic │ +/// │ `batch_size` not +/// ▼ reached yet +/// Coalesce Batch │ +/// ┌───────────────┴────────────────┘ +/// ▼ +/// `batch_size` reached +/// │ +/// └───────────────┐ +/// ▼ +/// try_grow() +/// ┌───────────────┴────────────────┐ +/// ▼ ▼ +/// try_grow() succeeds try_grow() fails +/// (Memory Available) (Memory Pressure) +/// │ │ +/// ▼ ▼ +/// RepartitionBatch::Memory spill_writer.push_batch() +/// (batch held in memory) (batch written to disk) +/// │ │ +/// │ ▼ +/// │ RepartitionBatch::Spilled +/// │ (marker - no batch data) +/// └──────────────┬─────────────────┘ +/// │ +/// ▼ +/// Send to channel +/// │ +/// ▼ +/// Output Stream (poll) +/// │ +/// ┌──────────────┴────────────────┐ +/// ▼ ▼ +/// RepartitionBatch::Memory RepartitionBatch::Spilled +/// Return batch immediately Poll spill_stream (blocks) +/// └─────────────┬─────────────────┘ +/// │ +/// ▼ +/// Return batch +/// (FIFO order preserved) /// ``` /// /// See [`RepartitionExec`] for overall architecture and [`StreamState`] for @@ -143,11 +153,122 @@ type MaybeBatch = Option>; type InputPartitionsToCurrentPartitionSender = Vec>; type InputPartitionsToCurrentPartitionReceiver = Vec>; -/// Output channel with its associated memory reservation and spill writer +/// Output channel with its associated memory reservation and spill writer. +/// +/// `coalescer` is `None` for preserve-order mode, where downstream +/// [`StreamingMergeBuilder`] performs the batching; otherwise it's a +/// [`SharedCoalescer`] cloned from the per-partition one held by +/// [`PartitionChannels`]. struct OutputChannel { sender: DistributionSender, reservation: SharedMemoryReservation, spill_writer: SpillPoolWriter, + shared_coalescer: Option, +} + +impl OutputChannel { + fn coalesce(&mut self, batch: RecordBatch) -> Result> { + match &self.shared_coalescer { + Some(shared) => Ok(shared.push_and_drain(batch)?), + None => Ok(vec![batch]), + } + } + + /// Send a single batch through the channel for `partition`, applying + /// the memory reservation / spill-writer fallback. Removes the channel + /// from `self.inner` if the receiver has hung up. + /// + /// Used after [`OutputChannel::coalesce`] for performance purposes. + async fn send(&mut self, batch: RecordBatch) -> Result<(), SendError> { + let size = batch.get_array_memory_size(); + + // Decide the payload outside of any await: never hold a MutexGuard + // across an await point. + let (payload, is_memory_batch) = { + match self.reservation.try_grow(size) { + Ok(_) => (Ok(RepartitionBatch::Memory(batch)), true), + Err(_) => match self.spill_writer.push_batch(&batch) { + Ok(()) => (Ok(RepartitionBatch::Spilled), false), + Err(err) => (Err(err), false), + }, + } + }; + + let result = self.sender.send(Some(payload)).await; + if result.is_err() && is_memory_batch { + self.reservation.shrink(size); + } + result + } + + async fn finalize(mut self) -> Result<()> { + let Some(shared) = self.shared_coalescer.take() else { + return Ok(()); + }; + for batch in shared.finalize()? { + // If this errored, it means that nobody is listening on the other side, which is fine + // and can happen in certain cases, like when a LIMIT drops the stream that listens. + let _ = self.send(batch).await; + } + Ok(()) + } +} + +/// A producer-side coalescer shared across all input tasks targeting a +/// single output partition. +/// +/// Bundles the [`LimitedBatchCoalescer`] (behind a [`Mutex`]) with the +/// active-sender counter that tracks how many input tasks may still push +/// into it. The last task to call [`Self::finalize`] is the one that +/// finalizes the coalescer and ships the residual batch. +/// +/// Cheap to [`Clone`]: both fields are [`Arc`]s. +#[derive(Clone)] +struct SharedCoalescer { + inner: Arc>, + active_senders: Arc, +} + +impl SharedCoalescer { + fn new(schema: SchemaRef, target_batch_size: usize, num_senders: usize) -> Self { + Self { + inner: Arc::new(Mutex::new(LimitedBatchCoalescer::new( + schema, + target_batch_size, + None, + ))), + active_senders: Arc::new(AtomicUsize::new(num_senders)), + } + } + + /// Push `batch` into the coalescer and drain any newly completed + /// batches. The mutex is held only briefly. + fn push_and_drain(&self, batch: RecordBatch) -> Result> { + let mut acc = Vec::new(); + let mut c = self.inner.lock(); + c.push_batch(batch)?; + while let Some(b) = c.next_completed_batch() { + acc.push(b); + } + Ok(acc) + } + + /// Decrement the active-senders counter. If this caller was the last + /// sender, finalize the coalescer and return its residual batches; if + /// other senders are still active, return `Ok(None)`. + fn finalize(&self) -> Result> { + let was_last = self.active_senders.fetch_sub(1, Ordering::AcqRel) == 1; + if !was_last { + return Ok(vec![]); + } + let mut acc = Vec::new(); + let mut c = self.inner.lock(); + c.finish()?; + while let Some(b) = c.next_completed_batch() { + acc.push(b); + } + Ok(acc) + } } /// Channels and resources for a single output partition. @@ -178,6 +299,10 @@ struct PartitionChannels { rx: InputPartitionsToCurrentPartitionReceiver, /// Memory reservation for this output partition reservation: SharedMemoryReservation, + /// Shared coalescer used by all input tasks targeting this output + /// partition. `None` in preserve-order mode (downstream + /// `StreamingMergeBuilder` handles batching). + shared_coalescer: Option, /// Spill writers for writing spilled data. /// SpillPoolWriter is Clone, so multiple writers can share state in non-preserve-order mode. spill_writers: Vec, @@ -347,6 +472,18 @@ impl RepartitionExecState { .map(|_| spill_pool::channel(max_file_size, Arc::clone(&spill_manager))) .unzip(); + // Coalesce on the producer side, before the channel's gate, so + // the consumer never sees the per-input-task small batches. + // Skip in preserve-order mode: each input has its own dedicated + // channel and `StreamingMergeBuilder` handles batching. + let shared_coalescer = (!preserve_order).then(|| { + SharedCoalescer::new( + input.schema(), + context.session_config().batch_size(), + num_input_partitions, + ) + }); + channels.insert( partition, PartitionChannels { @@ -355,6 +492,7 @@ impl RepartitionExecState { reservation, spill_readers, spill_writers, + shared_coalescer, }, ); } @@ -377,6 +515,7 @@ impl RepartitionExecState { reservation: Arc::clone(&channels.reservation), spill_writer: channels.spill_writers[spill_writer_idx] .clone(), + shared_coalescer: channels.shared_coalescer.clone(), }, ) }) @@ -797,6 +936,34 @@ impl BatchPartitioner { /// arbitrary interleaving (and thus unordered) unless /// [`Self::with_preserve_order`] specifies otherwise. /// +/// # Batch coalescing +/// +/// Repartitioning one [`RecordBatch`] implies creating multiple smaller batches, potentially +/// as many as the number of output partitions. [`RepartitionExec`] makes sure that the returned +/// batches adhere to the configured `datafusion.execution.batch_size` for efficient operations, +/// and for that, it will automatically coalesce batches right after repartitioning. +/// +/// For this, one shared [`LimitedBatchCoalescer`] per output partition is used: +/// +/// ```text +/// ┌───┐ ┌───┐ +/// ┌─▶│ │────────▶.───────────. │ │ ┌──────────────────┐ +/// │ └───┘ ┌───┐ ( Coalescer 0 )──▶ ├───┤ ───▶│ Output 0 │ +/// │┌──────▶│ │──▶`───────────' │ │ └──────────────────┘ +/// ││ └───┘ └───┘ +/// ┌──────────────────┐ ││ ┌──────────────────┐ +/// │BatchPartitioner 0│─┘│ │ Output 1 │ +/// └──────────────────┘ │ └──────────────────┘ +/// │ +/// ┌──────────────────┐ │ ... ┌──────────────────┐ +/// │BatchPartitioner 1│──┘ │ Output 2 │ +/// └──────────────────┘ └──────────────────┘ +/// +/// ┌──────────────────┐ +/// │ Output 3 │ +/// └──────────────────┘ +/// ``` +/// /// # Spilling Architecture /// /// RepartitionExec uses [`SpillPool`](crate::spill::spill_pool) channels to handle @@ -1132,7 +1299,6 @@ impl ExecutionPlan for RepartitionExec { spill_stream, 1, // Each receiver handles one input partition BaselineMetrics::new(&metrics, partition), - None, // subsequent merge sort already does batching https://github.com/apache/datafusion/blob/e4dcf0c85611ad0bd291f03a8e03fe56d773eb16/datafusion/physical-plan/src/sorts/merge.rs#L286 )) as SendableRecordBatchStream }) .collect::>(); @@ -1171,7 +1337,6 @@ impl ExecutionPlan for RepartitionExec { spill_stream, num_input_partitions, BaselineMetrics::new(&metrics, partition), - Some(context.session_config().batch_size()), )) as SendableRecordBatchStream) } }) @@ -1473,33 +1638,17 @@ impl RepartitionExec { for res in partitioner.partition_iter(batch)? { let (partition, batch) = res?; - let size = batch.get_array_memory_size(); let timer = metrics.send_time[partition].timer(); // if there is still a receiver, send to it - if let Some(channel) = output_channels.get_mut(&partition) { - let (batch_to_send, is_memory_batch) = - match channel.reservation.try_grow(size) { - Ok(_) => { - // Memory available - send in-memory batch - (RepartitionBatch::Memory(batch), true) - } - Err(_) => { - // We're memory limited - spill to SpillPool - // SpillPool handles file handle reuse and rotation - channel.spill_writer.push_batch(&batch)?; - // Send marker indicating batch was spilled - (RepartitionBatch::Spilled, false) - } - }; - - if channel.sender.send(Some(Ok(batch_to_send))).await.is_err() { - // If the other end has hung up, it was an early shutdown (e.g. LIMIT) - // Only shrink memory if it was a memory batch - if is_memory_batch { - channel.reservation.shrink(size); + if let Some(output_channel) = output_channels.get_mut(&partition) { + for batch in output_channel.coalesce(batch)? { + if output_channel.send(batch).await.is_err() { + // If the other end has hung up, it was an early shutdown (e.g. LIMIT) + // so ignore this channel from now on. + output_channels.remove(&partition); + break; } - output_channels.remove(&partition); } } timer.done(); @@ -1529,6 +1678,14 @@ impl RepartitionExec { } } + // End of input for this task. For each output partition we still + // have a channel to, decrement the active-senders counter; whoever + // sees the count drop to zero is the last input task and must + // finalize the shared coalescer and ship its residual. + for (_, output_channel) in output_channels.drain() { + output_channel.finalize().await?; + } + // Spill writers will auto-finalize when dropped // No need for explicit flush Ok(()) @@ -1660,13 +1817,9 @@ struct PerPartitionStream { /// Execution metrics baseline_metrics: BaselineMetrics, - - /// None for sort preserving variant (merge sort already does coalescing) - batch_coalescer: Option, } impl PerPartitionStream { - #[expect(clippy::too_many_arguments)] fn new( schema: SchemaRef, receiver: DistributionReceiver, @@ -1675,10 +1828,7 @@ impl PerPartitionStream { spill_stream: SendableRecordBatchStream, num_input_partitions: usize, baseline_metrics: BaselineMetrics, - batch_size: Option, ) -> Self { - let batch_coalescer = - batch_size.map(|s| LimitedBatchCoalescer::new(Arc::clone(&schema), s, None)); Self { schema, receiver, @@ -1688,7 +1838,6 @@ impl PerPartitionStream { state: StreamState::ReadingMemory, remaining_partitions: num_input_partitions, baseline_metrics, - batch_coalescer, } } @@ -1774,43 +1923,6 @@ impl PerPartitionStream { } } } - - fn poll_next_and_coalesce( - self: &mut Pin<&mut Self>, - cx: &mut Context<'_>, - coalescer: &mut LimitedBatchCoalescer, - ) -> Poll>> { - let cloned_time = self.baseline_metrics.elapsed_compute().clone(); - let mut completed = false; - - loop { - if let Some(batch) = coalescer.next_completed_batch() { - return Poll::Ready(Some(Ok(batch))); - } - if completed { - return Poll::Ready(None); - } - - match ready!(self.poll_next_inner(cx)) { - Some(Ok(batch)) => { - let _timer = cloned_time.timer(); - if let Err(err) = coalescer.push_batch(batch) { - return Poll::Ready(Some(Err(err))); - } - } - Some(err) => { - return Poll::Ready(Some(err)); - } - None => { - completed = true; - let _timer = cloned_time.timer(); - if let Err(err) = coalescer.finish() { - return Poll::Ready(Some(Err(err))); - } - } - } - } - } } impl Stream for PerPartitionStream { @@ -1820,13 +1932,7 @@ impl Stream for PerPartitionStream { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - let poll; - if let Some(mut coalescer) = self.batch_coalescer.take() { - poll = self.poll_next_and_coalesce(cx, &mut coalescer); - self.batch_coalescer = Some(coalescer); - } else { - poll = self.poll_next_inner(cx); - } + let poll = self.poll_next_inner(cx); self.baseline_metrics.record_poll(poll) } } @@ -2530,13 +2636,17 @@ mod tests { let input_partitions = vec![partition]; let partitioning = Partitioning::RoundRobinBatch(4); - // Set up context with moderate memory limit to force partial spilling - // 2KB should allow some batches in memory but force others to spill + // With `batch_size = 1024` and a single UInt32 column, each + // coalesced residual is ~4 KiB. An 8 KiB pool fits one and forces + // the rest to spill. let runtime = RuntimeEnvBuilder::default() - .with_memory_limit(2 * 1024, 1.0) + .with_memory_limit(8 * 1024, 1.0) .build_arc()?; - let task_ctx = TaskContext::default().with_runtime(runtime); + let session_config = SessionConfig::new().with_batch_size(1024); + let task_ctx = TaskContext::default() + .with_runtime(runtime) + .with_session_config(session_config); let task_ctx = Arc::new(task_ctx); // create physical plan