Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions be/src/cloud/cloud_internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -522,12 +522,15 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
}
};

// Use rs_meta.fs() instead of storage_resource.value()->fs to support packed files.
// PackedFileSystem wrapper in rs_meta.fs() handles the index_map lookup and
// reads from the correct packed file.
io::DownloadFileMeta download_meta {
.path = storage_resource.value()->remote_segment_path(rs_meta, segment_id),
.file_size = segment_size,
.offset = 0,
.download_size = segment_size,
.file_system = storage_resource.value()->fs,
.file_system = rs_meta.fs(),
.ctx = {.is_index_data = false,
.expiration_time = expiration_time,
.is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
Expand All @@ -543,8 +546,9 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c

_engine.file_cache_block_downloader().submit_download_task(download_meta);
}

// Use rs_meta.fs() to support packed files for inverted index download.
auto download_inverted_index = [&, tablet](std::string index_path, uint64_t idx_size) {
auto storage_resource = rs_meta.remote_storage_resource();
auto download_done = [=, version = rs_meta.version()](Status st) {
DBUG_EXECUTE_IF(
"CloudInternalServiceImpl::warm_up_rowset.download_inverted_idx", {
Expand Down Expand Up @@ -600,7 +604,7 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
io::DownloadFileMeta download_meta {
.path = io::Path(index_path),
.file_size = static_cast<int64_t>(idx_size),
.file_system = storage_resource.value()->fs,
.file_system = rs_meta.fs(),
.ctx = {.is_index_data = false, // DORIS-20877
.expiration_time = expiration_time,
.is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
Expand Down
16 changes: 9 additions & 7 deletions be/src/cloud/cloud_warm_up_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,14 @@ void CloudWarmUpManager::handle_jobs() {
}
for (int64_t seg_id = 0; seg_id < rs->num_segments(); seg_id++) {
// 1st. download segment files
// Use rs->fs() instead of storage_resource.value()->fs to support packed
// files. PackedFileSystem wrapper in RowsetMeta::fs() handles the index_map
// lookup and reads from the correct packed file.
if (!config::file_cache_enable_only_warm_up_idx) {
submit_download_tasks(
storage_resource.value()->remote_segment_path(*rs, seg_id),
rs->segment_file_size(cast_set<int>(seg_id)),
storage_resource.value()->fs, expiration_time, wait, false,
[tablet, rs, seg_id](Status st) {
rs->segment_file_size(cast_set<int>(seg_id)), rs->fs(),
expiration_time, wait, false, [tablet, rs, seg_id](Status st) {
VLOG_DEBUG << "warmup rowset " << rs->version() << " segment "
<< seg_id << " completed";
if (tablet->complete_rowset_segment_warmup(
Expand Down Expand Up @@ -301,8 +303,8 @@ void CloudWarmUpManager::handle_jobs() {
tablet->update_rowset_warmup_state_inverted_idx_num(
WarmUpTriggerSource::JOB, rs->rowset_id(), 1);
submit_download_tasks(
idx_path, file_size, storage_resource.value()->fs,
expiration_time, wait, true, [=](Status st) {
idx_path, file_size, rs->fs(), expiration_time, wait, true,
[=](Status st) {
VLOG_DEBUG << "warmup rowset " << rs->version()
<< " segment " << seg_id
<< "inverted idx:" << idx_path << " completed";
Expand All @@ -324,8 +326,8 @@ void CloudWarmUpManager::handle_jobs() {
tablet->update_rowset_warmup_state_inverted_idx_num(
WarmUpTriggerSource::JOB, rs->rowset_id(), 1);
submit_download_tasks(
idx_path, file_size, storage_resource.value()->fs,
expiration_time, wait, true, [=](Status st) {
idx_path, file_size, rs->fs(), expiration_time, wait, true,
[=](Status st) {
VLOG_DEBUG << "warmup rowset " << rs->version()
<< " segment " << seg_id
<< "inverted idx:" << idx_path << " completed";
Expand Down
28 changes: 15 additions & 13 deletions be/src/io/fs/packed_file_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ Status append_packed_info_trailer(FileWriter* writer, const std::string& packed_

// write small file data to file cache
void do_write_to_file_cache(const std::string& small_file_path, const std::string& data,
int64_t tablet_id) {
int64_t tablet_id, uint64_t expiration_time) {
if (data.empty()) {
return;
}
Expand All @@ -132,7 +132,8 @@ void do_write_to_file_cache(const std::string& small_file_path, const std::strin

VLOG_DEBUG << "packed_file_cache_write: path=" << small_file_path
<< " filename=" << path.filename().native() << " hash=" << cache_hash.to_string()
<< " size=" << data.size() << " tablet_id=" << tablet_id;
<< " size=" << data.size() << " tablet_id=" << tablet_id
<< " expiration_time=" << expiration_time;

BlockFileCache* file_cache = FileCacheFactory::instance()->get_by_path(cache_hash);
if (file_cache == nullptr) {
Expand All @@ -141,7 +142,8 @@ void do_write_to_file_cache(const std::string& small_file_path, const std::strin

// Allocate cache blocks
CacheContext ctx;
ctx.cache_type = FileCacheType::NORMAL;
ctx.cache_type = expiration_time > 0 ? FileCacheType::TTL : FileCacheType::NORMAL;
ctx.expiration_time = expiration_time;
ReadStatistics stats;
ctx.stats = &stats;

Expand Down Expand Up @@ -178,7 +180,7 @@ void do_write_to_file_cache(const std::string& small_file_path, const std::strin
// the async task execution. The original Slice may reference a buffer that gets
// reused or freed before the async task runs.
void write_small_file_to_cache_async(const std::string& small_file_path, const Slice& data,
int64_t tablet_id) {
int64_t tablet_id, uint64_t expiration_time) {
if (!config::enable_file_cache || data.size == 0) {
return;
}
Expand All @@ -191,21 +193,21 @@ void write_small_file_to_cache_async(const std::string& small_file_path, const S
auto* thread_pool = ExecEnv::GetInstance()->s3_file_upload_thread_pool();
if (thread_pool == nullptr) {
// Fallback to sync write if thread pool not available
do_write_to_file_cache(small_file_path, data_copy, tablet_id);
do_write_to_file_cache(small_file_path, data_copy, tablet_id, expiration_time);
return;
}

// Track async write count and bytes
g_packed_file_cache_async_write_count << 1;
g_packed_file_cache_async_write_bytes << static_cast<int64_t>(data_size);

Status st = thread_pool->submit_func(
[path = small_file_path, data = std::move(data_copy), tablet_id, data_size]() {
do_write_to_file_cache(path, data, tablet_id);
// Decrement async write count after completion
g_packed_file_cache_async_write_count << -1;
g_packed_file_cache_async_write_bytes << -static_cast<int64_t>(data_size);
});
Status st = thread_pool->submit_func([path = small_file_path, data = std::move(data_copy),
tablet_id, data_size, expiration_time]() {
do_write_to_file_cache(path, data, tablet_id, expiration_time);
// Decrement async write count after completion
g_packed_file_cache_async_write_count << -1;
g_packed_file_cache_async_write_bytes << -static_cast<int64_t>(data_size);
});

if (!st.ok()) {
// Revert metrics since task was not submitted
Expand Down Expand Up @@ -368,7 +370,7 @@ Status PackedFileManager::append_small_file(const std::string& path, const Slice
// Async write data to file cache using small file path as cache key.
// This ensures cache key matches the cleanup key in Rowset::clear_cache(),
// allowing proper cache cleanup when stale rowsets are removed.
write_small_file_to_cache_async(path, data, info.tablet_id);
write_small_file_to_cache_async(path, data, info.tablet_id, info.expiration_time);

// Update index
PackedSliceLocation location;
Expand Down
1 change: 1 addition & 0 deletions be/src/io/fs/packed_file_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ struct PackedAppendContext {
int64_t tablet_id = 0;
std::string rowset_id;
int64_t txn_id = 0;
uint64_t expiration_time = 0; // TTL expiration time in seconds since epoch, 0 means no TTL
};

// Global object that manages packing small files into larger files for S3 optimization
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/rowset/rowset_writer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ struct RowsetWriterContext {
append_info.tablet_id = tablet_id;
append_info.rowset_id = rowset_id.to_string();
append_info.txn_id = txn_id;
append_info.expiration_time = file_cache_ttl_sec > 0 && newest_write_timestamp > 0
? newest_write_timestamp + file_cache_ttl_sec
: 0;
fs = std::make_shared<io::PackedFileSystem>(fs, append_info);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.apache.doris.regression.suite.ClusterOptions
import groovy.json.JsonSlurper

suite('test_packed_file_warm_up_cluster_event', 'docker') {
def options = new ClusterOptions()
options.feConfigs += [
'cloud_cluster_check_interval_second=1',
]
options.beConfigs += [
'file_cache_enter_disk_resource_limit_mode_percent=99',
'enable_evict_file_cache_in_advance=false',
'file_cache_background_monitor_interval_ms=1000',
'enable_packed_file=true',
'disable_auto_compaction=true'
]
options.cloudMode = true

def clearFileCache = {ip, port ->
def url = "http://${ip}:${port}/api/file_cache?op=clear&sync=true"
def response = new URL(url).text
def json = new JsonSlurper().parseText(response)

// Check the status
if (json.status != "OK") {
throw new RuntimeException("Clear cache on ${ip}:${port} failed: ${json.status}")
}
}

def clearFileCacheOnAllBackends = {
def backends = sql """SHOW BACKENDS"""

for (be in backends) {
def ip = be[1]
def port = be[4]
clearFileCache(ip, port)
}

// clear file cache is async, wait it done
sleep(10000)
}

def getBrpcMetrics = {ip, port, name ->
def url = "http://${ip}:${port}/brpc_metrics"
if ((context.config.otherConfigs.get("enableTLS")?.toString()?.equalsIgnoreCase("true")) ?: false) {
url = url.replace("http://", "https://") + " --cert " + context.config.otherConfigs.get("trustCert") + " --cacert " + context.config.otherConfigs.get("trustCACert") + " --key " + context.config.otherConfigs.get("trustCAKey")
}
def metrics = new URL(url).text
def matcher = metrics =~ ~"${name}\\s+(\\d+)"
if (matcher.find()) {
return matcher[0][1] as long
} else {
throw new RuntimeException("${name} not found for ${ip}:${port}")
}
}

def logFileCacheDownloadMetrics = { cluster ->
def backends = sql """SHOW BACKENDS"""
def cluster_bes = backends.findAll { it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
for (be in cluster_bes) {
def ip = be[1]
def port = be[5]
def submitted = getBrpcMetrics(ip, port, "file_cache_download_submitted_num")
def finished = getBrpcMetrics(ip, port, "file_cache_download_finished_num")
def failed = getBrpcMetrics(ip, port, "file_cache_download_failed_num")
logger.info("${cluster} be ${ip}:${port}, downloader submitted=${submitted}"
+ ", finished=${finished}, failed=${failed}")
}
}

def logWarmUpRowsetMetrics = { cluster ->
def backends = sql """SHOW BACKENDS"""
def cluster_bes = backends.findAll { it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
for (be in cluster_bes) {
def ip = be[1]
def port = be[5]
def submitted_segment = getBrpcMetrics(ip, port, "file_cache_event_driven_warm_up_submitted_segment_num")
def finished_segment = getBrpcMetrics(ip, port, "file_cache_event_driven_warm_up_finished_segment_num")
def failed_segment = getBrpcMetrics(ip, port, "file_cache_event_driven_warm_up_failed_segment_num")
def submitted_index = getBrpcMetrics(ip, port, "file_cache_event_driven_warm_up_submitted_index_num")
def finished_index = getBrpcMetrics(ip, port, "file_cache_event_driven_warm_up_finished_index_num")
def failed_index = getBrpcMetrics(ip, port, "file_cache_event_driven_warm_up_failed_index_num")
logger.info("${cluster} be ${ip}:${port}, submitted_segment=${submitted_segment}"
+ ", finished_segment=${finished_segment}, failed_segment=${failed_segment}"
+ ", submitted_index=${submitted_index}"
+ ", finished_index=${finished_index}"
+ ", failed_index=${failed_index}")
}
}

def getTTLCacheSize = { ip, port ->
return getBrpcMetrics(ip, port, "ttl_cache_size")
}

def checkTTLCacheSizeSumEqual = { cluster1, cluster2 ->
def backends = sql """SHOW BACKENDS"""

def srcBes = backends.findAll { it[19].contains("""\"compute_group_name\" : \"${cluster1}\"""") }
def tgtBes = backends.findAll { it[19].contains("""\"compute_group_name\" : \"${cluster2}\"""") }

long srcSum = 0
for (src in srcBes) {
def ip = src[1]
def port = src[5]
srcSum += getTTLCacheSize(ip, port)
}

long tgtSum = 0
for (tgt in tgtBes) {
def ip = tgt[1]
def port = tgt[5]
tgtSum += getTTLCacheSize(ip, port)
}

logger.info("ttl_cache_size: src=${srcSum} dst=${tgtSum}")
assertTrue(srcSum > 0, "ttl_cache_size should > 0")
assertEquals(srcSum, tgtSum)
}

docker(options) {
def clusterName1 = "warmup_source"
def clusterName2 = "warmup_target"

// Add two clusters
cluster.addBackend(1, clusterName1)
cluster.addBackend(1, clusterName2)

def tag1 = getCloudBeTagByName(clusterName1)
def tag2 = getCloudBeTagByName(clusterName2)

logger.info("Cluster tag1: {}", tag1)
logger.info("Cluster tag2: {}", tag2)

def jsonSlurper = new JsonSlurper()
def clusterId1 = jsonSlurper.parseText(tag1).compute_group_id
def clusterId2 = jsonSlurper.parseText(tag2).compute_group_id

def getJobState = { jobId ->
def jobStateResult = sql """SHOW WARM UP JOB WHERE ID = ${jobId}"""
return jobStateResult[0][3]
}

// Ensure we are in source cluster
sql """use @${clusterName1}"""

// Simple setup to simulate data load and access
sql """CREATE TABLE IF NOT EXISTS customer (id INT, name STRING) DUPLICATE KEY(id) DISTRIBUTED BY HASH(id) BUCKETS 3 PROPERTIES ("file_cache_ttl_seconds" = "3600")"""

// Start warm up job
def jobId_ = sql """
WARM UP CLUSTER ${clusterName2} WITH CLUSTER ${clusterName1}
PROPERTIES (
"sync_mode" = "event_driven",
"sync_event" = "load"
)
"""

def jobId = jobId_[0][0]
logger.info("Warm-up job ID: ${jobId}")
clearFileCacheOnAllBackends()

sleep(15000)

for (int i = 0; i < 100; i++) {
sql """INSERT INTO customer VALUES (1, 'A'), (2, 'B'), (3, 'C')"""
}
sleep(15000)
logWarmUpRowsetMetrics(clusterName2)
logFileCacheDownloadMetrics(clusterName2)
checkTTLCacheSizeSumEqual(clusterName1, clusterName2)

def jobInfo = sql """SHOW WARM UP JOB WHERE ID = ${jobId}"""
assertEquals(jobInfo[0][0], jobId)
assertEquals(jobInfo[0][1], clusterName1)
assertEquals(jobInfo[0][2], clusterName2)
assertEquals(jobInfo[0][4], "CLUSTER")
assertTrue(jobInfo[0][3] in ["RUNNING", "PENDING"],
"JobState is ${jobInfo[0][3]}, expected RUNNING or PENDING")
assertEquals(jobInfo[0][5], "EVENT_DRIVEN (LOAD)")

// Cancel job and confirm
sql """CANCEL WARM UP JOB WHERE ID = ${jobId}"""
def cancelInfo = sql """SHOW WARM UP JOB WHERE ID = ${jobId}"""
assertEquals(cancelInfo[0][3], "CANCELLED")

// Clean up
sql """DROP TABLE IF EXISTS customer"""
}
}
Loading