Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions fluss-flink/fluss-flink-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@
<scope>provided</scope>
</dependency>

<!-- RoaringBitmap for bitmap SQL functions (FIP-37) -->
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
<version>${roaringbitmap.version}</version>
</dependency>

<!-- test dependency -->
<dependency>
<groupId>org.apache.fluss</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.flink.functions.bitmap;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.AggregateFunction;
import org.roaringbitmap.RoaringBitmap;

import javax.annotation.Nullable;

import java.io.IOException;

/**
* Shared base for bitmap aggregate UDFs that use {@link RoaringBitmap} as the accumulator.
*
* <p>The {@code @FunctionHint} annotation with {@code accumulator = @DataTypeHint("RAW")} tells
* Flink's Table planner to skip reflection-based POJO extraction and instead use the {@link
* TypeInformation} returned by {@link #getAccumulatorType()}, which provides the custom {@link
* RoaringBitmapSerializer}. Without this annotation, Flink attempts POJO field extraction on
* RoaringBitmap and fails.
*/
@FunctionHint(accumulator = @DataTypeHint(value = "RAW", bridgedTo = RoaringBitmap.class))
abstract class AbstractRbAggFunction extends AggregateFunction<byte[], RoaringBitmap> {

@Override
public RoaringBitmap createAccumulator() {
return new RoaringBitmap();
}

/** Merges multiple accumulators — required for session window aggregation. */
public void merge(RoaringBitmap acc, Iterable<RoaringBitmap> it) {
for (RoaringBitmap other : it) {
if (other != null) {
acc.or(other);
}
}
}

public void resetAccumulator(RoaringBitmap acc) {
acc.clear();
}

@Override
@Nullable
public byte[] getValue(RoaringBitmap acc) {
if (acc == null || acc.isEmpty()) {
return null;
}
try {
return BitmapUtils.toBytes(acc);
} catch (IOException e) {
throw new RuntimeException("Failed to serialize bitmap accumulator.", e);
}
}

@Override
public TypeInformation<RoaringBitmap> getAccumulatorType() {
return RoaringBitmapTypeInfo.INSTANCE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.flink.functions.bitmap;

import org.roaringbitmap.RoaringBitmap;

import javax.annotation.Nullable;

import java.io.IOException;
import java.nio.ByteBuffer;

/**
* Utility methods for serializing and deserializing {@link RoaringBitmap}.
*
* <p>Uses the ByteBuffer-based serialization approach, which is the preferred method recommended by
* the RoaringBitmap library. This format is compatible with the server-side {@code
* RoaringBitmapUtils.serializeRoaringBitmap32} used by {@code FieldRoaringBitmap32Agg}.
*/
public final class BitmapUtils {

private BitmapUtils() {}

/**
* Serializes a {@link RoaringBitmap} to a byte array.
*
* @param bitmap the bitmap to serialize; null returns null
* @return serialized byte array, or null if input is null
*/
@Nullable
public static byte[] toBytes(@Nullable RoaringBitmap bitmap) throws IOException {
if (bitmap == null) {
return null;
}
bitmap.runOptimize();
ByteBuffer buffer = ByteBuffer.allocate(bitmap.serializedSizeInBytes());
bitmap.serialize(buffer);
return buffer.array();
}

/**
* Deserializes a {@link RoaringBitmap} from a byte array.
*
* @param bytes the serialized bitmap bytes; null returns null
* @return deserialized RoaringBitmap, or null if input is null
*/
@Nullable
public static RoaringBitmap fromBytes(@Nullable byte[] bytes) throws IOException {
if (bytes == null) {
return null;
}
RoaringBitmap bitmap = new RoaringBitmap();
bitmap.deserialize(ByteBuffer.wrap(bytes));
return bitmap;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.flink.functions.bitmap;

import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.roaringbitmap.RoaringBitmap;

import java.io.IOException;

/**
* Flink {@link org.apache.flink.api.common.typeutils.TypeSerializer} for {@link RoaringBitmap}.
*
* <p>Used as the accumulator serializer for bitmap aggregate functions to ensure correct
* checkpoint/savepoint behavior. Without a custom serializer, Flink falls back to Kryo which is
* sensitive to internal class layout changes across RoaringBitmap library versions.
*/
public final class RoaringBitmapSerializer extends TypeSerializerSingleton<RoaringBitmap> {

public static final RoaringBitmapSerializer INSTANCE = new RoaringBitmapSerializer();

private static final long serialVersionUID = 1L;

private RoaringBitmapSerializer() {}

@Override
public boolean isImmutableType() {
return false;
}

@Override
public RoaringBitmap createInstance() {
return new RoaringBitmap();
}

@Override
public RoaringBitmap copy(RoaringBitmap from) {
return from.clone();
}

@Override
public RoaringBitmap copy(RoaringBitmap from, RoaringBitmap reuse) {
return from.clone();
}

@Override
public int getLength() {
return -1;
}

@Override
public void serialize(RoaringBitmap record, DataOutputView target) throws IOException {
record.runOptimize();
int size = record.serializedSizeInBytes();
target.writeInt(size);
byte[] bytes = BitmapUtils.toBytes(record);
target.write(bytes);
}

@Override
public RoaringBitmap deserialize(DataInputView source) throws IOException {
int size = source.readInt();
byte[] bytes = new byte[size];
source.readFully(bytes);
return BitmapUtils.fromBytes(bytes);
}

@Override
public RoaringBitmap deserialize(RoaringBitmap reuse, DataInputView source) throws IOException {
return deserialize(source);
}

@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
int size = source.readInt();
target.writeInt(size);
byte[] buffer = new byte[size];
source.readFully(buffer);
target.write(buffer);
}

@Override
public TypeSerializerSnapshot<RoaringBitmap> snapshotConfiguration() {
return new RoaringBitmapSerializerSnapshot();
}

/** Snapshot for {@link RoaringBitmapSerializer}. */
public static final class RoaringBitmapSerializerSnapshot
extends SimpleTypeSerializerSnapshot<RoaringBitmap> {

public RoaringBitmapSerializerSnapshot() {
super(() -> INSTANCE);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.flink.functions.bitmap;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.roaringbitmap.RoaringBitmap;

import java.util.Objects;

/**
* {@link TypeInformation} for {@link RoaringBitmap}.
*
* <p>Provides the custom {@link RoaringBitmapSerializer} to Flink's type system, ensuring correct
* checkpoint and savepoint behavior for bitmap aggregate function accumulators.
*/
public final class RoaringBitmapTypeInfo extends TypeInformation<RoaringBitmap> {

public static final RoaringBitmapTypeInfo INSTANCE = new RoaringBitmapTypeInfo();

private static final long serialVersionUID = 1L;

private RoaringBitmapTypeInfo() {}

@Override
public boolean isBasicType() {
return false;
}

@Override
public boolean isTupleType() {
return false;
}

@Override
public int getArity() {
return 1;
}

@Override
public int getTotalFields() {
return 1;
}

@Override
public Class<RoaringBitmap> getTypeClass() {
return RoaringBitmap.class;
}

@Override
public boolean isKeyType() {
return false;
}

@Override
public TypeSerializer<RoaringBitmap> createSerializer(ExecutionConfig config) {
return RoaringBitmapSerializer.INSTANCE;
}

@Override
public String toString() {
return "RoaringBitmapTypeInfo";
}

@Override
public boolean equals(Object obj) {
return obj instanceof RoaringBitmapTypeInfo;
}

@Override
public int hashCode() {
return Objects.hash(getTypeClass());
}

@Override
public boolean canEqual(Object obj) {
return obj instanceof RoaringBitmapTypeInfo;
}
}
Loading