Skip to content

[server] Optimize RemoteLogFetcher with async prefetch for recovery#3132

Open
Kaixuan-Duan wants to merge 8 commits into
apache:mainfrom
Kaixuan-Duan:remote-log-fetcher-prefetch
Open

[server] Optimize RemoteLogFetcher with async prefetch for recovery#3132
Kaixuan-Duan wants to merge 8 commits into
apache:mainfrom
Kaixuan-Duan:remote-log-fetcher-prefetch

Conversation

@Kaixuan-Duan
Copy link
Copy Markdown
Contributor

@Kaixuan-Duan Kaixuan-Duan commented Apr 19, 2026

Purpose

Linked issue: close #3091
This PR improves KV recovery performance by reducing wait time between remote log segments in RemoteLogFetcher.

Brief change log

  • Async prefetch window in RemoteLogFetcher: added a sliding window that downloads the next N remote log segments in the background while the consumer reads the current one, replacing the previous strictly-serial download → open → read → next loop.
  • Bounded sliding-window prefetch (default depth 4, configurable via kv.recover.remote-log.prefetch-num):
    uses a Future[] ring buffer indexed by seq % prefetchNum to cap downloaded-but-not-consumed segments, bounding local-disk usage. A dedicated fixed-thread-pool (default 3, configurable via kv.recover.remote-log.download-threads) runs the actual downloads in parallel. As the consumer advances, consumed slots are freed and back-filled, overlapping network I/O with local iteration.
  • Exponential backoff retry: failed downloads retry up to 5 times with backoff 100ms → 5s and 0.25 jitter (ExponentialBackoff), preventing thrash on transient remote-storage failures while still surfacing persistent failures to the recovery driver.
  • Failure handling: when a prefetched future is cancelled (e.g. by close()), fetchSegmentFile() falls back to a synchronous downloadSegmentWithRetry so a single cancelled task does not stall recovery. When a prefetched future fails with ExecutionException — meaning the async worker has already exhausted the exponential-backoff retry budget — the failure is propagated directly as IOException and the prefetch window is drained. No redundant sync retry round is attempted.

Tests

  • ./mvnw -pl fluss-server -am clean install -DskipTests
  • ./mvnw -pl fluss-server spotless:apply -Dtest='RemoteLogFetcherTest' -DfailIfNoTests=false test

API and Format

  • kv.recover.remote-log.prefetch-num int (default 4) : Max remote log segments downloaded but not yet consumed during KV recovery. 1 preserves the legacy one-step-prefetch behavior.
  • kv.recover.remote-log.download-threads (default 3) : Threads used to download remote log segments during KV recovery. Should be ≤ prefetch-num.

No breaking changes to user-facing public APIs.

Documentation

Added kv.recover.remote-log.prefetch-num and kv.recover.remote-log.download-threads to website/docs/maintenance/configuration.md.

Copy link
Copy Markdown
Member

@fresh-borzoni fresh-borzoni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Kaixuan-Duan Thanks for the contribution. I will help to review this PR

One process point: this issue was already assigned and I was actively working on it. In that situation, please coordinate on the issue before opening an overlapping PR. Assignment is not exclusive ownership, but it is an important coordination signal, and skipping it usually leads to duplicated effort and fragmented review.

We can evaluate this PR on its merits, but for future cases please check on the issue first.

Copy link
Copy Markdown
Member

@fresh-borzoni fresh-borzoni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ty, direction is right, I left some cooments, PTAL


private void cancelPrefetch() {
if (nextDownloadedSegmentFuture != null) {
nextDownloadedSegmentFuture.cancel(true);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cancel(true) on an already-completed future is a no-op and drops the reference to the downloaded File, which then lives in tempDir until fetcher-level close()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks — I have updated the logic to handle this explicitly.

activeIterator = null;
}
} finally {
downloadExecutor.shutdownNow();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdownNow() doesn't wait - if a prefetch is mid-flush, it can write to tempDir after deleteDirectoryQuietly runs. Either downloadExecutor.awaitTermination() with a short timeout before deletion, or make downloadSegment interruption-aware (most S3 SDKs don't honor Thread.isInterrupted() during socket reads, so the interrupt from shutdownNow is effectively decorative)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. I updated close() to call awaitTermination() after shutdownNow() before deleting the temp directory. If the download executor does not terminate within the timeout, the cleanup is skipped to avoid racing with an in-progress download that may still write into tempDir.

}

