Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ public SplitEnumerator<TieringSplit, TieringSourceEnumeratorState> createEnumera
public SplitEnumerator<TieringSplit, TieringSourceEnumeratorState> restoreEnumerator(
SplitEnumeratorContext<TieringSplit> splitEnumeratorContext,
TieringSourceEnumeratorState tieringSourceEnumeratorState) {
// stateless operator
return new TieringSourceEnumerator(
flussConf, splitEnumeratorContext, pollTieringTableIntervalMs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@
import org.apache.fluss.client.admin.Admin;
import org.apache.fluss.client.metadata.MetadataUpdater;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.UnsupportedVersionException;
import org.apache.fluss.flink.metrics.FlinkMetricRegistry;
import org.apache.fluss.flink.tiering.event.FailedTieringEvent;
import org.apache.fluss.flink.tiering.event.FinishedTieringEvent;
import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent;
import org.apache.fluss.flink.tiering.source.split.TieringSnapshotSplit;
import org.apache.fluss.flink.tiering.source.split.TieringSplit;
import org.apache.fluss.flink.tiering.source.split.TieringSplitGenerator;
import org.apache.fluss.flink.tiering.source.state.TieringSourceEnumeratorState;
import org.apache.fluss.lake.committer.TieringStats;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.rpc.GatewayClientProxy;
Expand All @@ -42,6 +45,7 @@
import org.apache.fluss.rpc.messages.PbLakeTieringStats;
import org.apache.fluss.rpc.messages.PbLakeTieringTableInfo;
import org.apache.fluss.rpc.metrics.ClientMetricGroup;
import org.apache.fluss.utils.ExceptionUtils;

import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.SourceEvent;
Expand All @@ -56,6 +60,7 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -64,6 +69,7 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -96,6 +102,15 @@ public class TieringSourceEnumerator

private static final Logger LOG = LoggerFactory.getLogger(TieringSourceEnumerator.class);

/**
* KV snapshot lease duration for the whole tiering job. One lease covers the entire job
* lifecycle; it is renewed implicitly by every {@code acquireSnapshots} call, so a relatively
* long duration is safe and also bounds the worst-case leaked-lease lifetime if the job dies
* abnormally. Tiering rounds are typically minute-level today so a 6-hour lease is more than
* sufficient for most cases.
*/
private static final long KV_SNAPSHOT_LEASE_DURATION_MS = Duration.ofHours(6).toMillis();

private final Configuration flussConf;
private final SplitEnumeratorContext<TieringSplit> context;
private final ScheduledExecutorService timerService;
Expand All @@ -109,6 +124,18 @@ public class TieringSourceEnumerator
private final Map<Long, TieringFinishInfo> finishedTables;
private final Set<Long> tieringReachMaxDurationsTables;

/**
* Buckets whose kv snapshots are currently held under the lease, grouped by tableId. Used to
* release the correct bucket subset when a table finishes/fails or when a failover happens.
*/
private final Map<Long, Set<TableBucket>> leasedBucketsByTable;

/**
* A unique lease id for this tiering job. Reused across all tables to keep a single renewal and
* bookkeeping entry on the server side, aligned with the normal Flink Source design.
*/
private final String kvSnapshotLeaseId;

// lazily instantiated
private RpcClient rpcClient;
private CoordinatorGateway coordinatorGateway;
Expand Down Expand Up @@ -138,6 +165,10 @@ public TieringSourceEnumerator(
this.finishedTables = new ConcurrentHashMap<>();
this.failedTableEpochs = new ConcurrentHashMap<>();
this.tieringReachMaxDurationsTables = Collections.synchronizedSet(new TreeSet<>());
// Thread safety: outer map is ConcurrentHashMap, values are ConcurrentHashMap-backed
// Sets. Reads/writes are safe across the coordinator thread and the timer thread.
this.leasedBucketsByTable = new ConcurrentHashMap<>();
this.kvSnapshotLeaseId = "tiering-" + UUID.randomUUID();
}

@Override
Expand Down Expand Up @@ -184,6 +215,14 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname
LOG.info("TieringSourceReader {} requests split.", subtaskId);
readersAwaitingSplit.add(subtaskId);

// Skip the synchronous split request path during failover. Otherwise we may immediately
// re-acquire a kv snapshot lease for the same table that was just released by the
// failover handling, leaving stale lease state until the next periodic poll. The
// periodic callAsync task will resume normal split generation once failover completes.
if (isFailOvering) {
return;
}

// If pending splits exist, assign them directly to the requesting reader
if (!pendingSplits.isEmpty()) {
assignSplits();
Expand Down Expand Up @@ -249,7 +288,13 @@ public void addReader(int subtaskId) {
context.currentParallelism(),
globalMaxAttempt,
context.registeredReadersOfAttempts());
isFailOvering = false;
// Defer clearing isFailOvering until the next periodic poll runs in
// the coordinator thread. This guarantees that any synchronous
// handleSplitRequest invoked right after addReader still observes
// isFailOvering=true and skips the synchronous fetch path, so we
// don't immediately re-acquire a kv snapshot lease that was just
// released by handleSourceReaderFailOver().
context.runInCoordinatorThread(() -> isFailOvering = false);
}
}
}
Expand Down Expand Up @@ -279,6 +324,8 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
TieringFinishInfo.from(
tieringEpoch, isForceFinished, finishedTieringEvent.getStats()));
}
// release the kv snapshot lease held for this table (if any).
maybeReleaseKvSnapshotLease(finishedTableId);
}

