diff --git a/fluss-flink/fluss-flink-common/pom.xml b/fluss-flink/fluss-flink-common/pom.xml index dab880b58b..7ed46425ed 100644 --- a/fluss-flink/fluss-flink-common/pom.xml +++ b/fluss-flink/fluss-flink-common/pom.xml @@ -95,6 +95,13 @@ provided + + + org.roaringbitmap + RoaringBitmap + ${roaringbitmap.version} + + org.apache.fluss diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/AbstractRbAggFunction.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/AbstractRbAggFunction.java new file mode 100644 index 0000000000..6a390fa132 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/AbstractRbAggFunction.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.functions.bitmap; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.FunctionHint; +import org.apache.flink.table.functions.AggregateFunction; +import org.roaringbitmap.RoaringBitmap; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** + * Shared base for bitmap aggregate UDFs that use {@link RoaringBitmap} as the accumulator. + * + *

The {@code @FunctionHint} annotation with {@code accumulator = @DataTypeHint("RAW")} tells + * Flink's Table planner to skip reflection-based POJO extraction and instead use the {@link + * TypeInformation} returned by {@link #getAccumulatorType()}, which provides the custom {@link + * RoaringBitmapSerializer}. Without this annotation, Flink attempts POJO field extraction on + * RoaringBitmap and fails. + */ +@FunctionHint(accumulator = @DataTypeHint(value = "RAW", bridgedTo = RoaringBitmap.class)) +abstract class AbstractRbAggFunction extends AggregateFunction { + + @Override + public RoaringBitmap createAccumulator() { + return new RoaringBitmap(); + } + + /** Merges multiple accumulators — required for session window aggregation. */ + public void merge(RoaringBitmap acc, Iterable it) { + for (RoaringBitmap other : it) { + if (other != null) { + acc.or(other); + } + } + } + + public void resetAccumulator(RoaringBitmap acc) { + acc.clear(); + } + + @Override + @Nullable + public byte[] getValue(RoaringBitmap acc) { + if (acc == null || acc.isEmpty()) { + return null; + } + try { + return BitmapUtils.toBytes(acc); + } catch (IOException e) { + throw new RuntimeException("Failed to serialize bitmap accumulator.", e); + } + } + + @Override + public TypeInformation getAccumulatorType() { + return RoaringBitmapTypeInfo.INSTANCE; + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/BitmapUtils.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/BitmapUtils.java new file mode 100644 index 0000000000..48c249d64e --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/BitmapUtils.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.functions.bitmap; + +import org.roaringbitmap.RoaringBitmap; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * Utility methods for serializing and deserializing {@link RoaringBitmap}. + * + *