@Override
public boolean hasNext() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If fetch() is called twice, the first Iterable still wraps the now-closed iterator and iterating it re-enters advance() on a closed instance, downloading into the shared tempDir, racing with the new iterator

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the closed iterator to stop immediately by marking it finished, clearing the pending batch, and making hasNext() return false after close.

}

private File fetchSegmentFile(RemoteLogSegment segment) throws IOException {
if (segment.equals(prefetchedSegment) && nextDownloadedSegmentFuture != null) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This depends on RemoteLogSegment having value-based equals(), or on both references coming from the same segments list (reference equality). Works today, but safer to compare by segment id tbh.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the prefetch match to compare remoteLogSegmentId() instead of relying on RemoteLogSegment.equals().

if (segment.equals(prefetchedSegment) && nextDownloadedSegmentFuture != null) {
try {
return nextDownloadedSegmentFuture.get();
} catch (InterruptedException e) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also catch CancellationException - it's unchecked (extends RuntimeException) and CompletableFuture.get() throws it on a cancelled future. Not a live bug in the current state machine (every cancelPrefetch nulls the field) but cheap defense-in-depth, especially given closed is volatile.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this as a defensive guard: if the prefetched future is cancelled and get() throws CancellationException.

@@ -28,10 +28,13 @@
import org.junit.jupiter.api.Test;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-blocking: Two of the three new tests inject state via reflection (setPrivateField) instead of exercising a real async prefetch - they cover the branches in fetchSegmentFile, but not close-during-real-in-flight-download or the orphan-file cleanup.

Consider one integration-style test with a real slow/failing download source.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ty, Addressed comments. PTAL

"Prefetched segment {} failed, fallback to sync download.",
segment.remoteLogSegmentId(),
e.getCause());
return downloadSegment(segment);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking: No retry on transient S3 failure - one flaky segment fails the entire recovery. In fluss-rust we added exponential backoff (100ms -> 5s with jitter) for this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. The retry design in fluss-rust is solid, so I followed the same idea here and added retries around synchronous segment downloads with exponential backoff and jitter, so transient remote storage failures are retried before failing the recovery.

return downloadSegment(segment);
}

