feat(parquet): fuse level encoding passes and compact level representation#9653
feat(parquet): fuse level encoding passes and compact level representation#9653HippoBaro wants to merge 6 commits intoapache:mainfrom
Conversation
335fb81 to
44dae05
Compare
|
This is a continuation of the work done in #9447 to improve runtime performance around sparse and/or highly uniform columns. As such this may be of interest to @alamb and @etseidl. 5a1d3d7 adds three benchmarks that exercise the code path this series optimizes. I created a PR (#9654) to merge those separately if needed so the benchmark bot can have a baseline to compare against. Thanks! |
44dae05 to
7902e69
Compare
|
Thanks @HippoBaro, this looks impressive. I'm still looking, but haven't found any obvious problems yet. Gads, every time I delve this deep into parquet I go a little mad 😵💫. I think the RLE encoder could use a little refactoring/comment improvements to make the flow a little more obvious. Not as part of this PR though. |
etseidl
left a comment
There was a problem hiding this comment.
Flushing a few comments. More tomorrow.
| let mut values_to_write = 0usize; | ||
| let max_def = self.descr.max_def_level(); | ||
| self.def_levels_encoder | ||
| .put_with_observer(levels, |level, count| { |
There was a problem hiding this comment.
❤️ When I added the histograms I wasn't happy with the redundancy here. Nice fix!
Add `is_accumulating()` and `extend_run()` methods to `RleEncoder` that allow callers to detect when the encoder is in RLE accumulation mode and bulk-extend runs without per-element overhead. Add `put_with_observer()` to `LevelEncoder` that calls an `FnMut(i16, usize)` observer for each run of identical values during encoding. This allows callers to piggyback counting and histogram updates into the encoding pass without extra iterations over the level buffer. Refactor `put()` to delegate to it with a no-op observer. Previously, `write_mini_batch()` made 3 separate passes over each level array: one to count non-null values or row boundaries, one to update the level histogram, and one to RLE-encode. Now all three operations happen in a single pass via the observer closure. Remove the separate `update_definition_level_histogram()` and `update_repetition_level_histogram()` methods from PageMetrics. Add `LevelHistogram::update_n()` for batch histogram updates. The encoding loop now checks if the encoder entered RLE accumulation mode after a call to `RleEncoder::put()`. When it does, it scans ahead for the rest of the run and batches the observer call with the full run length, enabling O(1) histogram and counting updates per RLE run. Benchmark results (vs baseline): primitive_sparse_99pct_null/default 15.2 ms (was 40.3 ms, −62%) primitive_sparse_99pct_null/parquet_2 16.1 ms (was 43.5 ms, −63%) primitive_sparse_99pct_null/zstd_parquet_2 17.0 ms (was 44.4 ms, −62%) list_primitive_sparse_99pct_null/default 17.4 ms (was 39.9 ms, −56%) list_primitive_sparse_99pct_null/parquet_2 16.7 ms (was 39.9 ms, −58%) list_primitive_sparse_99pct_null/zstd_p2 16.8 ms (was 40.7 ms, −59%) primitive_all_null/default 8.8 ms (was 38.0 ms, −77%) primitive_all_null/parquet_2 8.8 ms (was 36.9 ms, −76%) primitive_all_null/zstd_parquet_2 8.9 ms (was 36.1 ms, −75%) Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
Restructure `write_list()` to accumulate consecutive null and empty rows and flush them in a single `visit_leaves()` call using `extend(repeat_n(...))`, instead of calling `visit_leaves()` per row. With sparse data (99% nulls), a 4096-row batch previously triggered ~4000 individual tree traversals, each pushing a single value per leaf. Now consecutive null/empty runs are collapsed into one traversal that extends all leaf level buffers in bulk. This follows the same pattern already used by `write_struct()`. The `write_non_null_slice` path is unchanged since each non-null row has different offsets and cannot be batched. Benchmark results (vs previous commit): list_primitive_sparse_99pct_null/default 10.5 ms (was 17.4 ms, −40%) list_primitive_sparse_99pct_null/parquet_2 10.5 ms (was 16.7 ms, −37%) list_primitive_sparse_99pct_null/zstd_p2 10.6 ms (was 16.8 ms, −37%) Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
When every element in a list, struct, or fixed-size list array is null, short-circuit level building before the row loop and store a compact `(def_value, rep_value, count)` tuple on `ArrayLevels` instead of materializing `Vec<i16>` buffers. The same fast path applies at the leaf level in `write_levels()` when `logical_nulls` covers every row. On the write side, `ArrowColumnWriter` detects the `uniform_levels` tuple and calls a dedicated `write_uniform_null_batch()` that encodes def/rep levels via `RleEncoder::put_n()` in O(1) amortized time, bypassing the normal mini-batch chunking and per-element iteration. A new `LevelEncoder::put_n_with_observer()` fuses encoding with histogram and counting updates in a single call. `write_uniform_null_batch` chunks at the configured page row count limit to respect page boundaries. Also defers `non_null_indices.reserve()` to branches that actually populate it, avoiding an unnecessary allocation for all-null arrays. Benchmark results (vs previous commit): primitive_all_null/default 192 µs (was 8.8 ms, −97.8%) primitive_all_null/parquet_2 193 µs (was 8.8 ms, −97.8%) primitive_all_null/zstd_parquet_2 250 µs (was 8.9 ms, −97.2%) Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
Replace the ad hoc level and non-null index vectors with `LevelData` and `ValueSelection`, so the writer can represent absent, uniform, dense, and sparse cases directly instead of always materializing the worst-case shape. This keeps the common paths cheap, removes the dedicated uniform null fast path by folding it into the generic semantic writer, and preserves the old all-null throughput by keeping page-sized chunking for uniform batches. Extends the compact Uniform/Dense representations (introduced for all-null columns in the previous commit) to non-null columns, yielding the same allocation, batching, and encoding benefits for the common non-null case. Benchmark results (vs previous commit): primitive_non_null/default 57.8 ms (was 63.4 ms, −9%) primitive_non_null/parquet_2 78.0 ms (was 85.1 ms, −8%) struct_non_null/default 27.3 ms (was 29.9 ms, −9%) struct_non_null/parquet_2 36.1 ms (was 38.2 ms, −6%) struct_non_null/zstd_parquet_2 47.3 ms (was 50.9 ms, −7%) Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
The literal `8` appeared in two distinct roles throughout `RleEncoder`, `RleDecoder`, and their tests. Replacing each with a named constant makes the intent explicit and prevents the two meanings from being confused. `BIT_PACK_GROUP_SIZE = 8` The Parquet RLE/bit-packing hybrid format always bit-packs values in multiples of this count (spec: "we always bit-pack a multiple of 8 values at a time"). Every occurrence related to the staging buffer size, the repeat-count threshold that triggers the RLE decision, and the group-count arithmetic in bit-packed headers now uses this name. `u8::BITS` (= 8, from std) Used wherever a bit-count is divided by 8 to obtain a byte-count (e.g. `ceil(bit_width, u8::BITS as usize)`). This is a bits-per-byte conversion, a fundamentally different concept from the packing-group size. No behaviour change. Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
7902e69 to
c891c35
Compare
|
Thanks for the reviews! I've reworked the branch to address all feedback. Sorry for the delay, it took me a while to experiment. The main structural change is a The enum LevelData {
Absent,
Materialized(Vec<i16>),
Uniform { value: i16, count: usize },
}
The resulting refactor has a larger LoC footprint, but the API is arguably much cleaner and robust. Also, rebased as per #9656 (review) |
|
Thanks @HippoBaro. I'll try to make some time to review the changes. Probably not today but hopefully tomorrow... 🤞 |
|
run benchmark arrow_writer |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing faster_sparse_columns_encoding (c891c35) to aac969d (merge-base) diff File an issue against this benchmark runner |
# Which issue does this PR close? - None, but relates to #9653 # Rationale for this change #9653 introduces optimizations related to non-null uniform workloads. This adds benchmarks so we can quantify them. # What changes are included in this PR? Add three new benchmark cases to the arrow_writer benchmark suite for evaluating write performance on struct columns at varying null densities: * `struct_non_null`: a nullable struct with 0% null rows and non-nullable primitive children; * `struct_sparse_99pct_null`: a nullable struct with 99% null rows, exercising null batching through one level of struct nesting; * `struct_all_null`: a nullable struct with 100% null rows, exercising the uniform-null path through struct nesting. Baseline results (Apple M1 Max): ``` struct_non_null/default 29.9 ms struct_non_null/parquet_2 38.2 ms struct_non_null/zstd_parquet_2 50.9 ms struct_sparse_99pct_null/default 7.2 ms struct_sparse_99pct_null/parquet_2 7.3 ms struct_sparse_99pct_null/zstd_p2 8.1 ms struct_all_null/default 83.3 µs struct_all_null/parquet_2 82.5 µs struct_all_null/zstd_parquet_2 106.6 µs ``` # Are these changes tested? N/A # Are there any user-facing changes? None Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
|
run benchmark arrow_writer |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing faster_sparse_columns_encoding (6c73ac7) to adf9308 (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
|
I am surprised by the few regressions above, such as: I can't reproduce these locally. I get: Are these known to be noisy? |
Yes. They are extremely twitchy. I always take them with a grain of salt or ten. 😅 |
|
I've now run multiple passes of the arrow_writer bench on my workstation and there appear to be no regressions due to this PR. And the speed ups are quite impressive 😄 Details |
|
@kszucs do you have time to look at this PR? It touches on your CDC code. |
|
I am hoping to review this tomorrow |
alamb
left a comment
There was a problem hiding this comment.
Thank you @HippoBaro -- this is really exciting. I am sorry it is taking so long to review, but this is in some of the most performance critical and tricky code in the parquet writer.
I went through most of it pretty carefully and I really like where it is heading but as you can probably tell by the number of comments it is a pretty large change
What I would like to request is that we break this PR into smaller chunks to make review easier to veify to get this one in
Some suggested parts to break out:
BIT_PACK_GROUP_SIZE- The fast path changes to parquet/src/arrow/arrow_writer/levels.rs
- The introduction of LevelData
- Chang to use
put_with_observerrather thanput
Test coverage for fast path
One thing that came up during my review is that many of the newly added fast paths are not covered by tests/
To see this, you can run
cargo llvm-cov --html test -p parquet
Here is a copy of the result: llvm.zip
Here is an example showing that the fast paths aren't covered
| fn write(&mut self, _values: &Self::Values, _offset: usize, _len: usize) -> Result<()> { | ||
| unreachable!("should call write_gather instead") | ||
| fn write(&mut self, values: &Self::Values, offset: usize, len: usize) -> Result<()> { | ||
| downcast_op!( |
There was a problem hiding this comment.
is this code actually callable now?
There was a problem hiding this comment.
Yes! The code is now able to distinguish between Dense { offset, len } and Sparse(Vec<usize>). When the column has no nulls, write_leaf produces Dense directly without materializing a vec like previously and write_mini_batch then calls encoder.write(values, offset, len) based on that. Neat!
|
|
||
| /// Maximum groups of 8 values per bit-packed run. Current value is 64. | ||
| /// Number of values in one bit-packed group. The Parquet RLE/bit-packing hybrid | ||
| /// format always bit-packs values in multiples of this count (see the format spec: |
There was a problem hiding this comment.
Can you please provide a link in the comments to this statement
| /// needed. Callers may use [`extend_run`](Self::extend_run) to add further | ||
| /// repetitions in O(1) once this returns `true`. | ||
| #[inline] | ||
| pub fn is_accumulating(&self, value: u64) -> bool { |
There was a problem hiding this comment.
Perhaps calling it is_accumulating_rle would help readers understand more that this is specific for the RLE mode
| if self.current_value == value { | ||
| self.repeat_count += 1; | ||
| if self.repeat_count > 8 { | ||
| if self.repeat_count > BIT_PACK_GROUP_SIZE { |
There was a problem hiding this comment.
the change to use a constant is better than hard coded constants --thank you
|
|
||
| /// Increments the count for a level value by `count`. | ||
| #[inline] | ||
| pub fn update_n(&mut self, level: i16, count: i64) { |
There was a problem hiding this comment.
what does the n stand for here? As in why not call this update_count to match the inner?
| match nulls { | ||
| Some(nulls) => { | ||
| let null_offset = range.start; | ||
| let mut pending_nulls: usize = 0; |
There was a problem hiding this comment.
I think it might also help future readers to define what empties means in this context (and how it is different than null)
| } | ||
| } | ||
|
|
||
| match info.logical_nulls.clone() { |
| /// incrementally across multiple batches without forcing run boundaries. | ||
| /// The encoder is flushed automatically when [`consume`](Self::consume) is called. | ||
| #[inline] | ||
| pub fn put(&mut self, buffer: &[i16]) -> usize { |
There was a problem hiding this comment.
In theory this is a breaking API change
However the LevelEncoder is part of the "experimental" API which is documented as not being stable
| } | ||
|
|
||
| #[derive(Debug, Clone, Copy)] | ||
| pub(crate) enum LevelDataRef<'a> { |
There was a problem hiding this comment.
This seems pretty similar to LevelData -- why can't we just use &LevelData?
If there is a good reason I think we need an explanation in comments
| } | ||
|
|
||
| #[derive(Debug, Clone, Copy)] | ||
| pub(crate) enum ValueSelectionRef<'a> { |
There was a problem hiding this comment.
likewise here -- an explanation of this and how it is related to ValueSelection would be really helpful
|
I can't wait to get this in -- so good |
|
Thanks again for the thorough reviews! I’ll keep working on this branch/PR to address the feedback (hopefully tomorrow) and for discussion purposes, but we can otherwise close it. I’ll make individual PRs as requested. Bear with me as I work through the many comments and break the commits into more digestible pieces 🙇 |
100% -- thank you for being willing to do so |
|
I had a quick look at the levels and cdc changes and seems like a strict improvement without any noticeable issues; I will try to take a closer look tomorrow. |
Which issue does this PR close?
Rationale for this change
See issue for details. The Parquet column writer currently does per-value work during level encoding regardless of data sparsity, even though the output encoding (RLE) is proportional to the number of runs.
What changes are included in this PR?
Three incremental commits, each building on the previous:
Fuse level encoding with counting and histogram updates.
write_mini_batch()previously made three separate passes over each level array: count non-nulls, update the level histogram, and RLE-encode. Now all three happen in a single pass via an observer callback onLevelEncoder. When the RLE encoder enters accumulation mode, the loop scans ahead for the full run length and batches the observer call. This makes counting and histogram updates O(1) per run.Batch consecutive null/empty rows in
write_list. Consecutive null or empty list entries are now collapsed into a singlevisit_leaves()call that bulk-extends all leaf level buffers, instead of one tree traversal per null row. Mirrors the approach already used bywrite_struct().Short-circuit entirely-null columns. When every element in an array is null, skip
Vec<i16>level-buffer materialization entirely and store a compact(def_value, rep_value, count)tuple. The writer encodes this viaRleEncoder::put_n()in O(1) amortized time, bypassing the normal mini-batch loop.Are these changes tested?
All tests passing. I added some benchmark to exercice the heavy and all-null code paths, alongside the existing 25% sparseness benchmarks:
Non-nullable column benchmarks are within noise, as expected since they have no definition levels to optimize.
Are there any user-facing changes?
None.