diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala index 2d21a08ca0b..41772f265c4 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala @@ -90,8 +90,7 @@ class CHListenerApi extends ListenerApi with Logging { // Add configs import org.apache.gluten.backendsapi.clickhouse.CHConfig._ conf.setCHConfig( - "timezone" -> conf.get("spark.sql.session.timeZone", TimeZone.getDefault.getID), - "local_engine.settings.log_processors_profiles" -> "true") + "timezone" -> conf.get("spark.sql.session.timeZone", TimeZone.getDefault.getID)) conf.setCHSettings("spark_version", SPARK_VERSION) if (!conf.contains(RuntimeSettings.ENABLE_MEMORY_SPILL_SCHEDULER.key)) { // Enable adaptive memory spill scheduler for native by default diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp index 55a53eeda2e..48b67c8337c 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp @@ -82,6 +82,7 @@ class ParquetInputFormat : public FormatFile::InputFormat const Block outputHeader; std::unique_ptr rowRangesProvider; std::optional row_index_reader; + const UInt64 max_block_size; public: ParquetInputFormat( @@ -89,7 +90,8 @@ class ParquetInputFormat : public FormatFile::InputFormat const InputFormatPtr & input_, std::unique_ptr provider, const Block & readHeader_, - const Block & outputHeader_) + const Block & outputHeader_, + const UInt64 max_block_size_) : InputFormat(std::move(read_buffer_), input_) , readHeader(readHeader_) , outputHeader(outputHeader_) @@ -98,6 +100,7 @@ class ParquetInputFormat : public FormatFile::InputFormat outputHeader.columns() > readHeader.columns() ? std::make_optional(*rowRangesProvider, getMetaColumnType(outputHeader)) : std::nullopt) + , max_block_size(max_block_size_) { } @@ -107,8 +110,7 @@ class ParquetInputFormat : public FormatFile::InputFormat { assert(outputHeader.columns()); assert(row_index_reader); - // TODO: rebase-25.12, format_settings_.parquet.max_block_size - Columns cols{row_index_reader->readBatch(8192)}; + Columns cols{row_index_reader->readBatch(max_block_size)}; size_t rows = cols[0]->size(); return Chunk(std::move(cols), rows); } @@ -178,6 +180,11 @@ ParquetFormatFile::createInputFormat(const Block & header, const std::shared_ptr metaBuilder.build(*in, *read_header, column_index_filter_.get(), should_include_row_group); } + // remote_read_min_bytes_for_seek and input_format_parquet_local_file_min_bytes_for_seek in settings + const size_t min_bytes_for_seek = read_buffer_builder->isRemote() ? context->getReadSettings().remote_read_min_bytes_for_seek : format_settings.parquet.local_read_min_bytes_for_seek; + // input_format_parquet_max_block_size + const UInt64 max_block_size = format_settings.parquet.max_block_size; + column_index_filter_.reset(); if (metaBuilder.readRowGroups.empty()) @@ -188,7 +195,7 @@ ParquetFormatFile::createInputFormat(const Block & header, const std::shared_ptr auto createVectorizedFormat = [&]() -> InputFormatPtr { auto input = std::make_shared(*read_buffer_, read_header, *provider, format_settings); - return std::make_shared(std::move(read_buffer_), input, std::move(provider), *read_header, header); + return std::make_shared(std::move(read_buffer_), input, std::move(provider), *read_header, header, max_block_size); }; auto createParquetBlockInputFormat = [&]() -> InputFormatPtr @@ -212,8 +219,8 @@ ParquetFormatFile::createInputFormat(const Block & header, const std::shared_ptr auto parser_group = std::make_shared(filter_actions_dag, context, nullptr, nullptr, nullptr); auto parser_shared_resources = std::make_shared(context->getSettingsRef(), /*num_streams_=*/1); - size_t min_bytes_for_seek = format_settings.parquet.local_read_min_bytes_for_seek; - // TODO: check whether support complex types + // TODO: rebase-25.12, support complex types when there is a nullable type + // for example: parquet type is Array, requested type is Nullable(Array(Nullable(String))) if (format_settings.parquet.use_native_reader_v3 && !readRowIndex && onlyFlatType) { LOG_TRACE( @@ -228,7 +235,7 @@ ParquetFormatFile::createInputFormat(const Block & header, const std::shared_ptr for (const auto & rg : metaBuilder.readRowGroups) row_group_ids.push_back(static_cast(rg.index)); input->setBucketsToRead(std::make_shared(row_group_ids)); - return std::make_shared(std::move(read_buffer_), input, std::move(provider), *read_header, header); + return std::make_shared(std::move(read_buffer_), input, std::move(provider), *read_header, header, max_block_size); } else { @@ -236,7 +243,7 @@ ParquetFormatFile::createInputFormat(const Block & header, const std::shared_ptr &Poco::Logger::get("ParquetFormatFile"), "Using native parquet reader"); auto input = std::make_shared(*read_buffer_, read_header, format_settings, parser_shared_resources, parser_group, min_bytes_for_seek); - return std::make_shared(std::move(read_buffer_), input, std::move(provider), *read_header, header); + return std::make_shared(std::move(read_buffer_), input, std::move(provider), *read_header, header, max_block_size); } };