[flink] Fix batch fallback generating mixed split types for primary-key tables#3296
[flink] Fix batch fallback generating mixed split types for primary-key tables#3296matrixsparse wants to merge 1 commit into
Conversation
09b5acd to
7ac1f8c
Compare
|
Hi @luoyuxia, this is a follow-up fix for the mixed split types issue you mentioned in #3208. Could you PTAL? Thanks! cc @fresh-borzoni |
fresh-borzoni
left a comment
There was a problem hiding this comment.
@matrixsparse Ty, LGTM in general, one comment:
It matches spark logic, but at the same time this scenario is a bit dangerous - we have a big table that was never tiered to lake, then we decide to tier it to lake and run batched query through this fallback, instead of using kv_snapshot and replaying log on top, we read from earliest which is potentially a lot of records. So it's not very efficient and potentially OOM prone.
Let's file an issue about this to address separately for Spark and Flink?
cc @luoyuxia WDYT about this plan?
luoyuxia
left a comment
There was a problem hiding this comment.
@matrixsparse Thanks for the quick fix. Left on comments. PTAL
| // Use log-only splits to avoid generating mixed split | ||
| // types (HybridSnapshotLogSplit + LogSplit) for | ||
| // primary-key tables, which is not supported. | ||
| splits = this.initLogTablePartitionSplits(partitions); |
There was a problem hiding this comment.
Note it'll just genereate log split without stopping offset which will then nerver stop..
Summary
Follow-up fix for #3208.
For primary-key tables in batch mode, when no lake snapshot exists, the previous fallback logic called
initPartitionedSplits()/initNonPartitionedSplits(), which internally invokesgetSnapshotAndLogSplits(). This method may produce mixed split types —HybridSnapshotLogSplitfor buckets with KV snapshots andLogSplitfor buckets without — which the Flink connector does not support merging in batch mode.This fix replaces the fallback path with
initLogTablePartitionSplits()/getLogSplit()to generate uniformLogSplitfor all buckets, avoiding the mixed split type issue.Changes
initPartitionedSplits()→initLogTablePartitionSplits()initNonPartitionedSplits()→getLogSplit(null, null)