diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java index 233d2d358d..0ca7b84935 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java @@ -21,6 +21,7 @@ import org.apache.fluss.client.table.scanner.batch.BatchScanner; import org.apache.fluss.client.table.scanner.log.LogScanner; import org.apache.fluss.client.table.scanner.log.TypedLogScanner; +import org.apache.fluss.lake.source.LakeSource; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.predicate.Predicate; @@ -72,6 +73,8 @@ public interface Scan { */ Scan filter(@Nullable Predicate predicate); + Scan lakeSource(@Nullable LakeSource lakeSource); + /** * Creates a {@link LogScanner} to continuously read log data for this scan. * diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java index 6502b7946f..b737fe7ca0 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java @@ -30,6 +30,7 @@ import org.apache.fluss.client.table.scanner.log.TypedLogScannerImpl; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.lake.source.LakeSource; import org.apache.fluss.metadata.LogFormat; import org.apache.fluss.metadata.PartitionInfo; import org.apache.fluss.metadata.SchemaGetter; @@ -60,8 +61,11 @@ public class TableScan implements Scan { /** The record batch filter to apply. No filter if is null. */ @Nullable private final Predicate recordBatchFilter; + /** The lake source to do the fallback read. No lake source if is null. */ + @Nullable private final LakeSource lakeSource; + public TableScan(FlussConnection conn, TableInfo tableInfo, SchemaGetter schemaGetter) { - this(conn, tableInfo, schemaGetter, null, null, null); + this(conn, tableInfo, schemaGetter, null, null, null, null); } private TableScan( @@ -70,19 +74,27 @@ private TableScan( SchemaGetter schemaGetter, @Nullable int[] projectedColumns, @Nullable Integer limit, - @Nullable Predicate recordBatchFilter) { + @Nullable Predicate recordBatchFilter, + @Nullable LakeSource lakeSource) { this.conn = conn; this.tableInfo = tableInfo; this.projectedColumns = projectedColumns; this.limit = limit; this.schemaGetter = schemaGetter; this.recordBatchFilter = recordBatchFilter; + this.lakeSource = lakeSource; } @Override public Scan project(@Nullable int[] projectedColumns) { return new TableScan( - conn, tableInfo, schemaGetter, projectedColumns, limit, recordBatchFilter); + conn, + tableInfo, + schemaGetter, + projectedColumns, + limit, + recordBatchFilter, + lakeSource); } @Override @@ -102,18 +114,37 @@ public Scan project(List projectedColumnNames) { columnIndexes[i] = index; } return new TableScan( - conn, tableInfo, schemaGetter, columnIndexes, limit, recordBatchFilter); + conn, tableInfo, schemaGetter, columnIndexes, limit, recordBatchFilter, lakeSource); } @Override public Scan limit(int rowNumber) { return new TableScan( - conn, tableInfo, schemaGetter, projectedColumns, rowNumber, recordBatchFilter); + conn, + tableInfo, + schemaGetter, + projectedColumns, + rowNumber, + recordBatchFilter, + lakeSource); } @Override public Scan filter(@Nullable Predicate predicate) { - return new TableScan(conn, tableInfo, schemaGetter, projectedColumns, limit, predicate); + return new TableScan( + conn, tableInfo, schemaGetter, projectedColumns, limit, predicate, lakeSource); + } + + @Override + public Scan lakeSource(@Nullable LakeSource lakeSource) { + return new TableScan( + conn, + tableInfo, + schemaGetter, + projectedColumns, + limit, + recordBatchFilter, + lakeSource); } @Override @@ -140,6 +171,7 @@ public LogScanner createLogScanner() { conn.getMetadataUpdater(), conn.getClientMetricGroup(), conn.getOrCreateRemoteFileDownloader(), + lakeSource, projectedColumns, schemaGetter, recordBatchFilter); diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LakeCompletedFetch.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LakeCompletedFetch.java new file mode 100644 index 0000000000..249c2665e8 --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LakeCompletedFetch.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner.log; + +import org.apache.fluss.client.table.scanner.ScanRecord; +import org.apache.fluss.exception.FetchException; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.record.LogRecord; +import org.apache.fluss.record.LogRecordReadContext; +import org.apache.fluss.rpc.protocol.ApiError; +import org.apache.fluss.utils.CloseableIterator; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** A completed fetch backed by log records read from a lake snapshot. */ +class LakeCompletedFetch extends CompletedFetch { + + private final CloseableIterator records; + private final long endOffset; + + private long nextFetchOffset; + private boolean consumed; + + LakeCompletedFetch( + TableBucket tableBucket, + CloseableIterator records, + long fetchOffset, + long endOffset, + long highWatermark, + LogRecordReadContext readContext, + LogScannerStatus logScannerStatus) { + super( + tableBucket, + ApiError.NONE, + 0, + highWatermark, + Collections.emptyIterator(), + readContext, + logScannerStatus, + false, + fetchOffset, + NO_FILTERED_END_OFFSET); + this.records = records; + this.nextFetchOffset = fetchOffset; + this.endOffset = endOffset; + } + + @Override + boolean isConsumed() { + return consumed; + } + + @Override + long nextFetchOffset() { + return nextFetchOffset; + } + + @Override + void drain() { + if (!consumed) { + records.close(); + consumed = true; + } + super.drain(); + } + + @Override + public List fetchRecords(int maxRecords) { + if (consumed) { + return Collections.emptyList(); + } + + List scanRecords = new ArrayList<>(); + try { + for (int i = 0; i < maxRecords && records.hasNext(); ) { + LogRecord record = records.next(); + if (record.logOffset() < nextFetchOffset) { + continue; + } + if (record.logOffset() >= endOffset) { + nextFetchOffset = Math.max(nextFetchOffset, endOffset); + drain(); + break; + } + + scanRecords.add(toScanRecord(record)); + nextFetchOffset = record.logOffset() + 1; + i++; + } + + if (!records.hasNext()) { + nextFetchOffset = Math.max(nextFetchOffset, endOffset); + drain(); + } + } catch (Exception e) { + if (scanRecords.isEmpty()) { + throw new FetchException( + "Received exception when fetching lake records from " + tableBucket, e); + } + } + + return scanRecords; + } +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java index 2625716cde..4723e3b6f9 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java @@ -26,17 +26,24 @@ import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.ApiException; +import org.apache.fluss.exception.FlussRuntimeException; import org.apache.fluss.exception.InvalidMetadataException; import org.apache.fluss.exception.LeaderNotAvailableException; import org.apache.fluss.exception.PartitionNotExistException; import org.apache.fluss.fs.FsPath; +import org.apache.fluss.lake.source.LakeLogFetchInfo; +import org.apache.fluss.lake.source.LakeSource; +import org.apache.fluss.lake.source.LakeSplit; +import org.apache.fluss.lake.source.RecordReader; import org.apache.fluss.metadata.PhysicalTablePath; +import org.apache.fluss.metadata.ResolvedPartitionSpec; import org.apache.fluss.metadata.SchemaGetter; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePartition; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.predicate.Predicate; +import org.apache.fluss.record.LogRecord; import org.apache.fluss.record.LogRecordReadContext; import org.apache.fluss.record.LogRecords; import org.apache.fluss.record.MemoryLogRecords; @@ -55,6 +62,7 @@ import org.apache.fluss.rpc.util.PredicateMessageUtils; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ChunkedAllocationManager; import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.fluss.utils.CloseableIterator; import org.apache.fluss.utils.IOUtils; import org.apache.fluss.utils.Projection; @@ -70,6 +78,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -108,6 +117,7 @@ public class LogFetcher implements Closeable { private final LogFetchBuffer logFetchBuffer; private final LogFetchCollector logFetchCollector; private final RemoteLogDownloader remoteLogDownloader; + @Nullable private final LakeSource lakeSource; @GuardedBy("this") private final Set nodesWithPendingFetchRequests; @@ -127,6 +137,7 @@ public LogFetcher( MetadataUpdater metadataUpdater, ScannerMetricGroup scannerMetricGroup, RemoteFileDownloader remoteFileDownloader, + @Nullable LakeSource lakeSource, SchemaGetter schemaGetter) { this.tablePath = tableInfo.getTablePath(); this.isPartitioned = tableInfo.isPartitioned(); @@ -165,6 +176,7 @@ public LogFetcher( this.scannerMetricGroup = scannerMetricGroup; this.remoteLogDownloader = new RemoteLogDownloader(tablePath, conf, remoteFileDownloader, scannerMetricGroup); + this.lakeSource = lakeSource; remoteLogDownloader.start(); } @@ -404,6 +416,12 @@ private synchronized void handleFetchLogResponse( fetchResultForBucket.remoteLogFetchInfo(), fetchOffset, fetchResultForBucket.getHighWatermark()); + } else if (fetchResultForBucket.fetchFromLake()) { + addLakeFetches( + tb, + fetchResultForBucket.lakeLogFetchInfo(), + fetchOffset, + fetchResultForBucket.getHighWatermark()); } else { LogRecords logRecords = fetchResultForBucket.recordsOrEmpty(); boolean hasRecords = !MemoryLogRecords.EMPTY.equals(logRecords); @@ -496,6 +514,106 @@ private void pendRemoteFetches( } } + private void addLakeFetches( + TableBucket tableBucket, + LakeLogFetchInfo lakeLogFetchInfo, + long firstFetchOffset, + long highWatermark) { + checkNotNull(lakeLogFetchInfo, "LakeLogFetchInfo is null"); + try { + List lakeSplits = planLakeSplits(tableBucket, lakeLogFetchInfo); + logFetchBuffer.add( + new LakeCompletedFetch( + tableBucket, + createLakeRecordIterator(lakeSplits), + firstFetchOffset, + lakeLogFetchInfo.endOffset(), + highWatermark, + readContext, + logScannerStatus)); + } catch (Exception e) { + throw new FlussRuntimeException(e); + } + } + + private List planLakeSplits( + TableBucket tableBucket, LakeLogFetchInfo lakeLogFetchInfo) throws IOException { + List plannedSplits = + checkNotNull(lakeSource).createPlanner(() -> lakeLogFetchInfo.snapshotId()).plan(); + List lakeSplits = new ArrayList<>(); + for (LakeSplit lakeSplit : plannedSplits) { + // todo: partition + bucket filter and offset column filter could be supported in the + // planning stage + if (isLakeSplitForBucket(lakeSplit, tableBucket, lakeLogFetchInfo.partitionName())) { + lakeSplits.add(lakeSplit); + } + } + return lakeSplits; + } + + private boolean isLakeSplitForBucket( + LakeSplit lakeSplit, TableBucket tableBucket, @Nullable String partitionName) { + if (lakeSplit.bucket() != tableBucket.getBucket()) { + return false; + } + if (partitionName == null) { + return lakeSplit.partition() == null || lakeSplit.partition().isEmpty(); + } + return partitionName.equals( + String.join(ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR, lakeSplit.partition())); + } + + private CloseableIterator createLakeRecordIterator(List lakeSplits) + throws IOException { + List> iterators = new ArrayList<>(lakeSplits.size()); + try { + for (LakeSplit lakeSplit : lakeSplits) { + RecordReader recordReader = + checkNotNull(lakeSource) + .createRecordReader( + (LakeSource.ReaderContext) () -> lakeSplit); + iterators.add(recordReader.read()); + } + return new CompositeCloseableIterator<>(iterators); + } catch (IOException | RuntimeException e) { + IOUtils.closeAllQuietly(iterators); + throw e; + } + } + + private static class CompositeCloseableIterator implements CloseableIterator { + + private final List> iterators; + private final Iterator> iterator; + private CloseableIterator currentIterator; + + private CompositeCloseableIterator(List> iterators) { + this.iterators = iterators; + this.iterator = iterators.iterator(); + } + + @Override + public boolean hasNext() { + while ((currentIterator == null || !currentIterator.hasNext()) && iterator.hasNext()) { + if (currentIterator != null) { + currentIterator.close(); + } + currentIterator = iterator.next(); + } + return currentIterator != null && currentIterator.hasNext(); + } + + @Override + public T next() { + return currentIterator.next(); + } + + @Override + public void close() { + IOUtils.closeAllQuietly(iterators); + } + } + @VisibleForTesting Map prepareFetchLogRequests() { Map> fetchLogReqForBuckets = new HashMap<>(); diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java index 9a2dbf0b4c..cbe0ff5b64 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java @@ -23,6 +23,8 @@ import org.apache.fluss.client.table.scanner.RemoteFileDownloader; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.WakeupException; +import org.apache.fluss.lake.source.LakeSource; +import org.apache.fluss.lake.source.LakeSplit; import org.apache.fluss.metadata.SchemaGetter; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableInfo; @@ -81,6 +83,7 @@ public LogScannerImpl( MetadataUpdater metadataUpdater, ClientMetricGroup clientMetricGroup, RemoteFileDownloader remoteFileDownloader, + @Nullable LakeSource lakeSource, @Nullable int[] projectedFields, SchemaGetter schemaGetter, @Nullable Predicate recordBatchFilter) { @@ -103,6 +106,7 @@ public LogScannerImpl( metadataUpdater, scannerMetricGroup, remoteFileDownloader, + lakeSource, schemaGetter); } diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LakeCompletedFetchTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LakeCompletedFetchTest.java new file mode 100644 index 0000000000..a8c975c297 --- /dev/null +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LakeCompletedFetchTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner.log; + +import org.apache.fluss.client.metadata.TestingClientSchemaGetter; +import org.apache.fluss.client.metadata.TestingMetadataUpdater; +import org.apache.fluss.client.table.scanner.ScanRecord; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metadata.SchemaInfo; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.record.ChangeType; +import org.apache.fluss.record.LogRecord; +import org.apache.fluss.record.LogRecordReadContext; +import org.apache.fluss.row.GenericRow; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ChunkedAllocationManager; +import org.apache.fluss.utils.CloseableIterator; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.fluss.record.TestData.DATA1_SCHEMA; +import static org.apache.fluss.record.TestData.DATA1_TABLE_ID; +import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO; +import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link LakeCompletedFetch}. */ +class LakeCompletedFetchTest { + + @Test + void testFilterLakeRecordsByRequestedOffsetRange() { + TableBucket tableBucket = new TableBucket(DATA1_TABLE_ID, 0); + LogScannerStatus status = new LogScannerStatus(); + status.assignScanBuckets(Collections.singletonMap(tableBucket, 5L)); + + List records = Arrays.asList(record(3L), record(5L), record(6L), record(7L)); + LogRecordReadContext readContext = + LogRecordReadContext.createReadContext( + DATA1_TABLE_INFO, + false, + null, + new TestingClientSchemaGetter( + DATA1_TABLE_PATH, + new SchemaInfo(DATA1_SCHEMA, 1), + new TestingMetadataUpdater( + Collections.singletonMap( + DATA1_TABLE_PATH, DATA1_TABLE_INFO)), + new Configuration()), + new ChunkedAllocationManager.ChunkedFactory()); + + LakeCompletedFetch fetch = + new LakeCompletedFetch( + tableBucket, + CloseableIterator.wrap(records.iterator()), + 5L, + 7L, + 10L, + readContext, + status); + + List scanRecords = fetch.fetchRecords(10); + + assertThat(scanRecords).extracting(ScanRecord::logOffset).containsExactly(5L, 6L); + assertThat(fetch.nextFetchOffset()).isEqualTo(7L); + assertThat(fetch.isConsumed()).isTrue(); + } + + private static ScanRecord record(long offset) { + return new ScanRecord(offset, offset, ChangeType.APPEND_ONLY, new GenericRow(2)); + } +} diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherFilterITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherFilterITCase.java index ac64fbdbdb..f38c614a77 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherFilterITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherFilterITCase.java @@ -117,6 +117,7 @@ protected void setup() throws Exception { metadataUpdater, scannerMetricGroup, new RemoteFileDownloader(1), + null, TEST_SCHEMA_GETTER); } @@ -439,6 +440,7 @@ private LogFetcher createFetcherWithBuckets(Map scanBuckets) metadataUpdater, scannerMetricGroup, new RemoteFileDownloader(1), + null, TEST_SCHEMA_GETTER); } diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java index 50addfcfe0..cecc14639e 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherITCase.java @@ -109,6 +109,7 @@ protected void setup() throws Exception { metadataUpdater, TestingScannerMetricGroup.newInstance(), new RemoteFileDownloader(1), + null, clientSchemaGetter); } @@ -184,6 +185,7 @@ void testFetchWithSchemaChange() throws Exception { metadataUpdater, TestingScannerMetricGroup.newInstance(), new RemoteFileDownloader(1), + null, clientSchemaGetter); newSchemaLogFetcher.sendFetches(); // The fetcher is async to fetch data, so we need to wait the result write to the @@ -283,6 +285,7 @@ void testFetchWhenDestinationIsNullInMetadata() throws Exception { metadataUpdater, TestingScannerMetricGroup.newInstance(), new RemoteFileDownloader(1), + null, clientSchemaGetter); // send fetches to fetch data, should have no available fetch. @@ -324,6 +327,7 @@ void testFetchWithInvalidTableOrPartitions() throws Exception { metadataUpdater1, TestingScannerMetricGroup.newInstance(), new RemoteFileDownloader(1), + null, clientSchemaGetter); ExecutorService executor = Executors.newSingleThreadExecutor(); diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java index 576eab5dae..e7d19a982d 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherTest.java @@ -85,6 +85,7 @@ public void setup() { metadataUpdater, TestingScannerMetricGroup.newInstance(), new RemoteFileDownloader(1), + null, clientSchemaGetter); } diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogScannerITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogScannerITCase.java index cfe215fb0b..ca22b28dc5 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogScannerITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogScannerITCase.java @@ -18,26 +18,46 @@ package org.apache.fluss.client.table.scanner.log; import org.apache.fluss.client.admin.ClientToServerITCaseBase; +import org.apache.fluss.client.admin.OffsetSpec; import org.apache.fluss.client.table.Table; import org.apache.fluss.client.table.scanner.ScanRecord; import org.apache.fluss.client.table.writer.AppendWriter; import org.apache.fluss.client.table.writer.UpsertWriter; +import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.exception.FetchException; +import org.apache.fluss.lake.source.LakeSource; +import org.apache.fluss.lake.source.LakeSplit; +import org.apache.fluss.lake.source.RecordReader; +import org.apache.fluss.lake.source.TestingLakeSource; +import org.apache.fluss.metadata.PartitionInfo; +import org.apache.fluss.metadata.ResolvedPartitionSpec; import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.record.ChangeType; +import org.apache.fluss.record.LogRecord; +import org.apache.fluss.record.TestData; import org.apache.fluss.row.GenericRow; import org.apache.fluss.row.InternalRow; +import org.apache.fluss.rpc.gateway.CoordinatorGateway; +import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest; +import org.apache.fluss.rpc.messages.PbLakeTableOffsetForBucket; +import org.apache.fluss.rpc.messages.PbLakeTableSnapshotInfo; +import org.apache.fluss.server.log.LogTablet; +import org.apache.fluss.server.replica.Replica; import org.apache.fluss.types.DataTypes; import org.apache.fluss.types.RowType; +import org.apache.fluss.utils.CloseableIterator; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import java.io.IOException; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -49,6 +69,7 @@ import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR; import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH; import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.apache.fluss.testutils.common.CommonTestUtils.retry; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -467,4 +488,135 @@ void testSubscribeOutOfRangeLog() throws Exception { } } } + + /** + * When local log no longer contains the subscribed offset but the bucket can read from lake, + * the tablet returns lake fetch metadata after {@code LogOffsetOutOfRangeException}; the client + * then reads via {@link LakeCompletedFetch} using a {@link TestingLakeSource}-based + * implementation (split planning from the parent, non-empty records from the override). + */ + @Test + void testReadFromLakeAfterLocalLogOutOfRange() throws Exception { + TablePath tablePath = TablePath.of("test_db_1", "test_log_scanner_lake_oore_fallback"); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(DATA1_SCHEMA) + .distributedBy(1, "a") + .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true") + .build(); + long tableId = createTable(tablePath, tableDescriptor, false); + FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId); + + TableBucket tb = new TableBucket(tableId, 0); + FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb); + + try (Table writeTable = conn.getTable(tablePath)) { + AppendWriter appendWriter = writeTable.newAppend().createWriter(); + for (Object[] tuple : TestData.DATA1) { + appendWriter.append(row((Integer) tuple[0], (String) tuple[1])).get(); + } + appendWriter.flush(); + } + + Map latestOffsets = + admin.listOffsets( + tablePath, + Collections.singletonList(0), + new OffsetSpec.LatestSpec()) + .all() + .get(); + long lakeLogEndOffset = latestOffsets.get(0); + long snapshotId = 1L; + CoordinatorGateway coordinator = FLUSS_CLUSTER_EXTENSION.newCoordinatorClient(); + coordinator + .commitLakeTableSnapshot( + newCommitLakeTableSnapshotRequest(tableId, snapshotId, lakeLogEndOffset)) + .get(); + + retry( + Duration.ofMinutes(2), + () -> { + Replica leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb); + LogTablet logTablet = leader.getLogTablet(); + assertThat(logTablet.canFetchFromLakeLog(0L)).isTrue(); + assertThat(logTablet.getLakeLogEndOffset()).isEqualTo(lakeLogEndOffset); + }); + + Replica leaderReplica = FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb); + leaderReplica.truncateFullyAndStartAt(lakeLogEndOffset); + + LakeSource lakeSource = new DataLogTestingLakeSource(); + + List actual = new ArrayList<>(); + try (Table table = conn.getTable(tablePath); + LogScanner logScanner = table.newScan().lakeSource(lakeSource).createLogScanner()) { + logScanner.subscribeFromBeginning(0); + while (actual.size() < TestData.DATA1.size()) { + ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(5)); + for (ScanRecord scanRecord : scanRecords) { + assertThat(scanRecord.getChangeType()).isEqualTo(ChangeType.APPEND_ONLY); + InternalRow internalRow = scanRecord.getRow(); + actual.add(row(internalRow.getInt(0), internalRow.getString(1))); + } + } + } + + List expected = new ArrayList<>(); + for (Object[] tuple : TestData.DATA1) { + expected.add(row((Integer) tuple[0], (String) tuple[1])); + } + assertThat(actual).containsExactlyInAnyOrderElementsOf(expected); + } + + private static CommitLakeTableSnapshotRequest newCommitLakeTableSnapshotRequest( + long tableId, long snapshotId, long logEndOffset) { + CommitLakeTableSnapshotRequest request = new CommitLakeTableSnapshotRequest(); + PbLakeTableSnapshotInfo tableReq = request.addTablesReq(); + tableReq.setTableId(tableId); + tableReq.setSnapshotId(snapshotId); + PbLakeTableOffsetForBucket bucketReq = tableReq.addBucketsReq(); + bucketReq.setBucketId(0); + bucketReq.setLogEndOffset(logEndOffset); + bucketReq.setMaxTimestamp(System.currentTimeMillis()); + return request; + } + + /** + * Uses {@link TestingLakeSource} for planner and split serialization; overrides {@link + * RecordReader} so the lake fallback returns the same rows as {@link TestData#DATA1}. + */ + private static final class DataLogTestingLakeSource extends TestingLakeSource { + + DataLogTestingLakeSource() { + super( + 1, + Collections.singletonList( + new PartitionInfo( + -1L, + new ResolvedPartitionSpec( + Collections.emptyList(), Collections.emptyList()), + null))); + } + + @Override + public RecordReader createRecordReader(LakeSource.ReaderContext context) + throws IOException { + return () -> lakeTierRowsAsLogIterator(); + } + + private static CloseableIterator lakeTierRowsAsLogIterator() { + List list = new ArrayList<>(); + long offset = 0L; + for (Object[] tuple : TestData.DATA1) { + list.add( + new ScanRecord( + offset, + offset, + ChangeType.APPEND_ONLY, + row((Integer) tuple[0], (String) tuple[1]))); + offset++; + } + return CloseableIterator.wrap(list.iterator()); + } + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/lake/source/LakeLogFetchInfo.java b/fluss-common/src/main/java/org/apache/fluss/lake/source/LakeLogFetchInfo.java new file mode 100644 index 0000000000..b2d6ae9a36 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/lake/source/LakeLogFetchInfo.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.source; + +import javax.annotation.Nullable; + +/** Metadata for fetching a log offset range from a readable lake snapshot. */ +public class LakeLogFetchInfo { + + private final long snapshotId; + private final @Nullable String partitionName; + private final long startOffset; + private final long endOffset; + + public LakeLogFetchInfo( + long snapshotId, @Nullable String partitionName, long startOffset, long endOffset) { + this.snapshotId = snapshotId; + this.partitionName = partitionName; + this.startOffset = startOffset; + this.endOffset = endOffset; + } + + public long snapshotId() { + return snapshotId; + } + + @Nullable + public String partitionName() { + return partitionName; + } + + public long startOffset() { + return startOffset; + } + + public long endOffset() { + return endOffset; + } + + @Override + public String toString() { + return "LakeLogFetchInfo{" + + "snapshotId=" + + snapshotId + + ", partitionName='" + + partitionName + + '\'' + + ", startOffset=" + + startOffset + + ", endOffset=" + + endOffset + + '}'; + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitter.java index ba4c9d0131..73ce1dded2 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitter.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/emitter/FlinkRecordEmitter.java @@ -23,6 +23,7 @@ import org.apache.fluss.flink.source.reader.FlinkSourceReader; import org.apache.fluss.flink.source.reader.RecordAndPos; import org.apache.fluss.flink.source.split.HybridSnapshotLogSplitState; +import org.apache.fluss.flink.source.split.LogSplitState; import org.apache.fluss.flink.source.split.SourceSplitState; import org.apache.flink.api.connector.source.SourceOutput; @@ -73,6 +74,7 @@ public void emitRecord( } processAndEmitRecord(scanRecord, sourceOutput); } else if (splitState.isLogSplitState()) { + LogSplitState logSplitState = splitState.asLogSplitState(); // Attempt to process and emit the record. // For $binlog, this returns true only when a complete row (or the final part of // a split) is emitted. @@ -83,9 +85,7 @@ public void emitRecord( // This ensures that if a crash occurs mid-update (between BEFORE and AFTER), // the source will re-read the same log offset upon recovery, // allowing the BinlogDeserializationSchema to correctly reconstruct the state. - splitState - .asLogSplitState() - .setNextOffset(recordAndPosition.record().logOffset() + 1); + logSplitState.setNextOffset(recordAndPosition.record().logOffset() + 1); } } else if (splitState.isLakeSplit()) { if (lakeRecordRecordEmitter == null) { diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/BoundedSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/BoundedSplitReader.java index 0f842da069..3ec67490ae 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/BoundedSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/BoundedSplitReader.java @@ -200,7 +200,10 @@ public boolean hasNext() { @Override public RecordAndPos next() { - recordAndPosition.setRecord(records.next(), ++currentReadRecordsCount); + recordAndPosition.setRecord( + records.next(), + ++currentReadRecordsCount, + recordAndPosition.getCurrentSplitIndex()); return recordAndPosition; } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java index 90f2b6e9a6..13314a097d 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java @@ -141,6 +141,7 @@ public FlinkSourceSplitReader( table.newScan() .project(projectedFields) .filter(logRecordBatchFilter) + .lakeSource(lakeSource) .createLogScanner(); this.stoppingOffsets = new HashMap<>(); this.emptyLogSplits = new HashSet<>(); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/BoundedSplitReaderTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/BoundedSplitReaderTest.java index 2ecb48ea19..757bd84949 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/BoundedSplitReaderTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/BoundedSplitReaderTest.java @@ -206,7 +206,10 @@ private List collectRecords(BoundedSplitReader reader) throws IOEx while (recordIter.hasNext()) { RecordAndPos recordAndPos = recordIter.next(); records.add( - new RecordAndPos(recordAndPos.scanRecord, recordAndPos.readRecordsCount)); + new RecordAndPos( + recordAndPos.scanRecord, + recordAndPos.readRecordsCount, + recordAndPos.currentSplitIndex)); } recordIter.close(); } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java index c170518995..836113e6f3 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java @@ -113,7 +113,7 @@ void testSanityCheck() throws Exception { .isInstanceOf(IllegalArgumentException.class) .hasMessage("Field name2 does not exist in the row type."); - FlinkSourceSplitReader flinkSourceSplitReader = + try (FlinkSourceSplitReader flinkSourceSplitReader = new FlinkSourceSplitReader( clientConf, tablePath1, @@ -123,8 +123,9 @@ void testSanityCheck() throws Exception { null, null, null, - createMockSourceReaderMetrics()); - assertThat(flinkSourceSplitReader.getProjectedFields()).isNull(); + createMockSourceReaderMetrics())) { + assertThat(flinkSourceSplitReader.getProjectedFields()).isNull(); + } } @Test diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/LogScannerIcebergLakeFallbackITCase.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/LogScannerIcebergLakeFallbackITCase.java new file mode 100644 index 0000000000..5fbe791087 --- /dev/null +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/LogScannerIcebergLakeFallbackITCase.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.iceberg.tiering; + +import org.apache.fluss.client.table.Table; +import org.apache.fluss.client.table.scanner.ScanRecord; +import org.apache.fluss.client.table.scanner.log.LogScanner; +import org.apache.fluss.client.table.scanner.log.ScanRecords; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.lake.iceberg.source.IcebergLakeSource; +import org.apache.fluss.lake.iceberg.testutils.FlinkIcebergTieringTestBase; +import org.apache.fluss.lake.source.LakeSource; +import org.apache.fluss.lake.source.LakeSplit; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.record.ChangeType; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.server.replica.Replica; +import org.apache.fluss.types.DataTypes; + +import org.apache.flink.core.execution.JobClient; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Integration test for log data tiered to Iceberg then read via {@link LogScanner} with a real + * {@link IcebergLakeSource}, after local log truncation forces lake fallback for historical + * offsets. + */ +class LogScannerIcebergLakeFallbackITCase extends FlinkIcebergTieringTestBase { + + private static final String DEFAULT_DB = "fluss"; + + private static final Schema LOG_SCHEMA = + Schema.newBuilder() + .column("f_int", DataTypes.INT()) + .column("f_str", DataTypes.STRING()) + .build(); + + @Test + void testReadTieredLogFromIcebergAfterLocalTruncate() throws Exception { + TablePath tablePath = TablePath.of(DEFAULT_DB, "log_scanner_iceberg_lake_fallback"); + long tableId = createLogTable(tablePath, 1, false, LOG_SCHEMA); + TableBucket tableBucket = new TableBucket(tableId, 0); + + List batchA = + Arrays.asList(row(1, "a0"), row(2, "a1"), row(3, "a2"), row(4, "a3"), row(5, "a4")); + writeRows(tablePath, batchA, true); + + JobClient jobClient = buildTieringJob(execEnv); + try { + assertReplicaStatus(tableBucket, batchA.size()); + checkDataInIcebergAppendOnlyTable(tablePath, batchA, 0L); + } finally { + jobClient.cancel().get(); + } + + long lakeLogEndOffset = getLeaderReplica(tableBucket).getLakeLogEndOffset(); + assertThat(lakeLogEndOffset).isEqualTo(batchA.size()); + + Replica leader = getLeaderReplica(tableBucket); + leader.truncateFullyAndStartAt(lakeLogEndOffset); + + Configuration icebergConf = Configuration.fromMap(getIcebergCatalogConf()); + IcebergLakeSource icebergLakeSource = new IcebergLakeSource(icebergConf, tablePath); + @SuppressWarnings("unchecked") + LakeSource lakeSource = + (LakeSource) (LakeSource) icebergLakeSource; + + List polledResult = new ArrayList<>(); + try (Table table = conn.getTable(tablePath); + LogScanner lakeScanner = + table.newScan().lakeSource(lakeSource).createLogScanner()) { + lakeScanner.subscribeFromBeginning(0); + pollScanRecordsUntilCount( + lakeScanner, batchA.size(), polledResult, Duration.ofMinutes(2)); + assertThat(polledResult).containsExactlyInAnyOrderElementsOf(batchA); + } + } + + /** + * Polls until {@code targetNewCount} additional rows have been collected (relative to the list + * size at invocation). + */ + private static void pollScanRecordsUntilCount( + LogScanner logScanner, + int targetNewCount, + List sink, + Duration overallTimeout) + throws Exception { + int startSize = sink.size(); + long deadlineNanos = System.nanoTime() + overallTimeout.toNanos(); + while (sink.size() - startSize < targetNewCount) { + if (System.nanoTime() > deadlineNanos) { + throw new AssertionError( + String.format( + "Timed out after %s waiting for %d scan records (got %d)", + overallTimeout, targetNewCount, sink.size() - startSize)); + } + ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(5)); + for (ScanRecord scanRecord : scanRecords) { + assertThat(scanRecord.getChangeType()) + .isIn(ChangeType.APPEND_ONLY, ChangeType.INSERT); + InternalRow row = scanRecord.getRow(); + sink.add(row); + } + } + } +} diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/entity/FetchLogResultForBucket.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/entity/FetchLogResultForBucket.java index e239d8e052..29e2bf8a8d 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/entity/FetchLogResultForBucket.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/entity/FetchLogResultForBucket.java @@ -18,6 +18,7 @@ package org.apache.fluss.rpc.entity; import org.apache.fluss.annotation.Internal; +import org.apache.fluss.lake.source.LakeLogFetchInfo; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.record.LogRecords; import org.apache.fluss.record.MemoryLogRecords; @@ -33,6 +34,7 @@ @Internal public class FetchLogResultForBucket extends ResultForBucket { private final @Nullable RemoteLogFetchInfo remoteLogFetchInfo; + private final @Nullable LakeLogFetchInfo lakeLogFetchInfo; private final @Nullable LogRecords records; private final long highWatermark; private final long filteredEndOffset; @@ -42,6 +44,7 @@ public FetchLogResultForBucket( this( tableBucket, null, + null, checkNotNull(records, "records can not be null"), highWatermark, -1L, @@ -56,6 +59,7 @@ public FetchLogResultForBucket( this( tableBucket, null, + null, checkNotNull(records, "records can not be null"), highWatermark, filteredEndOffset, @@ -63,7 +67,7 @@ public FetchLogResultForBucket( } public FetchLogResultForBucket(TableBucket tableBucket, ApiError error) { - this(tableBucket, null, null, -1L, -1L, error); + this(tableBucket, null, null, null, -1L, -1L, error); } public FetchLogResultForBucket( @@ -72,6 +76,19 @@ public FetchLogResultForBucket( tableBucket, checkNotNull(remoteLogFetchInfo, "remote log fetch info can not be null"), null, + null, + highWatermark, + -1L, + ApiError.NONE); + } + + public FetchLogResultForBucket( + TableBucket tableBucket, LakeLogFetchInfo lakeLogFetchInfo, long highWatermark) { + this( + tableBucket, + null, + checkNotNull(lakeLogFetchInfo, "lake log fetch info can not be null"), + null, highWatermark, -1L, ApiError.NONE); @@ -84,18 +101,20 @@ public FetchLogResultForBucket( */ public FetchLogResultForBucket( TableBucket tableBucket, long highWatermark, long filteredEndOffset) { - this(tableBucket, null, null, highWatermark, filteredEndOffset, ApiError.NONE); + this(tableBucket, null, null, null, highWatermark, filteredEndOffset, ApiError.NONE); } private FetchLogResultForBucket( TableBucket tableBucket, @Nullable RemoteLogFetchInfo remoteLogFetchInfo, + @Nullable LakeLogFetchInfo lakeLogFetchInfo, @Nullable LogRecords records, long highWatermark, long filteredEndOffset, ApiError error) { super(tableBucket, error); this.remoteLogFetchInfo = remoteLogFetchInfo; + this.lakeLogFetchInfo = lakeLogFetchInfo; this.records = records; this.highWatermark = highWatermark; this.filteredEndOffset = filteredEndOffset; @@ -103,8 +122,8 @@ private FetchLogResultForBucket( /** * The fetch result currently supporting only fetch from remote or fetch from local. It means - * that if remoteLogFetchInfo is not null, the records should be null. Otherwise, the records - * should not be null. + * that if remoteLogFetchInfo/lakeLogFetchInfo is not null, the records should be null. + * Otherwise, the records should not be null. * * @return {@code true} if the log is fetched from remote. */ @@ -112,6 +131,11 @@ public boolean fetchFromRemote() { return remoteLogFetchInfo != null; } + /** Returns {@code true} if the log should be fetched from lake. */ + public boolean fetchFromLake() { + return lakeLogFetchInfo != null; + } + public @Nullable LogRecords records() { return records; } @@ -128,6 +152,10 @@ public LogRecords recordsOrEmpty() { return remoteLogFetchInfo; } + public @Nullable LakeLogFetchInfo lakeLogFetchInfo() { + return lakeLogFetchInfo; + } + public long getHighWatermark() { return highWatermark; } diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/util/CommonRpcMessageUtils.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/util/CommonRpcMessageUtils.java index 2cf783a118..a076a7ffff 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/util/CommonRpcMessageUtils.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/util/CommonRpcMessageUtils.java @@ -17,6 +17,7 @@ package org.apache.fluss.rpc.util; +import org.apache.fluss.lake.source.LakeLogFetchInfo; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.ResolvedPartitionSpec; import org.apache.fluss.metadata.TableBucket; @@ -30,6 +31,7 @@ import org.apache.fluss.rpc.messages.PbAclInfo; import org.apache.fluss.rpc.messages.PbFetchLogRespForBucket; import org.apache.fluss.rpc.messages.PbKeyValue; +import org.apache.fluss.rpc.messages.PbLakeLogFetchInfo; import org.apache.fluss.rpc.messages.PbPartitionSpec; import org.apache.fluss.rpc.messages.PbRemoteLogFetchInfo; import org.apache.fluss.rpc.messages.PbRemoteLogSegment; @@ -196,6 +198,19 @@ public static FetchLogResultForBucket getFetchLogResultForBucket( fetchLogResultForBucket = new FetchLogResultForBucket( tb, rlFetchInfo, respForBucket.getHighWatermark()); + } else if (respForBucket.hasLakeLogFetchInfo()) { + PbLakeLogFetchInfo pbLakeLogFetchInfo = respForBucket.getLakeLogFetchInfo(); + LakeLogFetchInfo lakeLogFetchInfo = + new LakeLogFetchInfo( + pbLakeLogFetchInfo.getSnapshotId(), + pbLakeLogFetchInfo.hasPartitionName() + ? pbLakeLogFetchInfo.getPartitionName() + : null, + pbLakeLogFetchInfo.getStartOffset(), + pbLakeLogFetchInfo.getEndOffset()); + fetchLogResultForBucket = + new FetchLogResultForBucket( + tb, lakeLogFetchInfo, respForBucket.getHighWatermark()); } else { ByteBuffer recordsBuffer = toByteBuffer(respForBucket.getRecordsSlice()); LogRecords records = diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index e8381215fc..8885812693 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -885,6 +885,7 @@ message PbFetchLogRespForBucket { // filter. When set (>= 0), the records field MUST be empty — the two fields are mutually // exclusive. The client should use this offset as the starting offset for the next fetch. optional int64 filtered_end_offset = 9; + optional PbLakeLogFetchInfo lake_log_fetch_info = 10; } message PbPutKvReqForBucket { @@ -1057,6 +1058,13 @@ message PbRemoteLogFetchInfo { optional int32 first_start_pos = 4; } +message PbLakeLogFetchInfo { + required int64 snapshot_id = 1; + optional string partition_name = 2; + required int64 start_offset = 3; + required int64 end_offset = 4; +} + message PbRemoteLogSegment { required string remote_log_segment_id = 1; required int64 remote_log_start_offset = 2; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java index 23c60c82e1..b36edf3c8c 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java @@ -261,6 +261,10 @@ public boolean isDataLakeEnabled() { return isDataLakeEnabled; } + public boolean isChangeLog() { + return isChangeLog; + } + public long getLakeTableSnapshotId() { return lakeTableSnapshotId; } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index 71fd233b68..6526946269 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -34,6 +34,7 @@ import org.apache.fluss.exception.UnknownTableOrBucketException; import org.apache.fluss.exception.UnsupportedVersionException; import org.apache.fluss.fs.FsPath; +import org.apache.fluss.lake.source.LakeLogFetchInfo; import org.apache.fluss.metadata.LogFormat; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.Schema; @@ -154,6 +155,8 @@ public class ReplicaManager implements ServerReconfigurable { private static final Logger LOG = LoggerFactory.getLogger(ReplicaManager.class); + private static final long INITIAL_LAKE_LOG_START_OFFSET = 0L; + public static final String HIGH_WATERMARK_CHECKPOINT_FILE_NAME = "high-watermark-checkpoint"; private final Configuration conf; private final Scheduler scheduler; @@ -1079,13 +1082,16 @@ public void notifyLakeTableOffset( LogTablet logTablet = getReplicaOrException(tb).getLogTablet(); logTablet.updateLakeTableSnapshotId(lakeBucketOffset.getSnapshotId()); - lakeBucketOffset - .getLogStartOffset() - .ifPresent(logTablet::updateLakeLogStartOffset); + Optional logStartOffset = lakeBucketOffset.getLogStartOffset(); + Optional logEndOffset = lakeBucketOffset.getLogEndOffset(); + if (!logTablet.isChangeLog()) { + logStartOffset.ifPresent(logTablet::updateLakeLogStartOffset); + if (!logStartOffset.isPresent() && logEndOffset.isPresent()) { + logTablet.updateLakeLogStartOffset(INITIAL_LAKE_LOG_START_OFFSET); + } + } - lakeBucketOffset - .getLogEndOffset() - .ifPresent(logTablet::updateLakeLogEndOffset); + logEndOffset.ifPresent(logTablet::updateLakeLogEndOffset); lakeBucketOffset .getMaxTimestamp() @@ -1146,10 +1152,13 @@ private void updateWithLakeTableSnapshot(Replica replica) throws Exception { if (optLakeTableSnapshot.isPresent()) { LakeTableSnapshot lakeTableSnapshot = optLakeTableSnapshot.get(); long snapshotId = optLakeTableSnapshot.get().getSnapshotId(); - replica.getLogTablet().updateLakeTableSnapshotId(snapshotId); - lakeTableSnapshot - .getLogEndOffset(tb) - .ifPresent(replica.getLogTablet()::updateLakeLogEndOffset); + LogTablet logTablet = replica.getLogTablet(); + logTablet.updateLakeTableSnapshotId(snapshotId); + Optional logEndOffset = lakeTableSnapshot.getLogEndOffset(tb); + if (!logTablet.isChangeLog() && logEndOffset.isPresent()) { + logTablet.updateLakeLogStartOffset(INITIAL_LAKE_LOG_START_OFFSET); + } + logEndOffset.ifPresent(logTablet::updateLakeLogEndOffset); } } @@ -1481,7 +1490,8 @@ public Map readFromLog( FetchLogResultForBucket result; if (replica != null && e instanceof LogOffsetOutOfRangeException) { - result = handleFetchOutOfRangeException(replica, fetchOffset, e); + result = + handleFetchOutOfRangeException(replica, fetchOffset, e, isFromFollower); } else { result = new FetchLogResultForBucket(tb, ApiError.fromThrowable(e)); } @@ -1493,18 +1503,15 @@ public Map readFromLog( } private FetchLogResultForBucket handleFetchOutOfRangeException( - Replica replica, long fetchOffset, Exception e) { + Replica replica, long fetchOffset, Exception e, boolean isFromFollower) { TableBucket tb = replica.getTableBucket(); if (fetchOffset == FetchParams.FETCH_FROM_EARLIEST_OFFSET) { fetchOffset = replica.getLogStartOffset(); } - if (canFetchFromLakeLog(replica, fetchOffset)) { - // todo: currently, we just return empty records directly - // need to return the info of datalake to make client can fetch - // from datalake directly - return new FetchLogResultForBucket( - tb, MemoryLogRecords.EMPTY, replica.getLogHighWatermark()); + if (!isFromFollower && canFetchFromLakeLog(replica, fetchOffset)) { + LakeLogFetchInfo lakeLogFetchInfo = fetchLogFromLake(replica, fetchOffset); + return new FetchLogResultForBucket(tb, lakeLogFetchInfo, replica.getLogHighWatermark()); } // Once we get a fetch out of range exception from local storage, we need to check whether // the log segment already upload to the remote storage. If uploaded, we will return a list @@ -1540,6 +1547,14 @@ private boolean canFetchFromRemoteLog(Replica replica, long fetchOffset) { return replica.getLogTablet().canFetchFromRemoteLog(fetchOffset); } + private LakeLogFetchInfo fetchLogFromLake(Replica replica, long fetchOffset) { + return new LakeLogFetchInfo( + replica.getLogTablet().getLakeTableSnapshotId(), + replica.getPhysicalTablePath().getPartitionName(), + fetchOffset, + replica.getLogTablet().getLakeLogEndOffset()); + } + private @Nullable RemoteLogFetchInfo fetchLogFromRemote(Replica replica, long fetchOffset) { List remoteLogSegmentList = remoteLogManager.relevantRemoteLogSegments(replica.getTableBucket(), fetchOffset); @@ -1696,7 +1711,8 @@ private void maybeAddDelayedFetchLog( } } - if (!fetchLogResultForBucket.fetchFromRemote()) { + if (!fetchLogResultForBucket.fetchFromRemote() + && !fetchLogResultForBucket.fetchFromLake()) { hasFetchFromLocal = true; bytesReadable += fetchLogResultForBucket.recordsOrEmpty().sizeInBytes(); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/delay/DelayedFetchLog.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/delay/DelayedFetchLog.java index e17b7e1ccb..388b585168 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/delay/DelayedFetchLog.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/delay/DelayedFetchLog.java @@ -90,7 +90,8 @@ public void onComplete() { fetchBucketStatusMap.entrySet()) { FetchBucketStatus fetchBucketStatus = fetchBucketStatusEntry.getValue(); TableBucket tb = fetchBucketStatusEntry.getKey(); - if (fetchBucketStatus.previousFetchLogResultForBucket.fetchFromRemote()) { + if (fetchBucketStatus.previousFetchLogResultForBucket.fetchFromRemote() + || fetchBucketStatus.previousFetchLogResultForBucket.fetchFromLake()) { result.put(tb, fetchBucketStatus.previousFetchLogResultForBucket); } else { reFetchBuckets.put(tb, fetchBucketStatus.fetchReqInfo); @@ -127,6 +128,7 @@ public boolean tryComplete() { LogOffsetMetadata fetchOffset = fetchBucketStatus.startOffsetMetadata; try { if (!fetchBucketStatus.previousFetchLogResultForBucket.fetchFromRemote() + && !fetchBucketStatus.previousFetchLogResultForBucket.fetchFromLake() && fetchOffset != LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) { Replica replica = replicaManager.getReplicaOrException(tb); LogOffsetSnapshot logOffsetSnapshot = diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java index f7a89828ab..d444c7cd14 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java @@ -30,6 +30,7 @@ import org.apache.fluss.fs.FsPath; import org.apache.fluss.fs.token.ObtainedSecurityToken; import org.apache.fluss.lake.committer.LakeCommitResult; +import org.apache.fluss.lake.source.LakeLogFetchInfo; import org.apache.fluss.metadata.DatabaseChange; import org.apache.fluss.metadata.DatabaseSummary; import org.apache.fluss.metadata.PartitionSpec; @@ -1019,6 +1020,19 @@ public static FetchLogResponse makeFetchLogResponse( .setRemoteLogFetchInfo() .setPartitionName(rlfInfo.partitionName()); } + } else if (bucketResult.fetchFromLake()) { + LakeLogFetchInfo lakeLogFetchInfo = bucketResult.lakeLogFetchInfo(); + checkNotNull(lakeLogFetchInfo, "Lake log fetch info is null."); + fetchLogRespForBucket + .setLakeLogFetchInfo() + .setSnapshotId(lakeLogFetchInfo.snapshotId()) + .setStartOffset(lakeLogFetchInfo.startOffset()) + .setEndOffset(lakeLogFetchInfo.endOffset()); + if (lakeLogFetchInfo.partitionName() != null) { + fetchLogRespForBucket + .setLakeLogFetchInfo() + .setPartitionName(lakeLogFetchInfo.partitionName()); + } } else { // set records LogRecords records = bucketResult.recordsOrEmpty(); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/CommitLakeTableSnapshotITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/CommitLakeTableSnapshotITCase.java index e5788f32b6..253c0b51c4 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/CommitLakeTableSnapshotITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/CommitLakeTableSnapshotITCase.java @@ -121,8 +121,10 @@ void testCommitDataLakeData() throws Exception { Duration.ofMinutes(2), () -> { LogTablet logTablet = replica.getLogTablet(); + assertThat(logTablet.getLakeLogStartOffset()).isEqualTo(0L); assertThat(logTablet.getLakeLogEndOffset()).isEqualTo(dataLakeLogEndOffset); assertThat(logTablet.getLakeMaxTimestamp()).isEqualTo(dataLakeMaxTimestamp); + assertThat(logTablet.canFetchFromLakeLog(0L)).isTrue(); }); } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/NotifyReplicaLakeTableOffsetTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/NotifyReplicaLakeTableOffsetTest.java index 0d0b25204c..d8aefff6ac 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/NotifyReplicaLakeTableOffsetTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/NotifyReplicaLakeTableOffsetTest.java @@ -23,6 +23,7 @@ import org.apache.fluss.server.entity.NotifyLakeTableOffsetData; import org.assertj.core.api.AssertionsForClassTypes; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -48,6 +49,27 @@ void testNotifyWithOutRemoteLog(boolean partitionedTable) throws Exception { notifyAndVerify(tb, replica, 2, 20L, 30L, System.currentTimeMillis()); } + @Test + void testNotifyWithoutLakeLogStartOffset() throws Exception { + TableBucket tb = makeTableBucket(false); + makeLogTableAsLeader(tb, false); + Replica replica = replicaManager.getReplicaOrException(tb); + + NotifyLakeTableOffsetData notifyLakeTableOffsetData = + new NotifyLakeTableOffsetData( + 1, + Collections.singletonMap( + tb, + new LakeBucketOffset(1, null, 20L, System.currentTimeMillis()))); + CompletableFuture future = new CompletableFuture<>(); + replicaManager.notifyLakeTableOffset(notifyLakeTableOffsetData, future::complete); + future.get(); + + AssertionsForClassTypes.assertThat(replica.getLogTablet().getLakeLogStartOffset()) + .isEqualTo(0L); + AssertionsForClassTypes.assertThat(replica.getLogTablet().canFetchFromLakeLog(0L)).isTrue(); + } + private void notifyAndVerify( TableBucket tb, Replica replica,