Skip to content

[client] Refactor RemoteLogDownloader to use chunked file append instead of downloading whole log file#3263

Open
swuferhong wants to merge 1 commit into
apache:mainfrom
swuferhong:remote-log-download-slice
Open

[client] Refactor RemoteLogDownloader to use chunked file append instead of downloading whole log file#3263
swuferhong wants to merge 1 commit into
apache:mainfrom
swuferhong:remote-log-download-slice

Conversation

@swuferhong
Copy link
Copy Markdown
Contributor

Purpose

Linked issue: close #3262

Refactor RemoteLogDownloader from whole-file download to chunked streaming I/O. This significantly reduces time-to-first-byte latency, saves bandwidth when consumers stop mid-segment, and adds fine-grained flow control at the chunk level.

Brief change log

Tests

API and Format

Documentation

Copy link
Copy Markdown
Member

@fresh-borzoni fresh-borzoni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@swuferhong Ty for the PR, that's an big improvement!

Now I want to backport it to rust part as well, I left some comments.
I hope you find them useful, PTAL

/**
* Tests chunked download with a small chunk size, verifying that: 1. A large segment is split
* into multiple chunks. 2. Chunks can be consumed while subsequent chunks are still being
* downloaded (边读边下载). 3. Flow control (maxPrefetchChunks) pauses downloading when unconsumed
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: mix of languages here

* Request to read a remote log segment in chunks starting from the given position. This method
* is non-blocking and returns a future for the first chunk.
*/
public RemoteLogDownloadFuture requestRemoteLog(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we introduced a race:
requestRemoteLog() adds the request to segmentsToFetch and returns. The caller then installs the next-chunk callback on the returned future. But the download thread is already running, so it can poll the request, read chunk 1, and call tryScheduleNextChunk(), which builds chunk 2's future by copying request.downloadFuture.getNextChunkCallback(), still null at that moment. Chunk 2 is read and completed, but nothing is listening,so the bucket silently stops at chunk 1.
Probably we can install the callback before publishing the request


private final CompletableFuture<LogRecords> chunkFuture;
private final Runnable recycleCallback;
private Consumer<RemoteLogDownloadFuture> nextChunkCallback;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

volatile?

}

@Override
public void close() throws IOException {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A request that's been polled and partially read but is paused is in neither segmentsToFetch nor continuationQueue.

close() only walks those two queues, so the open FSDataInputStream and the remote-chunk tmp file are left behind.

+ "A larger chunk size reduces the number of remote I/O requests but "
+ "increases memory usage per chunk read. The default setting is 8MB.");

public static final ConfigOption<Integer> CLIENT_SCANNER_REMOTE_LOG_MAX_PREFETCH_CHUNKS =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what will happen if it's 0?

long startTime = System.currentTimeMillis();
// download the remote file to local
remoteFileDownloader
.downloadFileAsync(fsPathAndFileName, localLogDir)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old fetchOnce dispatched the download to RemoteFileDownloader's thread pool (default 3) and returned immediately, so multiple segments could be downloading at once. The new processChunkRead does the fs.open + read inline on the dispatcher, so only one chunk is ever in flight.

this looks like it serializes remote reads, is it intentional or I'm missing smth?

// If toCompletedFetch() fails (e.g. the underlying chunk
// future completed exceptionally), discard this entry so
// the queue is not blocked. The bucket will become fetchable
// again and the server can re-issue a remote fetch.
Copy link
Copy Markdown
Member

@fresh-borzoni fresh-borzoni May 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'm not sure: what if the issue is permanent? It would be a infitite loop with only warning. Should we have retry with backoff and then surface an error in the scanner?


TableBucket tb = new TableBucket(DATA1_TABLE_ID, 0);
// Build a large segment with multiple records so multiple chunks are produced.
List<RemoteLogSegment> remoteLogSegments =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't look big or at least >8mb chunk size:

public static final List<Object[]> DATA1 =

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[client] Refactor RemoteLogDownloader to use chunked streaming instead of downloading whole segment file

2 participants