private void prefetchNextSegment() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefetch depth hardcoded to 1. If S3 p99 download time > consume time for a segment, the downloader sits idle and the optimization is half-realized. On the Rust side (fluss-rust #187) we landed on configurable depth with default 4 for exactly this reason. Since it's KV depth = 1 might be fine, but it's still better to configure and reason properly

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I optimized the prefetch depth, PTAL.

@Kaixuan-Duan
Copy link
Copy Markdown
Contributor Author

Kaixuan-Duan commented Apr 24, 2026

@fresh-borzoni Thanks for the review, and sorry for not coordinating on the issue beforehand. I didn’t realize it was already being actively worked on.
The testing code and prefetch depth optimization are still progressing, and I will learn the implementation approach of fluss-rust #187.

@Kaixuan-Duan
Copy link
Copy Markdown
Contributor Author

@fresh-borzoni Thank you for the review. Addressed, PTAL.

@Kaixuan-Duan Kaixuan-Duan deleted the remote-log-fetcher-prefetch branch April 29, 2026 02:28
@Kaixuan-Duan Kaixuan-Duan restored the remote-log-fetcher-prefetch branch April 29, 2026 02:29
@Kaixuan-Duan Kaixuan-Duan reopened this Apr 29, 2026
Copy link
Copy Markdown
Member

@fresh-borzoni fresh-borzoni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Kaixuan-Duan Ty for the changes, left some comments, PTAL


final RemoteLogSegment target = segment;
CompletableFuture<File> future;
try {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The async download calls downloadSegment(target), while retry only exists in the sync fallback. So a transient remote-storage failure in the prefetch worker isn't retried in the background - instead the consumer later hits the failed future and runs the retry loop synchronously. This pushes retry latency back onto the recovery thread and largely defeats the point of prefetching. The async worker should call downloadSegmentWithRetry(target).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The async worker now calls downloadSegmentWithRetry(target), so transient failures are retried in the worker thread instead of falling back to the recovery thread.

// no local file to clean up
}
} else {
future.cancel(true);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CompletableFuture.cancel(true) flips the future state to CANCELLED but does not propagate an interrupt to the underlying supplyAsync task. The worker keeps running, finishes the download, and writes the file to disk after we thought we cancelled. Even when cancel(true) returns true, the task may still complete. The isDone() check added here only handles the "already-completed when drained" case.

Switch to executor.submit() and store Future in PrefetchEntry, then FutureTask.cancel(true) actually interrupts the worker

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced supplyAsync with downloadExecutor.submit(Callable) and store the Future in the slot array. cancel(true) on the FutureTask now actually interrupts the worker.

// exceptionally (an interrupted Thread.sleep surfaces as InterruptedException).
for (CompletableFuture<File> f : inflight) {
assertThat(f.isDone()).isTrue();
assertThat(f.isCancelled() || f.isCompletedExceptionally()).isTrue();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

f.isCancelled() is just API state and says nothing about whether the download stopped. After the submit/Future switch, rewrite using smth like PR #2786's BlockingFileDownloader + latch pattern to assert real interruption.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Added testCloseInterruptsOngoingDownload using PR #2786's barrier+latch pattern. The release latch is never counted down by the test, so the only way finished.countDown() can fire is if Thread.interrupt() reached the worker inside the barrier. assertThat(finished.await(10s)).isTrue() is the actual interruption proof — isCancelled() is kept only as a secondary API-state check.

* <p><b>Note:</b> This class is NOT thread-safe. Each instance should be used by a single thread
* only.
*/
@NotThreadSafe
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NotThreadSafe plus volatile plus async workers is a contradiction. KV recovery is single-threaded per bucket in practice, so we might wish to prefer single-threaded usage here. Also with this hybrid design I believe we have possible race condition in ArrayDeque

WDYT?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Removed volatile/synchronized/Concurrent*/Semaphore/ArrayDeque. The consumer is now strictly single-threaded; the only cross-thread handoff is the per-slot Future, whose visibility is guaranteed by Future.get().

* respectively. When a prefetched segment is consumed, its permit is released and the fetcher
* immediately tries to start the next download, giving the window the behavior of Rust's RAII
* {@code PrefetchPermit} + recycle-notify pattern in <a
* href="https://github.com/apache/fluss-rust/pull/187">fluss-rust PR#187</a>.
Copy link
Copy Markdown
Member

@fresh-borzoni fresh-borzoni Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't reference fluss-rust here - cross-repo Javadoc links go stale(especially bc fluss-rust is planned to be ported to this repo), and the description should stand on its own.

Also: this pattern shines in async Rust where awaiting tasks need explicit notification on permit release (and RAII gives you deterministic permit drop). Java's sync consumer makes the permit-recycle machinery redundant - a Future[] indexed by seq % depth is fine as well.

Not a blocker, just worth considering.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right.
Removed the fluss-rust reference and RAII wording from Javadoc. Window is now a fixed Future[] prefetchSlots indexed by seq % prefetchNum.

@Kaixuan-Duan
Copy link
Copy Markdown
Contributor Author

@fresh-borzoni Thank you for the review. I have already modified it. PTAL

Copy link
Copy Markdown
Member

@fresh-borzoni fresh-borzoni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Kaixuan-Duan Ty for the fixes, overall LGTM, left a couple of comments, PTAL

@swuferhong Can you take a look as well?

}

@Test
void testPrefetchFailureFallsBackToSyncDownload() throws Exception {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we use fetchLogDataBarrier hook as well here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, thanks for the feedback.

}

@Test
void testCloseCancelsPendingPrefetchFuture() throws Exception {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}

@Test
void testCloseCancelsAllPendingPrefetchFutures() throws Exception {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}
}

// Defensive fallback: this branch is logically unreachable in production.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once remaining tests migrate to the fetchLogDataBarrier pattern, this defensive branch and injectPrefetchEntryForTest can both be deleted:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, thanks for the feedback.

return;
}
} else {
future.cancel(true);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a race: future completes between our isDone() check and cancel(true).
Cancel returns false, but we ignore the return - the file's already on disk and we never call get() to retrieve it.

Fix: if cancel returns false, we need to cleanupUnusedPrefetchedFile(future.get()) almost the similar pattern as above

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — extracted a cleanupCompletedFuture(Future<File>) helper, used by both the isDone() branch and the !cancel(true) fallback. No more silent leaks when the task completes between the two checks.

throw new IOException(
"Failed to download remote log segment: " + segment.remoteLogSegmentId(), e);
} finally {
if (!success) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's tighten: || Thread.currentThread().isInterrupted() to prevent that most network/file reads don't honor interrupts mid-syscall, so worth to check it

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — added || Thread.currentThread().isInterrupted() to the finally. Drops partial/interrupted temp files that IOUtils.copyBytes would otherwise return as success=true.

@Kaixuan-Duan
Copy link
Copy Markdown
Contributor Author

@fresh-borzoni Thank you for the review. Addressed, PTAL

@Kaixuan-Duan
Copy link
Copy Markdown
Contributor Author

@fresh-borzoni @swuferhong Hi, could you please help review this PR when you have time?
Thanks!

Copy link
Copy Markdown
Member

@fresh-borzoni fresh-borzoni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Kaixuan-Duan Sorry, I missed this, Ty for the changes, left some comments, PTAL

}
}

