Track spill read-back memory in SMJ#22103
Conversation
There was a problem hiding this comment.
@SubhamSinghal
Thanks for the follow up.
I found one blocking issue around memory reservation accounting on error paths in the spill read-back flow.
| .set_max(self.reservation.size()); | ||
| spill_read_mem += batch_mem; | ||
|
|
||
| let file = BufReader::new(File::open(spill_file.path())?); |
There was a problem hiding this comment.
It looks like spill_read_mem is only shrunk after all file reads and all interleave calls succeed. Once self.reservation.grow(batch_mem) succeeds, any later ? such as from File::open, StreamReader::try_new, next().transpose(), or interleave can return early before the shrink happens at line 1651.
That leaves the reservation inflated until the stream is dropped, which breaks the grow/shrink accounting invariant on error paths and can leave the memory pool reporting stale reserved memory after a failed poll.
Could we make this temporary read-back reservation scoped/RAII-based, or otherwise guarantee that shrink runs on every return path after a successful grow?
There was a problem hiding this comment.
@kosiew Thanks for highlighting this. Switched from manual grow/shrink on self.reservation to a scoped MemoryReservation via self.reservation.new_empty().
kosiew
left a comment
There was a problem hiding this comment.
@SubhamSinghal
Looks 👍 to me
Which issue does this PR close?
Follow-up to #21962.
Rationale for this change
After #21962, the memory pool accurately tracks residual
join_arraysmemory that remains after aBufferedBatchisspilled to disk. However, when spilled batches are read back from disk during output materialization in
materialize_right_columns, the deserialized data temporarily exists in memory without any pool reservation.The pool thinks these batches cost 0 bytes during read-back. Under memory pressure (the reason they were spilled), other
operators see stale headroom and may over-allocate, risking OOM.
What changes are included in this PR?
Changed
materialize_right_columnsfrom&selfto&mut selfand addedgrow/shrinkat the exact points where spilled data is read from disk:Path A (single source spilled):
grow(size_estimation)immediately beforefetch_right_columns_by_idxsshrink(size_estimation)immediately afterPath B (multi-source interleave):
size_estimationfor all spilled sourcesgrow(total)beforesource_dataloadingshrink(total)after interleave completesUses unconditional
grow()because the data must be read to produce output — there is no fallback. Same rationale as#21962: if memory physically exists, the pool must reflect it.
Are these changes tested?
Yes — two new tests:
spill_read_back_memory_accounting: multiple buffered batches for same key (multi-source Path B) — verifiespeak_mem_used >= size_estimationandpool.reserved() == 0at endspill_read_back_single_source: distinct keys with one batch per group (single-source Path A) — same assertionsAre there any user-facing changes?
No.