diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java index 1a6521f57e..1b4f096052 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java @@ -377,9 +377,12 @@ private void startInBatchMode() { p.getPartitionId(), p.getPartitionName())) .collect(Collectors.toList()); - splits = this.initPartitionedSplits(partitions); + // Use log-only splits to avoid generating mixed split + // types (HybridSnapshotLogSplit + LogSplit) for + // primary-key tables, which is not supported. + splits = this.initLogTablePartitionSplits(partitions); } else { - splits = this.initNonPartitionedSplits(); + splits = this.getLogSplit(null, null); } } return splits; @@ -734,18 +737,21 @@ private List getLogSplit( } if (!bucketsNeedInitOffset.isEmpty()) { - startingOffsetsInitializer - .getBucketOffsets(partitionName, bucketsNeedInitOffset, bucketOffsetsRetriever) - .forEach( - (bucketId, startingOffset) -> - splits.add( - new LogSplit( - new TableBucket( - tableInfo.getTableId(), - partitionId, - bucketId), - partitionName, - startingOffset))); + Map startingOffsets = + startingOffsetsInitializer.getBucketOffsets( + partitionName, bucketsNeedInitOffset, bucketOffsetsRetriever); + Map stoppingOffsets = + stoppingOffsetsInitializer.getBucketOffsets( + partitionName, bucketsNeedInitOffset, bucketOffsetsRetriever); + startingOffsets.forEach( + (bucketId, startingOffset) -> + splits.add( + new LogSplit( + new TableBucket( + tableInfo.getTableId(), partitionId, bucketId), + partitionName, + startingOffset, + stoppingOffsets.get(bucketId)))); } return splits; }