Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .my-shell.nix
14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ readme = "README.md"
license-file = "LICENSE"

[features]
default = ["opentelemetry-on"]
default = []
# Allows across thread and process tracing
opentelemetry-on = ["opentelemetry", "tracing-opentelemetry", "holochain_serialized_bytes", "serde", "serde_bytes"]
channels = ["tokio", "shrinkwraprs"]
Expand All @@ -19,13 +19,13 @@ derive_more = "0.99.3"
inferno = "0.10.0"
serde_json = { version = "1.0.51", features = [ "preserve_order" ] }
thiserror = "1.0.22"
tracing = "0.1.21"
tracing-core = "0.1.17"
tracing-serde = "0.1.2"
tracing-subscriber = "0.2.15"
tracing = "0.1.29"
tracing-core = "0.1.21"
tracing-serde = "0.1"
tracing-subscriber = { version = "0.3", features = ["default", "env-filter", "std", "fmt", "time", "json"] }

opentelemetry = { version = "0.8", default-features = false, features = ["trace", "serialize"], optional = true }
tracing-opentelemetry = { version = "0.8.0", optional = true }
opentelemetry = { version = "0.16", default-features = false, features = ["trace", "serialize"], optional = true }
tracing-opentelemetry = { version = "0.15.0", optional = true }
holochain_serialized_bytes = {version = "0.0", optional = true }
serde = { version = "1", optional = true }
serde_bytes = { version = "0.11", optional = true }
Expand Down
150 changes: 76 additions & 74 deletions examples/channels.rs
Original file line number Diff line number Diff line change
@@ -1,83 +1,85 @@
use std::error::Error;
// use std::error::Error;

use observability::{span_context, MsgWrap};
use tokio::sync::mpsc;
use tracing::*;
// use observability::{span_context, MsgWrap};
// use tokio::sync::mpsc;
// use tracing::*;

#[derive(Debug)]
struct Foo;
// #[derive(Debug)]
// struct Foo;

struct MyChannel {
rx: mpsc::Receiver<MsgWrap<Foo>>,
tx: mpsc::Sender<MsgWrap<Foo>>,
}
// struct MyChannel {
// rx: mpsc::Receiver<MsgWrap<Foo>>,
// tx: mpsc::Sender<MsgWrap<Foo>>,
// }

impl MyChannel {
fn new(tx: mpsc::Sender<MsgWrap<Foo>>, rx: mpsc::Receiver<MsgWrap<Foo>>) -> Self {
Self { rx, tx }
}
}
// impl MyChannel {
// fn new(tx: mpsc::Sender<MsgWrap<Foo>>, rx: mpsc::Receiver<MsgWrap<Foo>>) -> Self {
// Self { rx, tx }
// }
// }

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
observability::test_run_open().ok();
span_context!();
span_context!(current, Level::DEBUG);
let (tx1, rx2) = mpsc::channel(10);
let (tx2, rx1) = mpsc::channel(10);
let c1 = MyChannel::new(tx1.clone(), rx1);
let c2 = MyChannel::new(tx2, rx2);
let (tx4, rx4) = mpsc::channel(10);
let (_, dead) = mpsc::channel(1);
let c3 = MyChannel::new(tx1, rx4);
let c4 = MyChannel::new(tx4, dead);
let mut jh = Vec::new();
jh.push(tokio::task::spawn(async { a(c1).await.unwrap() }));
jh.push(tokio::task::spawn(async { b(c2, c4).await.unwrap() }));
jh.push(tokio::task::spawn(async { c(c3).await.unwrap() }));
// #[tokio::main]
// async fn main() -> Result<(), Box<dyn Error>> {
// observability::test_run_open().ok();
// span_context!();
// span_context!(current, Level::DEBUG);
// let (tx1, rx2) = mpsc::channel(10);
// let (tx2, rx1) = mpsc::channel(10);
// let c1 = MyChannel::new(tx1.clone(), rx1);
// let c2 = MyChannel::new(tx2, rx2);
// let (tx4, rx4) = mpsc::channel(10);
// let (_, dead) = mpsc::channel(1);
// let c3 = MyChannel::new(tx1, rx4);
// let c4 = MyChannel::new(tx4, dead);
// let mut jh = Vec::new();
// jh.push(tokio::task::spawn(async { a(c1).await.unwrap() }));
// jh.push(tokio::task::spawn(async { b(c2, c4).await.unwrap() }));
// jh.push(tokio::task::spawn(async { c(c3).await.unwrap() }));

for h in jh {
h.await?;
}
Ok(())
}
// for h in jh {
// h.await?;
// }
// Ok(())
// }

#[instrument(skip(channel))]
async fn a(mut channel: MyChannel) -> Result<(), Box<dyn Error>> {
for _ in 0..10 {
span_context!(Span::current());
channel.tx.send(Foo.into()).await?;
if let Some(r) = channel.rx.recv().await {
r.inner();
}
}
tokio::time::delay_for(std::time::Duration::from_millis(500)).await;
Ok(())
}
// #[instrument(skip(channel))]
// async fn a(mut channel: MyChannel) -> Result<(), Box<dyn Error>> {
// for _ in 0..10 {
// span_context!(Span::current());
// channel.tx.send(Foo.into()).await?;
// if let Some(r) = channel.rx.recv().await {
// r.inner();
// }
// }
// tokio::time::delay_for(std::time::Duration::from_millis(500)).await;
// Ok(())
// }

#[instrument(skip(channel, to_c))]
async fn b(mut channel: MyChannel, mut to_c: MyChannel) -> Result<(), Box<dyn Error>> {
for _ in 0..10 {
span_context!(Span::current());
if let Some(r) = channel.rx.recv().await {
r.inner();
}
channel.tx.send(Foo.into()).await?;
to_c.tx.send(Foo.into()).await?;
}
tokio::time::delay_for(std::time::Duration::from_millis(500)).await;
Ok(())
}
// #[instrument(skip(channel, to_c))]
// async fn b(mut channel: MyChannel, mut to_c: MyChannel) -> Result<(), Box<dyn Error>> {
// for _ in 0..10 {
// span_context!(Span::current());
// if let Some(r) = channel.rx.recv().await {
// r.inner();
// }
// channel.tx.send(Foo.into()).await?;
// to_c.tx.send(Foo.into()).await?;
// }
// tokio::time::delay_for(std::time::Duration::from_millis(500)).await;
// Ok(())
// }

#[instrument(skip(from_b_to_a))]
async fn c(mut from_b_to_a: MyChannel) -> Result<(), Box<dyn Error>> {
for _ in 0..10 {
span_context!(Span::current());
if let Some(r) = from_b_to_a.rx.recv().await {
r.inner();
}
from_b_to_a.tx.send(Foo.into()).await?;
}
tokio::time::delay_for(std::time::Duration::from_millis(500)).await;
Ok(())
}
// #[instrument(skip(from_b_to_a))]
// async fn c(mut from_b_to_a: MyChannel) -> Result<(), Box<dyn Error>> {
// for _ in 0..10 {
// span_context!(Span::current());
// if let Some(r) = from_b_to_a.rx.recv().await {
// r.inner();
// }
// from_b_to_a.tx.send(Foo.into()).await?;
// }
// tokio::time::delay_for(std::time::Duration::from_millis(500)).await;
// Ok(())
// }

fn main() {}
100 changes: 51 additions & 49 deletions examples/socket_client.rs
Original file line number Diff line number Diff line change
@@ -1,54 +1,56 @@
use observability::{span_context, OpenSpanExt};
use std::{env, error::Error, net::SocketAddr};
use tokio::net::UdpSocket;
use tracing::*;
// use observability::{span_context, OpenSpanExt};
// use std::{env, error::Error, net::SocketAddr};
// use tokio::net::UdpSocket;
// use tracing::*;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
observability::test_run_open().ok();
let remote_addr: SocketAddr = env::args()
.nth(1)
.unwrap_or_else(|| "127.0.0.1:8080".into())
.parse()?;
let local_addr: SocketAddr = if remote_addr.is_ipv4() {
"0.0.0.0:0"
} else {
"[::]:0"
}
.parse()?;
// #[tokio::main]
// async fn main() -> Result<(), Box<dyn Error>> {
// observability::test_run_open().ok();
// let remote_addr: SocketAddr = env::args()
// .nth(1)
// .unwrap_or_else(|| "127.0.0.1:8080".into())
// .parse()?;
// let local_addr: SocketAddr = if remote_addr.is_ipv4() {
// "0.0.0.0:0"
// } else {
// "[::]:0"
// }
// .parse()?;

