diff --git a/be/src/cloud/cloud_internal_service.cpp b/be/src/cloud/cloud_internal_service.cpp index db0c709827355e..e453bc7ee443a2 100644 --- a/be/src/cloud/cloud_internal_service.cpp +++ b/be/src/cloud/cloud_internal_service.cpp @@ -522,12 +522,15 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c } }; + // Use rs_meta.fs() instead of storage_resource.value()->fs to support packed files. + // PackedFileSystem wrapper in rs_meta.fs() handles the index_map lookup and + // reads from the correct packed file. io::DownloadFileMeta download_meta { .path = storage_resource.value()->remote_segment_path(rs_meta, segment_id), .file_size = segment_size, .offset = 0, .download_size = segment_size, - .file_system = storage_resource.value()->fs, + .file_system = rs_meta.fs(), .ctx = {.is_index_data = false, .expiration_time = expiration_time, .is_dryrun = config::enable_reader_dryrun_when_download_file_cache, @@ -543,8 +546,9 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c _engine.file_cache_block_downloader().submit_download_task(download_meta); } + + // Use rs_meta.fs() to support packed files for inverted index download. auto download_inverted_index = [&, tablet](std::string index_path, uint64_t idx_size) { - auto storage_resource = rs_meta.remote_storage_resource(); auto download_done = [=, version = rs_meta.version()](Status st) { DBUG_EXECUTE_IF( "CloudInternalServiceImpl::warm_up_rowset.download_inverted_idx", { @@ -600,7 +604,7 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c io::DownloadFileMeta download_meta { .path = io::Path(index_path), .file_size = static_cast(idx_size), - .file_system = storage_resource.value()->fs, + .file_system = rs_meta.fs(), .ctx = {.is_index_data = false, // DORIS-20877 .expiration_time = expiration_time, .is_dryrun = config::enable_reader_dryrun_when_download_file_cache, diff --git a/be/src/cloud/cloud_warm_up_manager.cpp b/be/src/cloud/cloud_warm_up_manager.cpp index 89d69647cb0041..85f350ef0b0cd6 100644 --- a/be/src/cloud/cloud_warm_up_manager.cpp +++ b/be/src/cloud/cloud_warm_up_manager.cpp @@ -251,12 +251,14 @@ void CloudWarmUpManager::handle_jobs() { } for (int64_t seg_id = 0; seg_id < rs->num_segments(); seg_id++) { // 1st. download segment files + // Use rs->fs() instead of storage_resource.value()->fs to support packed + // files. PackedFileSystem wrapper in RowsetMeta::fs() handles the index_map + // lookup and reads from the correct packed file. if (!config::file_cache_enable_only_warm_up_idx) { submit_download_tasks( storage_resource.value()->remote_segment_path(*rs, seg_id), - rs->segment_file_size(cast_set(seg_id)), - storage_resource.value()->fs, expiration_time, wait, false, - [tablet, rs, seg_id](Status st) { + rs->segment_file_size(cast_set(seg_id)), rs->fs(), + expiration_time, wait, false, [tablet, rs, seg_id](Status st) { VLOG_DEBUG << "warmup rowset " << rs->version() << " segment " << seg_id << " completed"; if (tablet->complete_rowset_segment_warmup( @@ -301,8 +303,8 @@ void CloudWarmUpManager::handle_jobs() { tablet->update_rowset_warmup_state_inverted_idx_num( WarmUpTriggerSource::JOB, rs->rowset_id(), 1); submit_download_tasks( - idx_path, file_size, storage_resource.value()->fs, - expiration_time, wait, true, [=](Status st) { + idx_path, file_size, rs->fs(), expiration_time, wait, true, + [=](Status st) { VLOG_DEBUG << "warmup rowset " << rs->version() << " segment " << seg_id << "inverted idx:" << idx_path << " completed"; @@ -324,8 +326,8 @@ void CloudWarmUpManager::handle_jobs() { tablet->update_rowset_warmup_state_inverted_idx_num( WarmUpTriggerSource::JOB, rs->rowset_id(), 1); submit_download_tasks( - idx_path, file_size, storage_resource.value()->fs, - expiration_time, wait, true, [=](Status st) { + idx_path, file_size, rs->fs(), expiration_time, wait, true, + [=](Status st) { VLOG_DEBUG << "warmup rowset " << rs->version() << " segment " << seg_id << "inverted idx:" << idx_path << " completed"; diff --git a/be/src/io/fs/packed_file_manager.cpp b/be/src/io/fs/packed_file_manager.cpp index 53a445502c7df5..75da15649d8d40 100644 --- a/be/src/io/fs/packed_file_manager.cpp +++ b/be/src/io/fs/packed_file_manager.cpp @@ -121,7 +121,7 @@ Status append_packed_info_trailer(FileWriter* writer, const std::string& packed_ // write small file data to file cache void do_write_to_file_cache(const std::string& small_file_path, const std::string& data, - int64_t tablet_id) { + int64_t tablet_id, uint64_t expiration_time) { if (data.empty()) { return; } @@ -132,7 +132,8 @@ void do_write_to_file_cache(const std::string& small_file_path, const std::strin VLOG_DEBUG << "packed_file_cache_write: path=" << small_file_path << " filename=" << path.filename().native() << " hash=" << cache_hash.to_string() - << " size=" << data.size() << " tablet_id=" << tablet_id; + << " size=" << data.size() << " tablet_id=" << tablet_id + << " expiration_time=" << expiration_time; BlockFileCache* file_cache = FileCacheFactory::instance()->get_by_path(cache_hash); if (file_cache == nullptr) { @@ -141,7 +142,8 @@ void do_write_to_file_cache(const std::string& small_file_path, const std::strin // Allocate cache blocks CacheContext ctx; - ctx.cache_type = FileCacheType::NORMAL; + ctx.cache_type = expiration_time > 0 ? FileCacheType::TTL : FileCacheType::NORMAL; + ctx.expiration_time = expiration_time; ReadStatistics stats; ctx.stats = &stats; @@ -178,7 +180,7 @@ void do_write_to_file_cache(const std::string& small_file_path, const std::strin // the async task execution. The original Slice may reference a buffer that gets // reused or freed before the async task runs. void write_small_file_to_cache_async(const std::string& small_file_path, const Slice& data, - int64_t tablet_id) { + int64_t tablet_id, uint64_t expiration_time) { if (!config::enable_file_cache || data.size == 0) { return; } @@ -191,7 +193,7 @@ void write_small_file_to_cache_async(const std::string& small_file_path, const S auto* thread_pool = ExecEnv::GetInstance()->s3_file_upload_thread_pool(); if (thread_pool == nullptr) { // Fallback to sync write if thread pool not available - do_write_to_file_cache(small_file_path, data_copy, tablet_id); + do_write_to_file_cache(small_file_path, data_copy, tablet_id, expiration_time); return; } @@ -199,13 +201,13 @@ void write_small_file_to_cache_async(const std::string& small_file_path, const S g_packed_file_cache_async_write_count << 1; g_packed_file_cache_async_write_bytes << static_cast(data_size); - Status st = thread_pool->submit_func( - [path = small_file_path, data = std::move(data_copy), tablet_id, data_size]() { - do_write_to_file_cache(path, data, tablet_id); - // Decrement async write count after completion - g_packed_file_cache_async_write_count << -1; - g_packed_file_cache_async_write_bytes << -static_cast(data_size); - }); + Status st = thread_pool->submit_func([path = small_file_path, data = std::move(data_copy), + tablet_id, data_size, expiration_time]() { + do_write_to_file_cache(path, data, tablet_id, expiration_time); + // Decrement async write count after completion + g_packed_file_cache_async_write_count << -1; + g_packed_file_cache_async_write_bytes << -static_cast(data_size); + }); if (!st.ok()) { // Revert metrics since task was not submitted @@ -368,7 +370,7 @@ Status PackedFileManager::append_small_file(const std::string& path, const Slice // Async write data to file cache using small file path as cache key. // This ensures cache key matches the cleanup key in Rowset::clear_cache(), // allowing proper cache cleanup when stale rowsets are removed. - write_small_file_to_cache_async(path, data, info.tablet_id); + write_small_file_to_cache_async(path, data, info.tablet_id, info.expiration_time); // Update index PackedSliceLocation location; diff --git a/be/src/io/fs/packed_file_manager.h b/be/src/io/fs/packed_file_manager.h index bfa83a240e0a86..8a9758bf314eb7 100644 --- a/be/src/io/fs/packed_file_manager.h +++ b/be/src/io/fs/packed_file_manager.h @@ -58,6 +58,7 @@ struct PackedAppendContext { int64_t tablet_id = 0; std::string rowset_id; int64_t txn_id = 0; + uint64_t expiration_time = 0; // TTL expiration time in seconds since epoch, 0 means no TTL }; // Global object that manages packing small files into larger files for S3 optimization diff --git a/be/src/olap/rowset/rowset_writer_context.h b/be/src/olap/rowset/rowset_writer_context.h index 0978a093a42062..7fba87606024fb 100644 --- a/be/src/olap/rowset/rowset_writer_context.h +++ b/be/src/olap/rowset/rowset_writer_context.h @@ -224,6 +224,9 @@ struct RowsetWriterContext { append_info.tablet_id = tablet_id; append_info.rowset_id = rowset_id.to_string(); append_info.txn_id = txn_id; + append_info.expiration_time = file_cache_ttl_sec > 0 && newest_write_timestamp > 0 + ? newest_write_timestamp + file_cache_ttl_sec + : 0; fs = std::make_shared(fs, append_info); } diff --git a/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_packed_file_warm_up_cluster_event.groovy b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_packed_file_warm_up_cluster_event.groovy new file mode 100644 index 00000000000000..f293741aa5f862 --- /dev/null +++ b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_packed_file_warm_up_cluster_event.groovy @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import groovy.json.JsonSlurper + +suite('test_packed_file_warm_up_cluster_event', 'docker') { + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + ] + options.beConfigs += [ + 'file_cache_enter_disk_resource_limit_mode_percent=99', + 'enable_evict_file_cache_in_advance=false', + 'file_cache_background_monitor_interval_ms=1000', + 'enable_packed_file=true', + 'disable_auto_compaction=true' + ] + options.cloudMode = true + + def clearFileCache = {ip, port -> + def url = "http://${ip}:${port}/api/file_cache?op=clear&sync=true" + def response = new URL(url).text + def json = new JsonSlurper().parseText(response) + + // Check the status + if (json.status != "OK") { + throw new RuntimeException("Clear cache on ${ip}:${port} failed: ${json.status}") + } + } + + def clearFileCacheOnAllBackends = { + def backends = sql """SHOW BACKENDS""" + + for (be in backends) { + def ip = be[1] + def port = be[4] + clearFileCache(ip, port) + } + + // clear file cache is async, wait it done + sleep(10000) + } + + def getBrpcMetrics = {ip, port, name -> + def url = "http://${ip}:${port}/brpc_metrics" + if ((context.config.otherConfigs.get("enableTLS")?.toString()?.equalsIgnoreCase("true")) ?: false) { + url = url.replace("http://", "https://") + " --cert " + context.config.otherConfigs.get("trustCert") + " --cacert " + context.config.otherConfigs.get("trustCACert") + " --key " + context.config.otherConfigs.get("trustCAKey") + } + def metrics = new URL(url).text + def matcher = metrics =~ ~"${name}\\s+(\\d+)" + if (matcher.find()) { + return matcher[0][1] as long + } else { + throw new RuntimeException("${name} not found for ${ip}:${port}") + } + } + + def logFileCacheDownloadMetrics = { cluster -> + def backends = sql """SHOW BACKENDS""" + def cluster_bes = backends.findAll { it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") } + for (be in cluster_bes) { + def ip = be[1] + def port = be[5] + def submitted = getBrpcMetrics(ip, port, "file_cache_download_submitted_num") + def finished = getBrpcMetrics(ip, port, "file_cache_download_finished_num") + def failed = getBrpcMetrics(ip, port, "file_cache_download_failed_num") + logger.info("${cluster} be ${ip}:${port}, downloader submitted=${submitted}" + + ", finished=${finished}, failed=${failed}") + } + } + + def logWarmUpRowsetMetrics = { cluster -> + def backends = sql """SHOW BACKENDS""" + def cluster_bes = backends.findAll { it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") } + for (be in cluster_bes) { + def ip = be[1] + def port = be[5] + def submitted_segment = getBrpcMetrics(ip, port, "file_cache_event_driven_warm_up_submitted_segment_num") + def finished_segment = getBrpcMetrics(ip, port, "file_cache_event_driven_warm_up_finished_segment_num") + def failed_segment = getBrpcMetrics(ip, port, "file_cache_event_driven_warm_up_failed_segment_num") + def submitted_index = getBrpcMetrics(ip, port, "file_cache_event_driven_warm_up_submitted_index_num") + def finished_index = getBrpcMetrics(ip, port, "file_cache_event_driven_warm_up_finished_index_num") + def failed_index = getBrpcMetrics(ip, port, "file_cache_event_driven_warm_up_failed_index_num") + logger.info("${cluster} be ${ip}:${port}, submitted_segment=${submitted_segment}" + + ", finished_segment=${finished_segment}, failed_segment=${failed_segment}" + + ", submitted_index=${submitted_index}" + + ", finished_index=${finished_index}" + + ", failed_index=${failed_index}") + } + } + + def getTTLCacheSize = { ip, port -> + return getBrpcMetrics(ip, port, "ttl_cache_size") + } + + def checkTTLCacheSizeSumEqual = { cluster1, cluster2 -> + def backends = sql """SHOW BACKENDS""" + + def srcBes = backends.findAll { it[19].contains("""\"compute_group_name\" : \"${cluster1}\"""") } + def tgtBes = backends.findAll { it[19].contains("""\"compute_group_name\" : \"${cluster2}\"""") } + + long srcSum = 0 + for (src in srcBes) { + def ip = src[1] + def port = src[5] + srcSum += getTTLCacheSize(ip, port) + } + + long tgtSum = 0 + for (tgt in tgtBes) { + def ip = tgt[1] + def port = tgt[5] + tgtSum += getTTLCacheSize(ip, port) + } + + logger.info("ttl_cache_size: src=${srcSum} dst=${tgtSum}") + assertTrue(srcSum > 0, "ttl_cache_size should > 0") + assertEquals(srcSum, tgtSum) + } + + docker(options) { + def clusterName1 = "warmup_source" + def clusterName2 = "warmup_target" + + // Add two clusters + cluster.addBackend(1, clusterName1) + cluster.addBackend(1, clusterName2) + + def tag1 = getCloudBeTagByName(clusterName1) + def tag2 = getCloudBeTagByName(clusterName2) + + logger.info("Cluster tag1: {}", tag1) + logger.info("Cluster tag2: {}", tag2) + + def jsonSlurper = new JsonSlurper() + def clusterId1 = jsonSlurper.parseText(tag1).compute_group_id + def clusterId2 = jsonSlurper.parseText(tag2).compute_group_id + + def getJobState = { jobId -> + def jobStateResult = sql """SHOW WARM UP JOB WHERE ID = ${jobId}""" + return jobStateResult[0][3] + } + + // Ensure we are in source cluster + sql """use @${clusterName1}""" + + // Simple setup to simulate data load and access + sql """CREATE TABLE IF NOT EXISTS customer (id INT, name STRING) DUPLICATE KEY(id) DISTRIBUTED BY HASH(id) BUCKETS 3 PROPERTIES ("file_cache_ttl_seconds" = "3600")""" + + // Start warm up job + def jobId_ = sql """ + WARM UP CLUSTER ${clusterName2} WITH CLUSTER ${clusterName1} + PROPERTIES ( + "sync_mode" = "event_driven", + "sync_event" = "load" + ) + """ + + def jobId = jobId_[0][0] + logger.info("Warm-up job ID: ${jobId}") + clearFileCacheOnAllBackends() + + sleep(15000) + + for (int i = 0; i < 100; i++) { + sql """INSERT INTO customer VALUES (1, 'A'), (2, 'B'), (3, 'C')""" + } + sleep(15000) + logWarmUpRowsetMetrics(clusterName2) + logFileCacheDownloadMetrics(clusterName2) + checkTTLCacheSizeSumEqual(clusterName1, clusterName2) + + def jobInfo = sql """SHOW WARM UP JOB WHERE ID = ${jobId}""" + assertEquals(jobInfo[0][0], jobId) + assertEquals(jobInfo[0][1], clusterName1) + assertEquals(jobInfo[0][2], clusterName2) + assertEquals(jobInfo[0][4], "CLUSTER") + assertTrue(jobInfo[0][3] in ["RUNNING", "PENDING"], + "JobState is ${jobInfo[0][3]}, expected RUNNING or PENDING") + assertEquals(jobInfo[0][5], "EVENT_DRIVEN (LOAD)") + + // Cancel job and confirm + sql """CANCEL WARM UP JOB WHERE ID = ${jobId}""" + def cancelInfo = sql """SHOW WARM UP JOB WHERE ID = ${jobId}""" + assertEquals(cancelInfo[0][3], "CANCELLED") + + // Clean up + sql """DROP TABLE IF EXISTS customer""" + } +}