Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ class CHListenerApi extends ListenerApi with Logging {
// Add configs
import org.apache.gluten.backendsapi.clickhouse.CHConfig._
conf.setCHConfig(
"timezone" -> conf.get("spark.sql.session.timeZone", TimeZone.getDefault.getID),
"local_engine.settings.log_processors_profiles" -> "true")
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value of the log_processors_profiles is true, don't need to set again.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better leave this here.

"timezone" -> conf.get("spark.sql.session.timeZone", TimeZone.getDefault.getID))
conf.setCHSettings("spark_version", SPARK_VERSION)
Comment on lines 90 to 94
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description/title is about passing correct values to ParquetInputFormat, but this hunk also removes the default CH setting local_engine.settings.log_processors_profiles = true. If this behavior change is intended, it should be called out in the PR description (or moved to a separate PR) since it changes runtime logging/profiling defaults for the ClickHouse backend.

Copilot uses AI. Check for mistakes.
if (!conf.contains(RuntimeSettings.ENABLE_MEMORY_SPILL_SCHEDULER.key)) {
// Enable adaptive memory spill scheduler for native by default
Expand Down
23 changes: 15 additions & 8 deletions cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,16 @@ class ParquetInputFormat : public FormatFile::InputFormat
const Block outputHeader;
std::unique_ptr<ColumnIndexRowRangesProvider> rowRangesProvider;
std::optional<VirtualColumnRowIndexReader> row_index_reader;
const UInt64 max_block_size;

public:
ParquetInputFormat(
std::unique_ptr<ReadBuffer> read_buffer_,
const InputFormatPtr & input_,
std::unique_ptr<ColumnIndexRowRangesProvider> provider,
const Block & readHeader_,
const Block & outputHeader_)
const Block & outputHeader_,
const UInt64 max_block_size_)
: InputFormat(std::move(read_buffer_), input_)
, readHeader(readHeader_)
, outputHeader(outputHeader_)
Expand All @@ -98,6 +100,7 @@ class ParquetInputFormat : public FormatFile::InputFormat
outputHeader.columns() > readHeader.columns()
? std::make_optional<VirtualColumnRowIndexReader>(*rowRangesProvider, getMetaColumnType(outputHeader))
: std::nullopt)
, max_block_size(max_block_size_)
{
}

Expand All @@ -107,8 +110,7 @@ class ParquetInputFormat : public FormatFile::InputFormat
{
assert(outputHeader.columns());
assert(row_index_reader);
// TODO: rebase-25.12, format_settings_.parquet.max_block_size
Columns cols{row_index_reader->readBatch(8192)};
Columns cols{row_index_reader->readBatch(max_block_size)};
size_t rows = cols[0]->size();
return Chunk(std::move(cols), rows);
Comment on lines 111 to 115
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change alters the batch size used when only meta columns are requested (readHeader.columns()==0) to depend on format_settings.parquet.max_block_size. There doesn’t appear to be a unit/integration test covering that row-index-only path with a non-default max_block_size; adding one would help prevent regressions (e.g., ensuring chunk sizes follow the setting rather than a hardcoded constant).

Copilot uses AI. Check for mistakes.
}
Expand Down Expand Up @@ -178,6 +180,11 @@ ParquetFormatFile::createInputFormat(const Block & header, const std::shared_ptr
metaBuilder.build(*in, *read_header, column_index_filter_.get(), should_include_row_group);
}

// remote_read_min_bytes_for_seek and input_format_parquet_local_file_min_bytes_for_seek in settings
const size_t min_bytes_for_seek = read_buffer_builder->isRemote() ? context->getReadSettings().remote_read_min_bytes_for_seek : format_settings.parquet.local_read_min_bytes_for_seek;
// input_format_parquet_max_block_size
const UInt64 max_block_size = format_settings.parquet.max_block_size;

column_index_filter_.reset();

if (metaBuilder.readRowGroups.empty())
Expand All @@ -188,7 +195,7 @@ ParquetFormatFile::createInputFormat(const Block & header, const std::shared_ptr
auto createVectorizedFormat = [&]() -> InputFormatPtr
{
auto input = std::make_shared<VectorizedParquetBlockInputFormat>(*read_buffer_, read_header, *provider, format_settings);
return std::make_shared<ParquetInputFormat>(std::move(read_buffer_), input, std::move(provider), *read_header, header);
return std::make_shared<ParquetInputFormat>(std::move(read_buffer_), input, std::move(provider), *read_header, header, max_block_size);
};

auto createParquetBlockInputFormat = [&]() -> InputFormatPtr
Expand All @@ -212,8 +219,8 @@ ParquetFormatFile::createInputFormat(const Block & header, const std::shared_ptr
auto parser_group = std::make_shared<FormatFilterInfo>(filter_actions_dag, context, nullptr, nullptr, nullptr);
auto parser_shared_resources = std::make_shared<FormatParserSharedResources>(context->getSettingsRef(), /*num_streams_=*/1);

size_t min_bytes_for_seek = format_settings.parquet.local_read_min_bytes_for_seek;
// TODO: check whether support complex types
// TODO: rebase-25.12, support complex types when there is a nullable type
// for example: parquet type is Array, requested type is Nullable(Array(Nullable(String)))
if (format_settings.parquet.use_native_reader_v3 && !readRowIndex && onlyFlatType)
{
LOG_TRACE(
Expand All @@ -228,15 +235,15 @@ ParquetFormatFile::createInputFormat(const Block & header, const std::shared_ptr
for (const auto & rg : metaBuilder.readRowGroups)
row_group_ids.push_back(static_cast<size_t>(rg.index));
input->setBucketsToRead(std::make_shared<ParquetFileBucketInfo>(row_group_ids));
return std::make_shared<ParquetInputFormat>(std::move(read_buffer_), input, std::move(provider), *read_header, header);
return std::make_shared<ParquetInputFormat>(std::move(read_buffer_), input, std::move(provider), *read_header, header, max_block_size);
}
else
{
LOG_TRACE(
&Poco::Logger::get("ParquetFormatFile"),
"Using native parquet reader");
auto input = std::make_shared<ParquetBlockInputFormat>(*read_buffer_, read_header, format_settings, parser_shared_resources, parser_group, min_bytes_for_seek);
return std::make_shared<ParquetInputFormat>(std::move(read_buffer_), input, std::move(provider), *read_header, header);
return std::make_shared<ParquetInputFormat>(std::move(read_buffer_), input, std::move(provider), *read_header, header, max_block_size);
}
};

Expand Down
Loading