Skip to content
Closed
47 changes: 36 additions & 11 deletions quadrants/runtime/amdgpu/kernel_launcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -309,26 +309,42 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, LaunchContextBuilder &ctx
if (ctx.result_buffer_size > 0) {
ctx.get_context().result_buffer = (uint64 *)device_result_buffer;
}
// Same explicit-stream race avoidance as the CUDA launcher: when active_stream != nullptr, allocate per-call
// ephemeral buffers so concurrent launches on different streams can't clobber each other.
const bool use_persistent_scratch = (active_stream == nullptr);
char *device_arg_buffer = nullptr;
void *ephemeral_arg_buffer = nullptr;
if (ctx.arg_buffer_size > 0) {
if (ctx.arg_buffer_size > launcher_ctx.arg_buffer_capacity) {
if (launcher_ctx.arg_buffer_dev_ptr != nullptr) {
AMDGPUDriver::get_instance().mem_free_async(launcher_ctx.arg_buffer_dev_ptr, nullptr);
if (use_persistent_scratch) {
if (ctx.arg_buffer_size > launcher_ctx.arg_buffer_capacity) {
if (launcher_ctx.arg_buffer_dev_ptr != nullptr) {
AMDGPUDriver::get_instance().mem_free_async(launcher_ctx.arg_buffer_dev_ptr, nullptr);
}
const std::size_t new_cap = std::max<std::size_t>(ctx.arg_buffer_size, 2 * launcher_ctx.arg_buffer_capacity);
AMDGPUDriver::get_instance().malloc_async(&launcher_ctx.arg_buffer_dev_ptr, new_cap, nullptr);
launcher_ctx.arg_buffer_capacity = new_cap;
}
const std::size_t new_cap = std::max<std::size_t>(ctx.arg_buffer_size, 2 * launcher_ctx.arg_buffer_capacity);
AMDGPUDriver::get_instance().malloc_async(&launcher_ctx.arg_buffer_dev_ptr, new_cap, nullptr);
launcher_ctx.arg_buffer_capacity = new_cap;
device_arg_buffer = static_cast<char *>(launcher_ctx.arg_buffer_dev_ptr);
} else {
AMDGPUDriver::get_instance().malloc_async(&ephemeral_arg_buffer, ctx.arg_buffer_size, active_stream);
device_arg_buffer = static_cast<char *>(ephemeral_arg_buffer);
}
device_arg_buffer = static_cast<char *>(launcher_ctx.arg_buffer_dev_ptr);
AMDGPUDriver::get_instance().memcpy_host_to_device_async(device_arg_buffer, ctx.get_context().arg_buffer,
ctx.arg_buffer_size, active_stream);
ctx.get_context().arg_buffer = device_arg_buffer;
}
int arg_size = sizeof(RuntimeContext *);
if (launcher_ctx.runtime_context_dev_ptr == nullptr) {
AMDGPUDriver::get_instance().malloc_async(&launcher_ctx.runtime_context_dev_ptr, sizeof(RuntimeContext), nullptr);
void *ephemeral_context_ptr = nullptr;
void *context_pointer = nullptr;
if (use_persistent_scratch) {
if (launcher_ctx.runtime_context_dev_ptr == nullptr) {
AMDGPUDriver::get_instance().malloc_async(&launcher_ctx.runtime_context_dev_ptr, sizeof(RuntimeContext), nullptr);
}
context_pointer = launcher_ctx.runtime_context_dev_ptr;
} else {
AMDGPUDriver::get_instance().malloc_async(&ephemeral_context_ptr, sizeof(RuntimeContext), active_stream);
context_pointer = ephemeral_context_ptr;
}
void *context_pointer = launcher_ctx.runtime_context_dev_ptr;
AMDGPUDriver::get_instance().memcpy_host_to_device_async(context_pointer, &ctx.get_context(), sizeof(RuntimeContext),
active_stream);

Expand All @@ -342,7 +358,9 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, LaunchContextBuilder &ctx
launch_offloaded_tasks(ctx, amdgpu_module, offloaded_tasks, context_pointer, arg_size);
}
QD_TRACE("Launching kernel");
// Persistent scratch: no per-launch free for arg_buffer. The scratch lives until the launcher is destroyed.
// Persistent scratch (default-stream path): no per-launch free for the per-handle `arg_buffer` / `runtime_context`
// or the launcher-global `result_buffer`. All live until launcher destruction; the dtor handles the final
// `mem_free_async`. Ephemeral buffers (explicit-stream path) are freed below.
if (ctx.result_buffer_size > 0) {
AMDGPUDriver::get_instance().memcpy_device_to_host_async(host_result_buffer, device_result_buffer,
ctx.result_buffer_size, active_stream);
Expand All @@ -363,6 +381,13 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, LaunchContextBuilder &ctx
}
// Persistent scratch: no per-launch free for the per-handle `arg_buffer` / `runtime_context` or the launcher-global
// `result_buffer`. All three live until the launcher is destroyed; the dtor handles the final `mem_free_async`.
// Ephemeral buffers (explicit-stream path) are freed here.
if (ephemeral_arg_buffer != nullptr) {
AMDGPUDriver::get_instance().mem_free_async(ephemeral_arg_buffer, active_stream);
}
if (ephemeral_context_ptr != nullptr) {
AMDGPUDriver::get_instance().mem_free_async(ephemeral_context_ptr, active_stream);
}
}

KernelLauncher::~KernelLauncher() {
Expand Down
8 changes: 7 additions & 1 deletion quadrants/runtime/amdgpu/kernel_launcher.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <deque>

#include "quadrants/codegen/llvm/compiled_kernel_data.h"
#include "quadrants/runtime/llvm/kernel_launcher.h"

Expand Down Expand Up @@ -49,7 +51,11 @@ class KernelLauncher : public LLVM::KernelLauncher {
// child completes, before the parent kernel that would be the next reader). Grown amortised-doubling.
void *persistent_result_buffer_dev_ptr_{nullptr};
std::size_t persistent_result_buffer_capacity_{0};
std::vector<Context> contexts_;
// std::deque (not std::vector): `publish_adstack_metadata`'s host-eval branch recursively registers snode-reader
// kernels via this same launcher, calling `contexts_.resize()` while a parent `launch_llvm_kernel` frame still
// holds a reference into the container. std::deque never invalidates references on push_back / resize, so the
// parent's `launcher_ctx` reference survives the child's registration.
std::deque<Context> contexts_;

public:
~KernelLauncher() override;
Expand Down
50 changes: 38 additions & 12 deletions quadrants/runtime/cuda/kernel_launcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,17 +365,28 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, LaunchContextBuilder &ctx
if (ctx.result_buffer_size > 0) {
ctx.get_context().result_buffer = (uint64 *)device_result_buffer;
}
// When launching on an explicit stream (active_stream != nullptr), two calls to the same kernel on different streams
// would race on the shared per-handle arg_buffer: the second call's memcpy can overwrite the buffer while the first
// kernel is still reading it. Allocate a per-call ephemeral buffer in that case; the stream-ordered free below
// ensures the memory stays live until the kernel finishes.
const bool use_persistent_scratch = (active_stream == nullptr);
char *device_arg_buffer = nullptr;
void *ephemeral_arg_buffer = nullptr;
if (ctx.arg_buffer_size > 0) {
if (ctx.arg_buffer_size > launcher_ctx.arg_buffer_capacity) {
if (launcher_ctx.arg_buffer_dev_ptr != nullptr) {
CUDADriver::get_instance().mem_free_async(launcher_ctx.arg_buffer_dev_ptr, nullptr);
if (use_persistent_scratch) {
if (ctx.arg_buffer_size > launcher_ctx.arg_buffer_capacity) {
if (launcher_ctx.arg_buffer_dev_ptr != nullptr) {
CUDADriver::get_instance().mem_free_async(launcher_ctx.arg_buffer_dev_ptr, nullptr);
}
const std::size_t new_cap = std::max<std::size_t>(ctx.arg_buffer_size, 2 * launcher_ctx.arg_buffer_capacity);
CUDADriver::get_instance().malloc_async(&launcher_ctx.arg_buffer_dev_ptr, new_cap, nullptr);
launcher_ctx.arg_buffer_capacity = new_cap;
}
const std::size_t new_cap = std::max<std::size_t>(ctx.arg_buffer_size, 2 * launcher_ctx.arg_buffer_capacity);
CUDADriver::get_instance().malloc_async(&launcher_ctx.arg_buffer_dev_ptr, new_cap, nullptr);
launcher_ctx.arg_buffer_capacity = new_cap;
device_arg_buffer = static_cast<char *>(launcher_ctx.arg_buffer_dev_ptr);
} else {
CUDADriver::get_instance().malloc_async(&ephemeral_arg_buffer, ctx.arg_buffer_size, active_stream);
device_arg_buffer = static_cast<char *>(ephemeral_arg_buffer);
}
device_arg_buffer = static_cast<char *>(launcher_ctx.arg_buffer_dev_ptr);
CUDADriver::get_instance().memcpy_host_to_device_async(device_arg_buffer, ctx.get_context().arg_buffer,
ctx.arg_buffer_size, active_stream);
ctx.get_context().arg_buffer = device_arg_buffer;
Expand All @@ -401,11 +412,17 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, LaunchContextBuilder &ctx
}
needs_sizer_device_ctx = needs_sizer_device_ctx && !CUDAContext::get_instance().supports_pageable_memory_access();
void *device_context_ptr = nullptr;
void *ephemeral_context_ptr = nullptr;
if (needs_sizer_device_ctx) {
if (launcher_ctx.runtime_context_dev_ptr == nullptr) {
CUDADriver::get_instance().malloc_async(&launcher_ctx.runtime_context_dev_ptr, sizeof(RuntimeContext), nullptr);
if (use_persistent_scratch) {
if (launcher_ctx.runtime_context_dev_ptr == nullptr) {
CUDADriver::get_instance().malloc_async(&launcher_ctx.runtime_context_dev_ptr, sizeof(RuntimeContext), nullptr);
}
device_context_ptr = launcher_ctx.runtime_context_dev_ptr;
} else {
CUDADriver::get_instance().malloc_async(&ephemeral_context_ptr, sizeof(RuntimeContext), active_stream);
device_context_ptr = ephemeral_context_ptr;
}
device_context_ptr = launcher_ctx.runtime_context_dev_ptr;
CUDADriver::get_instance().memcpy_host_to_device_async(device_context_ptr, &ctx.get_context(),
sizeof(RuntimeContext), active_stream);
}
Expand All @@ -419,8 +436,9 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, LaunchContextBuilder &ctx
} else {
launch_offloaded_tasks(ctx, cuda_module, offloaded_tasks, device_context_ptr);
}
// Persistent scratch: no per-launch free for the per-handle `arg_buffer` / `runtime_context` or the launcher-
// global `result_buffer`. All live until launcher destruction; the dtor handles the final `mem_free_async`.
// Persistent scratch (default-stream path): no per-launch free for the per-handle `arg_buffer` / `runtime_context`
// or the launcher-global `result_buffer`. All live until launcher destruction; the dtor handles the final
// `mem_free_async`. Ephemeral buffers (explicit-stream path) are freed below.
if (ctx.result_buffer_size > 0) {
CUDADriver::get_instance().memcpy_device_to_host_async(host_result_buffer, device_result_buffer,
ctx.result_buffer_size, active_stream);
Expand All @@ -440,6 +458,14 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, LaunchContextBuilder &ctx
} else if (ctx.result_buffer_size > 0) {
CUDADriver::get_instance().stream_synchronize(active_stream);
}
// Free per-call ephemeral buffers (explicit-stream path). The free is stream-ordered: it won't execute until all
// preceding work on active_stream (including the kernel reads) has completed.
if (ephemeral_arg_buffer != nullptr) {
CUDADriver::get_instance().mem_free_async(ephemeral_arg_buffer, active_stream);
}
if (ephemeral_context_ptr != nullptr) {
CUDADriver::get_instance().mem_free_async(ephemeral_context_ptr, active_stream);
}
}

KernelLauncher::~KernelLauncher() {
Expand Down
7 changes: 6 additions & 1 deletion quadrants/runtime/cuda/kernel_launcher.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <deque>
#include <string>
#include <vector>

Expand Down Expand Up @@ -55,7 +56,11 @@ class KernelLauncher : public LLVM::KernelLauncher {
const std::vector<OffloadedTask> &offloaded_tasks,
void *device_context_ptr);

std::vector<Context> contexts_;
// std::deque (not std::vector): `publish_adstack_metadata`'s host-eval branch recursively registers snode-reader
// kernels via this same launcher, calling `contexts_.resize()` while a parent `launch_llvm_kernel` frame still
// holds a reference into the container. std::deque never invalidates references on push_back / resize, so the
// parent's `launcher_ctx` reference survives the child's registration.
std::deque<Context> contexts_;
GraphManager graph_manager_;
// `result_buffer` stays launcher-global: kernels write to it, the host reads it back synchronously before any
// other kernel runs as a reader, so recursive snode-reader launches that reuse the buffer cannot smuggle stale
Expand Down
39 changes: 24 additions & 15 deletions quadrants/runtime/llvm/llvm_adstack_lazy_claim.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -628,13 +628,13 @@ void LlvmRuntimeExecutor::ensure_adstack_heap_float(std::size_t needed_bytes) {
}

void LlvmRuntimeExecutor::check_adstack_overflow() {
// Called from `synchronize()` on every sync, plus other Quadrants Python entry points wired in
// `Program::check_adstack_overflow_and_raise`. The flag lives in pinned host memory (allocated at
// `materialize_runtime`); polling is a relaxed atomic exchange on the cached host pointer via
// `std::atomic<int64_t>` reinterpret_cast - no DtoH, no JIT call, no sync drain. Available on all backends because
// the pinned-host memory is in the host process address space regardless of where the kernel that wrote it ran.
// The reinterpret_cast is portable because `std::atomic<int64_t>` is layout-compatible with `int64_t` on every
// target (verified by the static_assert below); see also Itanium ABI / MSVC ABI lock-free guarantees.
// Called from `synchronize_and_assert()` on every qd.sync(), plus per-launch from `Program::launch_kernel`. The
// flag lives in pinned host memory (allocated at `materialize_runtime`); polling is a relaxed atomic load/exchange
// on the cached host pointer via `std::atomic<int64_t>` reinterpret_cast - no DtoH, no JIT call, no sync drain.
// Available on all backends because the pinned-host memory is in the host process address space regardless of
// where the kernel that wrote it ran. The reinterpret_cast is portable because `std::atomic<int64_t>` is
// layout-compatible with `int64_t` on every target (verified by the static_assert below); see also Itanium ABI /
// MSVC ABI lock-free guarantees.
//
// Returns early when the slot has not been allocated yet (e.g. a C++ test that constructs Program without
// materializing the runtime and then triggers `Program::finalize -> synchronize`).
Expand All @@ -643,15 +643,24 @@ void LlvmRuntimeExecutor::check_adstack_overflow() {
if (adstack_overflow_flag_host_ptr_ == nullptr) {
return;
}
// Peek first: a relaxed load is cheaper than an exchange and avoids consuming the flag when the companion task_id
// slot has not yet been flushed from the device. The per-launch call site does NOT synchronize before polling, so
// the device's two atomic writes (flag OR, then task_id cmpxchg) may arrive at the host out of order. If we
// consumed the flag here but the task_id hadn't landed, the diagnostic would lack the kernel name and the later
// qd.sync() would see both slots clean — losing the identity forever.
int64_t flag =
reinterpret_cast<std::atomic<int64_t> *>(adstack_overflow_flag_host_ptr_)->exchange(0, std::memory_order_relaxed);
reinterpret_cast<std::atomic<int64_t> *>(adstack_overflow_flag_host_ptr_)->load(std::memory_order_relaxed);
if (flag == 0) {
return;
}
// Drain the companion task-id slot in the same poll. Both slots cleared so the next overflow records a fresh
// identity. `task_id == 0` means the kernel that overflowed pre-dates the registry wiring or its
// `ad_stack.registry_id` was unset for any reason (e.g. a deserialised offline-cache task that has not yet been
// re-registered); the diagnose helper falls through to the generic dual-cause message in that case.
// Flag is set — drain the default stream so that the companion task_id write is guaranteed to be host-visible
// before we read it. This sync only fires on the rare overflow path, so it has zero cost on the fast path.
synchronize();
// Now consume both slots. Both cleared so the next overflow records a fresh identity. `task_id == 0` means the
// kernel that overflowed pre-dates the registry wiring or its `ad_stack.registry_id` was unset for any reason
// (e.g. a deserialised offline-cache task that has not yet been re-registered); the diagnose helper falls through
// to the generic dual-cause message in that case.
reinterpret_cast<std::atomic<int64_t> *>(adstack_overflow_flag_host_ptr_)->store(0, std::memory_order_relaxed);
uint32_t task_id = 0;
if (adstack_overflow_task_id_host_ptr_ != nullptr) {
int64_t recorded = reinterpret_cast<std::atomic<int64_t> *>(adstack_overflow_task_id_host_ptr_)
Expand All @@ -665,9 +674,9 @@ void LlvmRuntimeExecutor::check_adstack_overflow() {
diagnostic = std::move(diag.message);
// Auto-invalidate the per-task metadata caches when the synchronous sizer rerun confirmed the cache is stale
// (DLPack-bypass cause). The current run is corrupted (we are about to raise), but the next launch's sizer
// reruns from scratch against the live (mutated) state and the kernel runs to completion without further
// user intervention. Unknown / Quadrants-bug cases skip the invalidation so a real sizer bug is not masked
// by silent recompute.
// reruns from scratch against the live (mutated) state and the kernel runs to completion without further user
// intervention. Unknown / Quadrants-bug cases skip the invalidation so a real sizer bug is not masked by
// silent recompute.
if (diag.confirmed_invalid_cache) {
prog->adstack_cache().invalidate_all_per_task();
}
Expand Down
Loading