From bf793219f858d543ac96f26eb33800db3585f5c8 Mon Sep 17 00:00:00 2001 From: zhangjunfan Date: Sat, 9 May 2026 14:47:28 +0800 Subject: [PATCH 1/4] [flink] introduce batched splits assignment mechanism --- .../fluss/flink/FlinkConnectorOptions.java | 9 ++ .../flink/catalog/FlinkTableFactory.java | 10 ++ .../flink/source/BinlogFlinkTableSource.java | 27 ++++ .../source/ChangelogFlinkTableSource.java | 27 ++++ .../fluss/flink/source/FlinkSource.java | 73 ++++++++++ .../fluss/flink/source/FlinkTableSource.java | 44 ++++++ .../fluss/flink/source/FlussSource.java | 30 ++++ .../flink/source/FlussSourceBuilder.java | 20 +++ .../enumerator/FlinkSourceEnumerator.java | 130 +++++++++++++++++- .../enumerator/FlinkSourceEnumeratorTest.java | 68 +++++++++ 10 files changed, 437 insertions(+), 1 deletion(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java index c7a3b44c28..23d8f0b2e9 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java @@ -150,6 +150,15 @@ public class FlinkConnectorOptions { + "as a small value would cause frequent requests and increase server load. In the future, " + "once list partitions is optimized, the default value of this parameter can be reduced."); + public static final ConfigOption SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE = + ConfigOptions.key("scan.split.assignment.batch-size") + .intType() + .defaultValue(Integer.MAX_VALUE) + .withDescription( + "The maximum number of Fluss source splits assigned to a reader in " + + "one assignment request. The value must be positive. By default, " + + "all pending splits for a reader are assigned in one request."); + public static final ConfigOption SINK_IGNORE_DELETE = ConfigOptions.key("sink.ignore-delete") .booleanType() diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java index 408a703058..1b082855e6 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java @@ -146,6 +146,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { tableOptions .get(FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL) .toMillis(); + int splitAssignmentBatchSize = + tableOptions.get(FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE); LeaseContext leaseContext = LeaseContext.fromConf(tableOptions); return new FlinkTableSource( @@ -163,6 +165,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { tableOptions.get(FlinkConnectorOptions.LOOKUP_INSERT_IF_NOT_EXISTS), cache, partitionDiscoveryIntervalMs, + splitAssignmentBatchSize, tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_ENABLED)), tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)), context.getCatalogTable().getOptions(), @@ -234,6 +237,7 @@ public Set> optionalOptions() { FlinkConnectorOptions.SCAN_STARTUP_MODE, FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP, FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL, + FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE, FlinkConnectorOptions.SCAN_KV_SNAPSHOT_LEASE_ID, FlinkConnectorOptions.SCAN_KV_SNAPSHOT_LEASE_DURATION, FlinkConnectorOptions.LOOKUP_ASYNC, @@ -365,6 +369,8 @@ private DynamicTableSource createChangelogTableSource( tableOptions .get(FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL) .toMillis(); + int splitAssignmentBatchSize = + tableOptions.get(FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE); return new ChangelogFlinkTableSource( TablePath.of(tableIdentifier.getDatabaseName(), baseTableName), @@ -374,6 +380,7 @@ private DynamicTableSource createChangelogTableSource( isStreamingMode, startupOptions, partitionDiscoveryIntervalMs, + splitAssignmentBatchSize, catalogTableOptions); } @@ -412,6 +419,8 @@ private DynamicTableSource createBinlogTableSource( tableOptions .get(FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL) .toMillis(); + int splitAssignmentBatchSize = + tableOptions.get(FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE); return new BinlogFlinkTableSource( TablePath.of(tableIdentifier.getDatabaseName(), baseTableName), @@ -421,6 +430,7 @@ private DynamicTableSource createBinlogTableSource( isStreamingMode, startupOptions, partitionDiscoveryIntervalMs, + splitAssignmentBatchSize, catalogTableOptions); } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java index 901995a657..93261e8430 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java @@ -19,6 +19,7 @@ import org.apache.fluss.client.initializer.OffsetsInitializer; import org.apache.fluss.config.Configuration; +import org.apache.fluss.flink.FlinkConnectorOptions; import org.apache.fluss.flink.source.deserializer.BinlogDeserializationSchema; import org.apache.fluss.flink.source.reader.LeaseContext; import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils; @@ -51,6 +52,7 @@ public class BinlogFlinkTableSource implements ScanTableSource { private final boolean streaming; private final FlinkConnectorOptionsUtils.StartupOptions startupOptions; private final long scanPartitionDiscoveryIntervalMs; + private final int splitPerAssignmentBatchSize; private final Map tableOptions; // Projection pushdown @@ -68,6 +70,28 @@ public BinlogFlinkTableSource( FlinkConnectorOptionsUtils.StartupOptions startupOptions, long scanPartitionDiscoveryIntervalMs, Map tableOptions) { + this( + tablePath, + flussConfig, + binlogOutputType, + isPartitioned, + streaming, + startupOptions, + scanPartitionDiscoveryIntervalMs, + FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE.defaultValue(), + tableOptions); + } + + public BinlogFlinkTableSource( + TablePath tablePath, + Configuration flussConfig, + org.apache.flink.table.types.logical.RowType binlogOutputType, + boolean isPartitioned, + boolean streaming, + FlinkConnectorOptionsUtils.StartupOptions startupOptions, + long scanPartitionDiscoveryIntervalMs, + int splitPerAssignmentBatchSize, + Map tableOptions) { this.tablePath = tablePath; this.flussConfig = flussConfig; this.binlogOutputType = binlogOutputType; @@ -75,6 +99,7 @@ public BinlogFlinkTableSource( this.streaming = streaming; this.startupOptions = startupOptions; this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs; + this.splitPerAssignmentBatchSize = splitPerAssignmentBatchSize; this.tableOptions = tableOptions; // Extract data columns from the 'before' nested ROW type (index 3) @@ -129,6 +154,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { null, offsetsInitializer, scanPartitionDiscoveryIntervalMs, + splitPerAssignmentBatchSize, new BinlogDeserializationSchema(), streaming, partitionFilters, @@ -148,6 +174,7 @@ public DynamicTableSource copy() { streaming, startupOptions, scanPartitionDiscoveryIntervalMs, + splitPerAssignmentBatchSize, tableOptions); copy.producedDataType = producedDataType; copy.projectedFields = projectedFields; diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java index f58fac3df9..4304b271b2 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java @@ -19,6 +19,7 @@ import org.apache.fluss.client.initializer.OffsetsInitializer; import org.apache.fluss.config.Configuration; +import org.apache.fluss.flink.FlinkConnectorOptions; import org.apache.fluss.flink.source.deserializer.ChangelogDeserializationSchema; import org.apache.fluss.flink.source.reader.LeaseContext; import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils; @@ -57,6 +58,7 @@ public class ChangelogFlinkTableSource implements ScanTableSource { private final boolean streaming; private final FlinkConnectorOptionsUtils.StartupOptions startupOptions; private final long scanPartitionDiscoveryIntervalMs; + private final int splitPerAssignmentBatchSize; private final Map tableOptions; // Projection pushdown @@ -81,6 +83,28 @@ public ChangelogFlinkTableSource( FlinkConnectorOptionsUtils.StartupOptions startupOptions, long scanPartitionDiscoveryIntervalMs, Map tableOptions) { + this( + tablePath, + flussConfig, + changelogOutputType, + partitionKeyIndexes, + streaming, + startupOptions, + scanPartitionDiscoveryIntervalMs, + FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE.defaultValue(), + tableOptions); + } + + public ChangelogFlinkTableSource( + TablePath tablePath, + Configuration flussConfig, + org.apache.flink.table.types.logical.RowType changelogOutputType, + int[] partitionKeyIndexes, + boolean streaming, + FlinkConnectorOptionsUtils.StartupOptions startupOptions, + long scanPartitionDiscoveryIntervalMs, + int splitPerAssignmentBatchSize, + Map tableOptions) { this.tablePath = tablePath; this.flussConfig = flussConfig; // The changelogOutputType already includes metadata columns from FlinkCatalog @@ -89,6 +113,7 @@ public ChangelogFlinkTableSource( this.streaming = streaming; this.startupOptions = startupOptions; this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs; + this.splitPerAssignmentBatchSize = splitPerAssignmentBatchSize; this.tableOptions = tableOptions; // Extract data columns by filtering out metadata columns by name @@ -166,6 +191,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { null, offsetsInitializer, scanPartitionDiscoveryIntervalMs, + splitPerAssignmentBatchSize, new ChangelogDeserializationSchema(), streaming, partitionFilters, @@ -185,6 +211,7 @@ public DynamicTableSource copy() { streaming, startupOptions, scanPartitionDiscoveryIntervalMs, + splitPerAssignmentBatchSize, tableOptions); copy.producedDataType = producedDataType; copy.projectedFields = projectedFields; diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java index efa426e5f9..8535a881a8 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java @@ -19,6 +19,7 @@ import org.apache.fluss.client.initializer.OffsetsInitializer; import org.apache.fluss.config.Configuration; +import org.apache.fluss.flink.FlinkConnectorOptions; import org.apache.fluss.flink.source.deserializer.DeserializerInitContextImpl; import org.apache.fluss.flink.source.deserializer.FlussDeserializationSchema; import org.apache.fluss.flink.source.emitter.FlinkRecordEmitter; @@ -67,6 +68,7 @@ public class FlinkSource @Nullable private final int[] projectedFields; protected final OffsetsInitializer offsetsInitializer; protected final long scanPartitionDiscoveryIntervalMs; + protected final int splitPerAssignmentBatchSize; private final boolean streaming; private final FlussDeserializationSchema deserializationSchema; @Nullable private final Predicate partitionFilters; @@ -99,6 +101,7 @@ public FlinkSource( logRecordBatchFilter, offsetsInitializer, scanPartitionDiscoveryIntervalMs, + FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE.defaultValue(), deserializationSchema, streaming, partitionFilters, @@ -121,6 +124,73 @@ public FlinkSource( @Nullable Predicate partitionFilters, @Nullable LakeSource lakeSource, LeaseContext leaseContext) { + this( + flussConf, + tablePath, + hasPrimaryKey, + isPartitioned, + sourceOutputType, + projectedFields, + logRecordBatchFilter, + offsetsInitializer, + scanPartitionDiscoveryIntervalMs, + FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE.defaultValue(), + deserializationSchema, + streaming, + partitionFilters, + lakeSource, + leaseContext); + } + + public FlinkSource( + Configuration flussConf, + TablePath tablePath, + boolean hasPrimaryKey, + boolean isPartitioned, + RowType sourceOutputType, + @Nullable int[] projectedFields, + @Nullable Predicate logRecordBatchFilter, + OffsetsInitializer offsetsInitializer, + long scanPartitionDiscoveryIntervalMs, + int splitPerAssignmentBatchSize, + FlussDeserializationSchema deserializationSchema, + boolean streaming, + @Nullable Predicate partitionFilters, + LeaseContext leaseContext) { + this( + flussConf, + tablePath, + hasPrimaryKey, + isPartitioned, + sourceOutputType, + projectedFields, + logRecordBatchFilter, + offsetsInitializer, + scanPartitionDiscoveryIntervalMs, + splitPerAssignmentBatchSize, + deserializationSchema, + streaming, + partitionFilters, + null, + leaseContext); + } + + public FlinkSource( + Configuration flussConf, + TablePath tablePath, + boolean hasPrimaryKey, + boolean isPartitioned, + RowType sourceOutputType, + @Nullable int[] projectedFields, + @Nullable Predicate logRecordBatchFilter, + OffsetsInitializer offsetsInitializer, + long scanPartitionDiscoveryIntervalMs, + int splitPerAssignmentBatchSize, + FlussDeserializationSchema deserializationSchema, + boolean streaming, + @Nullable Predicate partitionFilters, + @Nullable LakeSource lakeSource, + LeaseContext leaseContext) { this.flussConf = flussConf; this.tablePath = tablePath; this.hasPrimaryKey = hasPrimaryKey; @@ -130,6 +200,7 @@ public FlinkSource( this.logRecordBatchFilter = logRecordBatchFilter; this.offsetsInitializer = offsetsInitializer; this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs; + this.splitPerAssignmentBatchSize = splitPerAssignmentBatchSize; this.deserializationSchema = deserializationSchema; this.streaming = streaming; this.partitionFilters = partitionFilters; @@ -153,6 +224,7 @@ public SplitEnumerator createEnumerator( splitEnumeratorContext, offsetsInitializer, scanPartitionDiscoveryIntervalMs, + splitPerAssignmentBatchSize, streaming, partitionFilters, lakeSource, @@ -175,6 +247,7 @@ public SplitEnumerator restoreEnumerator sourceEnumeratorState.getRemainingHybridLakeFlussSplits(), offsetsInitializer, scanPartitionDiscoveryIntervalMs, + splitPerAssignmentBatchSize, streaming, partitionFilters, lakeSource, diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java index 0da1ba7668..859ef7cfae 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java @@ -139,6 +139,7 @@ public class FlinkTableSource @Nullable private final LookupCache cache; private final long scanPartitionDiscoveryIntervalMs; + private final int splitPerAssignmentBatchSize; private final boolean isDataLakeEnabled; private final LeaseContext leaseContext; @@ -194,6 +195,46 @@ public FlinkTableSource( @Nullable MergeEngineType mergeEngineType, Map tableOptions, LeaseContext leaseContext) { + this( + tablePath, + flussConfig, + tableConfig, + tableOutputType, + primaryKeyIndexes, + bucketKeyIndexes, + partitionKeyIndexes, + streaming, + startupOptions, + lookupAsync, + insertIfNotExists, + cache, + scanPartitionDiscoveryIntervalMs, + FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE.defaultValue(), + isDataLakeEnabled, + mergeEngineType, + tableOptions, + leaseContext); + } + + public FlinkTableSource( + TablePath tablePath, + Configuration flussConfig, + TableConfig tableConfig, + org.apache.flink.table.types.logical.RowType tableOutputType, + int[] primaryKeyIndexes, + int[] bucketKeyIndexes, + int[] partitionKeyIndexes, + boolean streaming, + FlinkConnectorOptionsUtils.StartupOptions startupOptions, + boolean lookupAsync, + boolean insertIfNotExists, + @Nullable LookupCache cache, + long scanPartitionDiscoveryIntervalMs, + int splitPerAssignmentBatchSize, + boolean isDataLakeEnabled, + @Nullable MergeEngineType mergeEngineType, + Map tableOptions, + LeaseContext leaseContext) { this.tablePath = tablePath; this.flussConfig = flussConfig; this.tableOutputType = tableOutputType; @@ -209,6 +250,7 @@ public FlinkTableSource( this.cache = cache; this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs; + this.splitPerAssignmentBatchSize = splitPerAssignmentBatchSize; this.isDataLakeEnabled = isDataLakeEnabled; this.leaseContext = leaseContext; this.mergeEngineType = mergeEngineType; @@ -370,6 +412,7 @@ public boolean isBounded() { logRecordBatchFilter, offsetsInitializer, scanPartitionDiscoveryIntervalMs, + splitPerAssignmentBatchSize, new RowDataDeserializationSchema(), streaming, partitionFilters, @@ -478,6 +521,7 @@ public DynamicTableSource copy() { insertIfNotExists, cache, scanPartitionDiscoveryIntervalMs, + splitPerAssignmentBatchSize, isDataLakeEnabled, mergeEngineType, tableOptions, diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java index 53cabcca79..795e850821 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java @@ -20,6 +20,7 @@ import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.client.initializer.OffsetsInitializer; import org.apache.fluss.config.Configuration; +import org.apache.fluss.flink.FlinkConnectorOptions; import org.apache.fluss.flink.source.deserializer.FlussDeserializationSchema; import org.apache.fluss.flink.source.reader.LeaseContext; import org.apache.fluss.metadata.TablePath; @@ -71,6 +72,34 @@ public class FlussSource extends FlinkSource { long scanPartitionDiscoveryIntervalMs, FlussDeserializationSchema deserializationSchema, boolean streaming) { + this( + flussConf, + tablePath, + hasPrimaryKey, + isPartitioned, + sourceOutputType, + projectedFields, + logRecordBatchFilter, + offsetsInitializer, + scanPartitionDiscoveryIntervalMs, + FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE.defaultValue(), + deserializationSchema, + streaming); + } + + FlussSource( + Configuration flussConf, + TablePath tablePath, + boolean hasPrimaryKey, + boolean isPartitioned, + RowType sourceOutputType, + @Nullable int[] projectedFields, + @Nullable Predicate logRecordBatchFilter, + OffsetsInitializer offsetsInitializer, + long scanPartitionDiscoveryIntervalMs, + int splitPerAssignmentBatchSize, + FlussDeserializationSchema deserializationSchema, + boolean streaming) { // TODO: Support partition pushDown in datastream super( flussConf, @@ -82,6 +111,7 @@ public class FlussSource extends FlinkSource { logRecordBatchFilter, offsetsInitializer, scanPartitionDiscoveryIntervalMs, + splitPerAssignmentBatchSize, deserializationSchema, streaming, null, diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java index 87efc01cd2..1e65491342 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java @@ -71,6 +71,7 @@ public class FlussSourceBuilder { private String[] projectedFieldNames; private Predicate logRecordBatchFilter; private Long scanPartitionDiscoveryIntervalMs; + private Integer splitPerAssignmentBatchSize; private OffsetsInitializer offsetsInitializer; private FlussDeserializationSchema deserializationSchema; @@ -133,6 +134,20 @@ public FlussSourceBuilder setScanPartitionDiscoveryIntervalMs( return this; } + /** + * Sets the maximum number of splits assigned to a reader in one assignment request. + * + *

If not specified, the default value from {@link + * FlinkConnectorOptions#SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE} is used. + * + * @param splitPerAssignmentBatchSize maximum splits per assignment request + * @return this builder + */ + public FlussSourceBuilder setSplitPerAssignmentBatchSize(int splitPerAssignmentBatchSize) { + this.splitPerAssignmentBatchSize = splitPerAssignmentBatchSize; + return this; + } + /** * Sets the starting offsets strategy for the Fluss source. * @@ -241,6 +256,10 @@ public FlussSource build() { .defaultValue() .toMillis(); } + if (splitPerAssignmentBatchSize == null) { + splitPerAssignmentBatchSize = + FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE.defaultValue(); + } if (this.flussConf == null) { this.flussConf = new Configuration(); @@ -317,6 +336,7 @@ public FlussSource build() { logRecordBatchFilter, offsetsInitializer, scanPartitionDiscoveryIntervalMs, + splitPerAssignmentBatchSize, deserializationSchema, true); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java index 1a6521f57e..6cd5da3b39 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java @@ -29,6 +29,7 @@ import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.UnsupportedVersionException; +import org.apache.fluss.flink.FlinkConnectorOptions; import org.apache.fluss.flink.lake.LakeSplitGenerator; import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit; import org.apache.fluss.flink.lake.split.LakeSnapshotSplit; @@ -52,6 +53,7 @@ import org.apache.fluss.row.BinaryString; import org.apache.fluss.row.GenericRow; import org.apache.fluss.row.InternalRow; +import org.apache.fluss.shaded.guava32.com.google.common.collect.Lists; import org.apache.fluss.utils.ExceptionUtils; import org.apache.flink.annotation.Internal; @@ -83,6 +85,7 @@ import java.util.TreeMap; import java.util.stream.Collectors; +import static org.apache.fluss.utils.Preconditions.checkArgument; import static org.apache.fluss.utils.Preconditions.checkNotNull; import static org.apache.fluss.utils.Preconditions.checkState; @@ -183,6 +186,37 @@ public class FlinkSourceEnumerator @Nullable private final LakeSource lakeSource; + private final int splitPerAssignmentBatchSize; + + public FlinkSourceEnumerator( + TablePath tablePath, + Configuration flussConf, + boolean hasPrimaryKey, + boolean isPartitioned, + SplitEnumeratorContext context, + OffsetsInitializer startingOffsetsInitializer, + long scanPartitionDiscoveryIntervalMs, + boolean streaming, + @Nullable Predicate partitionFilters, + @Nullable LakeSource lakeSource, + LeaseContext leaseContext, + boolean checkpointTriggeredBefore) { + this( + tablePath, + flussConf, + hasPrimaryKey, + isPartitioned, + context, + startingOffsetsInitializer, + scanPartitionDiscoveryIntervalMs, + FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE.defaultValue(), + streaming, + partitionFilters, + lakeSource, + leaseContext, + checkpointTriggeredBefore); + } + public FlinkSourceEnumerator( TablePath tablePath, Configuration flussConf, @@ -191,6 +225,7 @@ public FlinkSourceEnumerator( SplitEnumeratorContext context, OffsetsInitializer startingOffsetsInitializer, long scanPartitionDiscoveryIntervalMs, + int splitPerAssignmentBatchSize, boolean streaming, @Nullable Predicate partitionFilters, @Nullable LakeSource lakeSource, @@ -207,6 +242,7 @@ public FlinkSourceEnumerator( null, startingOffsetsInitializer, scanPartitionDiscoveryIntervalMs, + splitPerAssignmentBatchSize, streaming, partitionFilters, lakeSource, @@ -241,6 +277,43 @@ public FlinkSourceEnumerator( pendingHybridLakeFlussSplits, startingOffsetsInitializer, scanPartitionDiscoveryIntervalMs, + FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE.defaultValue(), + streaming, + partitionFilters, + lakeSource, + leaseContext, + checkpointTriggeredBefore); + } + + public FlinkSourceEnumerator( + TablePath tablePath, + Configuration flussConf, + boolean hasPrimaryKey, + boolean isPartitioned, + SplitEnumeratorContext context, + Set assignedTableBuckets, + Map assignedPartitions, + List pendingHybridLakeFlussSplits, + OffsetsInitializer startingOffsetsInitializer, + long scanPartitionDiscoveryIntervalMs, + int splitPerAssignmentBatchSize, + boolean streaming, + @Nullable Predicate partitionFilters, + @Nullable LakeSource lakeSource, + LeaseContext leaseContext, + boolean checkpointTriggeredBefore) { + this( + tablePath, + flussConf, + hasPrimaryKey, + isPartitioned, + context, + assignedTableBuckets, + assignedPartitions, + pendingHybridLakeFlussSplits, + startingOffsetsInitializer, + scanPartitionDiscoveryIntervalMs, + splitPerAssignmentBatchSize, streaming, partitionFilters, lakeSource, @@ -266,6 +339,48 @@ public FlinkSourceEnumerator( WorkerExecutor workerExecutor, LeaseContext leaseContext, boolean checkpointTriggeredBefore) { + this( + tablePath, + flussConf, + hasPrimaryKey, + isPartitioned, + context, + assignedTableBuckets, + assignedPartitions, + pendingHybridLakeFlussSplits, + startingOffsetsInitializer, + scanPartitionDiscoveryIntervalMs, + FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE.defaultValue(), + streaming, + partitionFilters, + lakeSource, + workerExecutor, + leaseContext, + checkpointTriggeredBefore); + } + + FlinkSourceEnumerator( + TablePath tablePath, + Configuration flussConf, + boolean hasPrimaryKey, + boolean isPartitioned, + SplitEnumeratorContext context, + Set assignedTableBuckets, + Map assignedPartitions, + List pendingHybridLakeFlussSplits, + OffsetsInitializer startingOffsetsInitializer, + long scanPartitionDiscoveryIntervalMs, + int splitPerAssignmentBatchSize, + boolean streaming, + @Nullable Predicate partitionFilters, + @Nullable LakeSource lakeSource, + WorkerExecutor workerExecutor, + LeaseContext leaseContext, + boolean checkpointTriggeredBefore) { + checkArgument( + splitPerAssignmentBatchSize > 0, + "Split assignment batch size must be positive, but was %s.", + splitPerAssignmentBatchSize); this.tablePath = checkNotNull(tablePath); this.flussConf = checkNotNull(flussConf); this.hasPrimaryKey = hasPrimaryKey; @@ -288,6 +403,7 @@ public FlinkSourceEnumerator( this.workerExecutor = workerExecutor; this.leaseContext = leaseContext; this.checkpointTriggeredBefore = checkpointTriggeredBefore; + this.splitPerAssignmentBatchSize = splitPerAssignmentBatchSize; } @Override @@ -927,7 +1043,19 @@ private void assignPendingSplits(Set pendingReaders) { // Assign pending splits to readers if (!incrementalAssignment.isEmpty()) { LOG.info("Assigning splits to readers {}", incrementalAssignment); - context.assignSplits(new SplitsAssignment<>(incrementalAssignment)); + for (Map.Entry> entry : + incrementalAssignment.entrySet()) { + int readerId = entry.getKey(); + List splits = entry.getValue(); + Lists.partition(splits, splitPerAssignmentBatchSize).stream() + .forEach( + batchSplits -> { + context.assignSplits( + new SplitsAssignment<>( + Collections.singletonMap( + readerId, batchSplits))); + }); + } } if (noMoreNewSplits) { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java index ca4722c9e6..956e9252bf 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java @@ -79,6 +79,7 @@ import static org.apache.fluss.record.TestData.DEFAULT_REMOTE_DATA_DIR; import static org.apache.fluss.testutils.DataTestUtils.row; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Unit tests for {@link FlinkSourceEnumerator}. */ class FlinkSourceEnumeratorTest extends FlinkTestBase { @@ -143,6 +144,73 @@ void testPkTableNoSnapshotSplits() throws Throwable { } } + @Test + void testSplitAssignmentBatchSize() throws Throwable { + long tableId = createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR); + try (MockSplitEnumeratorContext context = + new MockSplitEnumeratorContext<>(1)) { + FlinkSourceEnumerator enumerator = + new FlinkSourceEnumerator( + DEFAULT_TABLE_PATH, + flussConf, + true, + false, + context, + OffsetsInitializer.full(), + DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, + 2, + streaming, + null, + null, + LeaseContext.DEFAULT, + false); + + enumerator.start(); + registerReader(context, enumerator, 0); + context.runNextOneTimeCallable(); + + List> assignments = + context.getSplitsAssignmentSequence(); + assertThat(assignments).hasSize(2); + assertThat(assignments.get(0).assignment().get(0)).hasSize(2); + assertThat(assignments.get(1).assignment().get(0)).hasSize(1); + + List assignedSplits = new ArrayList<>(); + assignments.forEach( + assignment -> assignedSplits.addAll(assignment.assignment().get(0))); + assertThat(assignedSplits) + .containsExactly( + genLogSplit(tableId, 0), + genLogSplit(tableId, 1), + genLogSplit(tableId, 2)); + } + } + + @Test + void testInvalidSplitAssignmentBatchSize() throws Exception { + try (MockSplitEnumeratorContext context = + new MockSplitEnumeratorContext<>(1)) { + assertThatThrownBy( + () -> + new FlinkSourceEnumerator( + DEFAULT_TABLE_PATH, + flussConf, + true, + false, + context, + OffsetsInitializer.full(), + DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, + 0, + streaming, + null, + null, + LeaseContext.DEFAULT, + false)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Split assignment batch size must be positive"); + } + } + @Test void testPkTableWithSnapshotSplits() throws Throwable { long tableId = createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR); From 4a6b803221ebc3828b7e79afdf318515aef01765 Mon Sep 17 00:00:00 2001 From: zhangjunfan Date: Sat, 9 May 2026 15:47:20 +0800 Subject: [PATCH 2/4] ci failure --- .../enumerator/FlinkSourceEnumeratorTest.java | 29 ++++++++++++------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java index 956e9252bf..6d239f965b 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java @@ -138,8 +138,7 @@ void testPkTableNoSnapshotSplits() throws Throwable { expectedAssignment.put(i, Collections.singletonList(genLogSplit(tableId, i))); } - Map> actualAssignment = - getLastReadersAssignments(context); + Map> actualAssignment = getReadersAssignments(context); assertThat(actualAssignment).isEqualTo(expectedAssignment); } } @@ -245,8 +244,7 @@ void testPkTableWithSnapshotSplits() throws Throwable { // make enumerate to get splits and assign context.runNextOneTimeCallable(); - Map> actualAssignment = - getLastReadersAssignments(context); + Map> actualAssignment = getReadersAssignments(context); Map> expectedAssignment = new HashMap<>(); @@ -332,8 +330,7 @@ void testNonPkTable() throws Throwable { new LogSplit(new TableBucket(tableId, i), null, -2L))); } - Map> actualAssignment = - getLastReadersAssignments(context); + Map> actualAssignment = getReadersAssignments(context); assertThat(actualAssignment).isEqualTo(expectedAssignment); } } @@ -545,10 +542,11 @@ void testDiscoverPartitionsPeriodically(boolean isPrimaryKeyTable) throws Throwa createPartitions(zooKeeperClient, DEFAULT_TABLE_PATH, newPartitions); /// invoke partition discovery callable again and there should assignments. + int assignmentStart = context.getSplitsAssignmentSequence().size(); runPeriodicPartitionDiscovery(workExecutor); expectedAssignment = expectAssignments(enumerator, tableId, newPartitionNameIds); - actualAssignments = getLastReadersAssignments(context); + actualAssignments = getReadersAssignments(context, assignmentStart); checkAssignmentIgnoreOrder(actualAssignments, expectedAssignment); // drop + create partitions; @@ -561,6 +559,7 @@ void testDiscoverPartitionsPeriodically(boolean isPrimaryKeyTable) throws Throwa createPartitions(zooKeeperClient, DEFAULT_TABLE_PATH, newPartitions); // invoke partition discovery callable again + assignmentStart = context.getSplitsAssignmentSequence().size(); runPeriodicPartitionDiscovery(workExecutor); // there should be partition removed events @@ -581,7 +580,7 @@ void testDiscoverPartitionsPeriodically(boolean isPrimaryKeyTable) throws Throwa // check new assignments. expectedAssignment = expectAssignments(enumerator, tableId, newPartitionNameIds); - actualAssignments = getLastReadersAssignments(context); + actualAssignments = getReadersAssignments(context, assignmentStart); checkAssignmentIgnoreOrder(actualAssignments, expectedAssignment); Map assignedPartitions = @@ -984,11 +983,21 @@ private LogSplit genLogSplit(long tableId, int bucketId) { private Map> getReadersAssignments( MockSplitEnumeratorContext context) { + return getReadersAssignments(context, 0); + } + + private Map> getReadersAssignments( + MockSplitEnumeratorContext context, int startIndex) { List> splitsAssignments = context.getSplitsAssignmentSequence(); Map> assignment = new HashMap<>(); - for (SplitsAssignment splitAssignment : splitsAssignments) { - assignment.putAll(splitAssignment.assignment()); + for (int i = startIndex; i < splitsAssignments.size(); i++) { + for (Map.Entry> splitAssignment : + splitsAssignments.get(i).assignment().entrySet()) { + assignment + .computeIfAbsent(splitAssignment.getKey(), key -> new ArrayList<>()) + .addAll(splitAssignment.getValue()); + } } return assignment; } From fb4480d1316593c37ee1fb326206c8054f483560 Mon Sep 17 00:00:00 2001 From: zhangjunfan Date: Mon, 11 May 2026 13:51:54 +0800 Subject: [PATCH 3/4] add option check --- .../flink/utils/FlinkConnectorOptionsUtils.java | 12 ++++++++++++ .../flink/catalog/FlinkTableFactoryTest.java | 12 ++++++++++++ .../utils/FlinkConnectorOptionsUtilTest.java | 16 ++++++++++++++++ 3 files changed, 40 insertions(+) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtils.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtils.java index b9306e53af..249e1be089 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtils.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtils.java @@ -43,6 +43,7 @@ import static org.apache.flink.configuration.CoreOptions.TMP_DIRS; import static org.apache.fluss.config.ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR; +import static org.apache.fluss.flink.FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE; import static org.apache.fluss.flink.FlinkConnectorOptions.SCAN_STARTUP_MODE; import static org.apache.fluss.flink.FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP; import static org.apache.fluss.flink.FlinkConnectorOptions.ScanStartupMode.TIMESTAMP; @@ -61,6 +62,7 @@ public static ZoneId getLocalTimeZone(String timeZone) { public static void validateTableSourceOptions(ReadableConfig tableOptions) { validateScanStartupMode(tableOptions); + validateScanSplitAssignmentBatchSize(tableOptions); } /** @@ -154,6 +156,16 @@ private static void validateScanStartupMode(ReadableConfig tableOptions) { } } + private static void validateScanSplitAssignmentBatchSize(ReadableConfig tableOptions) { + int batchSize = tableOptions.get(SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE); + if (batchSize <= 0) { + throw new ValidationException( + String.format( + "'%s' must be positive, but was %s.", + SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE.key(), batchSize)); + } + } + /** * Parses timestamp String to Long. * diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkTableFactoryTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkTableFactoryTest.java index c1d136cb4c..57bf8e8da6 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkTableFactoryTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkTableFactoryTest.java @@ -93,6 +93,18 @@ void testTableSourceOptions() { FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP.key(), "2023-12-09 23:09:12"); createTableSource(schema, scanModeProperties); + // test split assignment batch size + Map splitAssignmentBatchProperties = getBasicOptions(); + splitAssignmentBatchProperties.put( + FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE.key(), "0"); + assertThatThrownBy(() -> createTableSource(schema, splitAssignmentBatchProperties)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "'scan.split.assignment.batch-size' must be positive, but was 0."); + splitAssignmentBatchProperties.put( + FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE.key(), "1"); + createTableSource(schema, splitAssignmentBatchProperties); + // test datalake options Map datalakeProperties = getBasicOptions(); datalakeProperties.put("table.datalake.format", "paimon"); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtilTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtilTest.java index 259a8ce6eb..0e06dc1019 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtilTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtilTest.java @@ -27,6 +27,7 @@ import static org.apache.flink.configuration.CoreOptions.TMP_DIRS; import static org.apache.fluss.config.ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR; +import static org.apache.fluss.flink.FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE; import static org.apache.fluss.flink.FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP; import static org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils.parseTimestamp; import static org.assertj.core.api.Assertions.assertThat; @@ -63,6 +64,21 @@ void testParseTimestamp() { + "You can config like: '2023-12-09 23:09:12' or '1678883047356'."); } + @Test + void testValidateSplitAssignmentBatchSize() { + org.apache.flink.configuration.Configuration tableOptions = + new org.apache.flink.configuration.Configuration(); + + tableOptions.set(SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE, 1); + FlinkConnectorOptionsUtils.validateTableSourceOptions(tableOptions); + + tableOptions.set(SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE, 0); + assertThatThrownBy( + () -> FlinkConnectorOptionsUtils.validateTableSourceOptions(tableOptions)) + .isInstanceOf(ValidationException.class) + .hasMessage("'scan.split.assignment.batch-size' must be positive, but was 0."); + } + @Test void testGetClientScannerIoTmpDir() { Configuration flussConfig = From 0c3c6ae38ad113ec4558c6fab512f622d8622891 Mon Sep 17 00:00:00 2001 From: zhangjunfan Date: Mon, 11 May 2026 13:55:00 +0800 Subject: [PATCH 4/4] fix log --- .../enumerator/FlinkSourceEnumerator.java | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java index 6cd5da3b39..e04989cbb2 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java @@ -1042,7 +1042,25 @@ private void assignPendingSplits(Set pendingReaders) { // Assign pending splits to readers if (!incrementalAssignment.isEmpty()) { - LOG.info("Assigning splits to readers {}", incrementalAssignment); + int totalSplits = incrementalAssignment.values().stream().mapToInt(List::size).sum(); + Map batchesPerReader = + incrementalAssignment.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> + (entry.getValue().size() + + splitPerAssignmentBatchSize + - 1) + / splitPerAssignmentBatchSize)); + LOG.info( + "Assigning splits to {} readers: totalSplits={}, batchesPerReader={}", + incrementalAssignment.size(), + totalSplits, + batchesPerReader); + if (LOG.isDebugEnabled()) { + LOG.debug("Assigning splits to readers {}", incrementalAssignment); + } for (Map.Entry> entry : incrementalAssignment.entrySet()) { int readerId = entry.getKey();