let mut socket = UdpSocket::bind(local_addr).await?;
const MAX_DATAGRAM_SIZE: usize = 65_507;
socket.connect(&remote_addr).await?;
{
let span = debug_span!("client send");
let _g = span.enter();
span_context!(span, Level::DEBUG);
let data = span.get_context_bytes();
// let mut socket = UdpSocket::bind(local_addr).await?;
// const MAX_DATAGRAM_SIZE: usize = 65_507;
// socket.connect(&remote_addr).await?;
// {
// let span = debug_span!("client send");
// let _g = span.enter();
// span_context!(span, Level::DEBUG);
// let data = span.get_context_bytes();

socket.send(data.as_ref()).await?;
}
{
let mut data = vec![0u8; MAX_DATAGRAM_SIZE];
let len = socket.recv(&mut data).await?;
let data = data[..len].to_vec();
let span = debug_span!("client recv");
let _g = span.enter();
span.set_from_bytes(data);
span_context!(span, Level::DEBUG);
// socket.send(data.as_ref()).await?;
// }
// {
// let mut data = vec![0u8; MAX_DATAGRAM_SIZE];
// let len = socket.recv(&mut data).await?;
// let data = data[..len].to_vec();
// let span = debug_span!("client recv");
// let _g = span.enter();
// span.set_from_bytes(data);
// span_context!(span, Level::DEBUG);

let data = span.get_context_bytes();
// let data = span.get_context_bytes();

socket.send(data.as_ref()).await?;
}
{
let mut data = vec![0u8; MAX_DATAGRAM_SIZE];
let len = socket.recv(&mut data).await?;
let data = data[..len].to_vec();
let span = debug_span!("client recv 2");
let _g = span.enter();
span.set_from_bytes(data);
span_context!(span, Level::DEBUG);
}
Ok(())
}
// socket.send(data.as_ref()).await?;
// }
// {
// let mut data = vec![0u8; MAX_DATAGRAM_SIZE];
// let len = socket.recv(&mut data).await?;
// let data = data[..len].to_vec();
// let span = debug_span!("client recv 2");
// let _g = span.enter();
// span.set_from_bytes(data);
// span_context!(span, Level::DEBUG);
// }
// Ok(())
// }

fn main() {}
86 changes: 44 additions & 42 deletions examples/socket_server.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,50 @@
use observability::{span_context, OpenSpanExt};
use std::{env, error::Error};
use tokio::net::UdpSocket;
use tracing::*;
// use observability::{span_context, OpenSpanExt};
// use std::{env, error::Error};
// use tokio::net::UdpSocket;
// use tracing::*;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
observability::test_run_open().ok();
let addr = env::args()
.nth(1)
.unwrap_or_else(|| "127.0.0.1:8080".to_string());
// #[tokio::main]
// async fn main() -> Result<(), Box<dyn Error>> {
// observability::test_run_open().ok();
// let addr = env::args()
// .nth(1)
// .unwrap_or_else(|| "127.0.0.1:8080".to_string());

let mut socket = UdpSocket::bind(&addr).await?;
println!("Listening on: {}", socket.local_addr()?);
// let mut socket = UdpSocket::bind(&addr).await?;
// println!("Listening on: {}", socket.local_addr()?);

{
let mut buf = vec![0; 1024];
let (size, peer) = socket.recv_from(&mut buf).await?;
// {
// let mut buf = vec![0; 1024];
// let (size, peer) = socket.recv_from(&mut buf).await?;

let data = buf[..size].to_vec();
let span = debug_span!("server recv");
span.set_from_bytes(data);
let _g = span.enter();
span_context!(span, Level::DEBUG);
let span = debug_span!("inner 1");
let _g = span.enter();
span_context!(span, Level::DEBUG);
let span = debug_span!("inner 2");
let _g = span.enter();
span_context!(span, Level::DEBUG);
let data = span.get_context_bytes();
let _amt = socket.send_to(&data[..], &peer).await?;
}
// let data = buf[..size].to_vec();
// let span = debug_span!("server recv");
// span.set_from_bytes(data);
// let _g = span.enter();
// span_context!(span, Level::DEBUG);
// let span = debug_span!("inner 1");
// let _g = span.enter();
// span_context!(span, Level::DEBUG);
// let span = debug_span!("inner 2");
// let _g = span.enter();
// span_context!(span, Level::DEBUG);
// let data = span.get_context_bytes();
// let _amt = socket.send_to(&data[..], &peer).await?;
// }

{
let mut buf = vec![0; 1024];
let (size, peer) = socket.recv_from(&mut buf).await?;
// {
// let mut buf = vec![0; 1024];
// let (size, peer) = socket.recv_from(&mut buf).await?;

let data = buf[..size].to_vec();
let span = debug_span!("server recv 2");
span.set_from_bytes(data);
let _g = span.enter();
span_context!(span, Level::DEBUG);
let data = span.get_context_bytes();
let _amt = socket.send_to(&data[..], &peer).await?;
}
Ok(())
}
// let data = buf[..size].to_vec();
// let span = debug_span!("server recv 2");
// span.set_from_bytes(data);
// let _g = span.enter();
// span_context!(span, Level::DEBUG);
// let data = span.get_context_bytes();
// let _amt = socket.send_to(&data[..], &peer).await?;
// }
// Ok(())
// }

fn main() {}
Loading