diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index dbc833f046a..309413e71c6 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -99,6 +99,12 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) { def hashProbeDynamicFilterPushdownEnabled: Boolean = getConf(HASH_PROBE_DYNAMIC_FILTER_PUSHDOWN_ENABLED) + def parallelExecutionEnabled: Boolean = + getConf(PARALLEL_EXECUTION_ENABLED) + + def parallelExecutionThreadPoolSize: Option[Int] = + getConf(PARALLEL_EXECUTION_THREAD_POOL_SIZE) + def valueStreamDynamicFilterEnabled: Boolean = getConf(VALUE_STREAM_DYNAMIC_FILTER_ENABLED) @@ -474,6 +480,23 @@ object VeloxConfig extends ConfigRegistry { .booleanConf .createWithDefault(true) + val PARALLEL_EXECUTION_ENABLED = + buildStaticConf("spark.gluten.sql.columnar.backend.velox.parallelExecution.enabled") + .doc( + "Whether to enable parallel execution of Velox task drivers for whole-stage execution. " + + "Default is false (serial execution).") + .booleanConf + .createWithDefault(false) + + val PARALLEL_EXECUTION_THREAD_POOL_SIZE = + buildStaticConf("spark.gluten.sql.columnar.backend.velox.parallelExecution.threadPoolSize") + .doc( + "Size of the thread pool used for parallel execution of Velox task drivers. " + + "If not set, defaults to 2 * spark.gluten.numTaskSlotsPerExecutor.") + .intConf + .checkValue(_ > 0, "must be a positive number") + .createOptional + val VALUE_STREAM_DYNAMIC_FILTER_ENABLED = buildConf("spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled") .doc( diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala index f2c9214b069..c2060af7ea6 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala @@ -274,7 +274,7 @@ object MetricsUtil extends Logging { } ju.updateJoinMetrics(operatorMetrics, metrics.getSingleMetrics, joinParams) case u: UnionMetricsUpdater => - // JoinRel outputs two suites of metrics respectively for hash build and hash probe. + // Union outputs two suites of metrics respectively. // Therefore, fetch one more suite of metrics here. operatorMetrics.add(metrics.getOperatorMetrics(curMetricsIdx)) curMetricsIdx -= 1 diff --git a/backends-velox/src/test/java/org/apache/gluten/test/MockVeloxBackend.java b/backends-velox/src/test/java/org/apache/gluten/test/MockVeloxBackend.java index 2c4b813f30c..a02bfa46e08 100644 --- a/backends-velox/src/test/java/org/apache/gluten/test/MockVeloxBackend.java +++ b/backends-velox/src/test/java/org/apache/gluten/test/MockVeloxBackend.java @@ -17,7 +17,8 @@ package org.apache.gluten.test; import org.apache.gluten.config.GlutenConfig; -import org.apache.gluten.config.VeloxConfig$; +import org.apache.gluten.config.GlutenCoreConfig; +import org.apache.gluten.config.VeloxConfig; import com.codahale.metrics.MetricRegistry; import org.apache.spark.SparkConf; @@ -72,7 +73,8 @@ public Object ask(Object message) throws Exception { private static SparkConf newSparkConf() { final SparkConf conf = new SparkConf(); conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY(), "1g"); - conf.set(VeloxConfig$.MODULE$.COLUMNAR_VELOX_CONNECTOR_IO_THREADS().key(), "0"); + conf.set(GlutenCoreConfig.NUM_TASK_SLOTS_PER_EXECUTOR().key(), "1"); + conf.set(VeloxConfig.COLUMNAR_VELOX_CONNECTOR_IO_THREADS().key(), "0"); return conf; } } diff --git a/cpp/core/CMakeLists.txt b/cpp/core/CMakeLists.txt index e16464b8111..9530d603f5b 100644 --- a/cpp/core/CMakeLists.txt +++ b/cpp/core/CMakeLists.txt @@ -128,6 +128,8 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS memory/MemoryManager.cc memory/ArrowMemoryPool.cc memory/ColumnarBatch.cc + threads/ThreadInitializer.cc + threads/ThreadManager.cc shuffle/Dictionary.cc shuffle/FallbackRangePartitioner.cc shuffle/HashPartitioner.cc diff --git a/cpp/core/compute/Runtime.cc b/cpp/core/compute/Runtime.cc index f5b4ed0f334..36502d206ca 100644 --- a/cpp/core/compute/Runtime.cc +++ b/cpp/core/compute/Runtime.cc @@ -40,9 +40,10 @@ void Runtime::registerFactory(const std::string& kind, Runtime::Factory factory, Runtime* Runtime::create( const std::string& kind, MemoryManager* memoryManager, + ThreadManager* threadManager, const std::unordered_map& sessionConf) { auto& factory = runtimeFactories().get(kind); - return factory(kind, std::move(memoryManager), sessionConf); + return factory(kind, std::move(memoryManager), std::move(threadManager), sessionConf); } void Runtime::release(Runtime* runtime) { diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h index 9d8315731fc..4acd9be74ce 100644 --- a/cpp/core/compute/Runtime.h +++ b/cpp/core/compute/Runtime.h @@ -30,6 +30,7 @@ #include "shuffle/ShuffleReader.h" #include "shuffle/ShuffleWriter.h" #include "substrait/plan.pb.h" +#include "threads/ThreadManager.h" #include "utils/ObjectStore.h" #include "utils/WholeStageDumper.h" @@ -61,12 +62,14 @@ class Runtime : public std::enable_shared_from_this { using Factory = std::function& sessionConf)>; using Releaser = std::function; static void registerFactory(const std::string& kind, Factory factory, Releaser releaser); static Runtime* create( const std::string& kind, MemoryManager* memoryManager, + ThreadManager* threadManager, const std::unordered_map& sessionConf = {}); static void release(Runtime*); static std::optional* localWriteFilesTempPath(); @@ -75,8 +78,9 @@ class Runtime : public std::enable_shared_from_this { Runtime( const std::string& kind, MemoryManager* memoryManager, + ThreadManager* threadManager, const std::unordered_map& confMap) - : kind_(kind), memoryManager_(memoryManager), confMap_(confMap) {} + : kind_(kind), memoryManager_(memoryManager), threadManager_(threadManager), confMap_(confMap) {} virtual ~Runtime() = default; @@ -126,6 +130,10 @@ class Runtime : public std::enable_shared_from_this { return memoryManager_; }; + virtual ThreadManager* threadManager() { + return threadManager_; + }; + /// This function is used to create certain converter from the format used by /// the backend to Spark unsafe row. virtual std::shared_ptr createColumnar2RowConverter(int64_t column2RowMemThreshold) { @@ -184,6 +192,7 @@ class Runtime : public std::enable_shared_from_this { protected: std::string kind_; MemoryManager* memoryManager_; + ThreadManager* threadManager_; std::unique_ptr objStore_ = ObjectStore::create(); std::unordered_map confMap_; // Session conf map diff --git a/cpp/core/jni/JniCommon.cc b/cpp/core/jni/JniCommon.cc index 3c17e1085bb..76a07aa2d88 100644 --- a/cpp/core/jni/JniCommon.cc +++ b/cpp/core/jni/JniCommon.cc @@ -16,6 +16,7 @@ */ #include "JniCommon.h" +#include void gluten::JniCommonState::ensureInitialized(JNIEnv* env) { std::lock_guard lockGuard(mtx_); @@ -120,7 +121,6 @@ std::shared_ptr gluten::JniColumnarBatchIterator::next() std::shared_ptr gluten::JniColumnarBatchIterator::nextInternal() const { JNIEnv* env = nullptr; attachCurrentThreadAsDaemonOrThrow(vm_, &env); - if (!env->CallBooleanMethod(jColumnarBatchItr_, serializedColumnarBatchIteratorHasNext_)) { checkException(env); return nullptr; // stream ended diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h index a4edd2c57e8..ef8ea6d8721 100644 --- a/cpp/core/jni/JniCommon.h +++ b/cpp/core/jni/JniCommon.h @@ -26,6 +26,7 @@ #include "compute/Runtime.h" #include "memory/AllocationListener.h" #include "shuffle/rss/RssClient.h" +#include "threads/ThreadInitializer.h" #include "utils/Compression.h" #include "utils/Exception.h" #include "utils/ResourceMap.h" @@ -549,3 +550,73 @@ class JavaRssClient : public RssClient { jmethodID javaPushPartitionData_; jbyteArray array_; }; + +class SparkThreadInitializer final : public gluten::ThreadInitializer { + public: + SparkThreadInitializer(JavaVM* vm, jobject jInitializerLocalRef) : vm_(vm) { + JNIEnv* env; + attachCurrentThreadAsDaemonOrThrow(vm_, &env); + jInitializerGlobalRef_ = env->NewGlobalRef(jInitializerLocalRef); + GLUTEN_CHECK(jInitializerGlobalRef_ != nullptr, "Failed to create global reference for native thread initializer."); + (void)initializeMethod(env); + } + + SparkThreadInitializer(const SparkThreadInitializer&) = delete; + SparkThreadInitializer(SparkThreadInitializer&&) = delete; + SparkThreadInitializer& operator=(const SparkThreadInitializer&) = delete; + SparkThreadInitializer& operator=(SparkThreadInitializer&&) = delete; + + ~SparkThreadInitializer() override { + JNIEnv* env; + if (vm_->GetEnv(reinterpret_cast(&env), jniVersion) != JNI_OK) { + LOG(WARNING) << "SparkThreadInitializer#~SparkThreadInitializer(): " + << "JNIEnv was not attached to current thread"; + return; + } + env->DeleteGlobalRef(jInitializerGlobalRef_); + } + + void initialize(const std::string& threadName) override { + JNIEnv* env; + attachCurrentThreadAsDaemonOrThrow(vm_, &env); + jstring jThreadName = env->NewStringUTF(threadName.c_str()); + env->CallVoidMethod(jInitializerGlobalRef_, initializeMethod(env), jThreadName); + env->DeleteLocalRef(jThreadName); + checkException(env); + } + + void destroy(const std::string& threadName) override { + // IMPORTANT: Do not call vm_.DetachCurrentThread here, otherwise Java side thread + // object might be dereferenced and garbage-collected, to break the reuse of thread + // resources. + JNIEnv* env; + attachCurrentThreadAsDaemonOrThrow(vm_, &env); + jstring jThreadName = env->NewStringUTF(threadName.c_str()); + env->CallVoidMethod(jInitializerGlobalRef_, destroyMethod(env), jThreadName); + env->DeleteLocalRef(jThreadName); + checkException(env); + } + + private: + jmethodID initializeMethod(JNIEnv* env) { + static jmethodID initializeMethod = + getMethodIdOrError(env, nativeThreadInitializerClass(env), "initialize", "(Ljava/lang/String;)V"); + return initializeMethod; + } + + jmethodID destroyMethod(JNIEnv* env) { + static jmethodID destroyMethod = + getMethodIdOrError(env, nativeThreadInitializerClass(env), "destroy", "(Ljava/lang/String;)V"); + return destroyMethod; + } + + jclass nativeThreadInitializerClass(JNIEnv* env) { + static jclass javaInitializerClass = + createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/threads/NativeThreadInitializer;"); + return javaInitializerClass; + } + + private: + JavaVM* vm_; + jobject jInitializerGlobalRef_; +}; diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 9e194be6ea8..da28e1141c4 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -181,8 +181,9 @@ class InternalRuntime : public Runtime { InternalRuntime( const std::string& kind, MemoryManager* memoryManager, + ThreadManager* threadManager, const std::unordered_map& confMap) - : Runtime(kind, memoryManager, confMap) {} + : Runtime(kind, memoryManager, threadManager, confMap) {} }; MemoryManager* internalMemoryManagerFactory(const std::string& kind, std::unique_ptr listener) { @@ -193,11 +194,33 @@ void internalMemoryManagerReleaser(MemoryManager* memoryManager) { delete memoryManager; } +class InternalThreadManager : public ThreadManager { + public: + InternalThreadManager(const std::string& kind, std::unique_ptr initializer) + : ThreadManager(kind), initializer_(std::shared_ptr(std::move(initializer))) {} + + ThreadInitializer* getThreadInitializer() override { + return initializer_.get(); + } + + private: + std::shared_ptr initializer_; +}; + +ThreadManager* internalThreadManagerFactory(const std::string& kind, std::unique_ptr initializer) { + return new InternalThreadManager(kind, std::move(initializer)); +} + +void internalThreadManagerReleaser(ThreadManager* threadManager) { + delete threadManager; +} + Runtime* internalRuntimeFactory( const std::string& kind, MemoryManager* memoryManager, + ThreadManager* threadManager, const std::unordered_map& sessionConf) { - return new InternalRuntime(kind, memoryManager, sessionConf); + return new InternalRuntime(kind, memoryManager, threadManager, sessionConf); } void internalRuntimeReleaser(Runtime* runtime) { @@ -252,6 +275,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { getJniErrorState()->ensureInitialized(env); MemoryManager::registerFactory(kInternalBackendKind, internalMemoryManagerFactory, internalMemoryManagerReleaser); + ThreadManager::registerFactory(kInternalBackendKind, internalThreadManagerFactory, internalThreadManagerReleaser); Runtime::registerFactory(kInternalBackendKind, internalRuntimeFactory, internalRuntimeReleaser); byteArrayClass = createGlobalClassReferenceOrError(env, "[B"); @@ -319,14 +343,16 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_runtime_RuntimeJniWrapper_createR jclass, jstring jBackendType, jlong nmmHandle, + jlong ntmHandle, jbyteArray sessionConf) { JNI_METHOD_START MemoryManager* memoryManager = jniCastOrThrow(nmmHandle); + ThreadManager* threadManager = jniCastOrThrow(ntmHandle); auto safeArray = getByteArrayElementsSafe(env, sessionConf); auto sparkConf = parseConfMap(env, safeArray.elems(), safeArray.length()); auto backendType = jStringToCString(env, jBackendType); - auto runtime = Runtime::create(backendType, memoryManager, sparkConf); + auto runtime = Runtime::create(backendType, memoryManager, threadManager, sparkConf); return reinterpret_cast(runtime); JNI_METHOD_END(kInvalidObjectHandle) } @@ -370,6 +396,33 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_memory_NativeMemoryManagerJniWrap JNI_METHOD_END(-1L) } +JNIEXPORT jlong JNICALL Java_org_apache_gluten_threads_NativeThreadManagerJniWrapper_create( // NOLINT + JNIEnv* env, + jclass, + jstring jBackendType, + jobject jInitializer) { + JNI_METHOD_START + JavaVM* vm; + if (env->GetJavaVM(&vm) != JNI_OK) { + throw GlutenException("Unable to get JavaVM instance"); + } + auto backendType = jStringToCString(env, jBackendType); + std::unique_ptr initializer = std::make_unique(vm, jInitializer); + ThreadManager* tm = ThreadManager::create(backendType, std::move(initializer)); + return reinterpret_cast(tm); + JNI_METHOD_END(-1L) +} + +JNIEXPORT void JNICALL Java_org_apache_gluten_threads_NativeThreadManagerJniWrapper_release( // NOLINT + JNIEnv* env, + jclass, + jlong ntmHandle) { + JNI_METHOD_START + auto* threadManager = jniCastOrThrow(ntmHandle); + ThreadManager::release(threadManager); + JNI_METHOD_END() +} + JNIEXPORT jbyteArray JNICALL Java_org_apache_gluten_memory_NativeMemoryManagerJniWrapper_collectUsage( // NOLINT JNIEnv* env, jclass, diff --git a/cpp/core/threads/ThreadInitializer.cc b/cpp/core/threads/ThreadInitializer.cc new file mode 100644 index 00000000000..b040343f412 --- /dev/null +++ b/cpp/core/threads/ThreadInitializer.cc @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ThreadInitializer.h" + +namespace gluten { +namespace { + +class NoopThreadInitializer final : public ThreadInitializer { + public: + void initialize(const std::string& threadName) override {} + void destroy(const std::string& threadName) override{}; +}; + +} // namespace + +std::unique_ptr ThreadInitializer::noop() { + return std::make_unique(); +} + +} // namespace gluten diff --git a/cpp/core/threads/ThreadInitializer.h b/cpp/core/threads/ThreadInitializer.h new file mode 100644 index 00000000000..fa0271141bf --- /dev/null +++ b/cpp/core/threads/ThreadInitializer.h @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace gluten { + +class ThreadInitializer { + public: + static std::unique_ptr noop(); + + virtual ~ThreadInitializer() = default; + + virtual void initialize(const std::string& threadName) = 0; + + virtual void destroy(const std::string& threadName) = 0; + + protected: + ThreadInitializer() = default; +}; + +} // namespace gluten diff --git a/cpp/core/threads/ThreadManager.cc b/cpp/core/threads/ThreadManager.cc new file mode 100644 index 00000000000..20a377b9a03 --- /dev/null +++ b/cpp/core/threads/ThreadManager.cc @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ThreadManager.h" + +#include "utils/Registry.h" + +namespace gluten { +namespace { + +Registry& threadManagerFactories() { + static Registry registry; + return registry; +} + +Registry& threadManagerReleasers() { + static Registry registry; + return registry; +} + +} // namespace + +void ThreadManager::registerFactory(const std::string& kind, Factory factory, Releaser releaser) { + threadManagerFactories().registerObj(kind, std::move(factory)); + threadManagerReleasers().registerObj(kind, std::move(releaser)); +} + +ThreadManager* ThreadManager::create(const std::string& kind, std::unique_ptr initializer) { + auto& factory = threadManagerFactories().get(kind); + return factory(kind, std::move(initializer)); +} + +void ThreadManager::release(ThreadManager* threadManager) { + const std::string kind = threadManager->kind(); + auto& releaser = threadManagerReleasers().get(kind); + releaser(threadManager); +} + +} // namespace gluten diff --git a/cpp/core/threads/ThreadManager.h b/cpp/core/threads/ThreadManager.h new file mode 100644 index 00000000000..44c32a75270 --- /dev/null +++ b/cpp/core/threads/ThreadManager.h @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "threads/ThreadInitializer.h" + +namespace gluten { + +class ThreadManager { + public: + using Factory = + std::function initializer)>; + using Releaser = std::function; + + static void registerFactory(const std::string& kind, Factory factory, Releaser releaser); + static ThreadManager* create(const std::string& kind, std::unique_ptr initializer); + static void release(ThreadManager* threadManager); + + explicit ThreadManager(const std::string& kind) : kind_(kind) {} + + virtual ~ThreadManager() = default; + + virtual std::string kind() { + return kind_; + } + + virtual ThreadInitializer* getThreadInitializer() = 0; + + private: + std::string kind_; +}; + +} // namespace gluten diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc b/cpp/velox/benchmarks/GenericBenchmark.cc index 616ba9bcfbd..1e00eadd848 100644 --- a/cpp/velox/benchmarks/GenericBenchmark.cc +++ b/cpp/velox/benchmarks/GenericBenchmark.cc @@ -38,6 +38,7 @@ #include "tests/utils/LocalRssClient.h" #include "tests/utils/TestAllocationListener.h" #include "tests/utils/TestStreamReader.h" +#include "threads/ThreadInitializer.h" #include "utils/Exception.h" #include "utils/StringUtil.h" #include "utils/Timer.h" @@ -382,7 +383,7 @@ void setQueryTraceConfig(std::unordered_map& configs) } } // namespace -using RuntimeFactory = std::function; +using RuntimeFactory = std::function; auto BM_Generic = [](::benchmark::State& state, const std::string& planFile, @@ -398,7 +399,8 @@ auto BM_Generic = [](::benchmark::State& state, auto* listenerPtr = listener.get(); auto* memoryManager = MemoryManager::create(kVeloxBackendKind, std::move(listener)); - auto runtime = runtimeFactory(memoryManager); + auto* threadManager = ThreadManager::create(kVeloxBackendKind, ThreadInitializer::noop()); + auto runtime = runtimeFactory(memoryManager, threadManager); auto plan = getPlanFromFile("Plan", planFile); std::vector splits{}; @@ -507,6 +509,7 @@ auto BM_Generic = [](::benchmark::State& state, updateBenchmarkMetrics(state, elapsedTime, readInputTime, writerMetrics, readerMetrics); Runtime::release(runtime); + ThreadManager::release(threadManager); MemoryManager::release(memoryManager); }; @@ -522,7 +525,8 @@ auto BM_ShuffleWriteRead = [](::benchmark::State& state, auto* listenerPtr = listener.get(); auto* memoryManager = MemoryManager::create(kVeloxBackendKind, std::move(listener)); - auto runtime = runtimeFactory(memoryManager); + auto* threadManager = ThreadManager::create(kVeloxBackendKind, ThreadInitializer::noop()); + auto runtime = runtimeFactory(memoryManager, threadManager); const size_t dirIndex = std::hash{}(std::this_thread::get_id()) % localDirs.size(); const auto dataFileDir = @@ -554,6 +558,7 @@ auto BM_ShuffleWriteRead = [](::benchmark::State& state, updateBenchmarkMetrics(state, elapsedTime, readInputTime, writerMetrics, readerMetrics); Runtime::release(runtime); + ThreadManager::release(threadManager); MemoryManager::release(memoryManager); }; @@ -709,8 +714,8 @@ int main(int argc, char** argv) { } } - RuntimeFactory runtimeFactory = [=](MemoryManager* memoryManager) { - return dynamic_cast(Runtime::create(kVeloxBackendKind, memoryManager, sessionConf)); + RuntimeFactory runtimeFactory = [=](MemoryManager* memoryManager, ThreadManager* threadManager) { + return dynamic_cast(Runtime::create(kVeloxBackendKind, memoryManager, threadManager, sessionConf)); }; const auto localDirs = createLocalDirs(); diff --git a/cpp/velox/benchmarks/ParquetWriteBenchmark.cc b/cpp/velox/benchmarks/ParquetWriteBenchmark.cc index c66f6a3a4f0..a534e1f6d43 100644 --- a/cpp/velox/benchmarks/ParquetWriteBenchmark.cc +++ b/cpp/velox/benchmarks/ParquetWriteBenchmark.cc @@ -23,6 +23,7 @@ #include "memory/VeloxMemoryManager.h" #include "operators/reader/ParquetReaderIterator.h" #include "operators/writer/VeloxParquetDataSource.h" +#include "threads/ThreadInitializer.h" #include "utils/VeloxArrowUtils.h" namespace gluten { @@ -52,7 +53,8 @@ class GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark { // reuse the ParquetWriteConverter for batches caused system % increase a lot auto memoryManager = getDefaultMemoryManager(); - auto runtime = Runtime::create(kVeloxBackendKind, memoryManager); + auto* threadManager = ThreadManager::create(kVeloxBackendKind, ThreadInitializer::noop()); + auto runtime = Runtime::create(kVeloxBackendKind, memoryManager, threadManager); auto veloxPool = memoryManager->getAggregateMemoryPool(); for (auto _ : state) { @@ -98,6 +100,7 @@ class GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark { state.counters["write_time"] = benchmark::Counter(writeTime, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); Runtime::release(runtime); + ThreadManager::release(threadManager); } private: diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index 85a8622508a..3440aa37518 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -84,15 +84,37 @@ void veloxMemoryManagerReleaser(MemoryManager* memoryManager) { Runtime* veloxRuntimeFactory( const std::string& kind, MemoryManager* memoryManager, + ThreadManager* threadManager, const std::unordered_map& sessionConf) { auto* vmm = dynamic_cast(memoryManager); GLUTEN_CHECK(vmm != nullptr, "Not a Velox memory manager"); - return new VeloxRuntime(kind, vmm, sessionConf); + return new VeloxRuntime(kind, vmm, threadManager, sessionConf); } void veloxRuntimeReleaser(Runtime* runtime) { delete runtime; } + +class VeloxThreadManager : public ThreadManager { + public: + VeloxThreadManager(const std::string& kind, std::unique_ptr initializer) + : ThreadManager(kind), initializer_(std::shared_ptr(std::move(initializer))) {} + + ThreadInitializer* getThreadInitializer() override { + return initializer_.get(); + } + + private: + std::shared_ptr initializer_; +}; + +ThreadManager* veloxThreadManagerFactory(const std::string& kind, std::unique_ptr initializer) { + return new VeloxThreadManager(kind, std::move(initializer)); +} + +void veloxThreadManagerReleaser(ThreadManager* threadManager) { + delete threadManager; +} } // namespace void VeloxBackend::init( @@ -119,6 +141,7 @@ void VeloxBackend::init( // Register factories. MemoryManager::registerFactory(kVeloxBackendKind, veloxMemoryManagerFactory, veloxMemoryManagerReleaser); + ThreadManager::registerFactory(kVeloxBackendKind, veloxThreadManagerFactory, veloxThreadManagerReleaser); Runtime::registerFactory(kVeloxBackendKind, veloxRuntimeFactory, veloxRuntimeReleaser); if (backendConf_->get(kDebugModeEnabled, false)) { @@ -183,17 +206,42 @@ void VeloxBackend::init( } #endif + const int32_t numTaskSlotsPerExecutor = [&]() { + if (!backendConf_->valueExists(kNumTaskSlotsPerExecutor)) { + LOG(WARNING) << kNumTaskSlotsPerExecutor << " is not set. Falling back to 1."; + return 1; + } + return backendConf_->get(kNumTaskSlotsPerExecutor).value(); + }(); + GLUTEN_CHECK( + numTaskSlotsPerExecutor >= 0, + kNumTaskSlotsPerExecutor + " was set to negative number " + std::to_string(numTaskSlotsPerExecutor) + + ", this should not happen."); + + const bool parallelExecutionEnabled = + backendConf_->get(kParallelExecutionEnabled, kParallelExecutionEnabledDefault); + if (parallelExecutionEnabled) { + // Default: 2 * task slots. + const int32_t threadPoolSize = + backendConf_->get(kParallelExecutionThreadPoolSize, 2 * numTaskSlotsPerExecutor); + if (threadPoolSize > 0) { + executor_ = std::make_unique(threadPoolSize); + LOG(INFO) << "Initialized CPUThreadPoolExecutor for parallel execution with thread num: " << threadPoolSize + << " (numTaskSlotsPerExecutor: " << numTaskSlotsPerExecutor << ")"; + } + } + const auto spillThreadNum = backendConf_->get(kSpillThreadNum, kSpillThreadNumDefaultValue); if (spillThreadNum > 0) { spillExecutor_ = std::make_unique(spillThreadNum); } - auto ioThreads = backendConf_->get(kVeloxIOThreads, kVeloxIOThreadsDefault); + + const auto ioThreads = backendConf_->get(kVeloxIOThreads, numTaskSlotsPerExecutor); GLUTEN_CHECK( ioThreads >= 0, kVeloxIOThreads + " was set to negative number " + std::to_string(ioThreads) + ", this should not happen."); if (ioThreads > 0) { - ioExecutor_ = std::make_unique( - ioThreads, std::make_unique>()); + ioExecutor_ = std::make_unique(ioThreads); } initJolFilesystem(); diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index 1e2cc6f3082..2597e455ea4 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -73,9 +73,15 @@ namespace { class HookedExecutor final : public folly::Executor { public: - HookedExecutor(folly::Executor* parent, std::string name, bool debug, std::chrono::milliseconds joinTimeout) + HookedExecutor( + folly::Executor* parent, + std::string name, + ThreadInitializer* initializer, + bool debug, + std::chrono::milliseconds joinTimeout) : parent_(parent), name_(std::move(name)), + initializer_(initializer), debug_(debug), joinTimeout_(joinTimeout), state_(std::make_shared()) {} @@ -154,8 +160,10 @@ class HookedExecutor final : public folly::Executor { } folly::Func wrap(folly::Func func, int8_t priority) { + auto initializer = initializer_; auto state = state_; const auto taskId = state->nextTaskId.fetch_add(1, std::memory_order_relaxed); + const auto taskName = name_ + std::to_string(taskId); if (debug_) { TaskInfo info{ .enqueueTime = std::chrono::steady_clock::now(), @@ -165,7 +173,7 @@ class HookedExecutor final : public folly::Executor { state->inFlightTasks[taskId] = std::move(info); } const auto debug = debug_; - return [func = std::move(func), state, debug, taskId]() mutable { + return [func = std::move(func), initializer, state, debug, taskId, taskName]() mutable { auto markDone = folly::makeGuard([&] { if (debug) { std::lock_guard lock(state->taskMutex); @@ -176,6 +184,9 @@ class HookedExecutor final : public folly::Executor { state->cv.notify_all(); } }); + GLUTEN_CHECK(initializer != nullptr, "ThreadInitializer is null."); + initializer->initialize(taskName); + auto invokeDestroyer = folly::makeGuard([&] { initializer->destroy(taskName); }); // Destroy the submitted callable and all of its captures before // decrementing inFlight_. Some async tasks capture AsyncLoadHolder, // which keeps a MemoryPool alive until the callable itself is @@ -190,6 +201,7 @@ class HookedExecutor final : public folly::Executor { folly::Executor* parent_; std::string name_; + ThreadInitializer* initializer_; bool debug_; std::chrono::milliseconds joinTimeout_; std::shared_ptr state_; @@ -198,12 +210,13 @@ class HookedExecutor final : public folly::Executor { std::unique_ptr makeHookedExecutor( folly::Executor* parent, const std::string& name, + ThreadInitializer* initializer, bool debug, std::chrono::milliseconds joinTimeout) { if (parent == nullptr) { return nullptr; } - return std::make_unique(parent, name, debug, joinTimeout); + return std::make_unique(parent, name, initializer, debug, joinTimeout); } std::string makeScopedConnectorId(const std::string& base, uint64_t runtimeId) { @@ -222,8 +235,9 @@ VeloxConnectorIds makeScopedConnectorIds(uint64_t runtimeId) { VeloxRuntime::VeloxRuntime( const std::string& kind, VeloxMemoryManager* vmm, + ThreadManager* threadManager, const std::unordered_map& confMap) - : Runtime(kind, vmm, confMap) { + : Runtime(kind, vmm, threadManager, confMap) { // Refresh session config. veloxCfg_ = std::make_shared(std::unordered_map(confMap_)); @@ -256,10 +270,13 @@ void VeloxRuntime::initializeExecutors() { const auto timeoutMs = veloxCfg_->get(kVeloxAsyncTimeoutOnTaskStopping, kVeloxAsyncTimeoutOnTaskStoppingDefault); const auto timeout = std::chrono::milliseconds(timeoutMs); - executor_ = makeHookedExecutor(VeloxBackend::get()->executor(), kind_ + ".executor", debugModeEnabled_, timeout); - spillExecutor_ = - makeHookedExecutor(VeloxBackend::get()->spillExecutor(), kind_ + ".spill", debugModeEnabled_, timeout); - ioExecutor_ = makeHookedExecutor(VeloxBackend::get()->ioExecutor(), kind_ + ".io", debugModeEnabled_, timeout); + ThreadInitializer* const initializer = threadManager_->getThreadInitializer(); + executor_ = + makeHookedExecutor(VeloxBackend::get()->executor(), kind_ + ".executor", initializer, debugModeEnabled_, timeout); + spillExecutor_ = makeHookedExecutor( + VeloxBackend::get()->spillExecutor(), kind_ + ".spill", initializer, debugModeEnabled_, timeout); + ioExecutor_ = + makeHookedExecutor(VeloxBackend::get()->ioExecutor(), kind_ + ".io", initializer, debugModeEnabled_, timeout); } void VeloxRuntime::registerConnectors() { diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h index 37f4da33439..c6ee1c462c6 100644 --- a/cpp/velox/compute/VeloxRuntime.h +++ b/cpp/velox/compute/VeloxRuntime.h @@ -42,6 +42,7 @@ class VeloxRuntime final : public Runtime { explicit VeloxRuntime( const std::string& kind, VeloxMemoryManager* vmm, + ThreadManager* threadManager, const std::unordered_map& confMap); ~VeloxRuntime() override; diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 2c1effba20c..882722088d4 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -15,6 +15,9 @@ * limitations under the License. */ #include "WholeStageResultIterator.h" +#include +#include +#include #include "VeloxBackend.h" #include "VeloxPlanConverter.h" #include "VeloxRuntime.h" @@ -100,28 +103,27 @@ WholeStageResultIterator::WholeStageResultIterator( auto fileSystem = velox::filesystems::getFileSystem(spillDir, nullptr); GLUTEN_CHECK(fileSystem != nullptr, "File System for spilling is null!"); fileSystem->mkdir(spillDir); - velox::common::SpillDiskOptions spillOpts{ - .spillDirPath = spillDir, .spillDirCreated = true, .spillDirCreateCb = nullptr}; - // Create task instance. std::unordered_set emptySet; - velox::core::PlanFragment planFragment{planNode, velox::core::ExecutionStrategy::kUngrouped, 1, emptySet}; - std::shared_ptr queryCtx = createNewVeloxQueryCtx(); - task_ = velox::exec::Task::create( - fmt::format( - "Gluten_Stage_{}_TID_{}_VTID_{}", - std::to_string(taskInfo_.stageId), - std::to_string(taskInfo_.taskId), - std::to_string(taskInfo.vId)), - std::move(planFragment), - 0, - std::move(queryCtx), - velox::exec::Task::ExecutionMode::kSerial, - /*consumer=*/velox::exec::Consumer{}, - /*memoryArbitrationPriority=*/0, - /*spillDiskOpts=*/spillOpts, - /*onError=*/nullptr); - if (!task_->supportSerialExecutionMode()) { + const bool parallelExecutionEnabled = + veloxCfg_->get(kParallelExecutionEnabled, kParallelExecutionEnabledDefault); + const bool serialExecution = !parallelExecutionEnabled; + + facebook::velox::exec::CursorParameters params; + params.planNode = planNode; + params.destination = 0; + params.maxDrivers = 1; + params.queryCtx = createNewVeloxQueryCtx(); + params.executionStrategy = velox::core::ExecutionStrategy::kUngrouped; + params.groupedExecutionLeafNodeIds = std::move(emptySet); + params.numSplitGroups = 1; + params.spillDirectory = spillDir; + params.serialExecution = serialExecution; + params.copyResult = false; + params.outputPool = memoryManager_->getLeafMemoryPool(); + cursor_ = velox::exec::TaskCursor::create(params); + task_ = cursor_->task().get(); + if (serialExecution && !task_->supportSerialExecutionMode()) { throw std::runtime_error("Task doesn't support single threaded execution: " + planNode->toString()); } @@ -238,41 +240,24 @@ std::shared_ptr WholeStageResultIterator::createNewVeloxQ } std::shared_ptr WholeStageResultIterator::next() { - if (task_->isFinished()) { - return nullptr; - } - velox::RowVectorPtr vector; while (true) { - auto future = velox::ContinueFuture::makeEmpty(); - auto out = task_->next(&future); - if (!future.valid()) { - // Not need to wait. Break. - vector = std::move(out); - break; + if (!cursor_->moveNext()) { + return nullptr; } - // Velox suggested to wait. This might be because another thread (e.g., background io thread) is spilling the task. - GLUTEN_CHECK(out == nullptr, "Expected to wait but still got non-null output from Velox task"); - VLOG(2) << "Velox task " << task_->taskId() - << " is busy when ::next() is called. Will wait and try again. Task state: " - << taskStateString(task_->state()); - future.wait(); - } - if (vector == nullptr) { - return nullptr; - } - uint64_t numRows = vector->size(); - if (numRows == 0) { - return nullptr; - } - - { - ScopedTimer timer(&loadLazyVectorTime_); - for (auto& child : vector->children()) { - child->loadedVector(); + RowVectorPtr vector = cursor_->current(); + GLUTEN_CHECK(vector != nullptr, "Cursor returned null vector."); + uint64_t numRows = vector->size(); + if (numRows == 0) { + continue; } + { + ScopedTimer timer(&loadLazyVectorTime_); + for (auto& child : vector->children()) { + child->loadedVector(); + } + } + return std::make_shared(vector); } - - return std::make_shared(vector); } int64_t WholeStageResultIterator::spillFixedSize(int64_t size) { @@ -304,9 +289,6 @@ void WholeStageResultIterator::getOrderedNodeIds( std::vector& nodeIds) { bool isProjectNode = (std::dynamic_pointer_cast(planNode) != nullptr); bool isLocalExchangeNode = (std::dynamic_pointer_cast(planNode) != nullptr); - bool isUnionNode = isLocalExchangeNode && - std::dynamic_pointer_cast(planNode)->type() == - velox::core::LocalPartitionNode::Type::kGather; const auto& sourceNodes = planNode->sources(); if (isProjectNode) { GLUTEN_CHECK(sourceNodes.size() == 1, "Illegal state"); @@ -322,22 +304,24 @@ void WholeStageResultIterator::getOrderedNodeIds( return; } - if (isUnionNode) { - // FIXME: The whole metrics system in gluten-substrait is magic. Passing metrics trees through JNI with a trivial - // array is possible but requires for a solid design. Apparently we haven't had it. All the code requires complete - // rework. - // Union was interpreted as LocalPartition + LocalExchange + 2 fake projects as children in Velox. So we only fetch - // metrics from the root node. - std::vector> unionChildren{}; + if (isLocalExchangeNode) { + // LocalPartition was interpreted as LocalPartition + LocalExchange + 2 fake projects (optional) as children + // in SubstraitToVeloxPlan. So we only fetch metrics from the root node. for (const auto& source : planNode->sources()) { const auto projectedChild = std::dynamic_pointer_cast(source); - GLUTEN_CHECK(projectedChild != nullptr, "Illegal state"); - const auto projectSources = projectedChild->sources(); - GLUTEN_CHECK(projectSources.size() == 1, "Illegal state"); - const auto projectSource = projectSources.at(0); - getOrderedNodeIds(projectSource, nodeIds); + if (projectedChild != nullptr) { + const auto projectSources = projectedChild->sources(); + GLUTEN_CHECK(projectSources.size() == 1, "Illegal state"); + const auto projectSource = projectSources.at(0); + getOrderedNodeIds(projectSource, nodeIds); + } else { + getOrderedNodeIds(source, nodeIds); + } + } + if (planNode->sources().size() == 2) { + // The LocalPartition maps to a concrete Spark native union transformer operator. + nodeIds.emplace_back(planNode->id()); } - nodeIds.emplace_back(planNode->id()); return; } @@ -399,6 +383,7 @@ void WholeStageResultIterator::noMoreSplits() { for (const auto& streamId : streamIds_) { task_->noMoreSplits(streamId); } + cursor_->setNoMoreSplits(); allSplitsAdded_ = true; } diff --git a/cpp/velox/compute/WholeStageResultIterator.h b/cpp/velox/compute/WholeStageResultIterator.h index 4fcc002ffd2..ebb13d342f0 100644 --- a/cpp/velox/compute/WholeStageResultIterator.h +++ b/cpp/velox/compute/WholeStageResultIterator.h @@ -28,6 +28,7 @@ #include "velox/common/config/Config.h" #include "velox/connectors/hive/iceberg/IcebergSplit.h" #include "velox/core/PlanNode.h" +#include "velox/exec/Cursor.h" #include "velox/exec/Task.h" #ifdef GLUTEN_ENABLE_GPU #include "cudf/GpuLock.h" @@ -57,7 +58,8 @@ class WholeStageResultIterator : public SplitAwareColumnarBatchIterator { task_->requestCancel().wait(); } auto deletionFuture = task_->taskDeletionFuture(); - task_.reset(); + cursor_.reset(); + task_ = nullptr; deletionFuture.wait(); } #ifdef GLUTEN_ENABLE_GPU @@ -85,7 +87,7 @@ class WholeStageResultIterator : public SplitAwareColumnarBatchIterator { /// Get the underlying Velox task for direct manipulation facebook::velox::exec::Task* task() { - return task_.get(); + return task_; } /// Add iterator-based splits from input iterators @@ -137,12 +139,13 @@ class WholeStageResultIterator : public SplitAwareColumnarBatchIterator { #endif const SparkTaskInfo taskInfo_; folly::Executor* executor_; - std::shared_ptr task_; + std::unique_ptr cursor_; + facebook::velox::exec::Task* task_ = nullptr; std::shared_ptr veloxPlan_; /// Spill. std::string spillStrategy_; - folly::Executor* spillExecutor_ = nullptr; + folly::Executor* spillExecutor_; VeloxConnectorIds connectorIds_; /// Metrics diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h index 7c7f96b8956..ddd951a2dbb 100644 --- a/cpp/velox/config/VeloxConfig.h +++ b/cpp/velox/config/VeloxConfig.h @@ -90,6 +90,12 @@ const std::string kHashProbeDynamicFilterPushdownEnabled = const std::string kHashProbeBloomFilterPushdownMaxSize = "spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize"; +const std::string kParallelExecutionEnabled = "spark.gluten.sql.columnar.backend.velox.parallelExecution.enabled"; +const bool kParallelExecutionEnabledDefault = false; + +const std::string kParallelExecutionThreadPoolSize = + "spark.gluten.sql.columnar.backend.velox.parallelExecution.threadPoolSize"; + const std::string kValueStreamDynamicFilterEnabled = "spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled"; const bool kValueStreamDynamicFilterEnabledDefault = false; @@ -145,8 +151,8 @@ const std::string kVeloxSsdCheckSumReadVerificationEnabled = "spark.gluten.sql.columnar.backend.velox.ssdChecksumReadVerificationEnabled"; // async +const std::string kNumTaskSlotsPerExecutor = "spark.gluten.numTaskSlotsPerExecutor"; const std::string kVeloxIOThreads = "spark.gluten.sql.columnar.backend.velox.IOThreads"; -const uint32_t kVeloxIOThreadsDefault = 0; const std::string kVeloxAsyncTimeoutOnTaskStopping = "spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping"; const int32_t kVeloxAsyncTimeoutOnTaskStoppingDefault = 30000; // 30s diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc index e30413d6d35..7b83e84d6bb 100644 --- a/cpp/velox/jni/VeloxJniWrapper.cc +++ b/cpp/velox/jni/VeloxJniWrapper.cc @@ -1121,7 +1121,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native facebook::velox::exec::BaseHashTable::kNoSpillInputStartPartitionBit, hashTableBuilders[0]->joinBuildVectorHasherMaxNumDistinct(), hashTableBuilders[0]->dropDuplicates(), - allowParallelJoinBuild ? VeloxBackend::get()->executor() : nullptr); + allowParallelJoinBuild ? VeloxBackend::get()->ioExecutor() : nullptr); for (int i = 1; i < numThreads; ++i) { if (hashTableBuilders[i]->joinHasNullKeys()) { diff --git a/cpp/velox/operators/plannodes/CudfVectorStream.h b/cpp/velox/operators/plannodes/CudfVectorStream.h index 7a663d252e4..7ec9476ad26 100644 --- a/cpp/velox/operators/plannodes/CudfVectorStream.h +++ b/cpp/velox/operators/plannodes/CudfVectorStream.h @@ -134,6 +134,11 @@ class CudfValueStreamNode final : public facebook::velox::core::PlanNode { std::shared_ptr iterator) : facebook::velox::core::PlanNode(id), outputType_(outputType), iterator_(std::move(iterator)) {} + // Only supports single thread because iterator_ is not guranteed thread-safe. + bool requiresSingleThread() const override { + return true; + } + const facebook::velox::RowTypePtr& outputType() const override { return outputType_; } diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc index 9772d752b8f..d01f52ae6ca 100644 --- a/cpp/velox/tests/RuntimeTest.cc +++ b/cpp/velox/tests/RuntimeTest.cc @@ -20,6 +20,7 @@ #include #include "compute/VeloxBackend.h" #include "memory.pb.h" +#include "threads/ThreadInitializer.h" namespace gluten { @@ -46,13 +47,26 @@ class DummyMemoryManager final : public MemoryManager { inline static const std::string kDummyBackendKind{"dummy"}; +class DummyThreadManager final : public ThreadManager { + public: + explicit DummyThreadManager(const std::string& kind) : ThreadManager(kind), initializer_(ThreadInitializer::noop()) {} + + ThreadInitializer* getThreadInitializer() override { + return initializer_.get(); + } + + private: + std::shared_ptr initializer_; +}; + class DummyRuntime final : public Runtime { public: DummyRuntime( const std::string& kind, DummyMemoryManager* mm, + ThreadManager* tm, const std::unordered_map& conf) - : Runtime(kind, mm, conf) {} + : Runtime(kind, mm, tm, conf) {} void parsePlan(const uint8_t* data, int32_t size) override {} @@ -127,8 +141,9 @@ class DummyRuntime final : public Runtime { static Runtime* dummyRuntimeFactory( const std::string& kind, MemoryManager* mm, + ThreadManager* tm, const std::unordered_map conf) { - return new DummyRuntime(kind, dynamic_cast(mm), conf); + return new DummyRuntime(kind, dynamic_cast(mm), tm, conf); } static void dummyRuntimeReleaser(Runtime* runtime) { @@ -138,7 +153,8 @@ static void dummyRuntimeReleaser(Runtime* runtime) { TEST(TestRuntime, CreateRuntime) { Runtime::registerFactory(kDummyBackendKind, dummyRuntimeFactory, dummyRuntimeReleaser); DummyMemoryManager mm(kDummyBackendKind); - auto runtime = Runtime::create(kDummyBackendKind, &mm); + DummyThreadManager tm(kDummyBackendKind); + auto runtime = Runtime::create(kDummyBackendKind, &mm, &tm); ASSERT_EQ(typeid(*runtime), typeid(DummyRuntime)); Runtime::release(runtime); } @@ -146,14 +162,18 @@ TEST(TestRuntime, CreateRuntime) { TEST(TestRuntime, CreateVeloxRuntime) { VeloxBackend::create(AllocationListener::noop(), {}); auto mm = MemoryManager::create(kVeloxBackendKind, AllocationListener::noop()); - auto runtime = Runtime::create(kVeloxBackendKind, mm); + auto tm = ThreadManager::create(kVeloxBackendKind, ThreadInitializer::noop()); + auto runtime = Runtime::create(kVeloxBackendKind, mm, tm); ASSERT_EQ(typeid(*runtime), typeid(VeloxRuntime)); Runtime::release(runtime); + ThreadManager::release(tm); } TEST(TestRuntime, GetResultIterator) { DummyMemoryManager mm(kDummyBackendKind); - auto runtime = std::make_shared(kDummyBackendKind, &mm, std::unordered_map()); + DummyThreadManager tm(kDummyBackendKind); + auto runtime = + std::make_shared(kDummyBackendKind, &mm, &tm, std::unordered_map()); auto iter = runtime->createResultIterator("/tmp/test-spill", {}); runtime->noMoreSplits(iter.get()); ASSERT_TRUE(iter->hasNext()); diff --git a/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc b/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc index a1d1cea02b7..0081bf3004e 100644 --- a/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc +++ b/cpp/velox/tests/Substrait2VeloxPlanConversionTest.cc @@ -28,6 +28,7 @@ #include "velox/type/Type.h" #include "FilePathGenerator.h" +#include "compute/VeloxBackend.h" using namespace facebook::velox; using namespace facebook::velox::test; diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md index a608dfbc450..f0d0e8e5ed3 100644 --- a/docs/velox-configuration.md +++ b/docs/velox-configuration.md @@ -54,6 +54,8 @@ nav_order: 16 | spark.gluten.sql.columnar.backend.velox.memoryUseHugePages | false | Use explicit huge pages for Velox memory allocation. | | spark.gluten.sql.columnar.backend.velox.orc.scan.enabled | true | Enable velox orc scan. If disabled, vanilla spark orc scan will be used. | | spark.gluten.sql.columnar.backend.velox.orcUseColumnNames | true | Maps table field names to file field names using names, not indices for ORC files. | +| spark.gluten.sql.columnar.backend.velox.parallelExecution.enabled | false | Whether to enable parallel execution of Velox task drivers for whole-stage execution. Default is false (serial execution). | +| spark.gluten.sql.columnar.backend.velox.parallelExecution.threadPoolSize | <undefined> | Size of the thread pool used for parallel execution of Velox task drivers. If not set, defaults to 2 * spark.gluten.numTaskSlotsPerExecutor. | | spark.gluten.sql.columnar.backend.velox.parquet.pageSizeBytes | 1MB | The page size in bytes is for compression. | | spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames | true | Maps table field names to file field names using names, not indices for Parquet files. | | spark.gluten.sql.columnar.backend.velox.prefetchRowGroups | 1 | Set the prefetch row groups for velox file scan | diff --git a/gluten-arrow/src/main/java/org/apache/gluten/runtime/RuntimeJniWrapper.java b/gluten-arrow/src/main/java/org/apache/gluten/runtime/RuntimeJniWrapper.java index 8c4280a3b53..ed227596722 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/runtime/RuntimeJniWrapper.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/runtime/RuntimeJniWrapper.java @@ -20,7 +20,8 @@ public class RuntimeJniWrapper { private RuntimeJniWrapper() {} - public static native long createRuntime(String backendType, long nmm, byte[] sessionConf); + public static native long createRuntime( + String backendType, long nmm, long ntm, byte[] sessionConf); public static native void releaseRuntime(long handle); } diff --git a/gluten-arrow/src/main/java/org/apache/gluten/threads/NativeThreadInitializer.java b/gluten-arrow/src/main/java/org/apache/gluten/threads/NativeThreadInitializer.java new file mode 100644 index 00000000000..ca8826e36f8 --- /dev/null +++ b/gluten-arrow/src/main/java/org/apache/gluten/threads/NativeThreadInitializer.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.threads; + +public interface NativeThreadInitializer { + void initialize(String threadName); + + void destroy(String threadName); +} diff --git a/gluten-arrow/src/main/java/org/apache/gluten/threads/NativeThreadManagerJniWrapper.java b/gluten-arrow/src/main/java/org/apache/gluten/threads/NativeThreadManagerJniWrapper.java new file mode 100644 index 00000000000..74f6cd69de7 --- /dev/null +++ b/gluten-arrow/src/main/java/org/apache/gluten/threads/NativeThreadManagerJniWrapper.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.threads; + +public class NativeThreadManagerJniWrapper { + private NativeThreadManagerJniWrapper() {} + + public static native long create(String backendType, NativeThreadInitializer initializer); + + public static native void release(long handle); +} diff --git a/gluten-arrow/src/main/java/org/apache/gluten/threads/TaskChildThreadInitializer.java b/gluten-arrow/src/main/java/org/apache/gluten/threads/TaskChildThreadInitializer.java new file mode 100644 index 00000000000..6f6bf4a7297 --- /dev/null +++ b/gluten-arrow/src/main/java/org/apache/gluten/threads/TaskChildThreadInitializer.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.threads; + +import com.google.common.base.Preconditions; +import org.apache.spark.TaskContext; +import org.apache.spark.util.SparkTaskUtil; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class TaskChildThreadInitializer implements NativeThreadInitializer { + private final TaskContext parentTaskContext; + private final Map childThreads = new ConcurrentHashMap<>(); + + public TaskChildThreadInitializer(TaskContext parentTaskContext) { + Preconditions.checkNotNull(parentTaskContext); + this.parentTaskContext = parentTaskContext; + } + + @Override + public void initialize(String threadName) { + final String javaThreadName = Thread.currentThread().getName(); + if (childThreads.put(threadName, javaThreadName) != null) { + throw new IllegalStateException( + String.format( + "Task native child thread %s (Java name: %s) is already initialized", + threadName, javaThreadName)); + } + SparkTaskUtil.setTaskContext(parentTaskContext); + } + + @Override + public void destroy(String threadName) { + if (childThreads.remove(threadName) == null) { + throw new IllegalStateException( + String.format("Task native thread %s is not initialized", threadName)); + } + SparkTaskUtil.unsetTaskContext(); + } +} diff --git a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala index e57bec619d0..2e3c4681400 100644 --- a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala +++ b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala @@ -19,10 +19,11 @@ package org.apache.gluten.runtime import org.apache.gluten.config.GlutenConfig import org.apache.gluten.exception.GlutenException import org.apache.gluten.memory.NativeMemoryManager +import org.apache.gluten.threads.{NativeThreadManager, TaskChildThreadInitializer} import org.apache.gluten.utils.ConfigUtil import org.apache.spark.sql.internal.{GlutenConfigUtil, SQLConf} -import org.apache.spark.task.TaskResource +import org.apache.spark.task.{TaskResource, TaskResources} import java.util import java.util.concurrent.atomic.AtomicBoolean @@ -31,6 +32,7 @@ import scala.collection.JavaConverters._ // for 2.12 trait Runtime { def memoryManager(): NativeMemoryManager + def threadManager(): NativeThreadManager def getHandle(): Long } @@ -51,9 +53,13 @@ object Runtime { with TaskResource { private val nmm: NativeMemoryManager = NativeMemoryManager(backendName, name) + private val ntm: NativeThreadManager = NativeThreadManager( + backendName, + new TaskChildThreadInitializer(TaskResources.getLocalTaskContext())) private val handle = RuntimeJniWrapper.createRuntime( backendName, nmm.getHandle(), + ntm.getHandle(), ConfigUtil.serialize( (GlutenConfig .getNativeSessionConf( @@ -67,6 +73,8 @@ object Runtime { override def memoryManager(): NativeMemoryManager = nmm + override def threadManager(): NativeThreadManager = ntm + override def release(): Unit = { if (!released.compareAndSet(false, true)) { throw new GlutenException( diff --git a/gluten-arrow/src/main/scala/org/apache/gluten/threads/NativeThreadManager.scala b/gluten-arrow/src/main/scala/org/apache/gluten/threads/NativeThreadManager.scala new file mode 100644 index 00000000000..543c23735f1 --- /dev/null +++ b/gluten-arrow/src/main/scala/org/apache/gluten/threads/NativeThreadManager.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.threads + +import org.apache.gluten.exception.GlutenException + +import org.apache.spark.task.{TaskResource, TaskResources} + +import java.util.concurrent.atomic.AtomicBoolean + +trait NativeThreadManager { + def getHandle(): Long +} + +object NativeThreadManager { + private class Impl( + private val backendName: String, + private val initializer: NativeThreadInitializer) + extends NativeThreadManager + with TaskResource { + private val handle = NativeThreadManagerJniWrapper.create(backendName, initializer) + private val released = new AtomicBoolean(false) + + override def getHandle(): Long = handle + + override def release(): Unit = { + if (!released.compareAndSet(false, true)) { + throw new GlutenException( + s"Thread manager instance already released: $handle, ${resourceName()}, ${priority()}") + } + NativeThreadManagerJniWrapper.release(handle) + } + + override def priority(): Int = 20 + + override def resourceName(): String = "ntm" + } + + def apply(backendName: String, initializer: NativeThreadInitializer): NativeThreadManager = { + TaskResources.addAnonymousResource(new Impl(backendName, initializer)) + } +} diff --git a/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala b/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala index 2c386e52ff1..1b148d9b2a0 100644 --- a/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala +++ b/gluten-core/src/main/scala/org/apache/spark/task/TaskResources.scala @@ -140,7 +140,7 @@ object TaskResources extends TaskListener with Logging { if (!inSparkTask()) { throw new UnsupportedOperationException( "Not in a Spark task. If the code is running on driver or for testing purpose, " + - "try using TaskResources#runUnsafe") + "try using TaskResources#runUnsafe. Current thread: " + Thread.currentThread().getName) } val tc = getLocalTaskContext() RESOURCE_REGISTRIES.synchronized { diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala index 2f8155ce70e..a6b3ca28a48 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala @@ -518,7 +518,9 @@ object GlutenConfig extends ConfigRegistry { "spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct", "spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks", "spark.gluten.sql.columnar.backend.velox.preferredBatchBytes", - "spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan" + "spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan", + "spark.gluten.sql.columnar.backend.velox.parallelExecution.enabled", + "spark.gluten.sql.columnar.backend.velox.parallelExecution.threadPoolSize" ) /** Get dynamic configs. */ @@ -607,10 +609,8 @@ object GlutenConfig extends ConfigRegistry { (SPARK_S3_CONNECTION_MAXIMUM, "15"), ("spark.gluten.velox.fs.s3a.retry.mode", "legacy"), ( - "spark.gluten.sql.columnar.backend.velox.IOThreads", - conf.getOrElse( - GlutenCoreConfig.NUM_TASK_SLOTS_PER_EXECUTOR.key, - GlutenCoreConfig.NUM_TASK_SLOTS_PER_EXECUTOR.defaultValueString)), + GlutenCoreConfig.NUM_TASK_SLOTS_PER_EXECUTOR.key, + GlutenCoreConfig.NUM_TASK_SLOTS_PER_EXECUTOR.defaultValueString), (COLUMNAR_SHUFFLE_CODEC.key, ""), (COLUMNAR_SHUFFLE_CODEC_BACKEND.key, ""), (DEBUG_CUDF.key, DEBUG_CUDF.defaultValueString),