private void closeCurrentFileLogRecords() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new config bounds “downloaded but not yet consumed” segments, but consumed segment files are not deleted when we move to the next segment. closeCurrentFileLogRecords() closes FileLogRecords, but the underlying .log file remains in tempDir until fetcher close.

Could we track the currently opened local segment file and delete it after closing FileLogRecords? Something like currentLocalFile, set after fetchSegmentFile(), then delete it in closeCurrentFileLogRecords().

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. Added currentLocalFile field to track the currently opened segment file; closeCurrentFileLogRecords() now deletes it after closing FileLogRecords.

e);
}

if (terminated) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cleanup path seems risky in two ways.

First, if awaitTermination() times out, we skip cleanup entirely, so one uncooperative download can leave the recovery directory behind.

Second, tempDir.getParent() is the shared tmp/ directory under the log dir, not just this fetcher's recovery directory. That is broader than what this RemoteLogFetcher owns.

Could we delete tempDir itself, not its parent, and attempt that best-effort regardless of whether the executor terminated? If deletion fails because a file is still open, logging that is fine, but I don't think we should skip the cleanup attempt.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Removed tempDir.getParent() deletion; now close() always attempts best-effort cleanup of its own UUID subdirectory regardless of executor termination, with a warning log if files are still in use.

// the ring head is *always* the segment advance() is asking for.
int headSlot = nextConsumeIndex % prefetchNum;
RemoteLogSegment headSegment = prefetchSegments[headSlot];
assert isSameSegmentId(segment, headSegment)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This invariant is important enough that I don't think assert is the right guard. Assertions are normally disabled in production.
Could we make this an explicit runtime check, drain the prefetch window, and throw IOException with the requested/head segment ids?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Fixed.

// success=true while the worker has been interrupted by close()/cancel(true).
// Treat "interrupt observed during the copy" as a failure for cleanup purposes
// so we don't leave a stale segment file behind in tempDir.
if (!success || Thread.currentThread().isInterrupted()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In downloadSegment(), if the copy succeeds but the thread has been interrupted, the finally block deletes localFile but the method can still return that File object.

Usually Future.cancel(true) masks this because the Future is cancelled, but interrupt races can still make this path observable. Could we throw after deleting on the interrupted path rather than returning a file that may no longer exist?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Added || Thread.currentThread().isInterrupted() to the finally block: if the copy succeeds but the thread was interrupted, the file is deleted and IOException is thrown instead of returning a dangling reference.

@fresh-borzoni
Copy link
Copy Markdown
Member

@Kaixuan-Duan Can you rebase as well, pls?

segment.remoteLogSegmentId(),
e);
return downloadSegmentWithRetry(segment);
} catch (ExecutionException e) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sync fallback retry in fetchSegmentFile is now a leftover from when retries lived only in the sync path. Could we drop the catch retry call and just propagate the failure?
The async path already exhausts the retry budget

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right — the async worker already exhausts retries via downloadSegmentWithRetry(). Removed the redundant sync retry in catch (ExecutionException); now propagates directly as IOException.

@Kaixuan-Duan Kaixuan-Duan force-pushed the remote-log-fetcher-prefetch branch from f66bca7 to c6ef6dd Compare May 12, 2026 05:47
@Kaixuan-Duan
Copy link
Copy Markdown
Contributor Author

@fresh-borzoni Thank you for the review! All 5 comments addressed.
Also rebased onto latest main. PTAL

Copy link
Copy Markdown
Member

@fresh-borzoni fresh-borzoni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Kaixuan-Duan Ty for the changes, LGTM.
Can you please, update docs as well, as we introduced new config options?

@Kaixuan-Duan
Copy link
Copy Markdown
Contributor Author

@fresh-borzoni Thanks for the review! I've updated the docs. PTAL

@fresh-borzoni
Copy link
Copy Markdown
Member

@Kaixuan-Duan Thank you 👍

@loserwang1024 @swuferhong Can you take a quick look when you have time?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[server] RemoteLogFetcher optimize to async downloading

2 participants