org.apache.fluss
diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/AbstractRbAggFunction.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/AbstractRbAggFunction.java
new file mode 100644
index 0000000000..6a390fa132
--- /dev/null
+++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/AbstractRbAggFunction.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.functions.bitmap;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.FunctionHint;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.roaringbitmap.RoaringBitmap;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * Shared base for bitmap aggregate UDFs that use {@link RoaringBitmap} as the accumulator.
+ *
+ * The {@code @FunctionHint} annotation with {@code accumulator = @DataTypeHint("RAW")} tells
+ * Flink's Table planner to skip reflection-based POJO extraction and instead use the {@link
+ * TypeInformation} returned by {@link #getAccumulatorType()}, which provides the custom {@link
+ * RoaringBitmapSerializer}. Without this annotation, Flink attempts POJO field extraction on
+ * RoaringBitmap and fails.
+ */
+@FunctionHint(accumulator = @DataTypeHint(value = "RAW", bridgedTo = RoaringBitmap.class))
+abstract class AbstractRbAggFunction extends AggregateFunction {
+
+ @Override
+ public RoaringBitmap createAccumulator() {
+ return new RoaringBitmap();
+ }
+
+ /** Merges multiple accumulators — required for session window aggregation. */
+ public void merge(RoaringBitmap acc, Iterable it) {
+ for (RoaringBitmap other : it) {
+ if (other != null) {
+ acc.or(other);
+ }
+ }
+ }
+
+ public void resetAccumulator(RoaringBitmap acc) {
+ acc.clear();
+ }
+
+ @Override
+ @Nullable
+ public byte[] getValue(RoaringBitmap acc) {
+ if (acc == null || acc.isEmpty()) {
+ return null;
+ }
+ try {
+ return BitmapUtils.toBytes(acc);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to serialize bitmap accumulator.", e);
+ }
+ }
+
+ @Override
+ public TypeInformation getAccumulatorType() {
+ return RoaringBitmapTypeInfo.INSTANCE;
+ }
+}
diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/BitmapUtils.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/BitmapUtils.java
new file mode 100644
index 0000000000..48c249d64e
--- /dev/null
+++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/BitmapUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.functions.bitmap;
+
+import org.roaringbitmap.RoaringBitmap;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Utility methods for serializing and deserializing {@link RoaringBitmap}.
+ *
+ * Uses the ByteBuffer-based serialization approach, which is the preferred method recommended by
+ * the RoaringBitmap library. This format is compatible with the server-side {@code
+ * RoaringBitmapUtils.serializeRoaringBitmap32} used by {@code FieldRoaringBitmap32Agg}.
+ */
+public final class BitmapUtils {
+
+ private BitmapUtils() {}
+
+ /**
+ * Serializes a {@link RoaringBitmap} to a byte array.
+ *
+ * @param bitmap the bitmap to serialize; null returns null
+ * @return serialized byte array, or null if input is null
+ */
+ @Nullable
+ public static byte[] toBytes(@Nullable RoaringBitmap bitmap) throws IOException {
+ if (bitmap == null) {
+ return null;
+ }
+ bitmap.runOptimize();
+ ByteBuffer buffer = ByteBuffer.allocate(bitmap.serializedSizeInBytes());
+ bitmap.serialize(buffer);
+ return buffer.array();
+ }
+
+ /**
+ * Deserializes a {@link RoaringBitmap} from a byte array.
+ *
+ * @param bytes the serialized bitmap bytes; null returns null
+ * @return deserialized RoaringBitmap, or null if input is null
+ */
+ @Nullable
+ public static RoaringBitmap fromBytes(@Nullable byte[] bytes) throws IOException {
+ if (bytes == null) {
+ return null;
+ }
+ RoaringBitmap bitmap = new RoaringBitmap();
+ bitmap.deserialize(ByteBuffer.wrap(bytes));
+ return bitmap;
+ }
+}
diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializer.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializer.java
new file mode 100644
index 0000000000..e72f6ad590
--- /dev/null
+++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializer.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.functions.bitmap;
+
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.roaringbitmap.RoaringBitmap;
+
+import java.io.IOException;
+
+/**
+ * Flink {@link org.apache.flink.api.common.typeutils.TypeSerializer} for {@link RoaringBitmap}.
+ *
+ *
Used as the accumulator serializer for bitmap aggregate functions to ensure correct
+ * checkpoint/savepoint behavior. Without a custom serializer, Flink falls back to Kryo which is
+ * sensitive to internal class layout changes across RoaringBitmap library versions.
+ */
+public final class RoaringBitmapSerializer extends TypeSerializerSingleton {
+
+ public static final RoaringBitmapSerializer INSTANCE = new RoaringBitmapSerializer();
+
+ private static final long serialVersionUID = 1L;
+
+ private RoaringBitmapSerializer() {}
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public RoaringBitmap createInstance() {
+ return new RoaringBitmap();
+ }
+
+ @Override
+ public RoaringBitmap copy(RoaringBitmap from) {
+ return from.clone();
+ }
+
+ @Override
+ public RoaringBitmap copy(RoaringBitmap from, RoaringBitmap reuse) {
+ return from.clone();
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(RoaringBitmap record, DataOutputView target) throws IOException {
+ record.runOptimize();
+ int size = record.serializedSizeInBytes();
+ target.writeInt(size);
+ byte[] bytes = BitmapUtils.toBytes(record);
+ target.write(bytes);
+ }
+
+ @Override
+ public RoaringBitmap deserialize(DataInputView source) throws IOException {
+ int size = source.readInt();
+ byte[] bytes = new byte[size];
+ source.readFully(bytes);
+ return BitmapUtils.fromBytes(bytes);
+ }
+
+ @Override
+ public RoaringBitmap deserialize(RoaringBitmap reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ int size = source.readInt();
+ target.writeInt(size);
+ byte[] buffer = new byte[size];
+ source.readFully(buffer);
+ target.write(buffer);
+ }
+
+ @Override
+ public TypeSerializerSnapshot snapshotConfiguration() {
+ return new RoaringBitmapSerializerSnapshot();
+ }
+
+ /** Snapshot for {@link RoaringBitmapSerializer}. */
+ public static final class RoaringBitmapSerializerSnapshot
+ extends SimpleTypeSerializerSnapshot {
+
+ public RoaringBitmapSerializerSnapshot() {
+ super(() -> INSTANCE);
+ }
+ }
+}
diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapTypeInfo.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapTypeInfo.java
new file mode 100644
index 0000000000..cfb7771b6d
--- /dev/null
+++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapTypeInfo.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.functions.bitmap;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.roaringbitmap.RoaringBitmap;
+
+import java.util.Objects;
+
+/**
+ * {@link TypeInformation} for {@link RoaringBitmap}.
+ *
+ * Provides the custom {@link RoaringBitmapSerializer} to Flink's type system, ensuring correct
+ * checkpoint and savepoint behavior for bitmap aggregate function accumulators.
+ */
+public final class RoaringBitmapTypeInfo extends TypeInformation {
+
+ public static final RoaringBitmapTypeInfo INSTANCE = new RoaringBitmapTypeInfo();
+
+ private static final long serialVersionUID = 1L;
+
+ private RoaringBitmapTypeInfo() {}
+
+ @Override
+ public boolean isBasicType() {
+ return false;
+ }
+
+ @Override
+ public boolean isTupleType() {
+ return false;
+ }
+
+ @Override
+ public int getArity() {
+ return 1;
+ }
+
+ @Override
+ public int getTotalFields() {
+ return 1;
+ }
+
+ @Override
+ public Class getTypeClass() {
+ return RoaringBitmap.class;
+ }
+
+ @Override
+ public boolean isKeyType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer createSerializer(ExecutionConfig config) {
+ return RoaringBitmapSerializer.INSTANCE;
+ }
+
+ @Override
+ public String toString() {
+ return "RoaringBitmapTypeInfo";
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof RoaringBitmapTypeInfo;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getTypeClass());
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof RoaringBitmapTypeInfo;
+ }
+}
diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/BitmapUtilsTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/BitmapUtilsTest.java
new file mode 100644
index 0000000000..590191c774
--- /dev/null
+++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/BitmapUtilsTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.functions.bitmap;
+
+import org.junit.jupiter.api.Test;
+import org.roaringbitmap.RoaringBitmap;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link BitmapUtils}. */
+class BitmapUtilsTest {
+
+ @Test
+ void testNullInputToBytes() throws IOException {
+ assertThat(BitmapUtils.toBytes(null)).isNull();
+ }
+
+ @Test
+ void testNullInputFromBytes() throws IOException {
+ assertThat(BitmapUtils.fromBytes(null)).isNull();
+ }
+
+ @Test
+ void testEmptyBitmapRoundTrip() throws IOException {
+ RoaringBitmap bitmap = new RoaringBitmap();
+ byte[] bytes = BitmapUtils.toBytes(bitmap);
+ assertThat(bytes).isNotNull();
+ RoaringBitmap result = BitmapUtils.fromBytes(bytes);
+ assertThat(result).isNotNull();
+ assertThat(result.isEmpty()).isTrue();
+ }
+
+ @Test
+ void testKnownValuesRoundTrip() throws IOException {
+ RoaringBitmap bitmap = new RoaringBitmap();
+ bitmap.add(1);
+ bitmap.add(100);
+ bitmap.add(1000);
+ bitmap.add(Integer.MAX_VALUE);
+
+ byte[] bytes = BitmapUtils.toBytes(bitmap);
+ assertThat(bytes).isNotNull();
+
+ RoaringBitmap result = BitmapUtils.fromBytes(bytes);
+ assertThat(result).isNotNull();
+ assertThat(result.getLongCardinality()).isEqualTo(4L);
+ assertThat(result.contains(1)).isTrue();
+ assertThat(result.contains(100)).isTrue();
+ assertThat(result.contains(1000)).isTrue();
+ assertThat(result.contains(Integer.MAX_VALUE)).isTrue();
+ assertThat(result.contains(2)).isFalse();
+ }
+
+ @Test
+ void testLargeCardinality() throws IOException {
+ RoaringBitmap bitmap = new RoaringBitmap();
+ for (int i = 0; i < 100_000; i++) {
+ bitmap.add(i);
+ }
+ byte[] bytes = BitmapUtils.toBytes(bitmap);
+ RoaringBitmap result = BitmapUtils.fromBytes(bytes);
+ assertThat(result.getLongCardinality()).isEqualTo(100_000L);
+ }
+
+ @Test
+ void testFormatCompatibleWithServerSerialization() throws IOException {
+ // This test verifies that our ByteBuffer-based serialization produces bytes
+ // that can be deserialized back correctly — same guarantee the server relies on.
+ RoaringBitmap original = RoaringBitmap.bitmapOf(42, 100, 200, 300);
+ byte[] bytes = BitmapUtils.toBytes(original);
+ RoaringBitmap restored = BitmapUtils.fromBytes(bytes);
+ assertThat(restored).isEqualTo(original);
+ }
+}
diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializerTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializerTest.java
new file mode 100644
index 0000000000..68cbcafd73
--- /dev/null
+++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/functions/bitmap/RoaringBitmapSerializerTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.functions.bitmap;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.junit.jupiter.api.Test;
+import org.roaringbitmap.RoaringBitmap;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link RoaringBitmapSerializer} and {@link RoaringBitmapTypeInfo}. */
+class RoaringBitmapSerializerTest {
+
+ private final RoaringBitmapSerializer serializer = RoaringBitmapSerializer.INSTANCE;
+
+ @Test
+ void testCreateInstance() {
+ RoaringBitmap instance = serializer.createInstance();
+ assertThat(instance).isNotNull();
+ assertThat(instance.isEmpty()).isTrue();
+ }
+
+ @Test
+ void testIsNotImmutable() {
+ assertThat(serializer.isImmutableType()).isFalse();
+ }
+
+ @Test
+ void testGetLengthIsMinusOne() {
+ assertThat(serializer.getLength()).isEqualTo(-1);
+ }
+
+ @Test
+ void testSerializeDeserializeRoundTrip() throws Exception {
+ RoaringBitmap original = new RoaringBitmap();
+ original.add(1);
+ original.add(100);
+ original.add(100_000);
+
+ DataOutputSerializer out = new DataOutputSerializer(256);
+ serializer.serialize(original, out);
+
+ DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer());
+ RoaringBitmap restored = serializer.deserialize(in);
+
+ assertThat(restored).isEqualTo(original);
+ assertThat(restored.getLongCardinality()).isEqualTo(3L);
+ }
+
+ @Test
+ void testDeserializeWithReuse() throws Exception {
+ RoaringBitmap original = RoaringBitmap.bitmapOf(42, 99, 1000);
+
+ DataOutputSerializer out = new DataOutputSerializer(256);
+ serializer.serialize(original, out);
+
+ DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer());
+ RoaringBitmap reuse = new RoaringBitmap();
+ RoaringBitmap restored = serializer.deserialize(reuse, in);
+
+ assertThat(restored).isEqualTo(original);
+ }
+
+ @Test
+ void testCopy() {
+ RoaringBitmap original = RoaringBitmap.bitmapOf(1, 2, 3);
+ RoaringBitmap copy = serializer.copy(original);
+
+ assertThat(copy).isEqualTo(original);
+ // Verify it is a deep copy
+ copy.add(999);
+ assertThat(original.contains(999)).isFalse();
+ }
+
+ @Test
+ void testCopyWithReuse() {
+ RoaringBitmap original = RoaringBitmap.bitmapOf(10, 20, 30);
+ RoaringBitmap reuse = new RoaringBitmap();
+ RoaringBitmap copy = serializer.copy(original, reuse);
+
+ assertThat(copy).isEqualTo(original);
+ }
+
+ @Test
+ void testEmptyBitmapRoundTrip() throws Exception {
+ RoaringBitmap empty = new RoaringBitmap();
+
+ DataOutputSerializer out = new DataOutputSerializer(64);
+ serializer.serialize(empty, out);
+
+ DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer());
+ RoaringBitmap restored = serializer.deserialize(in);
+
+ assertThat(restored.isEmpty()).isTrue();
+ }
+
+ @Test
+ void testSnapshotConfiguration() {
+ assertThat(serializer.snapshotConfiguration()).isNotNull();
+ }
+
+ // RoaringBitmapTypeInfo tests
+
+ @Test
+ void testTypeInfoGetTypeClass() {
+ assertThat(RoaringBitmapTypeInfo.INSTANCE.getTypeClass()).isEqualTo(RoaringBitmap.class);
+ }
+
+ @Test
+ void testTypeInfoCreateSerializer() {
+ TypeSerializer s =
+ RoaringBitmapTypeInfo.INSTANCE.createSerializer(new ExecutionConfig());
+ assertThat(s).isInstanceOf(RoaringBitmapSerializer.class);
+ }
+
+ @Test
+ void testTypeInfoEquality() {
+ assertThat(RoaringBitmapTypeInfo.INSTANCE.equals(RoaringBitmapTypeInfo.INSTANCE)).isTrue();
+ assertThat(RoaringBitmapTypeInfo.INSTANCE.equals("other")).isFalse();
+ }
+
+ @Test
+ void testTypeInfoIsNotKeyType() {
+ assertThat(RoaringBitmapTypeInfo.INSTANCE.isKeyType()).isFalse();
+ }
+
+ @Test
+ void testTypeInfoArity() {
+ assertThat(RoaringBitmapTypeInfo.INSTANCE.getArity()).isEqualTo(1);
+ assertThat(RoaringBitmapTypeInfo.INSTANCE.getTotalFields()).isEqualTo(1);
+ }
+
+ @Test
+ void testTypeInfoIsNotBasicType() {
+ assertThat(RoaringBitmapTypeInfo.INSTANCE.isBasicType()).isFalse();
+ }
+
+ @Test
+ void testTypeInfoIsNotTupleType() {
+ assertThat(RoaringBitmapTypeInfo.INSTANCE.isTupleType()).isFalse();
+ }
+
+ @Test
+ void testTypeInfoToString() {
+ assertThat(RoaringBitmapTypeInfo.INSTANCE.toString()).isEqualTo("RoaringBitmapTypeInfo");
+ }
+
+ @Test
+ void testTypeInfoHashCode() {
+ assertThat(RoaringBitmapTypeInfo.INSTANCE.hashCode())
+ .isEqualTo(RoaringBitmapTypeInfo.INSTANCE.hashCode());
+ }
+
+ @Test
+ void testTypeInfoCanEqual() {
+ assertThat(RoaringBitmapTypeInfo.INSTANCE.canEqual(RoaringBitmapTypeInfo.INSTANCE))
+ .isTrue();
+ assertThat(RoaringBitmapTypeInfo.INSTANCE.canEqual("other")).isFalse();
+ }
+}
diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml
index 1df67094f1..9b52ffa321 100644
--- a/fluss-test-coverage/pom.xml
+++ b/fluss-test-coverage/pom.xml
@@ -484,6 +484,9 @@
org.apache.fluss.flink.tiering.FlussLakeTieringEntrypoint
org.apache.fluss.flink.tiering.FlussLakeTiering
+
+ org.apache.fluss.flink.functions.bitmap.AbstractRbAggFunction
+
org.apache.flink.table.catalog.*