1717package org .apache .auron .flink .connector .kafka ;
1818
1919import static org .apache .auron .flink .connector .kafka .KafkaConstants .*;
20- import static org .apache .flink .util .Preconditions .checkNotNull ;
2120
2221import java .io .File ;
2322import java .io .InputStream ;
23+ import java .lang .reflect .Field ;
2424import java .util .*;
2525import org .apache .auron .flink .arrow .FlinkArrowReader ;
2626import org .apache .auron .flink .arrow .FlinkArrowUtils ;
3838import org .apache .auron .protobuf .PhysicalPlanNode ;
3939import org .apache .commons .collections .map .LinkedMap ;
4040import org .apache .commons .io .FileUtils ;
41- import org .apache .flink .api .common .ExecutionConfig ;
42- import org .apache .flink .api .common .eventtime .TimestampAssigner ;
43- import org .apache .flink .api .common .eventtime .WatermarkGenerator ;
44- import org .apache .flink .api .common .eventtime .WatermarkOutput ;
45- import org .apache .flink .api .common .eventtime .WatermarkOutputMultiplexer ;
4641import org .apache .flink .api .common .eventtime .WatermarkStrategy ;
4742import org .apache .flink .api .common .state .CheckpointListener ;
4843import org .apache .flink .api .common .state .ListState ;
4944import org .apache .flink .api .common .state .ListStateDescriptor ;
5045import org .apache .flink .api .common .state .OperatorStateStore ;
5146import org .apache .flink .api .common .typeinfo .TypeHint ;
5247import org .apache .flink .api .common .typeinfo .TypeInformation ;
53- import org .apache .flink .api .java .ClosureCleaner ;
5448import org .apache .flink .api .java .tuple .Tuple2 ;
5549import org .apache .flink .configuration .Configuration ;
5650import org .apache .flink .metrics .MetricGroup ;
6054import org .apache .flink .streaming .api .checkpoint .CheckpointedFunction ;
6155import org .apache .flink .streaming .api .functions .source .RichParallelSourceFunction ;
6256import org .apache .flink .streaming .api .operators .StreamingRuntimeContext ;
57+ import org .apache .flink .streaming .api .watermark .Watermark ;
6358import org .apache .flink .streaming .connectors .kafka .internals .KafkaTopicPartition ;
6459import org .apache .flink .streaming .connectors .kafka .internals .KafkaTopicPartitionAssigner ;
65- import org .apache .flink .streaming .connectors .kafka .internals .SourceContextWatermarkOutputAdapter ;
6660import org .apache .flink .table .data .RowData ;
61+ import org .apache .flink .table .runtime .generated .GeneratedWatermarkGeneratorSupplier ;
62+ import org .apache .flink .table .runtime .generated .WatermarkGenerator ;
6763import org .apache .flink .table .types .logical .BigIntType ;
6864import org .apache .flink .table .types .logical .IntType ;
6965import org .apache .flink .table .types .logical .LogicalType ;
7066import org .apache .flink .table .types .logical .RowType ;
7167import org .apache .flink .util .SerializableObject ;
72- import org .apache .flink .util .SerializedValue ;
7368import org .apache .kafka .clients .consumer .ConsumerConfig ;
7469import org .apache .kafka .clients .consumer .KafkaConsumer ;
7570import org .apache .kafka .common .PartitionInfo ;
8277 * If checkpoints are enabled, Kafka offsets are committed via Auron after a successful checkpoint.
8378 * If checkpoints are disabled, Kafka offsets are committed periodically via Auron.
8479 *
85- * <p>Watermark support is implemented via {@link WatermarkOutputMultiplexer} with per-partition
86- * watermark generation. Partition expansion is detected periodically using a lightweight
87- * {@link KafkaConsumer} (metadata queries only, no data consumption) .
80+ * <p>Watermark support uses the table-runtime {@link WatermarkGenerator} directly
81+ * (from {@code WatermarkPushDownSpec}) with per-partition watermark tracking.
82+ * The combined watermark emitted downstream is the minimum across all assigned partitions .
8883 */
8984public class AuronKafkaSourceFunction extends RichParallelSourceFunction <RowData >
9085 implements FlinkAuronFunction , CheckpointListener , CheckpointedFunction {
@@ -110,7 +105,6 @@ public class AuronKafkaSourceFunction extends RichParallelSourceFunction<RowData
110105 private transient Map <Integer , Long > restoredOffsets ;
111106 private transient Map <Integer , Long > currentOffsets ;
112107 private final SerializableObject lock = new SerializableObject ();
113- private SerializedValue <WatermarkStrategy <RowData >> watermarkStrategy ;
114108 private volatile boolean isRunning ;
115109 private transient String auronOperatorIdWithSubtaskIndex ;
116110 private transient MetricNode nativeMetric ;
@@ -120,14 +114,11 @@ public class AuronKafkaSourceFunction extends RichParallelSourceFunction<RowData
120114 private transient KafkaConsumer <byte [], byte []> kafkaConsumer ;
121115 private transient List <Integer > assignedPartitions ;
122116
123- // Watermark related
124- private transient WatermarkOutputMultiplexer watermarkOutputMultiplexer ;
125- private transient Map <Integer , String > partitionIdToOutputIdMap ;
126- private transient WatermarkGenerator <RowData > watermarkGenerator ;
127- private transient TimestampAssigner <RowData > timestampAssigner ;
128- // Periodic watermark control: autoWatermarkInterval > 0 means enabled
129- private transient long autoWatermarkInterval ;
130- private transient long lastPeriodicWatermarkTime ;
117+ // Watermark related: uses table-runtime WatermarkGenerator directly
118+ private WatermarkStrategy <RowData > watermarkStrategy ;
119+ private transient WatermarkGenerator tableWatermarkGenerator ;
120+ private transient Map <Integer , Long > partitionWatermarks ;
121+ private transient long currentCombinedWatermark ;
131122
132123 public AuronKafkaSourceFunction (
133124 LogicalType outputType ,
@@ -231,22 +222,24 @@ public void open(Configuration config) throws Exception {
231222 subtaskIndex ,
232223 assignedPartitions );
233224
234- // 3. Initialize Watermark components if watermarkStrategy is set
225+ // 3. Initialize table-runtime WatermarkGenerator if watermarkStrategy is set
235226 if (watermarkStrategy != null ) {
236- ClassLoader userCodeClassLoader = runtimeContext .getUserCodeClassLoader ();
237- WatermarkStrategy <RowData > deserializedWatermarkStrategy =
238- watermarkStrategy .deserializeValue (userCodeClassLoader );
239-
240227 MetricGroup metricGroup = runtimeContext .getMetricGroup ();
241-
242- this .timestampAssigner = deserializedWatermarkStrategy .createTimestampAssigner (() -> metricGroup );
243-
244- this .watermarkGenerator = deserializedWatermarkStrategy .createWatermarkGenerator (() -> metricGroup );
245-
246- // 4. Determine periodic watermark interval
247- // autoWatermarkInterval > 0 means periodic watermark is enabled
248- this .autoWatermarkInterval = runtimeContext .getExecutionConfig ().getAutoWatermarkInterval ();
249- this .lastPeriodicWatermarkTime = 0L ; // Initialize to 0 so first emit triggers immediately
228+ // Create DataStream API WatermarkGenerator via the strategy
229+ org .apache .flink .api .common .eventtime .WatermarkGenerator <RowData > dsGenerator =
230+ watermarkStrategy .createWatermarkGenerator (() -> metricGroup );
231+ // Extract inner table-runtime WatermarkGenerator from DefaultWatermarkGenerator
232+ if (dsGenerator instanceof GeneratedWatermarkGeneratorSupplier .DefaultWatermarkGenerator ) {
233+ Field field = GeneratedWatermarkGeneratorSupplier .DefaultWatermarkGenerator .class .getDeclaredField (
234+ "innerWatermarkGenerator" );
235+ field .setAccessible (true );
236+ this .tableWatermarkGenerator = (WatermarkGenerator ) field .get (dsGenerator );
237+ } else {
238+ throw new IllegalStateException ("Expected DefaultWatermarkGenerator from WatermarkPushDownSpec, got: "
239+ + dsGenerator .getClass ().getName ());
240+ }
241+ this .partitionWatermarks = new HashMap <>();
242+ this .currentCombinedWatermark = Long .MIN_VALUE ;
250243 }
251244 this .isRunning = true ;
252245 }
@@ -267,97 +260,76 @@ public void add(String name, long value) {
267260 fieldList .addAll (((RowType ) outputType ).getFields ());
268261 RowType auronOutputRowType = new RowType (fieldList );
269262
270- // Initialize WatermarkOutputMultiplexer here because sourceContext is available
271- if (watermarkGenerator != null ) {
272- this .watermarkOutputMultiplexer =
273- new WatermarkOutputMultiplexer (new SourceContextWatermarkOutputAdapter <>(sourceContext ));
274- this .partitionIdToOutputIdMap = new HashMap <>();
275- for (Integer partition : assignedPartitions ) {
276- String outputId = createOutputId (partition );
277- partitionIdToOutputIdMap .put (partition , outputId );
278- watermarkOutputMultiplexer .registerNewOutput (outputId , watermark -> {});
279- }
280- }
281-
282263 // Pre-check watermark flag to avoid per-record null checks in the hot path
283- final boolean enableWatermark = watermarkGenerator != null ;
284-
285- while (this .isRunning ) {
286- AuronCallNativeWrapper wrapper = new AuronCallNativeWrapper (
287- FlinkArrowUtils .getRootAllocator (),
288- physicalPlanNode ,
289- nativeMetric ,
290- 0 ,
291- 0 ,
292- 0 ,
293- AuronAdaptor .getInstance ()
294- .getAuronConfiguration ()
295- .getLong (FlinkAuronConfiguration .NATIVE_MEMORY_SIZE ));
296-
297- if (enableWatermark ) {
298- // Watermark-enabled path
299- while (wrapper .loadNextBatch (batch -> {
300- Map <Integer , Long > tmpOffsets = new HashMap <>(currentOffsets );
301- FlinkArrowReader arrowReader = FlinkArrowReader .create (batch , auronOutputRowType , 3 );
302-
303- for (int i = 0 ; i < batch .getRowCount (); i ++) {
304- AuronColumnarRowData tmpRowData = (AuronColumnarRowData ) arrowReader .read (i );
305- // Extract kafka meta fields
306- int partitionId = tmpRowData .getInt (-3 );
307- long offset = tmpRowData .getLong (-2 );
308- long kafkaTimestamp = tmpRowData .getLong (-1 );
309- tmpOffsets .put (partitionId , offset );
310-
311- // Extract event timestamp via user-defined TimestampAssigner
312- long timestamp = timestampAssigner .extractTimestamp (tmpRowData , kafkaTimestamp );
313-
314- // Route to the per-partition WatermarkOutput and trigger onEvent
315- // outputId must not null, else is a bug
316- String outputId = partitionIdToOutputIdMap .get (partitionId );
317- WatermarkOutput partitionOutput = watermarkOutputMultiplexer .getImmediateOutput (outputId );
318- watermarkGenerator .onEvent (tmpRowData , timestamp , partitionOutput );
319- // Emit record with event timestamp
320- sourceContext .collectWithTimestamp (arrowReader .read (i ), timestamp );
321- }
322-
323- // Periodic watermark: only emit if enough time has elapsed since last emit
324- // Controlled by ExecutionConfig.getAutoWatermarkInterval()
325- long currentTime = System .currentTimeMillis ();
326- if (autoWatermarkInterval > 0
327- && (currentTime - lastPeriodicWatermarkTime ) >= autoWatermarkInterval ) {
328- for (Map .Entry <Integer , String > entry : partitionIdToOutputIdMap .entrySet ()) {
329- // Use getDeferredOutput for periodic emit: all partitions update first,
330- // then multiplexer merges and emits once via onPeriodicEmit()
331- WatermarkOutput output = watermarkOutputMultiplexer .getDeferredOutput (entry .getValue ());
332- watermarkGenerator .onPeriodicEmit (output );
264+ final boolean enableWatermark = tableWatermarkGenerator != null ;
265+
266+ AuronCallNativeWrapper wrapper = new AuronCallNativeWrapper (
267+ FlinkArrowUtils .getRootAllocator (),
268+ physicalPlanNode ,
269+ nativeMetric ,
270+ 0 ,
271+ 0 ,
272+ 0 ,
273+ AuronAdaptor .getInstance ().getAuronConfiguration ().getLong (FlinkAuronConfiguration .NATIVE_MEMORY_SIZE ));
274+
275+ if (enableWatermark ) {
276+ // Watermark-enabled path: use table-runtime WatermarkGenerator directly
277+ while (wrapper .loadNextBatch (batch -> {
278+ Map <Integer , Long > tmpOffsets = new HashMap <>(currentOffsets );
279+ FlinkArrowReader arrowReader = FlinkArrowReader .create (batch , auronOutputRowType , 3 );
280+ for (int i = 0 ; i < batch .getRowCount (); i ++) {
281+ AuronColumnarRowData tmpRowData = (AuronColumnarRowData ) arrowReader .read (i );
282+ // Extract kafka meta fields
283+ int partitionId = tmpRowData .getInt (-3 );
284+ long offset = tmpRowData .getLong (-2 );
285+ long kafkaTimestamp = tmpRowData .getLong (-1 );
286+ tmpOffsets .put (partitionId , offset );
287+
288+ try {
289+ // Compute watermark using table-runtime WatermarkGenerator (stateless pure function)
290+ // with local Timezone
291+ Long watermark = tableWatermarkGenerator .currentWatermark (tmpRowData );
292+ // Update per-partition watermark tracking
293+ if (watermark != null ) {
294+ partitionWatermarks .merge (partitionId , watermark , Math ::max );
333295 }
334- // Merge all deferred updates and emit the combined watermark downstream
335- watermarkOutputMultiplexer .onPeriodicEmit ();
336- lastPeriodicWatermarkTime = currentTime ;
296+ } catch (Exception e ) {
297+ throw new RuntimeException ("Generated WatermarkGenerator fails to generate:" , e );
337298 }
299+ // Emit record with kafka timestamp
300+ sourceContext .collectWithTimestamp (tmpRowData , kafkaTimestamp );
301+ }
338302
339- synchronized (lock ) {
340- currentOffsets = tmpOffsets ;
341- }
342- })) {}
343- } else {
344- // No-watermark path: still use collectWithTimestamp with kafka timestamp
345- while (wrapper .loadNextBatch (batch -> {
346- Map <Integer , Long > tmpOffsets = new HashMap <>(currentOffsets );
347- FlinkArrowReader arrowReader = FlinkArrowReader .create (batch , auronOutputRowType , 3 );
348- for (int i = 0 ; i < batch .getRowCount (); i ++) {
349- AuronColumnarRowData tmpRowData = (AuronColumnarRowData ) arrowReader .read (i );
350- int partitionId = tmpRowData .getInt (-3 );
351- long offset = tmpRowData .getLong (-2 );
352- long kafkaTimestamp = tmpRowData .getLong (-1 );
353- tmpOffsets .put (partitionId , offset );
354- sourceContext .collectWithTimestamp (arrowReader .read (i ), kafkaTimestamp );
355- }
356- synchronized (lock ) {
357- currentOffsets = tmpOffsets ;
303+ // After each batch, compute combined watermark (min across all partitions) and emit
304+ if (!partitionWatermarks .isEmpty ()) {
305+ long minWatermark = Collections .min (partitionWatermarks .values ());
306+ if (minWatermark > currentCombinedWatermark ) {
307+ currentCombinedWatermark = minWatermark ;
308+ sourceContext .emitWatermark (new Watermark (minWatermark ));
358309 }
359- })) {}
360- }
310+ }
311+
312+ synchronized (lock ) {
313+ currentOffsets = tmpOffsets ;
314+ }
315+ })) {}
316+ } else {
317+ // No-watermark path: still use collectWithTimestamp with kafka timestamp
318+ while (wrapper .loadNextBatch (batch -> {
319+ Map <Integer , Long > tmpOffsets = new HashMap <>(currentOffsets );
320+ FlinkArrowReader arrowReader = FlinkArrowReader .create (batch , auronOutputRowType , 3 );
321+ for (int i = 0 ; i < batch .getRowCount (); i ++) {
322+ AuronColumnarRowData tmpRowData = (AuronColumnarRowData ) arrowReader .read (i );
323+ int partitionId = tmpRowData .getInt (-3 );
324+ long offset = tmpRowData .getLong (-2 );
325+ long kafkaTimestamp = tmpRowData .getLong (-1 );
326+ tmpOffsets .put (partitionId , offset );
327+ sourceContext .collectWithTimestamp (tmpRowData , kafkaTimestamp );
328+ }
329+ synchronized (lock ) {
330+ currentOffsets = tmpOffsets ;
331+ }
332+ })) {}
361333 }
362334 LOG .info ("Auron kafka source run end" );
363335 }
@@ -376,6 +348,11 @@ public void close() throws Exception {
376348 kafkaConsumer .close ();
377349 }
378350
351+ // Close table-runtime WatermarkGenerator
352+ if (tableWatermarkGenerator != null ) {
353+ tableWatermarkGenerator .close ();
354+ }
355+
379356 super .close ();
380357 }
381358
@@ -478,22 +455,7 @@ public void initializeState(FunctionInitializationContext context) throws Except
478455 }
479456 }
480457
481- public AuronKafkaSourceFunction assignTimestampsAndWatermarks (WatermarkStrategy <RowData > watermarkStrategy ) {
482- checkNotNull (watermarkStrategy );
483- try {
484- ClosureCleaner .clean (watermarkStrategy , ExecutionConfig .ClosureCleanerLevel .RECURSIVE , true );
485- this .watermarkStrategy = new SerializedValue <>(watermarkStrategy );
486- } catch (Exception e ) {
487- throw new IllegalArgumentException ("The given WatermarkStrategy is not serializable" , e );
488- }
489- return this ;
490- }
491-
492- // -------------------------------------------------------------------------
493- // Internal helpers
494- // -------------------------------------------------------------------------
495-
496- private String createOutputId (int partitionId ) {
497- return topic + "-" + partitionId ;
458+ public void setWatermarkStrategy (WatermarkStrategy <RowData > watermarkStrategy ) {
459+ this .watermarkStrategy = watermarkStrategy ;
498460 }
499461}
0 commit comments