diff --git a/auron-flink-extension/auron-flink-runtime/pom.xml b/auron-flink-extension/auron-flink-runtime/pom.xml index 654ce826b..d99a2dbf9 100644 --- a/auron-flink-extension/auron-flink-runtime/pom.xml +++ b/auron-flink-extension/auron-flink-runtime/pom.xml @@ -74,6 +74,12 @@ ${flink.version} provided + + org.apache.flink + flink-table-runtime + ${flink.version} + provided + diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java index 837ea5817..4b974d29f 100644 --- a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java @@ -88,7 +88,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { startupMode); if (watermarkStrategy != null) { - sourceFunction.assignTimestampsAndWatermarks(watermarkStrategy); + sourceFunction.setWatermarkStrategy(watermarkStrategy); } return new DataStreamScanProvider() { diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java index 52c56fa28..16d16da94 100644 --- a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java @@ -17,10 +17,10 @@ package org.apache.auron.flink.connector.kafka; import static org.apache.auron.flink.connector.kafka.KafkaConstants.*; -import static org.apache.flink.util.Preconditions.checkNotNull; import java.io.File; import java.io.InputStream; +import java.lang.reflect.Field; import java.util.*; import org.apache.auron.flink.arrow.FlinkArrowReader; import org.apache.auron.flink.arrow.FlinkArrowUtils; @@ -38,11 +38,6 @@ import org.apache.auron.protobuf.PhysicalPlanNode; import org.apache.commons.collections.map.LinkedMap; import org.apache.commons.io.FileUtils; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.eventtime.TimestampAssigner; -import org.apache.flink.api.common.eventtime.WatermarkGenerator; -import org.apache.flink.api.common.eventtime.WatermarkOutput; -import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.common.state.ListState; @@ -50,7 +45,6 @@ import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.MetricGroup; @@ -60,16 +54,17 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner; -import org.apache.flink.streaming.connectors.kafka.internals.SourceContextWatermarkOutputAdapter; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier; +import org.apache.flink.table.runtime.generated.WatermarkGenerator; import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.SerializableObject; -import org.apache.flink.util.SerializedValue; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; @@ -82,9 +77,9 @@ * If checkpoints are enabled, Kafka offsets are committed via Auron after a successful checkpoint. * If checkpoints are disabled, Kafka offsets are committed periodically via Auron. * - *

Watermark support is implemented via {@link WatermarkOutputMultiplexer} with per-partition - * watermark generation. Partition expansion is detected periodically using a lightweight - * {@link KafkaConsumer} (metadata queries only, no data consumption). + *

Watermark support uses the table-runtime {@link WatermarkGenerator} directly + * (from {@code WatermarkPushDownSpec}) with per-partition watermark tracking. + * The combined watermark emitted downstream is the minimum across all assigned partitions. */ public class AuronKafkaSourceFunction extends RichParallelSourceFunction implements FlinkAuronFunction, CheckpointListener, CheckpointedFunction { @@ -110,7 +105,6 @@ public class AuronKafkaSourceFunction extends RichParallelSourceFunction restoredOffsets; private transient Map currentOffsets; private final SerializableObject lock = new SerializableObject(); - private SerializedValue> watermarkStrategy; private volatile boolean isRunning; private transient String auronOperatorIdWithSubtaskIndex; private transient MetricNode nativeMetric; @@ -120,14 +114,11 @@ public class AuronKafkaSourceFunction extends RichParallelSourceFunction kafkaConsumer; private transient List assignedPartitions; - // Watermark related - private transient WatermarkOutputMultiplexer watermarkOutputMultiplexer; - private transient Map partitionIdToOutputIdMap; - private transient WatermarkGenerator watermarkGenerator; - private transient TimestampAssigner timestampAssigner; - // Periodic watermark control: autoWatermarkInterval > 0 means enabled - private transient long autoWatermarkInterval; - private transient long lastPeriodicWatermarkTime; + // Watermark related: uses table-runtime WatermarkGenerator directly + private WatermarkStrategy watermarkStrategy; + private transient WatermarkGenerator tableWatermarkGenerator; + private transient Map partitionWatermarks; + private transient long currentCombinedWatermark; public AuronKafkaSourceFunction( LogicalType outputType, @@ -231,22 +222,24 @@ public void open(Configuration config) throws Exception { subtaskIndex, assignedPartitions); - // 3. Initialize Watermark components if watermarkStrategy is set + // 3. Initialize table-runtime WatermarkGenerator if watermarkStrategy is set if (watermarkStrategy != null) { - ClassLoader userCodeClassLoader = runtimeContext.getUserCodeClassLoader(); - WatermarkStrategy deserializedWatermarkStrategy = - watermarkStrategy.deserializeValue(userCodeClassLoader); - MetricGroup metricGroup = runtimeContext.getMetricGroup(); - - this.timestampAssigner = deserializedWatermarkStrategy.createTimestampAssigner(() -> metricGroup); - - this.watermarkGenerator = deserializedWatermarkStrategy.createWatermarkGenerator(() -> metricGroup); - - // 4. Determine periodic watermark interval - // autoWatermarkInterval > 0 means periodic watermark is enabled - this.autoWatermarkInterval = runtimeContext.getExecutionConfig().getAutoWatermarkInterval(); - this.lastPeriodicWatermarkTime = 0L; // Initialize to 0 so first emit triggers immediately + // Create DataStream API WatermarkGenerator via the strategy + org.apache.flink.api.common.eventtime.WatermarkGenerator dsGenerator = + watermarkStrategy.createWatermarkGenerator(() -> metricGroup); + // Extract inner table-runtime WatermarkGenerator from DefaultWatermarkGenerator + if (dsGenerator instanceof GeneratedWatermarkGeneratorSupplier.DefaultWatermarkGenerator) { + Field field = GeneratedWatermarkGeneratorSupplier.DefaultWatermarkGenerator.class.getDeclaredField( + "innerWatermarkGenerator"); + field.setAccessible(true); + this.tableWatermarkGenerator = (WatermarkGenerator) field.get(dsGenerator); + } else { + throw new IllegalStateException("Expected DefaultWatermarkGenerator from WatermarkPushDownSpec, got: " + + dsGenerator.getClass().getName()); + } + this.partitionWatermarks = new HashMap<>(); + this.currentCombinedWatermark = Long.MIN_VALUE; } this.isRunning = true; } @@ -267,97 +260,76 @@ public void add(String name, long value) { fieldList.addAll(((RowType) outputType).getFields()); RowType auronOutputRowType = new RowType(fieldList); - // Initialize WatermarkOutputMultiplexer here because sourceContext is available - if (watermarkGenerator != null) { - this.watermarkOutputMultiplexer = - new WatermarkOutputMultiplexer(new SourceContextWatermarkOutputAdapter<>(sourceContext)); - this.partitionIdToOutputIdMap = new HashMap<>(); - for (Integer partition : assignedPartitions) { - String outputId = createOutputId(partition); - partitionIdToOutputIdMap.put(partition, outputId); - watermarkOutputMultiplexer.registerNewOutput(outputId, watermark -> {}); - } - } - // Pre-check watermark flag to avoid per-record null checks in the hot path - final boolean enableWatermark = watermarkGenerator != null; - - while (this.isRunning) { - AuronCallNativeWrapper wrapper = new AuronCallNativeWrapper( - FlinkArrowUtils.getRootAllocator(), - physicalPlanNode, - nativeMetric, - 0, - 0, - 0, - AuronAdaptor.getInstance() - .getAuronConfiguration() - .getLong(FlinkAuronConfiguration.NATIVE_MEMORY_SIZE)); - - if (enableWatermark) { - // Watermark-enabled path - while (wrapper.loadNextBatch(batch -> { - Map tmpOffsets = new HashMap<>(currentOffsets); - FlinkArrowReader arrowReader = FlinkArrowReader.create(batch, auronOutputRowType, 3); - - for (int i = 0; i < batch.getRowCount(); i++) { - AuronColumnarRowData tmpRowData = (AuronColumnarRowData) arrowReader.read(i); - // Extract kafka meta fields - int partitionId = tmpRowData.getInt(-3); - long offset = tmpRowData.getLong(-2); - long kafkaTimestamp = tmpRowData.getLong(-1); - tmpOffsets.put(partitionId, offset); - - // Extract event timestamp via user-defined TimestampAssigner - long timestamp = timestampAssigner.extractTimestamp(tmpRowData, kafkaTimestamp); - - // Route to the per-partition WatermarkOutput and trigger onEvent - // outputId must not null, else is a bug - String outputId = partitionIdToOutputIdMap.get(partitionId); - WatermarkOutput partitionOutput = watermarkOutputMultiplexer.getImmediateOutput(outputId); - watermarkGenerator.onEvent(tmpRowData, timestamp, partitionOutput); - // Emit record with event timestamp - sourceContext.collectWithTimestamp(arrowReader.read(i), timestamp); - } - - // Periodic watermark: only emit if enough time has elapsed since last emit - // Controlled by ExecutionConfig.getAutoWatermarkInterval() - long currentTime = System.currentTimeMillis(); - if (autoWatermarkInterval > 0 - && (currentTime - lastPeriodicWatermarkTime) >= autoWatermarkInterval) { - for (Map.Entry entry : partitionIdToOutputIdMap.entrySet()) { - // Use getDeferredOutput for periodic emit: all partitions update first, - // then multiplexer merges and emits once via onPeriodicEmit() - WatermarkOutput output = watermarkOutputMultiplexer.getDeferredOutput(entry.getValue()); - watermarkGenerator.onPeriodicEmit(output); + final boolean enableWatermark = tableWatermarkGenerator != null; + + AuronCallNativeWrapper wrapper = new AuronCallNativeWrapper( + FlinkArrowUtils.getRootAllocator(), + physicalPlanNode, + nativeMetric, + 0, + 0, + 0, + AuronAdaptor.getInstance().getAuronConfiguration().getLong(FlinkAuronConfiguration.NATIVE_MEMORY_SIZE)); + + if (enableWatermark) { + // Watermark-enabled path: use table-runtime WatermarkGenerator directly + while (wrapper.loadNextBatch(batch -> { + Map tmpOffsets = new HashMap<>(currentOffsets); + FlinkArrowReader arrowReader = FlinkArrowReader.create(batch, auronOutputRowType, 3); + for (int i = 0; i < batch.getRowCount(); i++) { + AuronColumnarRowData tmpRowData = (AuronColumnarRowData) arrowReader.read(i); + // Extract kafka meta fields + int partitionId = tmpRowData.getInt(-3); + long offset = tmpRowData.getLong(-2); + long kafkaTimestamp = tmpRowData.getLong(-1); + tmpOffsets.put(partitionId, offset); + + try { + // Compute watermark using table-runtime WatermarkGenerator (stateless pure function) + // with local Timezone + Long watermark = tableWatermarkGenerator.currentWatermark(tmpRowData); + // Update per-partition watermark tracking + if (watermark != null) { + partitionWatermarks.merge(partitionId, watermark, Math::max); } - // Merge all deferred updates and emit the combined watermark downstream - watermarkOutputMultiplexer.onPeriodicEmit(); - lastPeriodicWatermarkTime = currentTime; + } catch (Exception e) { + throw new RuntimeException("Generated WatermarkGenerator fails to generate:", e); } + // Emit record with kafka timestamp + sourceContext.collectWithTimestamp(tmpRowData, kafkaTimestamp); + } - synchronized (lock) { - currentOffsets = tmpOffsets; - } - })) {} - } else { - // No-watermark path: still use collectWithTimestamp with kafka timestamp - while (wrapper.loadNextBatch(batch -> { - Map tmpOffsets = new HashMap<>(currentOffsets); - FlinkArrowReader arrowReader = FlinkArrowReader.create(batch, auronOutputRowType, 3); - for (int i = 0; i < batch.getRowCount(); i++) { - AuronColumnarRowData tmpRowData = (AuronColumnarRowData) arrowReader.read(i); - int partitionId = tmpRowData.getInt(-3); - long offset = tmpRowData.getLong(-2); - long kafkaTimestamp = tmpRowData.getLong(-1); - tmpOffsets.put(partitionId, offset); - sourceContext.collectWithTimestamp(arrowReader.read(i), kafkaTimestamp); - } - synchronized (lock) { - currentOffsets = tmpOffsets; + // After each batch, compute combined watermark (min across all partitions) and emit + if (!partitionWatermarks.isEmpty()) { + long minWatermark = Collections.min(partitionWatermarks.values()); + if (minWatermark > currentCombinedWatermark) { + currentCombinedWatermark = minWatermark; + sourceContext.emitWatermark(new Watermark(minWatermark)); } - })) {} - } + } + + synchronized (lock) { + currentOffsets = tmpOffsets; + } + })) {} + } else { + // No-watermark path: still use collectWithTimestamp with kafka timestamp + while (wrapper.loadNextBatch(batch -> { + Map tmpOffsets = new HashMap<>(currentOffsets); + FlinkArrowReader arrowReader = FlinkArrowReader.create(batch, auronOutputRowType, 3); + for (int i = 0; i < batch.getRowCount(); i++) { + AuronColumnarRowData tmpRowData = (AuronColumnarRowData) arrowReader.read(i); + int partitionId = tmpRowData.getInt(-3); + long offset = tmpRowData.getLong(-2); + long kafkaTimestamp = tmpRowData.getLong(-1); + tmpOffsets.put(partitionId, offset); + sourceContext.collectWithTimestamp(tmpRowData, kafkaTimestamp); + } + synchronized (lock) { + currentOffsets = tmpOffsets; + } + })) {} } LOG.info("Auron kafka source run end"); } @@ -376,6 +348,11 @@ public void close() throws Exception { kafkaConsumer.close(); } + // Close table-runtime WatermarkGenerator + if (tableWatermarkGenerator != null) { + tableWatermarkGenerator.close(); + } + super.close(); } @@ -478,22 +455,7 @@ public void initializeState(FunctionInitializationContext context) throws Except } } - public AuronKafkaSourceFunction assignTimestampsAndWatermarks(WatermarkStrategy watermarkStrategy) { - checkNotNull(watermarkStrategy); - try { - ClosureCleaner.clean(watermarkStrategy, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); - this.watermarkStrategy = new SerializedValue<>(watermarkStrategy); - } catch (Exception e) { - throw new IllegalArgumentException("The given WatermarkStrategy is not serializable", e); - } - return this; - } - - // ------------------------------------------------------------------------- - // Internal helpers - // ------------------------------------------------------------------------- - - private String createOutputId(int partitionId) { - return topic + "-" + partitionId; + public void setWatermarkStrategy(WatermarkStrategy watermarkStrategy) { + this.watermarkStrategy = watermarkStrategy; } } diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java deleted file mode 100644 index ea8194417..000000000 --- a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SourceContextWatermarkOutputAdapter.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kafka.internals; - -import org.apache.flink.api.common.eventtime.Watermark; -import org.apache.flink.api.common.eventtime.WatermarkOutput; -import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; - -/** - * A {@link org.apache.flink.api.common.eventtime.WatermarkOutput} that forwards calls to a {@link - * org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext}. - */ -public class SourceContextWatermarkOutputAdapter implements WatermarkOutput { - private final SourceContext sourceContext; - - public SourceContextWatermarkOutputAdapter(SourceContext sourceContext) { - this.sourceContext = sourceContext; - } - - @Override - public void emitWatermark(Watermark watermark) { - sourceContext.emitWatermark(new org.apache.flink.streaming.api.watermark.Watermark(watermark.getTimestamp())); - } - - @Override - public void markIdle() { - sourceContext.markAsTemporarilyIdle(); - } - - @Override - public void markActive() { - // will be set active with next watermark - } -}