Skip to content

feat: Add Adaptive Kafka watermark generator#346

Open
ferenc-csaky wants to merge 3 commits into
mainfrom
feat/adaptive-watermark-gen
Open

feat: Add Adaptive Kafka watermark generator#346
ferenc-csaky wants to merge 3 commits into
mainfrom
feat/adaptive-watermark-gen

Conversation

@ferenc-csaky
Copy link
Copy Markdown
Collaborator

@ferenc-csaky ferenc-csaky commented Jun 1, 2026

Summary

Adds adaptive Kafka source watermarks for SOURCE_WATERMARK() and introduces opt-in idle watermark advancement for sparse Kafka streams.

The adaptive source watermark strategy estimates out-of-orderness from observed Kafka record timestamps and emits source watermarks once enough records have been observed. Idle advancement is disabled by default and can be enabled explicitly for sources where it is safe to advance event time during periods with no records.

Changes

  • Added adaptive source-watermark configuration:

    • scan.source-watermark.min-records
    • scan.source-watermark.min-out-of-orderness
    • scan.source-watermark.max-out-of-orderness
    • scan.source-watermark.out-of-orderness-quantile
  • Added opt-in idle source-watermark advancement:

    • scan.source-watermark.idle-advance-timeout
    • scan.source-watermark.idle-advance-safety-margin
    • scan.source-watermark.idle-advance-broker-check-timeout
    • scan.source-watermark.idle-advance-broker-check-ttl
  • Idle advancement remains disabled by default.

  • Idle advancement only proceeds when the Kafka readiness check confirms all explicit source topics use LogAppendTime.

  • Broker/topic readiness checks fail on errors, missing configs, topic-pattern sources, empty topic lists, or non-LogAppendTime topics.

  • Readiness checks are cached for 10 s by default to avoid checking Kafka every watermark period.

  • Reuses one lazy transient Kafka AdminClient per source checker/subtask to avoid repeated cold AdminClient bootstrap overhead.

  • Applies Flink withIdleness(...) only when idle advancement is disabled. When idle advancement is enabled, the source watermark generator must remain active so it can emit wall-clock-derived watermarks.

  • Exposes source-watermark options for both:

    • kafka-safe
    • upsert-kafka-safe
  • Added focused unit test coverage for:

    • adaptive watermark behavior
    • idle watermark advancement
    • readiness-check TTL caching
  • Added Kafka source-watermark integration test coverage for:

    • normal adaptive source watermarks
    • sparse-stream idle advancement
    • sparse-stream behavior without idle advancement

Safety Notes

Idle advancement is intentionally opt-in because advancing event time during source silence can mark later bursty traffic as late.

When enabled, idle advancement emits conservative wall-clock-derived watermarks using:

watermark = maxTimestamp + idleDuration - outOfOrderness - safetyMargin - 1

The final -1 preserves Flink’s exclusive watermark boundary so records exactly on the boundary remain on time.

Validation

Passed:

mvn -pl connectors/kafka-safe-connector clean test -Dtest=KafkaRecordTimestampWatermarkStrategyTest,KafkaAdminIdleAdvanceReadinessCheckerTest
mvn -pl flink-sql-runner -am -Dtest=NoSuchTest -Dit.test=KafkaSourceWatermarkIT verify

@ferenc-csaky ferenc-csaky force-pushed the feat/adaptive-watermark-gen branch from f5dd621 to 0c3a726 Compare June 1, 2026 14:50
@ferenc-csaky ferenc-csaky force-pushed the feat/adaptive-watermark-gen branch from 95f82a6 to 7125130 Compare June 2, 2026 15:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants