Skip to content

Fluss binlog source fails checkpoint/savepoint with NPE in FlinkSourceEnumerator.snapshotState #3307

@lgtm4e

Description

@lgtm4e

Search before asking

  • I searched in the issues and found nothing similar.

Fluss version

0.9.0 (latest release)

Please describe the bug 🐞

Description

When consuming a Fluss binlog table using Flink SQL, the Flink job fails during checkpoint/savepoint triggering with a NullPointerException in FlinkSourceEnumerator.snapshotState.

The exception occurs consistently when executing checkpoint/savepoint operations (including stop-with-savepoint).

It seems leaseContext is null when snapshotState() is invoked.


Environment

  • Flink: 1.20.3
  • Fluss: 0.9.0-incubating
  • Connector: fluss-flink-1.20-0.9.0-incubating.jar

Source Table

CREATE TABLE fluss_catalog.dwd_layer.dwd_cust_info_history (
  change_type STRING,
  change_log_offset BIGINT NOT NULL,
  change_date STRING NOT NULL,
  change_commit_time TIMESTAMP(3),
  tenant_id STRING NOT NULL,
  cust_id BIGINT NOT NULL,
  sex INT,
  birthday DATE,
  age INT,
  org_id BIGINT,
  org_full_name STRING,
  cust_state INT,
  src_crtime TIMESTAMP(3),
  src_uptime TIMESTAMP(3),
  valid_from TIMESTAMP(3),
  etl_time TIMESTAMP(3),
  PRIMARY KEY (change_date, tenant_id, cust_id, change_log_offset) NOT ENFORCED
) PARTITIONED BY (change_date) WITH (
  'table.datalake.enabled' = 'true',
  'table.datalake.freshness' = '1min',
  'bucket.num' = '4'
);

Insert SQL

INSERT INTO fluss_catalog.dwd_layer.dwd_cust_info_history
SELECT
  _change_type AS change_type,
  _log_offset AS change_log_offset,
  DATE_FORMAT(_commit_timestamp, 'yyyyMMdd') AS change_date,
  CAST(_commit_timestamp AS TIMESTAMP(3)) AS change_commit_time,
  `after`.tenant_id,
  `after`.cust_id,
  `after`.sex,
  `after`.birthday,
  CASE
    WHEN `after`.birthday IS NOT NULL AND `after`.birthday <= CURRENT_DATE
      THEN CAST(TIMESTAMPDIFF(YEAR, `after`.birthday, CURRENT_DATE) AS INT)
    WHEN `after`.age IS NOT NULL AND `after`.age > 0 THEN `after`.age
    ELSE NULL
  END AS age,
  `after`.org_id,
  `after`.org_full_name,
  `after`.cust_state,
  `after`.crtime AS src_crtime,
  `after`.uptime AS src_uptime,
  COALESCE(`after`.uptime, `after`.crtime, CAST(_commit_timestamp AS TIMESTAMP(3))) AS valid_from,
  CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)) AS etl_time
FROM fluss_catalog.ods_mysql.`ods_cust_info$binlog`
/*+ OPTIONS('scan.startup.mode' = 'earliest') */
WHERE `after`.tenant_id IS NOT NULL
  AND `after`.cust_id IS NOT NULL
  AND `after`.cust_id <> -1
  AND _log_offset IS NOT NULL
  AND _commit_timestamp IS NOT NULL
  AND _change_type IN ('insert', 'update');

Error

[ERROR] Could not execute SQL statement. Reason:
java.lang.NullPointerException: java.lang.NullPointerException: Cannot invoke "org.apache.fluss.flink.source.reader.LeaseContext.getKvSnapshotLeaseId()" because "this.leaseContext" is null

Full Stack Trace

2026-05-13 02:36:03,796 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Triggering stop-with-savepoint for job 9894357cf8b86e0a2c5d78b335598c50.
2026-05-13 02:36:03,801 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 2 (type=SavepointType{name='Suspend Savepoint', postCheckpointAction=SUSPEND, formatType=CANONICAL}) @ 1778639763798 for job 9894357cf8b86e0a2c5d78b335598c50.
2026-05-13 02:36:04,412 WARN  org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 2 for job 9894357cf8b86e0a2c5d78b335598c50. (0 consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: Trigger checkpoint failure.
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$getCheckpointException$17(CheckpointCoordinator.java:2365) ~[flink-dist-1.20.3.jar:1.20.3]
	at java.base/java.util.Optional.orElseGet(Unknown Source) ~[?:?]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.getCheckpointException(CheckpointCoordinator.java:2364) ~[flink-dist-1.20.3.jar:1.20.3]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:1056) ~[flink-dist-1.20.3.jar:1.20.3]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:1034) ~[flink-dist-1.20.3.jar:1.20.3]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$startTriggeringCheckpoint$7(CheckpointCoordinator.java:760) ~[flink-dist-1.20.3.jar:1.20.3]
	at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) ~[?:?]
	at org.apache.flink.util.MdcUtils.lambda$wrapRunnable$1(MdcUtils.java:70) ~[flink-dist-1.20.3.jar:1.20.3]
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:?]
	at java.base/java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
	at java.base/java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.fluss.flink.source.reader.LeaseContext.getKvSnapshotLeaseId()" because "this.leaseContext" is null
	at org.apache.fluss.flink.source.enumerator.FlinkSourceEnumerator.snapshotState(FlinkSourceEnumerator.java:1031) ~[fluss-flink-1.20-0.9.0-incubating.jar:0.9.0-incubating]
	at org.apache.fluss.flink.source.enumerator.FlinkSourceEnumerator.snapshotState(FlinkSourceEnumerator.java:100) ~[fluss-flink-1.20-0.9.0-incubating.jar:0.9.0-incubating]
	at org.apache.flink.runtime.source.coordinator.SourceCoordinator.toBytes(SourceCoordinator.java:577) ~[flink-dist-1.20.3.jar:1.20.3]
	at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$checkpointCoordinator$7(SourceCoordinator.java:421) ~[flink-dist-1.20.3.jar:1.20.3]
	at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:530) ~[flink-dist-1.20.3.jar:1.20.3]
	at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) ~[flink-dist-1.20.3.jar:1.20.3]
	... 6 more
2026-05-13 02:36:04,414 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking checkpoint 2 as aborted for source Source: ods_cust_info$binlog[35].

Additional Observations

  • The job itself runs normally before checkpoint/savepoint is triggered.
  • The failure happens specifically during checkpoint/savepoint snapshotting.
  • The source table is a Fluss binlog table: ods_cust_info$binlog
  • The issue appears related to leaseContext lifecycle/initialization inside FlinkSourceEnumerator.

Questions

  1. Is this a known issue in Fluss 0.9.0-incubating?
  2. Is there any workaround or configuration to avoid this issue?
  3. Has this issue already been fixed in a newer Fluss version?

Thanks.

Solution

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No fields configured for Bug.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions