diff --git a/.my-shell.nix b/.my-shell.nix new file mode 120000 index 0000000..c05445c --- /dev/null +++ b/.my-shell.nix @@ -0,0 +1 @@ +/home/freesig/holochain/my-shell.nix \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 882ae2f..47a8183 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ readme = "README.md" license-file = "LICENSE" [features] -default = ["opentelemetry-on"] +default = [] # Allows across thread and process tracing opentelemetry-on = ["opentelemetry", "tracing-opentelemetry", "holochain_serialized_bytes", "serde", "serde_bytes"] channels = ["tokio", "shrinkwraprs"] @@ -19,13 +19,13 @@ derive_more = "0.99.3" inferno = "0.10.0" serde_json = { version = "1.0.51", features = [ "preserve_order" ] } thiserror = "1.0.22" -tracing = "0.1.21" -tracing-core = "0.1.17" -tracing-serde = "0.1.2" -tracing-subscriber = "0.2.15" +tracing = "0.1.29" +tracing-core = "0.1.21" +tracing-serde = "0.1" +tracing-subscriber = { version = "0.3", features = ["default", "env-filter", "std", "fmt", "time", "json"] } -opentelemetry = { version = "0.8", default-features = false, features = ["trace", "serialize"], optional = true } -tracing-opentelemetry = { version = "0.8.0", optional = true } +opentelemetry = { version = "0.16", default-features = false, features = ["trace", "serialize"], optional = true } +tracing-opentelemetry = { version = "0.15.0", optional = true } holochain_serialized_bytes = {version = "0.0", optional = true } serde = { version = "1", optional = true } serde_bytes = { version = "0.11", optional = true } diff --git a/examples/channels.rs b/examples/channels.rs index 5ca9971..319941a 100644 --- a/examples/channels.rs +++ b/examples/channels.rs @@ -1,83 +1,85 @@ -use std::error::Error; +// use std::error::Error; -use observability::{span_context, MsgWrap}; -use tokio::sync::mpsc; -use tracing::*; +// use observability::{span_context, MsgWrap}; +// use tokio::sync::mpsc; +// use tracing::*; -#[derive(Debug)] -struct Foo; +// #[derive(Debug)] +// struct Foo; -struct MyChannel { - rx: mpsc::Receiver>, - tx: mpsc::Sender>, -} +// struct MyChannel { +// rx: mpsc::Receiver>, +// tx: mpsc::Sender>, +// } -impl MyChannel { - fn new(tx: mpsc::Sender>, rx: mpsc::Receiver>) -> Self { - Self { rx, tx } - } -} +// impl MyChannel { +// fn new(tx: mpsc::Sender>, rx: mpsc::Receiver>) -> Self { +// Self { rx, tx } +// } +// } -#[tokio::main] -async fn main() -> Result<(), Box> { - observability::test_run_open().ok(); - span_context!(); - span_context!(current, Level::DEBUG); - let (tx1, rx2) = mpsc::channel(10); - let (tx2, rx1) = mpsc::channel(10); - let c1 = MyChannel::new(tx1.clone(), rx1); - let c2 = MyChannel::new(tx2, rx2); - let (tx4, rx4) = mpsc::channel(10); - let (_, dead) = mpsc::channel(1); - let c3 = MyChannel::new(tx1, rx4); - let c4 = MyChannel::new(tx4, dead); - let mut jh = Vec::new(); - jh.push(tokio::task::spawn(async { a(c1).await.unwrap() })); - jh.push(tokio::task::spawn(async { b(c2, c4).await.unwrap() })); - jh.push(tokio::task::spawn(async { c(c3).await.unwrap() })); +// #[tokio::main] +// async fn main() -> Result<(), Box> { +// observability::test_run_open().ok(); +// span_context!(); +// span_context!(current, Level::DEBUG); +// let (tx1, rx2) = mpsc::channel(10); +// let (tx2, rx1) = mpsc::channel(10); +// let c1 = MyChannel::new(tx1.clone(), rx1); +// let c2 = MyChannel::new(tx2, rx2); +// let (tx4, rx4) = mpsc::channel(10); +// let (_, dead) = mpsc::channel(1); +// let c3 = MyChannel::new(tx1, rx4); +// let c4 = MyChannel::new(tx4, dead); +// let mut jh = Vec::new(); +// jh.push(tokio::task::spawn(async { a(c1).await.unwrap() })); +// jh.push(tokio::task::spawn(async { b(c2, c4).await.unwrap() })); +// jh.push(tokio::task::spawn(async { c(c3).await.unwrap() })); - for h in jh { - h.await?; - } - Ok(()) -} +// for h in jh { +// h.await?; +// } +// Ok(()) +// } -#[instrument(skip(channel))] -async fn a(mut channel: MyChannel) -> Result<(), Box> { - for _ in 0..10 { - span_context!(Span::current()); - channel.tx.send(Foo.into()).await?; - if let Some(r) = channel.rx.recv().await { - r.inner(); - } - } - tokio::time::delay_for(std::time::Duration::from_millis(500)).await; - Ok(()) -} +// #[instrument(skip(channel))] +// async fn a(mut channel: MyChannel) -> Result<(), Box> { +// for _ in 0..10 { +// span_context!(Span::current()); +// channel.tx.send(Foo.into()).await?; +// if let Some(r) = channel.rx.recv().await { +// r.inner(); +// } +// } +// tokio::time::delay_for(std::time::Duration::from_millis(500)).await; +// Ok(()) +// } -#[instrument(skip(channel, to_c))] -async fn b(mut channel: MyChannel, mut to_c: MyChannel) -> Result<(), Box> { - for _ in 0..10 { - span_context!(Span::current()); - if let Some(r) = channel.rx.recv().await { - r.inner(); - } - channel.tx.send(Foo.into()).await?; - to_c.tx.send(Foo.into()).await?; - } - tokio::time::delay_for(std::time::Duration::from_millis(500)).await; - Ok(()) -} +// #[instrument(skip(channel, to_c))] +// async fn b(mut channel: MyChannel, mut to_c: MyChannel) -> Result<(), Box> { +// for _ in 0..10 { +// span_context!(Span::current()); +// if let Some(r) = channel.rx.recv().await { +// r.inner(); +// } +// channel.tx.send(Foo.into()).await?; +// to_c.tx.send(Foo.into()).await?; +// } +// tokio::time::delay_for(std::time::Duration::from_millis(500)).await; +// Ok(()) +// } -#[instrument(skip(from_b_to_a))] -async fn c(mut from_b_to_a: MyChannel) -> Result<(), Box> { - for _ in 0..10 { - span_context!(Span::current()); - if let Some(r) = from_b_to_a.rx.recv().await { - r.inner(); - } - from_b_to_a.tx.send(Foo.into()).await?; - } - tokio::time::delay_for(std::time::Duration::from_millis(500)).await; - Ok(()) -} +// #[instrument(skip(from_b_to_a))] +// async fn c(mut from_b_to_a: MyChannel) -> Result<(), Box> { +// for _ in 0..10 { +// span_context!(Span::current()); +// if let Some(r) = from_b_to_a.rx.recv().await { +// r.inner(); +// } +// from_b_to_a.tx.send(Foo.into()).await?; +// } +// tokio::time::delay_for(std::time::Duration::from_millis(500)).await; +// Ok(()) +// } + +fn main() {} diff --git a/examples/socket_client.rs b/examples/socket_client.rs index aaecf62..4594e54 100644 --- a/examples/socket_client.rs +++ b/examples/socket_client.rs @@ -1,54 +1,56 @@ -use observability::{span_context, OpenSpanExt}; -use std::{env, error::Error, net::SocketAddr}; -use tokio::net::UdpSocket; -use tracing::*; +// use observability::{span_context, OpenSpanExt}; +// use std::{env, error::Error, net::SocketAddr}; +// use tokio::net::UdpSocket; +// use tracing::*; -#[tokio::main] -async fn main() -> Result<(), Box> { - observability::test_run_open().ok(); - let remote_addr: SocketAddr = env::args() - .nth(1) - .unwrap_or_else(|| "127.0.0.1:8080".into()) - .parse()?; - let local_addr: SocketAddr = if remote_addr.is_ipv4() { - "0.0.0.0:0" - } else { - "[::]:0" - } - .parse()?; +// #[tokio::main] +// async fn main() -> Result<(), Box> { +// observability::test_run_open().ok(); +// let remote_addr: SocketAddr = env::args() +// .nth(1) +// .unwrap_or_else(|| "127.0.0.1:8080".into()) +// .parse()?; +// let local_addr: SocketAddr = if remote_addr.is_ipv4() { +// "0.0.0.0:0" +// } else { +// "[::]:0" +// } +// .parse()?; - let mut socket = UdpSocket::bind(local_addr).await?; - const MAX_DATAGRAM_SIZE: usize = 65_507; - socket.connect(&remote_addr).await?; - { - let span = debug_span!("client send"); - let _g = span.enter(); - span_context!(span, Level::DEBUG); - let data = span.get_context_bytes(); +// let mut socket = UdpSocket::bind(local_addr).await?; +// const MAX_DATAGRAM_SIZE: usize = 65_507; +// socket.connect(&remote_addr).await?; +// { +// let span = debug_span!("client send"); +// let _g = span.enter(); +// span_context!(span, Level::DEBUG); +// let data = span.get_context_bytes(); - socket.send(data.as_ref()).await?; - } - { - let mut data = vec![0u8; MAX_DATAGRAM_SIZE]; - let len = socket.recv(&mut data).await?; - let data = data[..len].to_vec(); - let span = debug_span!("client recv"); - let _g = span.enter(); - span.set_from_bytes(data); - span_context!(span, Level::DEBUG); +// socket.send(data.as_ref()).await?; +// } +// { +// let mut data = vec![0u8; MAX_DATAGRAM_SIZE]; +// let len = socket.recv(&mut data).await?; +// let data = data[..len].to_vec(); +// let span = debug_span!("client recv"); +// let _g = span.enter(); +// span.set_from_bytes(data); +// span_context!(span, Level::DEBUG); - let data = span.get_context_bytes(); +// let data = span.get_context_bytes(); - socket.send(data.as_ref()).await?; - } - { - let mut data = vec![0u8; MAX_DATAGRAM_SIZE]; - let len = socket.recv(&mut data).await?; - let data = data[..len].to_vec(); - let span = debug_span!("client recv 2"); - let _g = span.enter(); - span.set_from_bytes(data); - span_context!(span, Level::DEBUG); - } - Ok(()) -} +// socket.send(data.as_ref()).await?; +// } +// { +// let mut data = vec![0u8; MAX_DATAGRAM_SIZE]; +// let len = socket.recv(&mut data).await?; +// let data = data[..len].to_vec(); +// let span = debug_span!("client recv 2"); +// let _g = span.enter(); +// span.set_from_bytes(data); +// span_context!(span, Level::DEBUG); +// } +// Ok(()) +// } + +fn main() {} \ No newline at end of file diff --git a/examples/socket_server.rs b/examples/socket_server.rs index 78411f6..6cb1dba 100644 --- a/examples/socket_server.rs +++ b/examples/socket_server.rs @@ -1,48 +1,50 @@ -use observability::{span_context, OpenSpanExt}; -use std::{env, error::Error}; -use tokio::net::UdpSocket; -use tracing::*; +// use observability::{span_context, OpenSpanExt}; +// use std::{env, error::Error}; +// use tokio::net::UdpSocket; +// use tracing::*; -#[tokio::main] -async fn main() -> Result<(), Box> { - observability::test_run_open().ok(); - let addr = env::args() - .nth(1) - .unwrap_or_else(|| "127.0.0.1:8080".to_string()); +// #[tokio::main] +// async fn main() -> Result<(), Box> { +// observability::test_run_open().ok(); +// let addr = env::args() +// .nth(1) +// .unwrap_or_else(|| "127.0.0.1:8080".to_string()); - let mut socket = UdpSocket::bind(&addr).await?; - println!("Listening on: {}", socket.local_addr()?); +// let mut socket = UdpSocket::bind(&addr).await?; +// println!("Listening on: {}", socket.local_addr()?); - { - let mut buf = vec![0; 1024]; - let (size, peer) = socket.recv_from(&mut buf).await?; +// { +// let mut buf = vec![0; 1024]; +// let (size, peer) = socket.recv_from(&mut buf).await?; - let data = buf[..size].to_vec(); - let span = debug_span!("server recv"); - span.set_from_bytes(data); - let _g = span.enter(); - span_context!(span, Level::DEBUG); - let span = debug_span!("inner 1"); - let _g = span.enter(); - span_context!(span, Level::DEBUG); - let span = debug_span!("inner 2"); - let _g = span.enter(); - span_context!(span, Level::DEBUG); - let data = span.get_context_bytes(); - let _amt = socket.send_to(&data[..], &peer).await?; - } +// let data = buf[..size].to_vec(); +// let span = debug_span!("server recv"); +// span.set_from_bytes(data); +// let _g = span.enter(); +// span_context!(span, Level::DEBUG); +// let span = debug_span!("inner 1"); +// let _g = span.enter(); +// span_context!(span, Level::DEBUG); +// let span = debug_span!("inner 2"); +// let _g = span.enter(); +// span_context!(span, Level::DEBUG); +// let data = span.get_context_bytes(); +// let _amt = socket.send_to(&data[..], &peer).await?; +// } - { - let mut buf = vec![0; 1024]; - let (size, peer) = socket.recv_from(&mut buf).await?; +// { +// let mut buf = vec![0; 1024]; +// let (size, peer) = socket.recv_from(&mut buf).await?; - let data = buf[..size].to_vec(); - let span = debug_span!("server recv 2"); - span.set_from_bytes(data); - let _g = span.enter(); - span_context!(span, Level::DEBUG); - let data = span.get_context_bytes(); - let _amt = socket.send_to(&data[..], &peer).await?; - } - Ok(()) -} +// let data = buf[..size].to_vec(); +// let span = debug_span!("server recv 2"); +// span.set_from_bytes(data); +// let _g = span.enter(); +// span_context!(span, Level::DEBUG); +// let data = span.get_context_bytes(); +// let _amt = socket.send_to(&data[..], &peer).await?; +// } +// Ok(()) +// } + +fn main() {} \ No newline at end of file diff --git a/src/fmt.rs b/src/fmt.rs index 0da9600..091b037 100644 --- a/src/fmt.rs +++ b/src/fmt.rs @@ -4,7 +4,7 @@ use tracing_core::field::Field; use tracing_serde::AsSerde; use tracing_subscriber::{ field::Visit, - fmt::{FmtContext, FormatFields}, + fmt::{format::Writer, FmtContext, FormatFields}, registry::LookupSpan, }; @@ -36,7 +36,7 @@ impl Visit for EventFieldVisitor { // Formatting the events for json pub(crate) fn format_event( ctx: &FmtContext<'_, S, N>, - writer: &mut dyn std::fmt::Write, + mut writer: Writer<'_>, event: &Event<'_>, ) -> std::fmt::Result where @@ -75,7 +75,7 @@ where // Formatting the events for json pub(crate) fn format_event_flame( ctx: &FmtContext<'_, S, N>, - writer: &mut dyn std::fmt::Write, + mut writer: Writer<'_>, event: &Event<'_>, ) -> std::fmt::Result where @@ -97,7 +97,7 @@ where // Formatting the events for json pub(crate) fn format_event_ice( ctx: &FmtContext<'_, S, N>, - writer: &mut dyn std::fmt::Write, + mut writer: Writer<'_>, event: &Event<'_>, ) -> std::fmt::Result where diff --git a/src/lib.rs b/src/lib.rs index 2e05382..1898d7e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -75,13 +75,18 @@ use tracing::{Event, Subscriber}; use tracing_subscriber::{ - filter::EnvFilter, - fmt::{format::FmtSpan, time::ChronoUtc, FmtContext}, + filter::{Directive, EnvFilter}, + fmt::{ + format::{DefaultFields, FmtSpan, Format, Full, Writer}, + time::UtcTime, + FmtContext, Formatter, + }, registry::LookupSpan, + reload::Handle, FmtSubscriber, }; -use std::{str::FromStr, sync::Once}; +use std::{io::Stderr, str::FromStr, sync::Once}; use flames::{toml_path, FlameTimed}; use fmt::*; @@ -93,12 +98,14 @@ mod open; #[cfg(all(feature = "opentelemetry-on", feature = "channels"))] pub use open::channel; -#[cfg(feature = "opentelemetry-on")] -pub use open::should_run; +// #[cfg(feature = "opentelemetry-on")] +// pub use open::should_run; pub use open::{Config, Context, MsgWrap, OpenSpanExt}; - pub use tracing; + +thread_local!(static HANDLE: std::cell::RefCell, fn() -> Stderr>>>> = std::cell::RefCell::new(None)); + #[derive(Debug, Clone)] /// Sets the kind of structured logging output you want pub enum Output { @@ -116,12 +123,40 @@ pub enum Output { FlameTimed, /// Creates a flamegraph from timed spans using idle time IceTimed, - /// Opentelemetry tracing - OpenTel, + // /// Opentelemetry tracing + // OpenTel, + /// Regular logging but can be the filters can be changed at runtime. + Dynamic, /// No logging to console None, } +/// A handle to dynamically reload the current filter. +pub struct DynFilter( + Option, fn() -> Stderr>>>, + Option, +); + +impl DynFilter { + /// Reload the current filter by adding a new directive. + /// You can overwrite a previous directive to disable it. + pub fn reload(&mut self, filter: &str) -> Result<(), Box> { + let filter: Directive = filter.parse()?; + let mut empty_env = self.1.take().unwrap_or_default(); + let mut empty_option = None; + self.0.as_ref().map(|h| { + h.modify(|a| { + std::mem::swap(a, &mut empty_env); + let mut new_env = empty_env.add_directive(filter); + std::mem::swap(a, &mut new_env); + empty_option = Some(new_env); + }) + }); + self.1 = empty_option; + Ok(()) + } +} + /// ParseError is a String pub type ParseError = String; @@ -138,7 +173,7 @@ impl FromStr for Output { "LogTimed" => Ok(Output::LogTimed), "FlameTimed" => Ok(Output::FlameTimed), "Compact" => Ok(Output::Compact), - "OpenTel" => Ok(Output::OpenTel), + // "OpenTel" => Ok(Output::OpenTel), "None" => Ok(Output::None), _ => Err("Could not parse log output type".into()), } @@ -155,15 +190,29 @@ pub fn test_run() -> Result<(), errors::TracingError> { init_fmt(Output::Log) } -/// Run tracing in a test that uses open telemetry to -/// send span contexts across process and thread boundaries. -pub fn test_run_open() -> Result<(), errors::TracingError> { +/// Run logging in a unit test +/// RUST_LOG or CUSTOM_FILTER must be set or +/// this is a no-op +pub fn dyn_test_run() -> Result { if std::env::var_os("RUST_LOG").is_none() { - return Ok(()); + return Ok(DynFilter(None, None)); } - init_fmt(Output::OpenTel) + init_fmt(Output::Dynamic)?; + let handle = HANDLE + .with(|h| h.borrow_mut().take()) + .ok_or(errors::TracingError::DynamicHandle)?; + Ok(DynFilter(Some(handle), None)) } +/// Run tracing in a test that uses open telemetry to +/// send span contexts across process and thread boundaries. +// pub fn test_run_open() -> Result<(), errors::TracingError> { +// if std::env::var_os("RUST_LOG").is_none() { +// return Ok(()); +// } +// init_fmt(Output::OpenTel) +// } + /// Same as test_run but with timed spans pub fn test_run_timed() -> Result<(), errors::TracingError> { if std::env::var_os("RUST_LOG").is_none() { @@ -235,31 +284,22 @@ pub fn init_fmt(output: Output) -> Result<(), errors::TracingError> { }) .ok(); } - let fm: fn( - ctx: &FmtContext<'_, _, _>, - &mut dyn std::fmt::Write, - &Event<'_>, - ) -> std::fmt::Result = format_event; - let fm_flame: fn( - ctx: &FmtContext<'_, _, _>, - &mut dyn std::fmt::Write, - &Event<'_>, - ) -> std::fmt::Result = format_event_flame; - let fm_ice: fn( - ctx: &FmtContext<'_, _, _>, - &mut dyn std::fmt::Write, - &Event<'_>, - ) -> std::fmt::Result = format_event_ice; + let fm: fn(ctx: &FmtContext<'_, _, _>, Writer<'_>, &Event<'_>) -> std::fmt::Result = + format_event; + let fm_flame: fn(ctx: &FmtContext<'_, _, _>, Writer<'_>, &Event<'_>) -> std::fmt::Result = + format_event_flame; + let fm_ice: fn(ctx: &FmtContext<'_, _, _>, Writer<'_>, &Event<'_>) -> std::fmt::Result = + format_event_ice; let subscriber = FmtSubscriber::builder() - .with_writer(std::io::stderr) + .with_writer(std::io::stderr as fn() -> Stderr) .with_target(true); match output { Output::Json => { let subscriber = subscriber .with_env_filter(filter) - .with_timer(ChronoUtc::rfc3339()) + .with_timer(UtcTime::rfc_3339()) .json() .event_format(fm); finish(subscriber.finish()) @@ -268,7 +308,7 @@ pub fn init_fmt(output: Output) -> Result<(), errors::TracingError> { let subscriber = subscriber .with_span_events(FmtSpan::CLOSE) .with_env_filter(filter) - .with_timer(ChronoUtc::rfc3339()) + .with_timer(UtcTime::rfc_3339()) .json() .event_format(fm); finish(subscriber.finish()) @@ -282,7 +322,7 @@ pub fn init_fmt(output: Output) -> Result<(), errors::TracingError> { let subscriber = subscriber .with_span_events(FmtSpan::CLOSE) .with_env_filter(filter) - .with_timer(ChronoUtc::rfc3339()) + .with_timer(UtcTime::rfc_3339()) .event_format(fm_flame); finish(subscriber.finish()) } @@ -290,7 +330,7 @@ pub fn init_fmt(output: Output) -> Result<(), errors::TracingError> { let subscriber = subscriber .with_span_events(FmtSpan::CLOSE) .with_env_filter(filter) - .with_timer(ChronoUtc::rfc3339()) + .with_timer(UtcTime::rfc_3339()) .event_format(fm_ice); finish(subscriber.finish()) } @@ -298,29 +338,37 @@ pub fn init_fmt(output: Output) -> Result<(), errors::TracingError> { let subscriber = subscriber.compact(); finish(subscriber.with_env_filter(filter).finish()) } - Output::OpenTel => { - #[cfg(feature = "opentelemetry-on")] - { - use open::OPEN_ON; - use opentelemetry::api::Provider; - OPEN_ON.store(true, std::sync::atomic::Ordering::SeqCst); - use tracing_subscriber::prelude::*; - open::init(); - let tracer = opentelemetry::sdk::Provider::default().get_tracer("component_name"); - let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); - finish( - subscriber - .with_env_filter(filter) - .finish() - .with(telemetry) - .with(open::OpenLayer), - ) - } - #[cfg(not(feature = "opentelemetry-on"))] - { - Ok(()) - } + Output::Dynamic => { + let subscriber = subscriber.with_env_filter(filter).with_filter_reloading(); + let handle = subscriber.reload_handle(); + HANDLE.with(|f| { + *f.borrow_mut() = Some(handle); + }); + finish(subscriber.finish()) } + // Output::OpenTel => { + // #[cfg(feature = "opentelemetry-on")] + // { + // use open::OPEN_ON; + // use opentelemetry::api::Provider; + // OPEN_ON.store(true, std::sync::atomic::Ordering::SeqCst); + // use tracing_subscriber::prelude::*; + // open::init(); + // let tracer = opentelemetry::sdk::Provider::default().get_tracer("component_name"); + // let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + // finish( + // subscriber + // .with_env_filter(filter) + // .finish() + // .with(telemetry) + // .with(open::OpenLayer), + // ) + // } + // #[cfg(not(feature = "opentelemetry-on"))] + // { + // Ok(()) + // } + // } Output::None => Ok(()), } } @@ -349,6 +397,8 @@ pub mod errors { SetGlobal(#[from] tracing::subscriber::SetGlobalDefaultError), #[error("Failed to setup tracing flame")] TracingFlame, + #[error("Failed to get the dynamic handle")] + DynamicHandle, #[error(transparent)] BadDirective(#[from] tracing_subscriber::filter::ParseError), } diff --git a/src/open.rs b/src/open.rs index 2c72d31..909f64a 100644 --- a/src/open.rs +++ b/src/open.rs @@ -404,7 +404,7 @@ mod on { where S: Subscriber + for<'span> LookupSpan<'span>, { - fn new_span( + fn on_new_span( &self, attrs: &Attributes<'_>, id: &tracing::span::Id, diff --git a/tests/dynamic.rs b/tests/dynamic.rs new file mode 100644 index 0000000..8562b39 --- /dev/null +++ b/tests/dynamic.rs @@ -0,0 +1,14 @@ +use tracing::*; + +#[test] +fn reload_filter() { + let mut handle = observability::dyn_test_run().unwrap(); + + let span = debug_span!("span"); + span.in_scope(|| debug!("test")); + + handle.reload("debug").unwrap(); + + let span = debug_span!("span"); + span.in_scope(|| debug!("test")); +} diff --git a/tests/open_telemetry.rs b/tests/open_telemetry.rs index 20b1bb6..e9cb685 100644 --- a/tests/open_telemetry.rs +++ b/tests/open_telemetry.rs @@ -1,81 +1,81 @@ -use observability::{span_context, Context, OpenSpanExt}; -use tokio::sync::mpsc; -use tracing::*; +// use observability::{span_context, Context, OpenSpanExt}; +// use tokio::sync::mpsc; +// use tracing::*; -#[tokio::test(threaded_scheduler)] -async fn same_thread_test() { - observability::test_run_open().ok(); - let span = debug_span!("span a"); - let context = span.get_context(); - let _g = span.enter(); +// #[tokio::test(threaded_scheduler)] +// async fn same_thread_test() { +// observability::test_run_open().ok(); +// let span = debug_span!("span a"); +// let context = span.get_context(); +// let _g = span.enter(); - span_context!(span, Level::DEBUG); - debug!(msg = "in span a"); +// span_context!(span, Level::DEBUG); +// debug!(msg = "in span a"); - let span = debug_span!("span b"); - let _g = span.enter(); - debug!("in span b"); - span_context!(span, Level::DEBUG); +// let span = debug_span!("span b"); +// let _g = span.enter(); +// debug!("in span b"); +// span_context!(span, Level::DEBUG); - let span = debug_span!("span c"); - span.set_context(context); - span_context!(span, Level::DEBUG); - let _g = span.enter(); - debug!("in span c"); -} +// let span = debug_span!("span c"); +// span.set_context(context); +// span_context!(span, Level::DEBUG); +// let _g = span.enter(); +// debug!("in span c"); +// } -#[tokio::test(threaded_scheduler)] -async fn cross_thread_test() { - observability::test_run_open().ok(); - let (mut tx1, rx1) = mpsc::channel(100); - let (tx2, mut rx2) = mpsc::channel(100); - tokio::task::spawn(across_thread(rx1, tx2)); - { - let span = debug_span!("from original thread"); - let context = span.get_context(); - let _g = span.enter(); - span_context!(span, Level::DEBUG); - tx1.send(context).await.unwrap(); - } - { - let context = rx2.recv().await.unwrap(); +// #[tokio::test(threaded_scheduler)] +// async fn cross_thread_test() { +// observability::test_run_open().ok(); +// let (mut tx1, rx1) = mpsc::channel(100); +// let (tx2, mut rx2) = mpsc::channel(100); +// tokio::task::spawn(across_thread(rx1, tx2)); +// { +// let span = debug_span!("from original thread"); +// let context = span.get_context(); +// let _g = span.enter(); +// span_context!(span, Level::DEBUG); +// tx1.send(context).await.unwrap(); +// } +// { +// let context = rx2.recv().await.unwrap(); - let span = debug_span!("original thread"); - span.set_context(context); - span_context!(span, Level::DEBUG); - let _g = span.enter(); - let span = debug_span!("inner"); - let _g = span.enter(); - span_context!(span, Level::DEBUG); - } - { - let context = rx2.recv().await.unwrap(); +// let span = debug_span!("original thread"); +// span.set_context(context); +// span_context!(span, Level::DEBUG); +// let _g = span.enter(); +// let span = debug_span!("inner"); +// let _g = span.enter(); +// span_context!(span, Level::DEBUG); +// } +// { +// let context = rx2.recv().await.unwrap(); - let span = debug_span!("original thread"); - span.set_context(context); - span_context!(span, Level::DEBUG); - let _g = span.enter(); - } -} +// let span = debug_span!("original thread"); +// span.set_context(context); +// span_context!(span, Level::DEBUG); +// let _g = span.enter(); +// } +// } -async fn across_thread(mut rx: mpsc::Receiver, mut tx: mpsc::Sender) { - { - let context = rx.recv().await.unwrap(); - let span = debug_span!("across thread"); - span.set_context(context); - span_context!(span, Level::DEBUG); - let _g = span.enter(); - let span = debug_span!("inner"); - let _g = span.enter(); - span_context!(span, Level::DEBUG); - tx.send(span.get_context()).await.unwrap(); - } - tokio::time::delay_for(std::time::Duration::from_millis(100)).await; - { - let span = debug_span!("from another thread"); - let context = span.get_context(); - let _g = span.enter(); - span_context!(span, Level::DEBUG); - tx.send(context).await.unwrap(); - } -} +// async fn across_thread(mut rx: mpsc::Receiver, mut tx: mpsc::Sender) { +// { +// let context = rx.recv().await.unwrap(); +// let span = debug_span!("across thread"); +// span.set_context(context); +// span_context!(span, Level::DEBUG); +// let _g = span.enter(); +// let span = debug_span!("inner"); +// let _g = span.enter(); +// span_context!(span, Level::DEBUG); +// tx.send(span.get_context()).await.unwrap(); +// } +// tokio::time::delay_for(std::time::Duration::from_millis(100)).await; +// { +// let span = debug_span!("from another thread"); +// let context = span.get_context(); +// let _g = span.enter(); +// span_context!(span, Level::DEBUG); +// tx.send(context).await.unwrap(); +// } +// }