Skip to content

[flink] currentFetchEventTimeLag is misleading when subscribing multiple partitions/buckets with uneven lag #3304

@loserwang1024

Description

@loserwang1024

Search before asking

  • I searched in the issues and found nothing similar.

Fluss version

0.9.0 (latest release)

Please describe the bug 🐞

When a Flink source subscribes to multiple partition tables (or multiple buckets) whose consuming progress differs significantly, the reported currentFetchEventTimeLag metric is much smaller than the actual currentEmitEventTimeLag. This makes the metric misleading for monitoring and alerting, because:

  • currentFetchEventTimeLag stays near 0, suggesting the source has caught up.
  • currentEmitEventTimeLag reports a large lag (e.g. 2 hours), suggesting records are piling up.
  • Yet the Flink job has no backpressure, so the gap cannot be explained by downstream slowness.

Root Cause

In FlinkSourceSplitReader#forLogRecords:

maxConsumerRecordTimestampInFetch =
        Math.max(maxConsumerRecordTimestampInFetch, lastRecord.timestamp());
...
flinkSourceReaderMetrics.reportRecordEventTime(
        fetchTimestamp - maxConsumerRecordTimestampInFetch);

The aggregated timestamp is the MAX across all buckets in the fetch, so the reported lag is effectively the MIN lag across buckets.

Example scenario

  • Partition A is 2 hours behind → its last record timestamp is 2 hours old.
  • Partition B is reading the latest data → its last record timestamp ≈ now.

Every logScanner.poll(POLL_TIMEOUT) tends to return partition B's fresh data (since it is always locally ready as a CompletedFetch). The Math.max across buckets then picks partition B's near-now timestamp, so:

fetchTimestamp - maxTimestamp ≈ 0   → currentFetchEventTimeLag reports ~0

Meanwhile, emit lag is computed per record and aggregated as max, so partition A's 2-hour-old records correctly inflate currentEmitEventTimeLag. This is why the two metrics diverge dramatically even though there is no backpressure.

Solution

  1. Align fetch lag semantics with emit lag (max-lag across buckets):
    Change the aggregation in forLogRecords from Math.max to Math.min on record timestamp (equivalent to max on lag), so currentFetchEventTimeLag reports the worst-case lag across buckets in the fetch.

  2. Expose per-bucket / per-split fetch lag:
    Add a new gauge currentFetchEventTimeLag under the existing per-bucket metric group (next to currentOffset):

    • Non-partitioned: fluss.reader.bucket.{bucket_id}.currentFetchEventTimeLag
    • Partitioned: fluss.reader.partition.{partition_id}.bucket.{bucket_id}.currentFetchEventTimeLag

    This allows users to observe exactly which partition/bucket is lagging, making diagnosis like the scenario above straightforward.

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No fields configured for Bug.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions