Search before asking
Description
Follow-up to #3296.
In batch mode on a lake-enabled PK table with no lake snapshot yet, the fallback reads every bucket's log from EARLIEST, ignoring existing Fluss KV snapshots. This is OOM-prone on large never-tiered tables.
Spark fixed the equivalent in #3317 with per-bucket dispatch (snapshot+tail where KV snapshot exists, log-only otherwise).
Flink can't 1:1 port because FlinkSourceSplitReader has no sort-merge on the Fluss-only path. We may reuse LakeSnapshotAndLogSplitScanner (already used by Flink's lake reader, already sort-merges per PK) for the fallback or do smth similar
Willingness to contribute
Search before asking
Description
Follow-up to #3296.
In batch mode on a lake-enabled PK table with no lake snapshot yet, the fallback reads every bucket's log from EARLIEST, ignoring existing Fluss KV snapshots. This is OOM-prone on large never-tiered tables.
Spark fixed the equivalent in #3317 with per-bucket dispatch (snapshot+tail where KV snapshot exists, log-only otherwise).
Flink can't 1:1 port because
FlinkSourceSplitReaderhas no sort-merge on the Fluss-only path. We may reuseLakeSnapshotAndLogSplitScanner(already used by Flink's lake reader, already sort-merges per PK) for the fallback or do smth similarWillingness to contribute