Search before asking
Fluss version
0.9.0 (latest release)
Please describe the bug 🐞
Description
When consuming a Fluss binlog table using Flink SQL, the Flink job fails during checkpoint/savepoint triggering with a NullPointerException in FlinkSourceEnumerator.snapshotState.
The exception occurs consistently when executing checkpoint/savepoint operations (including stop-with-savepoint).
It seems leaseContext is null when snapshotState() is invoked.
Environment
- Flink: 1.20.3
- Fluss: 0.9.0-incubating
- Connector:
fluss-flink-1.20-0.9.0-incubating.jar
Source Table
CREATE TABLE fluss_catalog.dwd_layer.dwd_cust_info_history (
change_type STRING,
change_log_offset BIGINT NOT NULL,
change_date STRING NOT NULL,
change_commit_time TIMESTAMP(3),
tenant_id STRING NOT NULL,
cust_id BIGINT NOT NULL,
sex INT,
birthday DATE,
age INT,
org_id BIGINT,
org_full_name STRING,
cust_state INT,
src_crtime TIMESTAMP(3),
src_uptime TIMESTAMP(3),
valid_from TIMESTAMP(3),
etl_time TIMESTAMP(3),
PRIMARY KEY (change_date, tenant_id, cust_id, change_log_offset) NOT ENFORCED
) PARTITIONED BY (change_date) WITH (
'table.datalake.enabled' = 'true',
'table.datalake.freshness' = '1min',
'bucket.num' = '4'
);
Insert SQL
INSERT INTO fluss_catalog.dwd_layer.dwd_cust_info_history
SELECT
_change_type AS change_type,
_log_offset AS change_log_offset,
DATE_FORMAT(_commit_timestamp, 'yyyyMMdd') AS change_date,
CAST(_commit_timestamp AS TIMESTAMP(3)) AS change_commit_time,
`after`.tenant_id,
`after`.cust_id,
`after`.sex,
`after`.birthday,
CASE
WHEN `after`.birthday IS NOT NULL AND `after`.birthday <= CURRENT_DATE
THEN CAST(TIMESTAMPDIFF(YEAR, `after`.birthday, CURRENT_DATE) AS INT)
WHEN `after`.age IS NOT NULL AND `after`.age > 0 THEN `after`.age
ELSE NULL
END AS age,
`after`.org_id,
`after`.org_full_name,
`after`.cust_state,
`after`.crtime AS src_crtime,
`after`.uptime AS src_uptime,
COALESCE(`after`.uptime, `after`.crtime, CAST(_commit_timestamp AS TIMESTAMP(3))) AS valid_from,
CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)) AS etl_time
FROM fluss_catalog.ods_mysql.`ods_cust_info$binlog`
/*+ OPTIONS('scan.startup.mode' = 'earliest') */
WHERE `after`.tenant_id IS NOT NULL
AND `after`.cust_id IS NOT NULL
AND `after`.cust_id <> -1
AND _log_offset IS NOT NULL
AND _commit_timestamp IS NOT NULL
AND _change_type IN ('insert', 'update');
Error
[ERROR] Could not execute SQL statement. Reason:
java.lang.NullPointerException: java.lang.NullPointerException: Cannot invoke "org.apache.fluss.flink.source.reader.LeaseContext.getKvSnapshotLeaseId()" because "this.leaseContext" is null
Full Stack Trace
2026-05-13 02:36:03,796 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Triggering stop-with-savepoint for job 9894357cf8b86e0a2c5d78b335598c50.
2026-05-13 02:36:03,801 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 2 (type=SavepointType{name='Suspend Savepoint', postCheckpointAction=SUSPEND, formatType=CANONICAL}) @ 1778639763798 for job 9894357cf8b86e0a2c5d78b335598c50.
2026-05-13 02:36:04,412 WARN org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 2 for job 9894357cf8b86e0a2c5d78b335598c50. (0 consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: Trigger checkpoint failure.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$getCheckpointException$17(CheckpointCoordinator.java:2365) ~[flink-dist-1.20.3.jar:1.20.3]
at java.base/java.util.Optional.orElseGet(Unknown Source) ~[?:?]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.getCheckpointException(CheckpointCoordinator.java:2364) ~[flink-dist-1.20.3.jar:1.20.3]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:1056) ~[flink-dist-1.20.3.jar:1.20.3]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:1034) ~[flink-dist-1.20.3.jar:1.20.3]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$startTriggeringCheckpoint$7(CheckpointCoordinator.java:760) ~[flink-dist-1.20.3.jar:1.20.3]
at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) ~[?:?]
at org.apache.flink.util.MdcUtils.lambda$wrapRunnable$1(MdcUtils.java:70) ~[flink-dist-1.20.3.jar:1.20.3]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:?]
at java.base/java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
at java.base/java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.fluss.flink.source.reader.LeaseContext.getKvSnapshotLeaseId()" because "this.leaseContext" is null
at org.apache.fluss.flink.source.enumerator.FlinkSourceEnumerator.snapshotState(FlinkSourceEnumerator.java:1031) ~[fluss-flink-1.20-0.9.0-incubating.jar:0.9.0-incubating]
at org.apache.fluss.flink.source.enumerator.FlinkSourceEnumerator.snapshotState(FlinkSourceEnumerator.java:100) ~[fluss-flink-1.20-0.9.0-incubating.jar:0.9.0-incubating]
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.toBytes(SourceCoordinator.java:577) ~[flink-dist-1.20.3.jar:1.20.3]
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$checkpointCoordinator$7(SourceCoordinator.java:421) ~[flink-dist-1.20.3.jar:1.20.3]
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:530) ~[flink-dist-1.20.3.jar:1.20.3]
at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) ~[flink-dist-1.20.3.jar:1.20.3]
... 6 more
2026-05-13 02:36:04,414 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking checkpoint 2 as aborted for source Source: ods_cust_info$binlog[35].
Additional Observations
- The job itself runs normally before checkpoint/savepoint is triggered.
- The failure happens specifically during checkpoint/savepoint snapshotting.
- The source table is a Fluss binlog table:
ods_cust_info$binlog
- The issue appears related to
leaseContext lifecycle/initialization inside FlinkSourceEnumerator.
Questions
- Is this a known issue in Fluss 0.9.0-incubating?
- Is there any workaround or configuration to avoid this issue?
- Has this issue already been fixed in a newer Fluss version?
Thanks.
Solution
No response
Are you willing to submit a PR?
Search before asking
Fluss version
0.9.0 (latest release)
Please describe the bug 🐞
Description
When consuming a Fluss binlog table using Flink SQL, the Flink job fails during checkpoint/savepoint triggering with a
NullPointerExceptioninFlinkSourceEnumerator.snapshotState.The exception occurs consistently when executing checkpoint/savepoint operations (including stop-with-savepoint).
It seems
leaseContextis null whensnapshotState()is invoked.Environment
fluss-flink-1.20-0.9.0-incubating.jarSource Table
Insert SQL
Error
Full Stack Trace
Additional Observations
ods_cust_info$binlogleaseContextlifecycle/initialization insideFlinkSourceEnumerator.Questions
Thanks.
Solution
No response
Are you willing to submit a PR?