perf: coalesce batches before sending to distributor channels in RepartitionExec#22010
perf: coalesce batches before sending to distributor channels in RepartitionExec#22010gabotechs wants to merge 10 commits into
Conversation
…butor channel, instead of after.
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing gabotechs/repartition-coalesce-batches-berfore-channels (dcf6482) to b2fd2d3 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing gabotechs/repartition-coalesce-batches-berfore-channels (dcf6482) to b2fd2d3 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing gabotechs/repartition-coalesce-batches-berfore-channels (dcf6482) to b2fd2d3 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing gabotechs/repartition-coalesce-batches-berfore-channels (dcf6482) to b2fd2d3 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing gabotechs/repartition-coalesce-batches-berfore-channels (dcf6482) to b2fd2d3 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing gabotechs/repartition-coalesce-batches-berfore-channels (dcf6482) to b2fd2d3 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing gabotechs/repartition-coalesce-batches-berfore-channels (dcf6482) to b2fd2d3 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing gabotechs/repartition-coalesce-batches-berfore-channels (dcf6482) to b2fd2d3 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing gabotechs/repartition-coalesce-batches-berfore-channels (dcf6482) to b2fd2d3 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
run benchmark tpch10 |
| /// inside `inner` holds cloned `Arc`s pointing back at those shared | ||
| /// resources. | ||
| struct OutputChannels { | ||
| inner: HashMap<usize, OutputChannel>, |
There was a problem hiding this comment.
Probably no perf change but I think Vec<Option<OutputChannel>> is better
There was a problem hiding this comment.
I agree using Vec is better (faster, clearer what is allowed). Maybe as a follwo on PR
There was a problem hiding this comment.
The reason why this is a HashMap<usize, OutputChannel> is mainly because that's what it was before this PR. Note how that did not change:
datafusion/datafusion/physical-plan/src/repartition/mod.rs
Lines 559 to 576 in dcf6482
I'll try refactoring further and using Vec instead
There was a problem hiding this comment.
Manage to come up with bab26b7.
There's several places in this file where partition->channel maps are represented with HashMap<usize, _> instead of Vec<_>. Rather than moving only this one to be a Vec<_> I tried to be consistent in the other places and now they are all Vec<_>s.
This cascaded into a bunch of restructures that I think have a net positive outcome, as it allowed to remove yet another abstraction (struct OutputChannels) and simplify a bit the code, and now the diff is even smaller.
Let me know what you think.
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing gabotechs/repartition-coalesce-batches-berfore-channels (dcf6482) to b2fd2d3 (merge-base) diff using: tpch10 File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch10 — base (merge-base)
tpch10 — branch
File an issue against this benchmark runner |
|
This should fix #20491 too! |
From: Apache Arrow DataFusion: a Fast, Embeddable, Modular Analytic I think there is some overlap of these 2024 figures at high core counts (>32 cores) and the reported improvements here. @alamb @gabotechs perhaps it would be good to redo this experiment once more. Perhaps we could also test clickbench just on higher core counts and see if it improves. |
@gabotechs I notice peak memory is quite a bit higher here? |
I can imagine how this can happen in cases where the fanout is very big, it boils down to the gating mechanism implemented in Before this PR, the batches that were flowing through there were of size The memory reporting there seems quite unstable though, for example, this other runs show the same peak memory usage: |
|
run benchmark tpch10 |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing gabotechs/repartition-coalesce-batches-berfore-channels (dcf6482) to b2fd2d3 (merge-base) diff using: tpch10 File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch10 — base (merge-base)
tpch10 — branch
File an issue against this benchmark runner |
|
Checking this one out |
|
run benchmark clickbench_partitioned |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing gabotechs/repartition-coalesce-batches-berfore-channels (dcf6482) to b2fd2d3 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
There was a problem hiding this comment.
Thank you @gabotechs and @Dandandan
It took me a while to wrap my head around the new design (where now the senders also have shared state and a potential source of contention)
Before merge, I think we should:
- Use @Dandandan ;s suggestion and avoid a HashMap
- Try to document the high-level design a bit (see below)
- Adjust memory accounting to account for in-progress coalesced batches (see inlined comment)
- Clarify the that
limitis actually never set
In terms of documentation I recommend we document the design / explain the rationale a bit in the high level module documentation:
datafusion/datafusion/physical-plan/src/repartition/mod.rs
Lines 90 to 128 in dcf6482
FYI @crepererum perhaps you have some time to review this PR too as I think you are familiar with the original design
| /// inside `inner` holds cloned `Arc`s pointing back at those shared | ||
| /// resources. | ||
| struct OutputChannels { | ||
| inner: HashMap<usize, OutputChannel>, |
There was a problem hiding this comment.
I agree using Vec is better (faster, clearer what is allowed). Maybe as a follwo on PR
| batch: RecordBatch, | ||
| ) -> Result<()> { | ||
| let Some(channel) = self.inner.get(&partition) else { | ||
| return Ok(()); |
There was a problem hiding this comment.
this just drops the input -- maybe it is worth pointing out in a comment when this can happen (I think when the output partition has been closed)
There was a problem hiding this comment.
🤔 thinking about it, it's probably possible to structure things so that this never happens. I'll try it out.
| /// active-senders counter. The last task to do so calls | ||
| /// [`SharedCoalescer::finalize`] and ships the residual. | ||
| async fn finalize(&mut self) -> Result<()> { | ||
| let partitions: Vec<usize> = self.inner.keys().copied().collect(); |
There was a problem hiding this comment.
I wonder if we could avoid this copy/ allocation and just consume the inner
async fn finalize(&mut self) -> Result<()> {
for (partition, channel) in self.inner.drain() {
let Some(shared) = channel.shared_coalescer.clone() else {
continue;
};
for batch in shared.finalize()? {
self.send_to_channel(partition, batch).await;
}
}
Ok(())
}it would probably require rejiggering how self.send_to_channel worked to get a passed channel (rather than looking it up again)
| timer.done(); | ||
| return; | ||
| }; | ||
| match channel.reservation.try_grow(size) { |
There was a problem hiding this comment.
Claude pointed out that this code only increses the memory reservation after coalescing the batch -- so in other words the memory reservation doesn't account for the memory in the SharedCoalescer -- for really wide fanouts that could be non trivial memory (num_partitions batches worth of memory)
There was a problem hiding this comment.
I think this is how it worked before:
Before, the batches were getting coalesced later in the RepartitionExec pipeline, and there was no memory accounting happening there, so I'd expect the memory reservation to behave the same as main.
Not implying that's the best approach though... as I invest time in improving this operator, I find it a bit hard to deal with, as it seems to have accumulated several stacked abstractions (structs representing things) over the years, and it's a bit hard to maintain.
I'd not be surprised if a full rewrite could yield a more maintainable outcome.
| let name = self.name().to_owned(); | ||
| let schema = self.schema(); | ||
| let schema_captured = Arc::clone(&schema); | ||
| let fetch = self.fetch(); |
There was a problem hiding this comment.
I think this is always None (because RepartitionExec doesn't provide an implementation). I think it would be clearer if you explicitly set fetch to None here, and maybe left a comment / ticket reference to implement limiting within the repartition (you'll be in good shape to do this with the LimitedBatchCoalescer)
There was a problem hiding this comment.
🤔 Interesting, you are completely right, I was getting confused because of the fact that LimitedBatchCoalescer is used here, but it looks like None is passed in every case, so we might be better without it at all.
| /// batches. The mutex is held only briefly. | ||
| fn push_and_drain(&self, batch: RecordBatch) -> Result<Vec<RecordBatch>> { | ||
| let mut acc = Vec::new(); | ||
| let mut c = self.inner.lock(); |
There was a problem hiding this comment.
I wonder if this would become a contention point / stall threads as this effectively blocks the executor thread if two threads are trying to write to the same output partition at once. And since pushing a batch can be an expensive operation (copying!)
I think we should use a tokio mutex here to make sure we don't block the runtime if some other task is writing to the output
There was a problem hiding this comment.
I'm not sure if a tokio mutex would really help here. As this is a synchronous operation, we would be blocking the thread anyways because we are doing a blocking operation. Citing Tokio docs about this:
The feature that the async mutex offers over the blocking mutex is the ability to keep it locked across an .await point. This makes the async mutex more expensive than the blocking mutex, so the blocking mutex should be preferred in the cases where it can be used. The primary use case for the async mutex is to provide shared mutable access to IO resources such as a database connection.
That being said, I do not love that there is a Mutex here, I'll try to provide a lock-free solution.
There was a problem hiding this comment.
Managed to do this for reducing contention: bbd10de.
However, it introduces yet another point that accumulates batches without accounting for them.
There was a problem hiding this comment.
Let me know what you think, if you think this is good I can somehow wire that up with the memory reservations
There was a problem hiding this comment.
I think we should be very careful about adding yet another dependency -- can we contemplate adding that in a folow on PR?
There was a problem hiding this comment.
Reverted the commit. I still worry a bit at this becoming a contention point though... benchmarks seems pretty happy with the change, but they also run queries sequentially one be one.
If you'd prefer to move forward without the staging queue approach, I'd like to first try this out internally in Datadog's systems at a large scale and see if it can be a problem.
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
…onChannels` and the `OutputChannel`. Remove `OutputChannels`.
|
Moving it temporarily to draft, I want to improve it a bit more before it's ready for another review. |
4c66204 to
98aa2c0
Compare
98aa2c0 to
bbd10de
Compare
|
run benchmark tpch10 |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing gabotechs/repartition-coalesce-batches-berfore-channels (bbd10de) to 4fac70d (merge-base) diff using: tpch10 File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch10 — base (merge-base)
tpch10 — branch
File an issue against this benchmark runner |
alamb
left a comment
There was a problem hiding this comment.
Thanks @gabotechs -- I will try and find time to review this carefully soon. Those are some impressive perf numbers 🦾
| "crossbeam-utils", | ||
| ] | ||
|
|
||
| [[package]] |
There was a problem hiding this comment.
not a huge fan of this new dependency --given we already have https://crates.io/crates/crossbeam-deque (a few lines above) maybe we can just that one so there are no net new dependencies?
There was a problem hiding this comment.
I don't think we can no, that package contains a work stealing queue, which is not really what we need here.
There was a problem hiding this comment.
Reverted the commit for now
This reverts commit bbd10de.


Which issue does this PR close?
Rationale for this change
Today RepartitionExec emits one small batch per (input batch × non-empty output partition), then coalesces those small batches back into target-sized ones on the consumer side. That means the channel layer (memory accounting, gate, await suspensions) does work proportional to num_partitions per input batch, even though each small batch only carries batch_size / num_partitions rows. Moving the coalescing producer-side, before the gate, collapses the channel traffic and reduces sensitivity to high output fanout.
This gets much worst in https://github.com/datafusion-contrib/datafusion-distributed, which uses
RepartitionExecas the foundational backbone for network shuffles: it scales up theRepartitionExecoutput partitions to match P (partitions) * W (workers), where P is typically in the range of 12-24, and W can be from 1 to thousands. The fanout overhead that the currentRepartitionExecintroduces is pretty big in that setup.Ideally, the overhead of fanning many small
RecordBatchesshould be as small as possible.What changes are included in this PR?
Are these changes tested?
Yes, by existing tests
Are there any user-facing changes?
No