diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
index 837ea5817..4b974d29f 100644
--- a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
+++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
@@ -88,7 +88,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
startupMode);
if (watermarkStrategy != null) {
- sourceFunction.assignTimestampsAndWatermarks(watermarkStrategy);
+ sourceFunction.setWatermarkStrategy(watermarkStrategy);
}
return new DataStreamScanProvider() {
diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
index 52c56fa28..16d16da94 100644
--- a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
+++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java
@@ -17,10 +17,10 @@
package org.apache.auron.flink.connector.kafka;
import static org.apache.auron.flink.connector.kafka.KafkaConstants.*;
-import static org.apache.flink.util.Preconditions.checkNotNull;
import java.io.File;
import java.io.InputStream;
+import java.lang.reflect.Field;
import java.util.*;
import org.apache.auron.flink.arrow.FlinkArrowReader;
import org.apache.auron.flink.arrow.FlinkArrowUtils;
@@ -38,11 +38,6 @@
import org.apache.auron.protobuf.PhysicalPlanNode;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.commons.io.FileUtils;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.eventtime.TimestampAssigner;
-import org.apache.flink.api.common.eventtime.WatermarkGenerator;
-import org.apache.flink.api.common.eventtime.WatermarkOutput;
-import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
@@ -50,7 +45,6 @@
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
@@ -60,16 +54,17 @@
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
-import org.apache.flink.streaming.connectors.kafka.internals.SourceContextWatermarkOutputAdapter;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier;
+import org.apache.flink.table.runtime.generated.WatermarkGenerator;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.SerializableObject;
-import org.apache.flink.util.SerializedValue;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
@@ -82,9 +77,9 @@
* If checkpoints are enabled, Kafka offsets are committed via Auron after a successful checkpoint.
* If checkpoints are disabled, Kafka offsets are committed periodically via Auron.
*
- * Watermark support is implemented via {@link WatermarkOutputMultiplexer} with per-partition
- * watermark generation. Partition expansion is detected periodically using a lightweight
- * {@link KafkaConsumer} (metadata queries only, no data consumption).
+ *
Watermark support uses the table-runtime {@link WatermarkGenerator} directly
+ * (from {@code WatermarkPushDownSpec}) with per-partition watermark tracking.
+ * The combined watermark emitted downstream is the minimum across all assigned partitions.
*/
public class AuronKafkaSourceFunction extends RichParallelSourceFunction
implements FlinkAuronFunction, CheckpointListener, CheckpointedFunction {
@@ -110,7 +105,6 @@ public class AuronKafkaSourceFunction extends RichParallelSourceFunction restoredOffsets;
private transient Map currentOffsets;
private final SerializableObject lock = new SerializableObject();
- private SerializedValue> watermarkStrategy;
private volatile boolean isRunning;
private transient String auronOperatorIdWithSubtaskIndex;
private transient MetricNode nativeMetric;
@@ -120,14 +114,11 @@ public class AuronKafkaSourceFunction extends RichParallelSourceFunction kafkaConsumer;
private transient List assignedPartitions;
- // Watermark related
- private transient WatermarkOutputMultiplexer watermarkOutputMultiplexer;
- private transient Map partitionIdToOutputIdMap;
- private transient WatermarkGenerator watermarkGenerator;
- private transient TimestampAssigner timestampAssigner;
- // Periodic watermark control: autoWatermarkInterval > 0 means enabled
- private transient long autoWatermarkInterval;
- private transient long lastPeriodicWatermarkTime;
+ // Watermark related: uses table-runtime WatermarkGenerator directly
+ private WatermarkStrategy watermarkStrategy;
+ private transient WatermarkGenerator tableWatermarkGenerator;
+ private transient Map partitionWatermarks;
+ private transient long currentCombinedWatermark;
public AuronKafkaSourceFunction(
LogicalType outputType,
@@ -231,22 +222,24 @@ public void open(Configuration config) throws Exception {
subtaskIndex,
assignedPartitions);
- // 3. Initialize Watermark components if watermarkStrategy is set
+ // 3. Initialize table-runtime WatermarkGenerator if watermarkStrategy is set
if (watermarkStrategy != null) {
- ClassLoader userCodeClassLoader = runtimeContext.getUserCodeClassLoader();
- WatermarkStrategy deserializedWatermarkStrategy =
- watermarkStrategy.deserializeValue(userCodeClassLoader);
-
MetricGroup metricGroup = runtimeContext.getMetricGroup();
-
- this.timestampAssigner = deserializedWatermarkStrategy.createTimestampAssigner(() -> metricGroup);
-
- this.watermarkGenerator = deserializedWatermarkStrategy.createWatermarkGenerator(() -> metricGroup);
-
- // 4. Determine periodic watermark interval
- // autoWatermarkInterval > 0 means periodic watermark is enabled
- this.autoWatermarkInterval = runtimeContext.getExecutionConfig().getAutoWatermarkInterval();
- this.lastPeriodicWatermarkTime = 0L; // Initialize to 0 so first emit triggers immediately
+ // Create DataStream API WatermarkGenerator via the strategy
+ org.apache.flink.api.common.eventtime.WatermarkGenerator dsGenerator =
+ watermarkStrategy.createWatermarkGenerator(() -> metricGroup);
+ // Extract inner table-runtime WatermarkGenerator from DefaultWatermarkGenerator
+ if (dsGenerator instanceof GeneratedWatermarkGeneratorSupplier.DefaultWatermarkGenerator) {
+ Field field = GeneratedWatermarkGeneratorSupplier.DefaultWatermarkGenerator.class.getDeclaredField(
+ "innerWatermarkGenerator");
+ field.setAccessible(true);
+ this.tableWatermarkGenerator = (WatermarkGenerator) field.get(dsGenerator);
+ } else {
+ throw new IllegalStateException("Expected DefaultWatermarkGenerator from WatermarkPushDownSpec, got: "
+ + dsGenerator.getClass().getName());
+ }
+ this.partitionWatermarks = new HashMap<>();
+ this.currentCombinedWatermark = Long.MIN_VALUE;
}
this.isRunning = true;
}
@@ -267,97 +260,76 @@ public void add(String name, long value) {
fieldList.addAll(((RowType) outputType).getFields());
RowType auronOutputRowType = new RowType(fieldList);
- // Initialize WatermarkOutputMultiplexer here because sourceContext is available
- if (watermarkGenerator != null) {
- this.watermarkOutputMultiplexer =
- new WatermarkOutputMultiplexer(new SourceContextWatermarkOutputAdapter<>(sourceContext));
- this.partitionIdToOutputIdMap = new HashMap<>();
- for (Integer partition : assignedPartitions) {
- String outputId = createOutputId(partition);
- partitionIdToOutputIdMap.put(partition, outputId);
- watermarkOutputMultiplexer.registerNewOutput(outputId, watermark -> {});
- }
- }
-
// Pre-check watermark flag to avoid per-record null checks in the hot path
- final boolean enableWatermark = watermarkGenerator != null;
-
- while (this.isRunning) {
- AuronCallNativeWrapper wrapper = new AuronCallNativeWrapper(
- FlinkArrowUtils.getRootAllocator(),
- physicalPlanNode,
- nativeMetric,
- 0,
- 0,
- 0,
- AuronAdaptor.getInstance()
- .getAuronConfiguration()
- .getLong(FlinkAuronConfiguration.NATIVE_MEMORY_SIZE));
-
- if (enableWatermark) {
- // Watermark-enabled path
- while (wrapper.loadNextBatch(batch -> {
- Map tmpOffsets = new HashMap<>(currentOffsets);
- FlinkArrowReader arrowReader = FlinkArrowReader.create(batch, auronOutputRowType, 3);
-
- for (int i = 0; i < batch.getRowCount(); i++) {
- AuronColumnarRowData tmpRowData = (AuronColumnarRowData) arrowReader.read(i);
- // Extract kafka meta fields
- int partitionId = tmpRowData.getInt(-3);
- long offset = tmpRowData.getLong(-2);
- long kafkaTimestamp = tmpRowData.getLong(-1);
- tmpOffsets.put(partitionId, offset);
-
- // Extract event timestamp via user-defined TimestampAssigner
- long timestamp = timestampAssigner.extractTimestamp(tmpRowData, kafkaTimestamp);
-
- // Route to the per-partition WatermarkOutput and trigger onEvent
- // outputId must not null, else is a bug
- String outputId = partitionIdToOutputIdMap.get(partitionId);
- WatermarkOutput partitionOutput = watermarkOutputMultiplexer.getImmediateOutput(outputId);
- watermarkGenerator.onEvent(tmpRowData, timestamp, partitionOutput);
- // Emit record with event timestamp
- sourceContext.collectWithTimestamp(arrowReader.read(i), timestamp);
- }
-
- // Periodic watermark: only emit if enough time has elapsed since last emit
- // Controlled by ExecutionConfig.getAutoWatermarkInterval()
- long currentTime = System.currentTimeMillis();
- if (autoWatermarkInterval > 0
- && (currentTime - lastPeriodicWatermarkTime) >= autoWatermarkInterval) {
- for (Map.Entry entry : partitionIdToOutputIdMap.entrySet()) {
- // Use getDeferredOutput for periodic emit: all partitions update first,
- // then multiplexer merges and emits once via onPeriodicEmit()
- WatermarkOutput output = watermarkOutputMultiplexer.getDeferredOutput(entry.getValue());
- watermarkGenerator.onPeriodicEmit(output);
+ final boolean enableWatermark = tableWatermarkGenerator != null;
+
+ AuronCallNativeWrapper wrapper = new AuronCallNativeWrapper(
+ FlinkArrowUtils.getRootAllocator(),
+ physicalPlanNode,
+ nativeMetric,
+ 0,
+ 0,
+ 0,
+ AuronAdaptor.getInstance().getAuronConfiguration().getLong(FlinkAuronConfiguration.NATIVE_MEMORY_SIZE));
+
+ if (enableWatermark) {
+ // Watermark-enabled path: use table-runtime WatermarkGenerator directly
+ while (wrapper.loadNextBatch(batch -> {
+ Map tmpOffsets = new HashMap<>(currentOffsets);
+ FlinkArrowReader arrowReader = FlinkArrowReader.create(batch, auronOutputRowType, 3);
+ for (int i = 0; i < batch.getRowCount(); i++) {
+ AuronColumnarRowData tmpRowData = (AuronColumnarRowData) arrowReader.read(i);
+ // Extract kafka meta fields
+ int partitionId = tmpRowData.getInt(-3);
+ long offset = tmpRowData.getLong(-2);
+ long kafkaTimestamp = tmpRowData.getLong(-1);
+ tmpOffsets.put(partitionId, offset);
+
+ try {
+ // Compute watermark using table-runtime WatermarkGenerator (stateless pure function)
+ // with local Timezone
+ Long watermark = tableWatermarkGenerator.currentWatermark(tmpRowData);
+ // Update per-partition watermark tracking
+ if (watermark != null) {
+ partitionWatermarks.merge(partitionId, watermark, Math::max);
}
- // Merge all deferred updates and emit the combined watermark downstream
- watermarkOutputMultiplexer.onPeriodicEmit();
- lastPeriodicWatermarkTime = currentTime;
+ } catch (Exception e) {
+ throw new RuntimeException("Generated WatermarkGenerator fails to generate:", e);
}
+ // Emit record with kafka timestamp
+ sourceContext.collectWithTimestamp(tmpRowData, kafkaTimestamp);
+ }
- synchronized (lock) {
- currentOffsets = tmpOffsets;
- }
- })) {}
- } else {
- // No-watermark path: still use collectWithTimestamp with kafka timestamp
- while (wrapper.loadNextBatch(batch -> {
- Map tmpOffsets = new HashMap<>(currentOffsets);
- FlinkArrowReader arrowReader = FlinkArrowReader.create(batch, auronOutputRowType, 3);
- for (int i = 0; i < batch.getRowCount(); i++) {
- AuronColumnarRowData tmpRowData = (AuronColumnarRowData) arrowReader.read(i);
- int partitionId = tmpRowData.getInt(-3);
- long offset = tmpRowData.getLong(-2);
- long kafkaTimestamp = tmpRowData.getLong(-1);
- tmpOffsets.put(partitionId, offset);
- sourceContext.collectWithTimestamp(arrowReader.read(i), kafkaTimestamp);
- }
- synchronized (lock) {
- currentOffsets = tmpOffsets;
+ // After each batch, compute combined watermark (min across all partitions) and emit
+ if (!partitionWatermarks.isEmpty()) {
+ long minWatermark = Collections.min(partitionWatermarks.values());
+ if (minWatermark > currentCombinedWatermark) {
+ currentCombinedWatermark = minWatermark;
+ sourceContext.emitWatermark(new Watermark(minWatermark));
}
- })) {}
- }
+ }
+
+ synchronized (lock) {
+ currentOffsets = tmpOffsets;
+ }
+ })) {}
+ } else {
+ // No-watermark path: still use collectWithTimestamp with kafka timestamp
+ while (wrapper.loadNextBatch(batch -> {
+ Map tmpOffsets = new HashMap<>(currentOffsets);
+ FlinkArrowReader arrowReader = FlinkArrowReader.create(batch, auronOutputRowType, 3);
+ for (int i = 0; i < batch.getRowCount(); i++) {
+ AuronColumnarRowData tmpRowData = (AuronColumnarRowData) arrowReader.read(i);
+ int partitionId = tmpRowData.getInt(-3);
+ long offset = tmpRowData.getLong(-2);
+ long kafkaTimestamp = tmpRowData.getLong(-1);
+ tmpOffsets.put(partitionId, offset);
+ sourceContext.collectWithTimestamp(tmpRowData, kafkaTimestamp);
+ }
+ synchronized (lock) {
+ currentOffsets = tmpOffsets;
+ }
+ })) {}
}
LOG.info("Auron kafka source run end");
}
@@ -376,6 +348,11 @@ public void close() throws Exception {
kafkaConsumer.close();
}
+ // Close table-runtime WatermarkGenerator
+ if (tableWatermarkGenerator != null) {
+ tableWatermarkGenerator.close();
+ }
+
super.close();
}
@@ -478,22 +455,7 @@ public void initializeState(FunctionInitializationContext context) throws Except
}
}
- public AuronKafkaSourceFunction assignTimestampsAndWatermarks(WatermarkStrategy watermarkStrategy) {
- checkNotNull(watermarkStrategy);
- try {
- ClosureCleaner.clean(watermarkStrategy, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
- this.watermarkStrategy = new SerializedValue<>(watermarkStrategy);
- } catch (Exception e) {
- throw new IllegalArgumentException("The given WatermarkStrategy is not serializable", e);
- }
- return this;
- }
-
- // -------------------------------------------------------------------------
- // Internal helpers
- // -------------------------------------------------------------------------
-
- private String createOutputId(int partitionId) {
- return topic + "-" + partitionId;
+ public void setWatermarkStrategy(WatermarkStrategy watermarkStrategy) {
+ this.watermarkStrategy = watermarkStrategy;
}
}
diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java
deleted file mode 100644
index ea8194417..000000000
--- a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.apache.flink.api.common.eventtime.Watermark;
-import org.apache.flink.api.common.eventtime.WatermarkOutput;
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-
-/**
- * A {@link org.apache.flink.api.common.eventtime.WatermarkOutput} that forwards calls to a {@link
- * org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext}.
- */
-public class SourceContextWatermarkOutputAdapter implements WatermarkOutput {
- private final SourceContext sourceContext;
-
- public SourceContextWatermarkOutputAdapter(SourceContext sourceContext) {
- this.sourceContext = sourceContext;
- }
-
- @Override
- public void emitWatermark(Watermark watermark) {
- sourceContext.emitWatermark(new org.apache.flink.streaming.api.watermark.Watermark(watermark.getTimestamp()));
- }
-
- @Override
- public void markIdle() {
- sourceContext.markAsTemporarilyIdle();
- }
-
- @Override
- public void markActive() {
- // will be set active with next watermark
- }
-}