diff --git a/fluss-common/src/main/java/org/apache/fluss/bucketing/BucketingFunction.java b/fluss-common/src/main/java/org/apache/fluss/bucketing/BucketingFunction.java index 915fbd172f..5291ccdc39 100644 --- a/fluss-common/src/main/java/org/apache/fluss/bucketing/BucketingFunction.java +++ b/fluss-common/src/main/java/org/apache/fluss/bucketing/BucketingFunction.java @@ -43,6 +43,8 @@ static BucketingFunction of(@Nullable DataLakeFormat lakeFormat) { return new FlussBucketingFunction(); } else if (lakeFormat == DataLakeFormat.ICEBERG) { return new IcebergBucketingFunction(); + } else if (lakeFormat == DataLakeFormat.HUDI) { + return new HudiBucketingFunction(); } else { throw new UnsupportedOperationException("Unsupported lake format: " + lakeFormat); } diff --git a/fluss-common/src/main/java/org/apache/fluss/bucketing/HudiBucketingFunction.java b/fluss-common/src/main/java/org/apache/fluss/bucketing/HudiBucketingFunction.java new file mode 100644 index 0000000000..0799d64b5f --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/bucketing/HudiBucketingFunction.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.bucketing; + +/** + * An implementation of {@link BucketingFunction} to follow Hudi's bucketing strategy. + * + *

The bucket id is computed in the same way as Hudi's {@code + * org.apache.hudi.index.bucket.BucketIdentifier#getBucketId(String, String, int)}: take a 32-bit + * integer hash that is encoded as a fixed 4-byte big-endian array by {@code HudiKeyEncoder}, mask + * its sign bit and modulo by {@code numBuckets}. + */ +public class HudiBucketingFunction implements BucketingFunction { + + /** Length of a Hudi-encoded bucket key, in bytes (a single big-endian {@code int}). */ + private static final int ENCODED_KEY_LENGTH = 4; + + @Override + public int bucketing(byte[] bucketKey, int numBuckets) { + if (bucketKey == null) { + throw new IllegalArgumentException("bucketKey must not be null"); + } + if (bucketKey.length != ENCODED_KEY_LENGTH) { + throw new IllegalArgumentException( + "bucketKey must be exactly " + + ENCODED_KEY_LENGTH + + " bytes for Hudi bucketing, but got " + + bucketKey.length + + " bytes. The bucket key bytes are expected to be produced by HudiKeyEncoder."); + } + if (numBuckets <= 0) { + throw new IllegalArgumentException( + "numBuckets must be positive, but got " + numBuckets); + } + + // Decode 4-byte big-endian int produced by HudiKeyEncoder via bit operations + // to avoid allocating a ByteBuffer instance on this hot path. + int restored = + ((bucketKey[0] & 0xFF) << 24) + | ((bucketKey[1] & 0xFF) << 16) + | ((bucketKey[2] & 0xFF) << 8) + | (bucketKey[3] & 0xFF); + + return (restored & Integer.MAX_VALUE) % numBuckets; + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/row/encode/KeyEncoder.java b/fluss-common/src/main/java/org/apache/fluss/row/encode/KeyEncoder.java index 4d59be33b0..fe52e926cc 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/encode/KeyEncoder.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/encode/KeyEncoder.java @@ -20,6 +20,7 @@ import org.apache.fluss.config.TableConfig; import org.apache.fluss.metadata.DataLakeFormat; import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.encode.hudi.HudiKeyEncoder; import org.apache.fluss.row.encode.iceberg.IcebergKeyEncoder; import org.apache.fluss.row.encode.paimon.PaimonKeyEncoder; import org.apache.fluss.types.RowType; @@ -76,6 +77,14 @@ static KeyEncoder ofPrimaryKeyEncoder( Optional optKvFormatVersion = tableConfig.getKvFormatVersion(); DataLakeFormat dataLakeFormat = tableConfig.getDataLakeFormat().orElse(null); int kvFormatVersion = optKvFormatVersion.orElse(1); + + // Hudi's HudiKeyEncoder is lossy (4-byte hash); it must NOT be used for + // primary key encoding because different keys with the same List#hashCode + // would collide. Use CompactedKeyEncoder instead. + if (dataLakeFormat == DataLakeFormat.HUDI) { + return CompactedKeyEncoder.createKeyEncoder(rowType, keyFields); + } + if (kvFormatVersion == 1) { return of(rowType, keyFields, dataLakeFormat); } @@ -129,6 +138,8 @@ static KeyEncoder of( return CompactedKeyEncoder.createKeyEncoder(rowType, keyFields); } else if (lakeFormat == DataLakeFormat.ICEBERG) { return new IcebergKeyEncoder(rowType, keyFields); + } else if (lakeFormat == DataLakeFormat.HUDI) { + return new HudiKeyEncoder(rowType, keyFields); } else { throw new UnsupportedOperationException("Unsupported datalake format: " + lakeFormat); } diff --git a/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java b/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java new file mode 100644 index 0000000000..ae5ab770fe --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/row/encode/hudi/HudiKeyEncoder.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.row.encode.hudi; + +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.encode.KeyEncoder; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.RowType; + +import java.util.ArrayList; +import java.util.List; + +/** + * An implementation of {@link KeyEncoder} to follow Hudi's encoding strategy. + * + *

The encoded bytes are a 4-byte big-endian representation of {@code List.hashCode()} + * over the stringified key fields, which matches the way Hudi's {@code BucketIdentifier} hashes a + * record key. Null fields are replaced by {@link #NULL_RECORDKEY_PLACEHOLDER} so that an explicit + * null and the literal string {@code "null"} no longer collide in the hash space. + */ +public class HudiKeyEncoder implements KeyEncoder { + + /** + * Placeholder used to represent a {@code null} key field when computing the record-key hash. It + * is intentionally aligned with Hudi's {@code KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER} so that + * the resulting bucket id stays identical to what Hudi would compute on its side. + */ + public static final String NULL_RECORDKEY_PLACEHOLDER = "__null__"; + + private final InternalRow.FieldGetter[] fieldGetters; + + public HudiKeyEncoder(RowType rowType, List keys) { + // for getting key fields out of fluss internal row + fieldGetters = new InternalRow.FieldGetter[keys.size()]; + for (int i = 0; i < keys.size(); i++) { + int keyIndex = rowType.getFieldIndex(keys.get(i)); + DataType keyDataType = rowType.getTypeAt(keyIndex); + fieldGetters[i] = InternalRow.createFieldGetter(keyDataType, keyIndex); + } + } + + @Override + public byte[] encodeKey(InternalRow row) { + // Build the same string list that Hudi would build out of a record key, so the + // resulting List#hashCode() — and therefore the bucket id — match Hudi's own + // BucketIdentifier#getBucketId. + List values = new ArrayList<>(fieldGetters.length); + for (InternalRow.FieldGetter fieldGetter : fieldGetters) { + Object value = fieldGetter.getFieldOrNull(row); + values.add(stringifyForRecordKey(value)); + } + int hashCode = values.hashCode(); + + // 4-byte big-endian, decoded symmetrically by HudiBucketingFunction. + return new byte[] { + (byte) (hashCode >>> 24), + (byte) (hashCode >>> 16), + (byte) (hashCode >>> 8), + (byte) hashCode + }; + } + + private static String stringifyForRecordKey(Object value) { + if (value == null) { + return NULL_RECORDKEY_PLACEHOLDER; + } + if (value instanceof BinaryString) { + return value.toString(); + } + return String.valueOf(value); + } +} diff --git a/fluss-lake/fluss-lake-hudi/pom.xml b/fluss-lake/fluss-lake-hudi/pom.xml index ddf6b1329d..9fbce449f5 100644 --- a/fluss-lake/fluss-lake-hudi/pom.xml +++ b/fluss-lake/fluss-lake-hudi/pom.xml @@ -32,6 +32,43 @@ + + org.apache.hudi + hudi-flink${flink.major.version}-bundle + ${hudi.version} + test + + + log4j + log4j + + + org.slf4j + slf4j-api + + + org.apache.commons + commons-lang3 + + + org.apache.curator + curator-client + + + org.apache.curator + curator-framework + + + org.apache.curator + curator-recipes + + + org.apache.zookeeper + zookeeper + + + + org.apache.fluss fluss-common diff --git a/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java new file mode 100644 index 0000000000..d9a96248ca --- /dev/null +++ b/fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/bucketing/HudiBucketingFunctionTest.java @@ -0,0 +1,483 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.bucketing; + +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.GenericRow; +import org.apache.fluss.row.TimestampLtz; +import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.row.encode.hudi.HudiKeyEncoder; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.DataTypes; +import org.apache.fluss.types.RowType; + +import org.apache.hudi.index.bucket.BucketIdentifier; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.fluss.row.encode.hudi.HudiKeyEncoder.NULL_RECORDKEY_PLACEHOLDER; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Unit test for {@link HudiBucketingFunction}. */ +class HudiBucketingFunctionTest { + + @Test + void testIntegerHash() { + int testValue = 42; + int bucketNum = 10; + String key = "id"; + + RowType rowType = RowType.of(new DataType[] {DataTypes.INT()}, new String[] {key}); + + GenericRow row = GenericRow.of(testValue); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList(key)); + + String recordKey = String.valueOf(testValue); + + // Encode with our implementation + byte[] ourEncodedKey = encoder.encodeKey(row); + // This is the equivalent bytes array which the key should be encoded to. + byte[] equivalentBytes = toBytes(new String[] {recordKey}); + assertThat(ourEncodedKey).isEqualTo(equivalentBytes); + + int hudiBucket = BucketIdentifier.getBucketId(recordKey, key, bucketNum); + + HudiBucketingFunction hudiBucketingFunction = new HudiBucketingFunction(); + int ourBucket = hudiBucketingFunction.bucketing(ourEncodedKey, bucketNum); + + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testLongHash() { + long testValue = 1234567890123456789L; + int bucketNum = 10; + String key = "id"; + + RowType rowType = RowType.of(new DataType[] {DataTypes.BIGINT()}, new String[] {key}); + + GenericRow row = GenericRow.of(testValue); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList(key)); + + String recordKey = String.valueOf(testValue); + + // Encode with our implementation + byte[] ourEncodedKey = encoder.encodeKey(row); + // This is the equivalent bytes array which the key should be encoded to. + byte[] equivalentBytes = toBytes(new String[] {recordKey}); + assertThat(ourEncodedKey).isEqualTo(equivalentBytes); + + int hudiBucket = BucketIdentifier.getBucketId(recordKey, key, bucketNum); + + HudiBucketingFunction hudiBucketingFunction = new HudiBucketingFunction(); + int ourBucket = hudiBucketingFunction.bucketing(ourEncodedKey, bucketNum); + + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testStringHash() { + String testValue = "Hello Hudi, Fluss this side!"; + int bucketNum = 10; + String key = "name"; + + RowType rowType = RowType.of(new DataType[] {DataTypes.STRING()}, new String[] {key}); + + GenericRow row = GenericRow.of(BinaryString.fromString(testValue)); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList(key)); + + // Encode with our implementation + byte[] ourEncodedKey = encoder.encodeKey(row); + // This is the equivalent bytes array which the key should be encoded to. + byte[] equivalentBytes = toBytes(new String[] {testValue}); + assertThat(ourEncodedKey).isEqualTo(equivalentBytes); + + int hudiBucket = BucketIdentifier.getBucketId(testValue, key, bucketNum); + + HudiBucketingFunction hudiBucketingFunction = new HudiBucketingFunction(); + int ourBucket = hudiBucketingFunction.bucketing(ourEncodedKey, bucketNum); + + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testDecimalHash() { + BigDecimal testValue = new BigDecimal("123.45"); + Decimal decimal = Decimal.fromBigDecimal(testValue, 10, 2); + int bucketNum = 10; + String key = "amount"; + + RowType rowType = RowType.of(new DataType[] {DataTypes.DECIMAL(10, 2)}, new String[] {key}); + + GenericRow row = GenericRow.of(decimal); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList(key)); + + // Encode with our implementation + byte[] ourEncodedKey = encoder.encodeKey(row); + // This is the equivalent bytes array which the key should be encoded to. + byte[] equivalentBytes = toBytes(new String[] {testValue.toPlainString()}); + assertThat(ourEncodedKey).isEqualTo(equivalentBytes); + + int hudiBucket = BucketIdentifier.getBucketId(testValue.toPlainString(), key, bucketNum); + + HudiBucketingFunction hudiBucketingFunction = new HudiBucketingFunction(); + int ourBucket = hudiBucketingFunction.bucketing(ourEncodedKey, bucketNum); + + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testTimestampEncodingHash() throws IOException { + long millis = 1698235273182L; + int nanos = 123456; + long micros = millis * 1000 + (nanos / 1000); + TimestampNtz testValue = TimestampNtz.fromMillis(millis, nanos); + int bucketNum = 10; + String key = "event_time"; + + RowType rowType = RowType.of(new DataType[] {DataTypes.TIMESTAMP(6)}, new String[] {key}); + + GenericRow row = GenericRow.of(testValue); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList(key)); + + // Encode with our implementation + byte[] ourEncodedKey = encoder.encodeKey(row); + // This is the equivalent bytes array which the key should be encoded to. + byte[] equivalentBytes = toBytes(new String[] {testValue.toString()}); + assertThat(ourEncodedKey).isEqualTo(equivalentBytes); + + int hudiBucket = BucketIdentifier.getBucketId(testValue.toString(), key, bucketNum); + + HudiBucketingFunction hudiBucketingFunction = new HudiBucketingFunction(); + int ourBucket = hudiBucketingFunction.bucketing(ourEncodedKey, bucketNum); + + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testDateHash() { + int dateValue = 19655; + int bucketNum = 10; + String key = "date"; + + RowType rowType = RowType.of(new DataType[] {DataTypes.DATE()}, new String[] {key}); + GenericRow row = GenericRow.of(dateValue); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList(key)); + + // Encode with our implementation + byte[] ourEncodedKey = encoder.encodeKey(row); + // This is the equivalent bytes array which the key should be encoded to. + byte[] equivalentBytes = toBytes(new String[] {String.valueOf(dateValue)}); + assertThat(ourEncodedKey).isEqualTo(equivalentBytes); + + int hudiBucket = BucketIdentifier.getBucketId(String.valueOf(dateValue), key, bucketNum); + + HudiBucketingFunction hudiBucketingFunction = new HudiBucketingFunction(); + int ourBucket = hudiBucketingFunction.bucketing(ourEncodedKey, bucketNum); + + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testMultiKeysHashing() { + int testIntValue = 42; + long testLongValue = 1234567890123456789L; + String testStringValue = "Hello Hudi, Fluss this side!"; + BigDecimal testValue = new BigDecimal("123.45"); + Decimal decimal = Decimal.fromBigDecimal(testValue, 10, 2); + int bucketNum = 10; + String key = "age,id,name,amount"; + String recordKey = + "age:" + + testIntValue + + ",id:" + + testLongValue + + ",name:" + + testStringValue + + ",amount:" + + testValue; + + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT(), + DataTypes.BIGINT(), + DataTypes.STRING(), + DataTypes.DECIMAL(10, 2) + }, + new String[] {"age", "id", "name", "amount"}); + GenericRow row = + GenericRow.of( + testIntValue, + testLongValue, + BinaryString.fromString(testStringValue), + decimal); + HudiKeyEncoder encoder = + new HudiKeyEncoder(rowType, Arrays.asList("age", "id", "name", "amount")); + + // Encode with our implementation + byte[] ourEncodedKey = encoder.encodeKey(row); + // This is the equivalent bytes array which the key should be encoded to. + byte[] equivalentBytes = + toBytes( + new String[] { + String.valueOf(testIntValue), + String.valueOf(testLongValue), + testStringValue, + String.valueOf(decimal) + }); + assertThat(ourEncodedKey).isEqualTo(equivalentBytes); + + int hudiBucket = BucketIdentifier.getBucketId(recordKey, key, bucketNum); + + HudiBucketingFunction hudiBucketingFunction = new HudiBucketingFunction(); + int ourBucket = hudiBucketingFunction.bucketing(ourEncodedKey, bucketNum); + + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testNullFieldUsesPlaceholder() { + int bucketNum = 10; + String key = "name"; + + RowType rowType = + RowType.of(new DataType[] {DataTypes.STRING().copy(true)}, new String[] {key}); + + // a row with an explicit null on the bucket key column + GenericRow row = GenericRow.of((Object) null); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList(key)); + + // Encoded bytes must hash the placeholder, NOT the literal "null" string. + byte[] ourEncodedKey = encoder.encodeKey(row); + byte[] placeholderBytes = toBytes(new String[] {NULL_RECORDKEY_PLACEHOLDER}); + byte[] javaNullLiteralBytes = toBytes(new String[] {"null"}); + + assertThat(ourEncodedKey).isEqualTo(placeholderBytes); + assertThat(ourEncodedKey).isNotEqualTo(javaNullLiteralBytes); + + int hudiBucket = BucketIdentifier.getBucketId(NULL_RECORDKEY_PLACEHOLDER, key, bucketNum); + int ourBucket = new HudiBucketingFunction().bucketing(ourEncodedKey, bucketNum); + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testNullFieldDoesNotCollideWithLiteralNullString() { + // The literal string "null" must not produce the same encoded bytes as a real + // null value — a regression test for the previous String.valueOf(null) behavior. + String key = "name"; + RowType rowType = + RowType.of(new DataType[] {DataTypes.STRING().copy(true)}, new String[] {key}); + + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList(key)); + + byte[] encodedNull = encoder.encodeKey(GenericRow.of((Object) null)); + byte[] encodedLiteralNull = + encoder.encodeKey(GenericRow.of(BinaryString.fromString("null"))); + + assertThat(encodedNull).isNotEqualTo(encodedLiteralNull); + } + + @Test + void testBucketingRejectsInvalidBucketKey() { + HudiBucketingFunction function = new HudiBucketingFunction(); + + assertThatThrownBy(() -> function.bucketing(null, 10)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("must not be null"); + + // length 0 — wrong length + assertThatThrownBy(() -> function.bucketing(new byte[0], 10)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("exactly 4 bytes"); + + // length < 4 — used to throw BufferUnderflowException, now must be IAE + assertThatThrownBy(() -> function.bucketing(new byte[] {1, 2, 3}, 10)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("exactly 4 bytes"); + + // length > 4 — used to silently truncate, now must be IAE + assertThatThrownBy(() -> function.bucketing(new byte[] {1, 2, 3, 4, 5}, 10)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("exactly 4 bytes"); + } + + @Test + void testBucketingRejectsNonPositiveNumBuckets() { + HudiBucketingFunction function = new HudiBucketingFunction(); + byte[] anyKey = new byte[] {0, 0, 0, 1}; + + assertThatThrownBy(() -> function.bucketing(anyKey, 0)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("must be positive"); + assertThatThrownBy(() -> function.bucketing(anyKey, -1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("must be positive"); + } + + @Test + void testCompositeBucketKeyMatchesHudiFieldValueRecordKey() { + // Two-field bucket key: Hudi expects record key in "f1:v1,f2:v2" form when its + // BucketIdentifier sees a ':' in the record key, then extracts ["v1","v2"] and + // hashes that List. Fluss feeds the same ["v1","v2"] into List#hashCode(). + int bucketNum = 16; + String f1 = "user_id"; + String f2 = "region"; + int idValue = 12345; + String regionValue = "cn-north"; + + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.STRING()}, + new String[] {f1, f2}); + GenericRow row = GenericRow.of(idValue, BinaryString.fromString(regionValue)); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Arrays.asList(f1, f2)); + + byte[] ourEncodedKey = encoder.encodeKey(row); + byte[] expected = toBytes(new String[] {String.valueOf(idValue), regionValue}); + assertThat(ourEncodedKey).isEqualTo(expected); + + // Compare against Hudi's List-based overload to avoid Hudi's own + // recordKey-parsing path (which would split on ':' inside values like timestamps). + int hudiBucket = + BucketIdentifier.getBucketId( + Arrays.asList(String.valueOf(idValue), regionValue), bucketNum); + int ourBucket = new HudiBucketingFunction().bucketing(ourEncodedKey, bucketNum); + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testCompositeBucketKeyWithNullFieldUsesPlaceholder() { + int bucketNum = 8; + String f1 = "user_id"; + String f2 = "region"; + + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT().copy(true), DataTypes.STRING().copy(true)}, + new String[] {f1, f2}); + // f2 is null on purpose + GenericRow row = GenericRow.of(42, null); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Arrays.asList(f1, f2)); + + byte[] ourEncodedKey = encoder.encodeKey(row); + byte[] expected = toBytes(new String[] {"42", NULL_RECORDKEY_PLACEHOLDER}); + assertThat(ourEncodedKey).isEqualTo(expected); + + int hudiBucket = + BucketIdentifier.getBucketId( + Arrays.asList("42", NULL_RECORDKEY_PLACEHOLDER), bucketNum); + int ourBucket = new HudiBucketingFunction().bucketing(ourEncodedKey, bucketNum); + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testBooleanAndIntegralTypes() { + int bucketNum = 10; + // BOOLEAN + assertSingleFieldRoundTrip( + DataTypes.BOOLEAN(), true, String.valueOf(true), "flag", bucketNum); + assertSingleFieldRoundTrip( + DataTypes.BOOLEAN(), false, String.valueOf(false), "flag", bucketNum); + // TINYINT / SMALLINT + assertSingleFieldRoundTrip(DataTypes.TINYINT(), (byte) 7, "7", "b", bucketNum); + assertSingleFieldRoundTrip(DataTypes.SMALLINT(), (short) 12345, "12345", "s", bucketNum); + // FLOAT (use a value whose Float.toString is stable across JVMs) + assertSingleFieldRoundTrip(DataTypes.FLOAT(), 3.5f, Float.toString(3.5f), "f", bucketNum); + } + + @Test + void testDateAndTimeTypes() { + int bucketNum = 10; + // DATE is stored as days-since-epoch int internally; record key is its String value. + int dateDays = 19852; + assertSingleFieldRoundTrip( + DataTypes.DATE(), dateDays, String.valueOf(dateDays), "d", bucketNum); + // TIME is stored as ms-of-day int. + int timeMillis = 12345678; + assertSingleFieldRoundTrip( + DataTypes.TIME(), timeMillis, String.valueOf(timeMillis), "t", bucketNum); + } + + @Test + void testTimestampLtzType() throws IOException { + int bucketNum = 10; + TimestampLtz ts = TimestampLtz.fromInstant(Instant.ofEpochMilli(1700000000000L)); + RowType rowType = + RowType.of(new DataType[] {DataTypes.TIMESTAMP_LTZ(6)}, new String[] {"ts"}); + GenericRow row = GenericRow.of(ts); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList("ts")); + byte[] enc = encoder.encodeKey(row); + byte[] expected = toBytes(new String[] {ts.toString()}); + assertThat(enc).isEqualTo(expected); + + int hudiBucket = + BucketIdentifier.getBucketId(Collections.singletonList(ts.toString()), bucketNum); + int ourBucket = new HudiBucketingFunction().bucketing(enc, bucketNum); + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + @Test + void testBucketingNumBucketsBoundaryValues() { + HudiBucketingFunction f = new HudiBucketingFunction(); + // numBuckets == 1 => bucket id always 0 + for (int sample : new int[] {0, 1, -1, Integer.MAX_VALUE, Integer.MIN_VALUE}) { + byte[] key = intToBytes(sample); + assertThat(f.bucketing(key, 1)).isEqualTo(0); + } + // numBuckets == Integer.MAX_VALUE: bucket id == hash & MAX_VALUE + int hash = Integer.MIN_VALUE; // stress sign-bit handling + byte[] key = intToBytes(hash); + int expected = (hash & Integer.MAX_VALUE) % Integer.MAX_VALUE; + assertThat(f.bucketing(key, Integer.MAX_VALUE)).isEqualTo(expected); + // bucket id must always be in [0, numBuckets) + assertThat(f.bucketing(key, 7)).isGreaterThanOrEqualTo(0).isLessThan(7); + } + + private void assertSingleFieldRoundTrip( + DataType dataType, Object value, String stringified, String key, int bucketNum) { + RowType rowType = RowType.of(new DataType[] {dataType}, new String[] {key}); + GenericRow row = GenericRow.of(value); + HudiKeyEncoder encoder = new HudiKeyEncoder(rowType, Collections.singletonList(key)); + byte[] ourEncodedKey = encoder.encodeKey(row); + byte[] expected = toBytes(new String[] {stringified}); + assertThat(ourEncodedKey).isEqualTo(expected); + + int hudiBucket = BucketIdentifier.getBucketId(stringified, key, bucketNum); + int ourBucket = new HudiBucketingFunction().bucketing(ourEncodedKey, bucketNum); + assertThat(ourBucket).isEqualTo(hudiBucket); + } + + private static byte[] intToBytes(int v) { + return new byte[] {(byte) (v >>> 24), (byte) (v >>> 16), (byte) (v >>> 8), (byte) v}; + } + + private byte[] toBytes(String[] value) { + List values = Arrays.asList(value); + return ByteBuffer.allocate(4).putInt(values.hashCode()).array(); + } +}