feat(trace_buffer): adopt libdatadog's TraceBuffer & borrow the host tokio runtime#226
feat(trace_buffer): adopt libdatadog's TraceBuffer & borrow the host tokio runtime#226Aaalibaba42 wants to merge 6 commits into
Conversation
This comment has been minimized.
This comment has been minimized.
3ba789c to
e6304ac
Compare
3fdeef4 to
167d226
Compare
paullegranddc
left a comment
There was a problem hiding this comment.
Overall, the way this currently works , inheriting the runtime and trying to perform a shutdown in the caller tokio context seems dodgy.
I think it would be cleaner and less brittle if we
- always created a new runtime (possibly a current thread one)
- sent it in a separate thread
- create the trace exporter and spawn tasks in the new thread
- wait on a shutdown signal, where we shutdown the shared runtime and drop it.
| # Local development uses sibling path deps so changes in ../libdatadog are picked up | ||
| # without a roundtrip through GitHub. CI / publish must switch this back to the `git` | ||
| # form (see commit `3e5ed24` for the precedent). Once the borrowed-runtime work in | ||
| # libdatadog lands and is pushed, replace these with the corresponding git refs. | ||
| libdd-data-pipeline = { path = "../libdatadog/libdd-data-pipeline", default-features = false, features = ["telemetry"] } | ||
| libdd-trace-utils = { path = "../libdatadog/libdd-trace-utils", default-features = false } | ||
| libdd-telemetry = { path = "../libdatadog/libdd-telemetry", default-features = false } | ||
| libdd-common = { path = "../libdatadog/libdd-common", default-features = false } | ||
| libdd-tinybytes = { path = "../libdatadog/libdd-tinybytes", default-features = false } | ||
| libdd-library-config = { path = "../libdatadog/libdd-library-config", default-features = false } | ||
| libdd-shared-runtime = { path = "../libdatadog/libdd-shared-runtime", default-features = false } | ||
| libdd-capabilities-impl = { path = "../libdatadog/libdd-capabilities-impl", default-features = false } |
There was a problem hiding this comment.
Just a reminder that this shouldn't be merged
| @@ -0,0 +1,28 @@ | |||
| [[ | |||
There was a problem hiding this comment.
Why have the snapshot test changed?
| let runtime = match tokio::runtime::Handle::try_current() { | ||
| Ok(handle) => Arc::new(SharedRuntime::from_handle(handle)), | ||
| Err(_) => Arc::new( | ||
| SharedRuntime::new().expect("failed to create SharedRuntime for trace exporter"), | ||
| ), | ||
| }; |
There was a problem hiding this comment.
DIscussed IRL but I would prefer if we always created a new runtime for the SharedRuntime, and sent it in a brand new thread, not risking interacting with it from the customer side
| // Drive the async builder to completion. The future's only suspension point is | ||
| // a single non-blocking mpsc `send` into a freshly-created telemetry-worker | ||
| // channel, which is always Ready on first poll. We deliberately use | ||
| // [`poll_to_completion`] instead of `SharedRuntime::block_on` (which errors in | ||
| // borrowed mode) or `tokio::Handle::block_on` (which panics from inside a tokio | ||
| // worker thread). See [`poll_to_completion`]'s docs for the contract. |
There was a problem hiding this comment.
This seems like a very brittle expectations
| // alive SharedRuntime) before we tear the workers down. Without this, fast-finishing | ||
| // applications would shut the workers down before the stats worker ever ran, dropping | ||
| // the stats payload and breaking expectations downstream. | ||
| let flush_result = if self.runtime.is_borrowed() && tokio_is_multi_thread() { |
There was a problem hiding this comment.
So if the runtime is not multi threaded we just block?
This is not correct as the shutdown is an async operation, so if the shared runtime inherited the same current thread runtime as the app, we will block it and the trace buffer flush won't be able to make progress
| // trace-buffer worker's flush while we park on the Condvar inside | ||
| // `flush_and_wait`. Without this, the Condvar wait would block the only | ||
| // tokio worker thread available to make progress. | ||
| tokio::task::block_in_place(|| self.trace_buffer.flush_and_wait(remaining())) |
There was a problem hiding this comment.
Also shutting down the trace buffer should already be flushing lefotver spans in the pipe. No need to flush and wait here
| } | ||
| } | ||
|
|
||
| fn poll_to_completion<F: std::future::Future>(future: F) -> F::Output { |
There was a problem hiding this comment.
I'd prefer if we used https://docs.rs/futures/latest/futures/executor/fn.block_on.html which does the same
|
Hey @Aaalibaba42, Hope you’re doing well! I have a PR that’s currently blocked on this change, so I wanted to check in and see if you have a rough ETA for when this might be merged. No rush if you’re still working through it, just trying to plan around the dependency 😄 |
|
Hey @mabdinur, this might still be a few days. The changes here lead to discussions and changes in libdatadog that must be settled before this is merged, I'm hoping to have this merged before the end of next week but it's the last PR of the effort :/. |
What?
Replace the in-tree
AsyncTraceExporter(buffering, flushing, shutdownorchestration) with libdatadog's
TraceBuffer+SharedRuntime, the sameimplementation that was upstreamed from this repo. The
DatadogExporteris nowa thin wrapper that owns a
TraceBufferand aSharedRuntimeinstead ofmanaging its own background thread, batch queue, and condvar synchronisation.
When the exporter is constructed from inside a tokio runtime — the common case
for an async application booting up dd-trace-rs from its
#[tokio::main]— theunderlying
SharedRuntimenow borrows the host runtime instead of spawninga second one. Borrowed mode is what makes
Dropandshutdownwork cleanlyfrom a host worker thread without nested-runtime panics.
Why?
language tracers. Keeping a duplicate copy here means diverging bug fixes,
duplicated tests, and extra maintenance. Adopting the libdatadog version
lets us delete ~860 lines of
exporter/mod.rsand benefit from upstreamimprovements (e.g. runtime-aware shutdown) for free.
both panicked with
Cannot start a runtime from within a runtimewhen theuser owned a tokio runtime — which is the only sane way to run a modern
Rust web service. Borrowed-mode
SharedRuntimeis the proper fix.How?
1. Adopt
TraceBuffer+SharedRuntimedatadog-opentelemetry/src/exporter/mod.rsentirely.DatadogExporterinspan_exporter.rsto construct aTraceExporterviaTraceExporterBuilderand wrap it in aTraceBuffer.MapperExporterimplementsExport<SpanData>for the OTel→DD conversionand lives alongside the new
DatadogExporter.2. Pick the right runtime backing at construction time
If we are constructed from inside a tokio runtime, borrow it; otherwise spin
up our own. Borrowed mode gives up fork-safety in exchange for letting
Drop/shutdownwork withoutblock_on— see the borrowed-runtime work inlibdd-shared-runtime.3. Drive the async builder from a sync constructor
DatadogExporter::newis sync (-> Self) but the newTraceExporterBuilder::build_asyncis async. CallingSharedRuntime::block_onin borrowed mode returnsUnsupported; callingtokio::runtime::Handle::block_onfrom a tokio workerpanics. Solution: a small
poll_to_completionhelper — a hand-rolled,thread-park-based executor — that drives
build_asyncto completion. This issafe because
build_async's only suspension point is a single non-blockingmpsc send into a freshly-created telemetry-worker channel, which is always
Readyon first poll. The contract is documented at the helper's call site.4. Non-blocking shutdown
DatadogExporter::shutdownnow usesSharedRuntime::trigger_shutdown_signal+wait_shutdown_done(Condvar-based,no
block_on). To avoid deadlocking the single worker thread of acurrent_threadhost runtime, the wait is conditional on the host runtime'sflavor:
tokio::task::block_in_placearoundwait_shutdown_doneso other workers can keep driving the shutdown tasks.current_threadhost — fire-and-forgettrigger_shutdown_signalandskip the Condvar wait entirely; blocking the only worker thread would
prevent the shutdown tasks from ever running. This is best-effort; an
async-aware
shutdown_asyncAPI is the long-term fix.A small
tokio_is_multi_thread()helper does the flavor detection.5.
DropforDatadogExporterCallers that forget to call
shutdown()now get a best-effort teardown with a1s timeout. The impl is idempotent: a prior explicit shutdown makes the
Dropa no-op. Regression tests exercise drop-inside-tokio anddrop-after-explicit-shutdown, both asserting the borrowed-runtime path
(
exporter.runtime.is_borrowed()).Tests
test_drop_inside_tokio_runtimeandtest_drop_after_explicit_shutdown_is_noop, which used to be#[ignore]dwhile the tokio-in-tokio panic was open. Both now run under
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]and assertexporter.runtime.is_borrowed().test_new_outside_tokio_picks_owned_runtimefor the owned-runtimefallback.
global-state tests are unchanged.
Additional notes
Cargo.tomlcurrently point to thejwiriath/tracebuffer-sharedruntime-rusttracerbranch via path deps forlocal development; the workspace comment documents that these need to flip
back to git refs (or pinned versions) once the libdatadog PR lands and is
published.
test-utilsfeature gate exposes await_readyhook onMapperExporterso snapshot tests can wait for agent info before assertingon stats-derived metrics.
DatadogExporterAPI surface:send_chunk,force_flush,shutdown, andqueue_metricsall keep their existingsignatures.