[lake/hudi] Introduce HudiBucketingFunction for bucket strategy#3316
[lake/hudi] Introduce HudiBucketingFunction for bucket strategy#3316fhan688 wants to merge 6 commits into
Conversation
…coder,add four test cases in HudiBucketingFunctionTest
…tingFunctionTest(composite keys、multi data type).
|
please help review, thanks! |
There was a problem hiding this comment.
Pull request overview
This PR introduces Hudi-specific bucket-key encoding and bucket-id calculation so Fluss can route lake-tiered records consistently with Hudi bucket indexing.
Changes:
- Adds
HudiBucketingFunctionand wires it intoBucketingFunction.of(...). - Adds
HudiKeyEncoderand wires it intoKeyEncoder.of(...). - Adds Hudi-based unit tests and a test-scoped Hudi bundle dependency.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
fluss-common/src/main/java/org/apache/fluss/bucketing/BucketingFunction.java |
Routes HUDI lake format to the new bucketing function. |
fluss-common/src/main/java/org/apache/fluss/bucketing/HudiBucketingFunction.java |
Implements Hudi-style bucket id calculation from encoded hash bytes. |
fluss-common/src/main/java/org/apache/fluss/row/encode/KeyEncoder.java |
Routes HUDI lake format to the new key encoder. |
fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java |
Encodes Hudi bucket keys as a 4-byte hash of stringified key fields. |
fluss-lake/fluss-lake-hudi/pom.xml |
Adds Hudi bundle as a test-scoped dependency. |
fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java |
Adds Hudi cross-validation and edge-case tests for encoding and bucketing. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } else if (lakeFormat == DataLakeFormat.HUDI) { | ||
| return new HudiKeyEncoder(rowType, keyFields); |
There was a problem hiding this comment.
nice review, thanks.
|
|
||
| public HudiKeyEncoder(RowType rowType, List<String> keys) { | ||
| // for getting key fields out of fluss internal row | ||
| fieldGetters = new InternalRow.FieldGetter[keys.size()]; |
There was a problem hiding this comment.
The encoded hash here is values.hashCode() over a List<String> built directly from each key field's toString(). However, Hudi's production path goes through BucketIdentifier#getBucketId(HoodieKey, indexKeyFields, numBuckets), which parses the record-key string by splitting on : and ,. As soon as a key field's string form contains : or , (very common for TIMESTAMP_LTZ, e.g. 2023-10-25T10:01:13.182Z, or any user string with a comma), the List<String> Hudi reconstructs differs from the one we hash here, and the resulting bucket id will diverge from Hudi's.
Note that HudiBucketingFunctionTest#testTimestampLtzType only validates against the BucketIdentifier.getBucketId(List<String>, int) overload, which sidesteps this parsing step. Please add an end-to-end test that goes through the HoodieKey overload, and either escape : / , inside stringifyForRecordKey, or document the limitation explicitly in the class Javadoc.
| if (value instanceof BinaryString) { | ||
| return value.toString(); | ||
| } | ||
| return String.valueOf(value); |
There was a problem hiding this comment.
String.valueOf(value) at line 86 will produce "[B@xxxx" for byte[], and reference-style strings for BinaryArrayData / BinaryMapData / BinaryRowData. This means a Hudi table with such a column declared as a bucket key would compute non-reproducible bucket ids.
Please reject unsupported types in the constructor by checking keyDataType.getTypeRoot() and only allowing primitive/decimal/string/temporal types. Throw IllegalArgumentException for the rest, with a clear message.
Purpose
Linked issue: #3274
Introduce Hudi's bucketing strategy into Fluss so that the Fluss server/client can compute the same bucket id as Hudi's BucketIdentifier when tiering data into a Hudi table with bucket index. This is a prerequisite for the upcoming HudiLakeWriter and HudiCompaction PRs, which need to route records to the correct Hudi bucket file.
Brief change log
fluss-common (production code)
BucketingFunction.of(...) — add DataLakeFormat.HUDI branch that returns HudiBucketingFunction.
HudiBucketingFunction — implements BucketingFunction. Decodes a 4-byte big-endian int produced by HudiKeyEncoder and computes (hash & Integer.MAX_VALUE) % numBuckets, matching Hudi's BucketIdentifier.getBucketId(List, int). Includes strict input validation (bucketKey must be exactly 4 bytes, numBuckets must be positive).
KeyEncoder.createKeyEncoder(...) — add DataLakeFormat.HUDI branch that returns HudiKeyEncoder.
HudiKeyEncoder — implements KeyEncoder. Computes List.hashCode() inline (h = 31*h + elementStringHash) over the stringified key fields, avoiding intermediate ArrayList/String.valueOf allocations on the hot path. For common numeric types (int, long, byte, short, boolean) the string hash code is computed without materializing the string. Null fields are encoded as "null" placeholder (aligned with Hudi's KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER) to avoid collision with the literal string "null".
fluss-lake-hudi (test & build)
Single-field types: INT, BIGINT, STRING, DECIMAL, TIMESTAMP_NTZ
Additional types: BOOLEAN, TINYINT, SMALLINT, FLOAT, DATE, TIME, TIMESTAMP_LTZ
Composite (multi-field) bucket keys with and without null fields
Null field uses placeholder (not literal "null") — regression test
Illegal input: bucketKey null / wrong length / numBuckets ≤ 0
Boundary: numBuckets=1, Integer.MIN_VALUE hash, negative hash sign-bit handling
All tests cross-validate against Hudi's BucketIdentifier.getBucketId(List, int)
Tests
HudiBucketingFunctionTest (13 test cases, all passing):
testIntegerHash / testLongHash / testStringHash / testDecimalHash / testTimestampEncodingHash — original single-field coverage
testNullFieldUsesPlaceholder / testNullFieldDoesNotCollideWithLiteralNullString — null handling
testBucketingRejectsInvalidBucketKey / testBucketingRejectsNonPositiveNumBuckets — input validation
testCompositeBucketKeyMatchesHudiFieldValueRecordKey / testCompositeBucketKeyWithNullFieldUsesPlaceholder — multi-field keys
testBooleanAndIntegralTypes / testDateAndTimeTypes / testTimestampLtzType — type coverage
testBucketingNumBucketsBoundaryValues — boundary conditions
API and Format
No API or storage format changes. This PR only adds new implementations behind existing interfaces (BucketingFunction and KeyEncoder) for a new DataLakeFormat.HUDI enum value that was already defined.
Documentation
No new user-facing documentation required. This is an internal bucketing strategy implementation.