diff --git a/quadrants/runtime/amdgpu/kernel_launcher.cpp b/quadrants/runtime/amdgpu/kernel_launcher.cpp index 7d803c382a..d63a46712f 100644 --- a/quadrants/runtime/amdgpu/kernel_launcher.cpp +++ b/quadrants/runtime/amdgpu/kernel_launcher.cpp @@ -309,26 +309,42 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, LaunchContextBuilder &ctx if (ctx.result_buffer_size > 0) { ctx.get_context().result_buffer = (uint64 *)device_result_buffer; } + // Same explicit-stream race avoidance as the CUDA launcher: when active_stream != nullptr, allocate per-call + // ephemeral buffers so concurrent launches on different streams can't clobber each other. + const bool use_persistent_scratch = (active_stream == nullptr); char *device_arg_buffer = nullptr; + void *ephemeral_arg_buffer = nullptr; if (ctx.arg_buffer_size > 0) { - if (ctx.arg_buffer_size > launcher_ctx.arg_buffer_capacity) { - if (launcher_ctx.arg_buffer_dev_ptr != nullptr) { - AMDGPUDriver::get_instance().mem_free_async(launcher_ctx.arg_buffer_dev_ptr, nullptr); + if (use_persistent_scratch) { + if (ctx.arg_buffer_size > launcher_ctx.arg_buffer_capacity) { + if (launcher_ctx.arg_buffer_dev_ptr != nullptr) { + AMDGPUDriver::get_instance().mem_free_async(launcher_ctx.arg_buffer_dev_ptr, nullptr); + } + const std::size_t new_cap = std::max(ctx.arg_buffer_size, 2 * launcher_ctx.arg_buffer_capacity); + AMDGPUDriver::get_instance().malloc_async(&launcher_ctx.arg_buffer_dev_ptr, new_cap, nullptr); + launcher_ctx.arg_buffer_capacity = new_cap; } - const std::size_t new_cap = std::max(ctx.arg_buffer_size, 2 * launcher_ctx.arg_buffer_capacity); - AMDGPUDriver::get_instance().malloc_async(&launcher_ctx.arg_buffer_dev_ptr, new_cap, nullptr); - launcher_ctx.arg_buffer_capacity = new_cap; + device_arg_buffer = static_cast(launcher_ctx.arg_buffer_dev_ptr); + } else { + AMDGPUDriver::get_instance().malloc_async(&ephemeral_arg_buffer, ctx.arg_buffer_size, active_stream); + device_arg_buffer = static_cast(ephemeral_arg_buffer); } - device_arg_buffer = static_cast(launcher_ctx.arg_buffer_dev_ptr); AMDGPUDriver::get_instance().memcpy_host_to_device_async(device_arg_buffer, ctx.get_context().arg_buffer, ctx.arg_buffer_size, active_stream); ctx.get_context().arg_buffer = device_arg_buffer; } int arg_size = sizeof(RuntimeContext *); - if (launcher_ctx.runtime_context_dev_ptr == nullptr) { - AMDGPUDriver::get_instance().malloc_async(&launcher_ctx.runtime_context_dev_ptr, sizeof(RuntimeContext), nullptr); + void *ephemeral_context_ptr = nullptr; + void *context_pointer = nullptr; + if (use_persistent_scratch) { + if (launcher_ctx.runtime_context_dev_ptr == nullptr) { + AMDGPUDriver::get_instance().malloc_async(&launcher_ctx.runtime_context_dev_ptr, sizeof(RuntimeContext), nullptr); + } + context_pointer = launcher_ctx.runtime_context_dev_ptr; + } else { + AMDGPUDriver::get_instance().malloc_async(&ephemeral_context_ptr, sizeof(RuntimeContext), active_stream); + context_pointer = ephemeral_context_ptr; } - void *context_pointer = launcher_ctx.runtime_context_dev_ptr; AMDGPUDriver::get_instance().memcpy_host_to_device_async(context_pointer, &ctx.get_context(), sizeof(RuntimeContext), active_stream); @@ -342,7 +358,9 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, LaunchContextBuilder &ctx launch_offloaded_tasks(ctx, amdgpu_module, offloaded_tasks, context_pointer, arg_size); } QD_TRACE("Launching kernel"); - // Persistent scratch: no per-launch free for arg_buffer. The scratch lives until the launcher is destroyed. + // Persistent scratch (default-stream path): no per-launch free for the per-handle `arg_buffer` / `runtime_context` + // or the launcher-global `result_buffer`. All live until launcher destruction; the dtor handles the final + // `mem_free_async`. Ephemeral buffers (explicit-stream path) are freed below. if (ctx.result_buffer_size > 0) { AMDGPUDriver::get_instance().memcpy_device_to_host_async(host_result_buffer, device_result_buffer, ctx.result_buffer_size, active_stream); @@ -363,6 +381,13 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, LaunchContextBuilder &ctx } // Persistent scratch: no per-launch free for the per-handle `arg_buffer` / `runtime_context` or the launcher-global // `result_buffer`. All three live until the launcher is destroyed; the dtor handles the final `mem_free_async`. + // Ephemeral buffers (explicit-stream path) are freed here. + if (ephemeral_arg_buffer != nullptr) { + AMDGPUDriver::get_instance().mem_free_async(ephemeral_arg_buffer, active_stream); + } + if (ephemeral_context_ptr != nullptr) { + AMDGPUDriver::get_instance().mem_free_async(ephemeral_context_ptr, active_stream); + } } KernelLauncher::~KernelLauncher() { diff --git a/quadrants/runtime/amdgpu/kernel_launcher.h b/quadrants/runtime/amdgpu/kernel_launcher.h index 0b12bba660..08e061ec93 100644 --- a/quadrants/runtime/amdgpu/kernel_launcher.h +++ b/quadrants/runtime/amdgpu/kernel_launcher.h @@ -1,5 +1,7 @@ #pragma once +#include + #include "quadrants/codegen/llvm/compiled_kernel_data.h" #include "quadrants/runtime/llvm/kernel_launcher.h" @@ -49,7 +51,11 @@ class KernelLauncher : public LLVM::KernelLauncher { // child completes, before the parent kernel that would be the next reader). Grown amortised-doubling. void *persistent_result_buffer_dev_ptr_{nullptr}; std::size_t persistent_result_buffer_capacity_{0}; - std::vector contexts_; + // std::deque (not std::vector): `publish_adstack_metadata`'s host-eval branch recursively registers snode-reader + // kernels via this same launcher, calling `contexts_.resize()` while a parent `launch_llvm_kernel` frame still + // holds a reference into the container. std::deque never invalidates references on push_back / resize, so the + // parent's `launcher_ctx` reference survives the child's registration. + std::deque contexts_; public: ~KernelLauncher() override; diff --git a/quadrants/runtime/cuda/kernel_launcher.cpp b/quadrants/runtime/cuda/kernel_launcher.cpp index 6c48087396..4856ce8aa9 100644 --- a/quadrants/runtime/cuda/kernel_launcher.cpp +++ b/quadrants/runtime/cuda/kernel_launcher.cpp @@ -365,17 +365,28 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, LaunchContextBuilder &ctx if (ctx.result_buffer_size > 0) { ctx.get_context().result_buffer = (uint64 *)device_result_buffer; } + // When launching on an explicit stream (active_stream != nullptr), two calls to the same kernel on different streams + // would race on the shared per-handle arg_buffer: the second call's memcpy can overwrite the buffer while the first + // kernel is still reading it. Allocate a per-call ephemeral buffer in that case; the stream-ordered free below + // ensures the memory stays live until the kernel finishes. + const bool use_persistent_scratch = (active_stream == nullptr); char *device_arg_buffer = nullptr; + void *ephemeral_arg_buffer = nullptr; if (ctx.arg_buffer_size > 0) { - if (ctx.arg_buffer_size > launcher_ctx.arg_buffer_capacity) { - if (launcher_ctx.arg_buffer_dev_ptr != nullptr) { - CUDADriver::get_instance().mem_free_async(launcher_ctx.arg_buffer_dev_ptr, nullptr); + if (use_persistent_scratch) { + if (ctx.arg_buffer_size > launcher_ctx.arg_buffer_capacity) { + if (launcher_ctx.arg_buffer_dev_ptr != nullptr) { + CUDADriver::get_instance().mem_free_async(launcher_ctx.arg_buffer_dev_ptr, nullptr); + } + const std::size_t new_cap = std::max(ctx.arg_buffer_size, 2 * launcher_ctx.arg_buffer_capacity); + CUDADriver::get_instance().malloc_async(&launcher_ctx.arg_buffer_dev_ptr, new_cap, nullptr); + launcher_ctx.arg_buffer_capacity = new_cap; } - const std::size_t new_cap = std::max(ctx.arg_buffer_size, 2 * launcher_ctx.arg_buffer_capacity); - CUDADriver::get_instance().malloc_async(&launcher_ctx.arg_buffer_dev_ptr, new_cap, nullptr); - launcher_ctx.arg_buffer_capacity = new_cap; + device_arg_buffer = static_cast(launcher_ctx.arg_buffer_dev_ptr); + } else { + CUDADriver::get_instance().malloc_async(&ephemeral_arg_buffer, ctx.arg_buffer_size, active_stream); + device_arg_buffer = static_cast(ephemeral_arg_buffer); } - device_arg_buffer = static_cast(launcher_ctx.arg_buffer_dev_ptr); CUDADriver::get_instance().memcpy_host_to_device_async(device_arg_buffer, ctx.get_context().arg_buffer, ctx.arg_buffer_size, active_stream); ctx.get_context().arg_buffer = device_arg_buffer; @@ -401,11 +412,17 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, LaunchContextBuilder &ctx } needs_sizer_device_ctx = needs_sizer_device_ctx && !CUDAContext::get_instance().supports_pageable_memory_access(); void *device_context_ptr = nullptr; + void *ephemeral_context_ptr = nullptr; if (needs_sizer_device_ctx) { - if (launcher_ctx.runtime_context_dev_ptr == nullptr) { - CUDADriver::get_instance().malloc_async(&launcher_ctx.runtime_context_dev_ptr, sizeof(RuntimeContext), nullptr); + if (use_persistent_scratch) { + if (launcher_ctx.runtime_context_dev_ptr == nullptr) { + CUDADriver::get_instance().malloc_async(&launcher_ctx.runtime_context_dev_ptr, sizeof(RuntimeContext), nullptr); + } + device_context_ptr = launcher_ctx.runtime_context_dev_ptr; + } else { + CUDADriver::get_instance().malloc_async(&ephemeral_context_ptr, sizeof(RuntimeContext), active_stream); + device_context_ptr = ephemeral_context_ptr; } - device_context_ptr = launcher_ctx.runtime_context_dev_ptr; CUDADriver::get_instance().memcpy_host_to_device_async(device_context_ptr, &ctx.get_context(), sizeof(RuntimeContext), active_stream); } @@ -419,8 +436,9 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, LaunchContextBuilder &ctx } else { launch_offloaded_tasks(ctx, cuda_module, offloaded_tasks, device_context_ptr); } - // Persistent scratch: no per-launch free for the per-handle `arg_buffer` / `runtime_context` or the launcher- - // global `result_buffer`. All live until launcher destruction; the dtor handles the final `mem_free_async`. + // Persistent scratch (default-stream path): no per-launch free for the per-handle `arg_buffer` / `runtime_context` + // or the launcher-global `result_buffer`. All live until launcher destruction; the dtor handles the final + // `mem_free_async`. Ephemeral buffers (explicit-stream path) are freed below. if (ctx.result_buffer_size > 0) { CUDADriver::get_instance().memcpy_device_to_host_async(host_result_buffer, device_result_buffer, ctx.result_buffer_size, active_stream); @@ -440,6 +458,14 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, LaunchContextBuilder &ctx } else if (ctx.result_buffer_size > 0) { CUDADriver::get_instance().stream_synchronize(active_stream); } + // Free per-call ephemeral buffers (explicit-stream path). The free is stream-ordered: it won't execute until all + // preceding work on active_stream (including the kernel reads) has completed. + if (ephemeral_arg_buffer != nullptr) { + CUDADriver::get_instance().mem_free_async(ephemeral_arg_buffer, active_stream); + } + if (ephemeral_context_ptr != nullptr) { + CUDADriver::get_instance().mem_free_async(ephemeral_context_ptr, active_stream); + } } KernelLauncher::~KernelLauncher() { diff --git a/quadrants/runtime/cuda/kernel_launcher.h b/quadrants/runtime/cuda/kernel_launcher.h index e56f064857..eb5b4763df 100644 --- a/quadrants/runtime/cuda/kernel_launcher.h +++ b/quadrants/runtime/cuda/kernel_launcher.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -55,7 +56,11 @@ class KernelLauncher : public LLVM::KernelLauncher { const std::vector &offloaded_tasks, void *device_context_ptr); - std::vector contexts_; + // std::deque (not std::vector): `publish_adstack_metadata`'s host-eval branch recursively registers snode-reader + // kernels via this same launcher, calling `contexts_.resize()` while a parent `launch_llvm_kernel` frame still + // holds a reference into the container. std::deque never invalidates references on push_back / resize, so the + // parent's `launcher_ctx` reference survives the child's registration. + std::deque contexts_; GraphManager graph_manager_; // `result_buffer` stays launcher-global: kernels write to it, the host reads it back synchronously before any // other kernel runs as a reader, so recursive snode-reader launches that reuse the buffer cannot smuggle stale diff --git a/quadrants/runtime/llvm/llvm_adstack_lazy_claim.cpp b/quadrants/runtime/llvm/llvm_adstack_lazy_claim.cpp index 914e64d5ba..f43f858e30 100644 --- a/quadrants/runtime/llvm/llvm_adstack_lazy_claim.cpp +++ b/quadrants/runtime/llvm/llvm_adstack_lazy_claim.cpp @@ -628,13 +628,13 @@ void LlvmRuntimeExecutor::ensure_adstack_heap_float(std::size_t needed_bytes) { } void LlvmRuntimeExecutor::check_adstack_overflow() { - // Called from `synchronize()` on every sync, plus other Quadrants Python entry points wired in - // `Program::check_adstack_overflow_and_raise`. The flag lives in pinned host memory (allocated at - // `materialize_runtime`); polling is a relaxed atomic exchange on the cached host pointer via - // `std::atomic` reinterpret_cast - no DtoH, no JIT call, no sync drain. Available on all backends because - // the pinned-host memory is in the host process address space regardless of where the kernel that wrote it ran. - // The reinterpret_cast is portable because `std::atomic` is layout-compatible with `int64_t` on every - // target (verified by the static_assert below); see also Itanium ABI / MSVC ABI lock-free guarantees. + // Called from `synchronize_and_assert()` on every qd.sync(), plus per-launch from `Program::launch_kernel`. The + // flag lives in pinned host memory (allocated at `materialize_runtime`); polling is a relaxed atomic load/exchange + // on the cached host pointer via `std::atomic` reinterpret_cast - no DtoH, no JIT call, no sync drain. + // Available on all backends because the pinned-host memory is in the host process address space regardless of + // where the kernel that wrote it ran. The reinterpret_cast is portable because `std::atomic` is + // layout-compatible with `int64_t` on every target (verified by the static_assert below); see also Itanium ABI / + // MSVC ABI lock-free guarantees. // // Returns early when the slot has not been allocated yet (e.g. a C++ test that constructs Program without // materializing the runtime and then triggers `Program::finalize -> synchronize`). @@ -643,15 +643,24 @@ void LlvmRuntimeExecutor::check_adstack_overflow() { if (adstack_overflow_flag_host_ptr_ == nullptr) { return; } + // Peek first: a relaxed load is cheaper than an exchange and avoids consuming the flag when the companion task_id + // slot has not yet been flushed from the device. The per-launch call site does NOT synchronize before polling, so + // the device's two atomic writes (flag OR, then task_id cmpxchg) may arrive at the host out of order. If we + // consumed the flag here but the task_id hadn't landed, the diagnostic would lack the kernel name and the later + // qd.sync() would see both slots clean — losing the identity forever. int64_t flag = - reinterpret_cast *>(adstack_overflow_flag_host_ptr_)->exchange(0, std::memory_order_relaxed); + reinterpret_cast *>(adstack_overflow_flag_host_ptr_)->load(std::memory_order_relaxed); if (flag == 0) { return; } - // Drain the companion task-id slot in the same poll. Both slots cleared so the next overflow records a fresh - // identity. `task_id == 0` means the kernel that overflowed pre-dates the registry wiring or its - // `ad_stack.registry_id` was unset for any reason (e.g. a deserialised offline-cache task that has not yet been - // re-registered); the diagnose helper falls through to the generic dual-cause message in that case. + // Flag is set — drain the default stream so that the companion task_id write is guaranteed to be host-visible + // before we read it. This sync only fires on the rare overflow path, so it has zero cost on the fast path. + synchronize(); + // Now consume both slots. Both cleared so the next overflow records a fresh identity. `task_id == 0` means the + // kernel that overflowed pre-dates the registry wiring or its `ad_stack.registry_id` was unset for any reason + // (e.g. a deserialised offline-cache task that has not yet been re-registered); the diagnose helper falls through + // to the generic dual-cause message in that case. + reinterpret_cast *>(adstack_overflow_flag_host_ptr_)->store(0, std::memory_order_relaxed); uint32_t task_id = 0; if (adstack_overflow_task_id_host_ptr_ != nullptr) { int64_t recorded = reinterpret_cast *>(adstack_overflow_task_id_host_ptr_) @@ -665,9 +674,9 @@ void LlvmRuntimeExecutor::check_adstack_overflow() { diagnostic = std::move(diag.message); // Auto-invalidate the per-task metadata caches when the synchronous sizer rerun confirmed the cache is stale // (DLPack-bypass cause). The current run is corrupted (we are about to raise), but the next launch's sizer - // reruns from scratch against the live (mutated) state and the kernel runs to completion without further - // user intervention. Unknown / Quadrants-bug cases skip the invalidation so a real sizer bug is not masked - // by silent recompute. + // reruns from scratch against the live (mutated) state and the kernel runs to completion without further user + // intervention. Unknown / Quadrants-bug cases skip the invalidation so a real sizer bug is not masked by + // silent recompute. if (diag.confirmed_invalid_cache) { prog->adstack_cache().invalidate_all_per_task(); }