if (sourceEvent instanceof FailedTieringEvent) {
Expand All @@ -297,6 +344,8 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
} else {
failedTableEpochs.put(failedTableId, tieringEpoch);
}
// release the kv snapshot lease held for this table (if any).
maybeReleaseKvSnapshotLease(failedTableId);
}

if (!finishedTables.isEmpty() || !failedTableEpochs.isEmpty()) {
Expand All @@ -317,6 +366,11 @@ private void handleSourceReaderFailOver() {
tieringReachMaxDurationsTables.clear();
// also clean all pending splits since we mark all as failed
pendingSplits.clear();
// Release leases for all currently tracked tables to avoid leaking server-side
// snapshot references when those tables are marked failed during failover.
for (Long tableId : new HashSet<>(leasedBucketsByTable.keySet())) {
maybeReleaseKvSnapshotLease(tableId);
}
if (!failedTableEpochs.isEmpty()) {
// call one round of heartbeat to notify table has been finished or failed
this.context.callAsync(
Expand Down Expand Up @@ -464,6 +518,10 @@ private void generateTieringSplits(Tuple3<Long, Long, TablePath> tieringTable)
finishedTables.put(tieringTable.f0, TieringFinishInfo.from(tieringTable.f1));
} else {
tieringTableEpochs.put(tieringTable.f0, tieringTable.f1);
// Acquire kv snapshot lease for all snapshot splits of this table before they
// are assigned to readers, so that snapshots referenced by these splits will not
// be garbage-collected by the Fluss server while tiering is in progress.
maybeAcquireKvSnapshotLease(tieringTable.f0, tieringSplits);
pendingSplits.addAll(tieringSplits);

timerService.schedule(
Expand All @@ -481,7 +539,12 @@ private void generateTieringSplits(Tuple3<Long, Long, TablePath> tieringTable)
}
} catch (Exception e) {
LOG.warn("Fail to generate Tiering splits for table {}.", tieringTable.f2, e);
// Remove from tieringTableEpochs in case it was already added before the failure.
tieringTableEpochs.remove(tieringTable.f0);
failedTableEpochs.put(tieringTable.f0, tieringTable.f1);
// Release any lease that was partially acquired before the failure so the
// server-side snapshot references are not held unnecessarily until lease expiry.
maybeReleaseKvSnapshotLease(tieringTable.f0);
}
}

Expand All @@ -494,7 +557,6 @@ private List<TieringSplit> populateNumberOfTieringSplits(List<TieringSplit> tier

@Override
public TieringSourceEnumeratorState snapshotState(long checkpointId) throws Exception {
// do nothing, the downstream lake committer will snapshot the state to Fluss Cluster
return new TieringSourceEnumeratorState();
}

Expand All @@ -515,6 +577,16 @@ public void close() throws IOException {
LOG.error("Failed to close Tiering Source enumerator.", e);
}
}
// Release any remaining leases held by this enumerator. The fluss cluster will also
// expire stale leases naturally after KV_SNAPSHOT_LEASE_DURATION_MS, so a best-effort
// release here is sufficient.
for (Long tableId : new HashSet<>(leasedBucketsByTable.keySet())) {
try {
maybeReleaseKvSnapshotLease(tableId);
} catch (Exception e) {
LOG.warn("Failed to release kv snapshot lease for table {} on close.", tableId, e);
}
}
try {
if (flussAdmin != null) {
LOG.info("Closing Fluss Admin client...");
Expand All @@ -533,6 +605,126 @@ public void close() throws IOException {
}
}

/**
* Acquire kv snapshot lease for all {@link TieringSnapshotSplit}s of the given table so that
* snapshots referenced by these splits will not be cleaned up by the Fluss server during
* tiering. Bucket-snapshot mappings are remembered in {@link #leasedBucketsByTable} so they can
* be released on finish/fail.
*
* <p>Falls back to a warning (no exception thrown) when the server does not support the kv
* snapshot lease API, to preserve compatibility with older Fluss clusters.
*/
private void maybeAcquireKvSnapshotLease(long tableId, List<TieringSplit> tieringSplits) {
Map<TableBucket, Long> bucketsToLease = new HashMap<>();
for (TieringSplit split : tieringSplits) {
if (split.isTieringSnapshotSplit()) {
TieringSnapshotSplit snapshotSplit = split.asTieringSnapshotSplit();
bucketsToLease.put(snapshotSplit.getTableBucket(), snapshotSplit.getSnapshotId());
}
}
if (bucketsToLease.isEmpty()) {
return;
}
LOG.info(
"Try to acquire kv snapshot lease {} for tiering table {} with {} buckets.",
kvSnapshotLeaseId,
tableId,
bucketsToLease.size());
try {
Set<TableBucket> unavailableBuckets =
flussAdmin
.createKvSnapshotLease(kvSnapshotLeaseId, KV_SNAPSHOT_LEASE_DURATION_MS)
.acquireSnapshots(bucketsToLease)
.get()
.getUnavailableTableBucketSet();
// Only record successfully leased buckets so we don't later try to release
// buckets that were never actually acquired (e.g. snapshots already GC'ed or
// missing on the server).
Set<TableBucket> acquiredBuckets = new HashSet<>(bucketsToLease.keySet());
if (!unavailableBuckets.isEmpty()) {
LOG.warn(
"Failed to acquire kv snapshot lease for {} of {} buckets of tiering "
+ "table {}: {}. The corresponding snapshots may have already "
+ "been garbage-collected; tiering for those buckets may fail "
+ "later when the snapshots are accessed.",
unavailableBuckets.size(),
bucketsToLease.size(),
tableId,
unavailableBuckets);
acquiredBuckets.removeAll(unavailableBuckets);
}
if (!acquiredBuckets.isEmpty()) {
leasedBucketsByTable
.computeIfAbsent(tableId, k -> ConcurrentHashMap.newKeySet())
.addAll(acquiredBuckets);
}
} catch (Exception e) {
if (ExceptionUtils.findThrowable(e, UnsupportedVersionException.class).isPresent()) {
LOG.warn(
"Failed to acquire kv snapshot lease for tiering table {} because the "
+ "server does not support kv snapshot lease API. Snapshots may "
+ "be cleaned up earlier than expected. Please upgrade the Fluss "
+ "server to version 0.9 or later.",
tableId,
e);
} else {
LOG.error(
"Failed to acquire kv snapshot lease for tiering table {}. "
+ "Tiering will proceed without snapshot protection; the "
+ "snapshot may be garbage-collected while tiering is in progress.",
tableId,
e);
}
}
}

/**
* Release the kv snapshot lease held for a specific table. Called when a table finishes
* tiering, fails, or is abandoned due to failover. Missing leases (log-only tables, or tables
* for which acquire failed) are handled as no-ops.
*/
private void maybeReleaseKvSnapshotLease(long tableId) {
Set<TableBucket> bucketsToRelease = leasedBucketsByTable.remove(tableId);
if (bucketsToRelease == null || bucketsToRelease.isEmpty()) {
return;
}
Comment on lines +686 to +690
LOG.info(
"Try to release kv snapshot lease {} for tiering table {} with {} buckets.",
kvSnapshotLeaseId,
tableId,
bucketsToRelease.size());
try {
flussAdmin
.createKvSnapshotLease(kvSnapshotLeaseId, KV_SNAPSHOT_LEASE_DURATION_MS)
.releaseSnapshots(bucketsToRelease)
.get();
} catch (Exception e) {
if (ExceptionUtils.findThrowable(e, UnsupportedVersionException.class).isPresent()) {
LOG.warn(
"Failed to release kv snapshot lease for tiering table {} because the "
+ "server does not support kv snapshot lease API.",
tableId,
e);
} else {
LOG.error(
"Failed to release kv snapshot lease for tiering table {}. The lease "
+ "will expire naturally on the server side.",
tableId,
e);
}
}
}

@VisibleForTesting
String getKvSnapshotLeaseId() {
return kvSnapshotLeaseId;
}

@VisibleForTesting
Map<Long, Set<TableBucket>> getLeasedBucketsByTable() {
return leasedBucketsByTable;
}

/**
* Report failed table to Fluss coordinator via HeartBeat, this method should be called when
* {@link TieringSourceEnumerator} is closed or receives failed table from downstream lake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,8 @@

package org.apache.fluss.flink.tiering.source.state;

import org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator;

/** The marker class of stateless component {@link TieringSourceEnumerator}. */
/** The state of the tiering source enumerator. Currently a stateless marker. */
public class TieringSourceEnumeratorState {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will breaks the statless design of tiering service. I think we can just ignore the kvSnapshotLeaseId and delegate fluss cluster to do snapshot ttl.

public TieringSourceEnumeratorState() {}

@Override
public boolean equals(Object that) {
if (this == that) {
return true;
}
if (that != null) {
return this.toString().equals(that.toString());
}
return false;
}

@Override
public String toString() {
return "SourceEnumeratorState{}";
}
public TieringSourceEnumeratorState() {}
}
Loading