[client] Refactor RemoteLogDownloader to use chunked file append instead of downloading whole log file#3263
Conversation
…ead of downloading whole log file
fresh-borzoni
left a comment
There was a problem hiding this comment.
@swuferhong Ty for the PR, that's an big improvement!
Now I want to backport it to rust part as well, I left some comments.
I hope you find them useful, PTAL
| /** | ||
| * Tests chunked download with a small chunk size, verifying that: 1. A large segment is split | ||
| * into multiple chunks. 2. Chunks can be consumed while subsequent chunks are still being | ||
| * downloaded (边读边下载). 3. Flow control (maxPrefetchChunks) pauses downloading when unconsumed |
There was a problem hiding this comment.
nit: mix of languages here
| * Request to read a remote log segment in chunks starting from the given position. This method | ||
| * is non-blocking and returns a future for the first chunk. | ||
| */ | ||
| public RemoteLogDownloadFuture requestRemoteLog( |
There was a problem hiding this comment.
I think we introduced a race:
requestRemoteLog() adds the request to segmentsToFetch and returns. The caller then installs the next-chunk callback on the returned future. But the download thread is already running, so it can poll the request, read chunk 1, and call tryScheduleNextChunk(), which builds chunk 2's future by copying request.downloadFuture.getNextChunkCallback(), still null at that moment. Chunk 2 is read and completed, but nothing is listening,so the bucket silently stops at chunk 1.
Probably we can install the callback before publishing the request
|
|
||
| private final CompletableFuture<LogRecords> chunkFuture; | ||
| private final Runnable recycleCallback; | ||
| private Consumer<RemoteLogDownloadFuture> nextChunkCallback; |
| } | ||
|
|
||
| @Override | ||
| public void close() throws IOException { |
There was a problem hiding this comment.
A request that's been polled and partially read but is paused is in neither segmentsToFetch nor continuationQueue.
close() only walks those two queues, so the open FSDataInputStream and the remote-chunk tmp file are left behind.
| + "A larger chunk size reduces the number of remote I/O requests but " | ||
| + "increases memory usage per chunk read. The default setting is 8MB."); | ||
|
|
||
| public static final ConfigOption<Integer> CLIENT_SCANNER_REMOTE_LOG_MAX_PREFETCH_CHUNKS = |
There was a problem hiding this comment.
I wonder what will happen if it's 0?
| long startTime = System.currentTimeMillis(); | ||
| // download the remote file to local | ||
| remoteFileDownloader | ||
| .downloadFileAsync(fsPathAndFileName, localLogDir) |
There was a problem hiding this comment.
The old fetchOnce dispatched the download to RemoteFileDownloader's thread pool (default 3) and returned immediately, so multiple segments could be downloading at once. The new processChunkRead does the fs.open + read inline on the dispatcher, so only one chunk is ever in flight.
this looks like it serializes remote reads, is it intentional or I'm missing smth?
| // If toCompletedFetch() fails (e.g. the underlying chunk | ||
| // future completed exceptionally), discard this entry so | ||
| // the queue is not blocked. The bucket will become fetchable | ||
| // again and the server can re-issue a remote fetch. |
There was a problem hiding this comment.
nit: I'm not sure: what if the issue is permanent? It would be a infitite loop with only warning. Should we have retry with backoff and then surface an error in the scanner?
|
|
||
| TableBucket tb = new TableBucket(DATA1_TABLE_ID, 0); | ||
| // Build a large segment with multiple records so multiple chunks are produced. | ||
| List<RemoteLogSegment> remoteLogSegments = |
There was a problem hiding this comment.
It doesn't look big or at least >8mb chunk size:
Purpose
Linked issue: close #3262
Refactor
RemoteLogDownloaderfrom whole-file download to chunked streaming I/O. This significantly reduces time-to-first-byte latency, saves bandwidth when consumers stop mid-segment, and adds fine-grained flow control at the chunk level.Brief change log
Tests
API and Format
Documentation