Uses the ByteBuffer-based serialization approach, which is the preferred method recommended by + * the RoaringBitmap library. This format is compatible with the server-side {@code + * RoaringBitmapUtils.serializeRoaringBitmap32} used by {@code FieldRoaringBitmap32Agg}. + */ +public final class BitmapUtils { + + private BitmapUtils() {} + + /** + * Serializes a {@link RoaringBitmap} to a byte array. + * + * @param bitmap the bitmap to serialize; null returns null + * @return serialized byte array, or null if input is null + */ + @Nullable + public static byte[] toBytes(@Nullable RoaringBitmap bitmap) throws IOException { + if (bitmap == null) { + return null; + } + bitmap.runOptimize(); + ByteBuffer buffer = ByteBuffer.allocate(bitmap.serializedSizeInBytes()); + bitmap.serialize(buffer); + return buffer.array(); + } + + /** + * Deserializes a {@link RoaringBitmap} from a byte array. + * + * @param bytes the serialized bitmap bytes; null returns null + * @return deserialized RoaringBitmap, or null if input is null + */ + @Nullable + public static RoaringBitmap fromBytes(@Nullable byte[] bytes) throws IOException { + if (bytes == null) { + return null; + } + RoaringBitmap bitmap = new RoaringBitmap(); + bitmap.deserialize(ByteBuffer.wrap(bytes)); + return bitmap; + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializer.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializer.java new file mode 100644 index 0000000000..e72f6ad590 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializer.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.functions.bitmap; + +import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.roaringbitmap.RoaringBitmap; + +import java.io.IOException; + +/** + * Flink {@link org.apache.flink.api.common.typeutils.TypeSerializer} for {@link RoaringBitmap}. + * + *

Used as the accumulator serializer for bitmap aggregate functions to ensure correct + * checkpoint/savepoint behavior. Without a custom serializer, Flink falls back to Kryo which is + * sensitive to internal class layout changes across RoaringBitmap library versions. + */ +public final class RoaringBitmapSerializer extends TypeSerializerSingleton { + + public static final RoaringBitmapSerializer INSTANCE = new RoaringBitmapSerializer(); + + private static final long serialVersionUID = 1L; + + private RoaringBitmapSerializer() {} + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public RoaringBitmap createInstance() { + return new RoaringBitmap(); + } + + @Override + public RoaringBitmap copy(RoaringBitmap from) { + return from.clone(); + } + + @Override + public RoaringBitmap copy(RoaringBitmap from, RoaringBitmap reuse) { + return from.clone(); + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(RoaringBitmap record, DataOutputView target) throws IOException { + record.runOptimize(); + int size = record.serializedSizeInBytes(); + target.writeInt(size); + byte[] bytes = BitmapUtils.toBytes(record); + target.write(bytes); + } + + @Override + public RoaringBitmap deserialize(DataInputView source) throws IOException { + int size = source.readInt(); + byte[] bytes = new byte[size]; + source.readFully(bytes); + return BitmapUtils.fromBytes(bytes); + } + + @Override + public RoaringBitmap deserialize(RoaringBitmap reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + int size = source.readInt(); + target.writeInt(size); + byte[] buffer = new byte[size]; + source.readFully(buffer); + target.write(buffer); + } + + @Override + public TypeSerializerSnapshot snapshotConfiguration() { + return new RoaringBitmapSerializerSnapshot(); + } + + /** Snapshot for {@link RoaringBitmapSerializer}. */ + public static final class RoaringBitmapSerializerSnapshot + extends SimpleTypeSerializerSnapshot { + + public RoaringBitmapSerializerSnapshot() { + super(() -> INSTANCE); + } + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapTypeInfo.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapTypeInfo.java new file mode 100644 index 0000000000..cfb7771b6d --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapTypeInfo.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.functions.bitmap; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.roaringbitmap.RoaringBitmap; + +import java.util.Objects; + +/** + * {@link TypeInformation} for {@link RoaringBitmap}. + * + *

Provides the custom {@link RoaringBitmapSerializer} to Flink's type system, ensuring correct + * checkpoint and savepoint behavior for bitmap aggregate function accumulators. + */ +public final class RoaringBitmapTypeInfo extends TypeInformation { + + public static final RoaringBitmapTypeInfo INSTANCE = new RoaringBitmapTypeInfo(); + + private static final long serialVersionUID = 1L; + + private RoaringBitmapTypeInfo() {} + + @Override + public boolean isBasicType() { + return false; + } + + @Override + public boolean isTupleType() { + return false; + } + + @Override + public int getArity() { + return 1; + } + + @Override + public int getTotalFields() { + return 1; + } + + @Override + public Class getTypeClass() { + return RoaringBitmap.class; + } + + @Override + public boolean isKeyType() { + return false; + } + + @Override + public TypeSerializer createSerializer(ExecutionConfig config) { + return RoaringBitmapSerializer.INSTANCE; + } + + @Override + public String toString() { + return "RoaringBitmapTypeInfo"; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof RoaringBitmapTypeInfo; + } + + @Override + public int hashCode() { + return Objects.hash(getTypeClass()); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof RoaringBitmapTypeInfo; + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/BitmapUtilsTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/BitmapUtilsTest.java new file mode 100644 index 0000000000..590191c774 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/BitmapUtilsTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.functions.bitmap; + +import org.junit.jupiter.api.Test; +import org.roaringbitmap.RoaringBitmap; + +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit tests for {@link BitmapUtils}. */ +class BitmapUtilsTest { + + @Test + void testNullInputToBytes() throws IOException { + assertThat(BitmapUtils.toBytes(null)).isNull(); + } + + @Test + void testNullInputFromBytes() throws IOException { + assertThat(BitmapUtils.fromBytes(null)).isNull(); + } + + @Test + void testEmptyBitmapRoundTrip() throws IOException { + RoaringBitmap bitmap = new RoaringBitmap(); + byte[] bytes = BitmapUtils.toBytes(bitmap); + assertThat(bytes).isNotNull(); + RoaringBitmap result = BitmapUtils.fromBytes(bytes); + assertThat(result).isNotNull(); + assertThat(result.isEmpty()).isTrue(); + } + + @Test + void testKnownValuesRoundTrip() throws IOException { + RoaringBitmap bitmap = new RoaringBitmap(); + bitmap.add(1); + bitmap.add(100); + bitmap.add(1000); + bitmap.add(Integer.MAX_VALUE); + + byte[] bytes = BitmapUtils.toBytes(bitmap); + assertThat(bytes).isNotNull(); + + RoaringBitmap result = BitmapUtils.fromBytes(bytes); + assertThat(result).isNotNull(); + assertThat(result.getLongCardinality()).isEqualTo(4L); + assertThat(result.contains(1)).isTrue(); + assertThat(result.contains(100)).isTrue(); + assertThat(result.contains(1000)).isTrue(); + assertThat(result.contains(Integer.MAX_VALUE)).isTrue(); + assertThat(result.contains(2)).isFalse(); + } + + @Test + void testLargeCardinality() throws IOException { + RoaringBitmap bitmap = new RoaringBitmap(); + for (int i = 0; i < 100_000; i++) { + bitmap.add(i); + } + byte[] bytes = BitmapUtils.toBytes(bitmap); + RoaringBitmap result = BitmapUtils.fromBytes(bytes); + assertThat(result.getLongCardinality()).isEqualTo(100_000L); + } + + @Test + void testFormatCompatibleWithServerSerialization() throws IOException { + // This test verifies that our ByteBuffer-based serialization produces bytes + // that can be deserialized back correctly — same guarantee the server relies on. + RoaringBitmap original = RoaringBitmap.bitmapOf(42, 100, 200, 300); + byte[] bytes = BitmapUtils.toBytes(original); + RoaringBitmap restored = BitmapUtils.fromBytes(bytes); + assertThat(restored).isEqualTo(original); + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializerTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializerTest.java new file mode 100644 index 0000000000..68cbcafd73 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializerTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.functions.bitmap; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.junit.jupiter.api.Test; +import org.roaringbitmap.RoaringBitmap; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit tests for {@link RoaringBitmapSerializer} and {@link RoaringBitmapTypeInfo}. */ +class RoaringBitmapSerializerTest { + + private final RoaringBitmapSerializer serializer = RoaringBitmapSerializer.INSTANCE; + + @Test + void testCreateInstance() { + RoaringBitmap instance = serializer.createInstance(); + assertThat(instance).isNotNull(); + assertThat(instance.isEmpty()).isTrue(); + } + + @Test + void testIsNotImmutable() { + assertThat(serializer.isImmutableType()).isFalse(); + } + + @Test + void testGetLengthIsMinusOne() { + assertThat(serializer.getLength()).isEqualTo(-1); + } + + @Test + void testSerializeDeserializeRoundTrip() throws Exception { + RoaringBitmap original = new RoaringBitmap(); + original.add(1); + original.add(100); + original.add(100_000); + + DataOutputSerializer out = new DataOutputSerializer(256); + serializer.serialize(original, out); + + DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer()); + RoaringBitmap restored = serializer.deserialize(in); + + assertThat(restored).isEqualTo(original); + assertThat(restored.getLongCardinality()).isEqualTo(3L); + } + + @Test + void testDeserializeWithReuse() throws Exception { + RoaringBitmap original = RoaringBitmap.bitmapOf(42, 99, 1000); + + DataOutputSerializer out = new DataOutputSerializer(256); + serializer.serialize(original, out); + + DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer()); + RoaringBitmap reuse = new RoaringBitmap(); + RoaringBitmap restored = serializer.deserialize(reuse, in); + + assertThat(restored).isEqualTo(original); + } + + @Test + void testCopy() { + RoaringBitmap original = RoaringBitmap.bitmapOf(1, 2, 3); + RoaringBitmap copy = serializer.copy(original); + + assertThat(copy).isEqualTo(original); + // Verify it is a deep copy + copy.add(999); + assertThat(original.contains(999)).isFalse(); + } + + @Test + void testCopyWithReuse() { + RoaringBitmap original = RoaringBitmap.bitmapOf(10, 20, 30); + RoaringBitmap reuse = new RoaringBitmap(); + RoaringBitmap copy = serializer.copy(original, reuse); + + assertThat(copy).isEqualTo(original); + } + + @Test + void testEmptyBitmapRoundTrip() throws Exception { + RoaringBitmap empty = new RoaringBitmap(); + + DataOutputSerializer out = new DataOutputSerializer(64); + serializer.serialize(empty, out); + + DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer()); + RoaringBitmap restored = serializer.deserialize(in); + + assertThat(restored.isEmpty()).isTrue(); + } + + @Test + void testSnapshotConfiguration() { + assertThat(serializer.snapshotConfiguration()).isNotNull(); + } + + // RoaringBitmapTypeInfo tests + + @Test + void testTypeInfoGetTypeClass() { + assertThat(RoaringBitmapTypeInfo.INSTANCE.getTypeClass()).isEqualTo(RoaringBitmap.class); + } + + @Test + void testTypeInfoCreateSerializer() { + TypeSerializer s = + RoaringBitmapTypeInfo.INSTANCE.createSerializer(new ExecutionConfig()); + assertThat(s).isInstanceOf(RoaringBitmapSerializer.class); + } + + @Test + void testTypeInfoEquality() { + assertThat(RoaringBitmapTypeInfo.INSTANCE.equals(RoaringBitmapTypeInfo.INSTANCE)).isTrue(); + assertThat(RoaringBitmapTypeInfo.INSTANCE.equals("other")).isFalse(); + } + + @Test + void testTypeInfoIsNotKeyType() { + assertThat(RoaringBitmapTypeInfo.INSTANCE.isKeyType()).isFalse(); + } + + @Test + void testTypeInfoArity() { + assertThat(RoaringBitmapTypeInfo.INSTANCE.getArity()).isEqualTo(1); + assertThat(RoaringBitmapTypeInfo.INSTANCE.getTotalFields()).isEqualTo(1); + } + + @Test + void testTypeInfoIsNotBasicType() { + assertThat(RoaringBitmapTypeInfo.INSTANCE.isBasicType()).isFalse(); + } + + @Test + void testTypeInfoIsNotTupleType() { + assertThat(RoaringBitmapTypeInfo.INSTANCE.isTupleType()).isFalse(); + } + + @Test + void testTypeInfoToString() { + assertThat(RoaringBitmapTypeInfo.INSTANCE.toString()).isEqualTo("RoaringBitmapTypeInfo"); + } + + @Test + void testTypeInfoHashCode() { + assertThat(RoaringBitmapTypeInfo.INSTANCE.hashCode()) + .isEqualTo(RoaringBitmapTypeInfo.INSTANCE.hashCode()); + } + + @Test + void testTypeInfoCanEqual() { + assertThat(RoaringBitmapTypeInfo.INSTANCE.canEqual(RoaringBitmapTypeInfo.INSTANCE)) + .isTrue(); + assertThat(RoaringBitmapTypeInfo.INSTANCE.canEqual("other")).isFalse(); + } +} diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index 1df67094f1..9b52ffa321 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -484,6 +484,9 @@ org.apache.fluss.flink.tiering.FlussLakeTieringEntrypoint org.apache.fluss.flink.tiering.FlussLakeTiering + + org.apache.fluss.flink.functions.bitmap.AbstractRbAggFunction + org.apache.flink.table.catalog.*