diff --git a/balius-runtime/src/lib.rs b/balius-runtime/src/lib.rs index 4190ad1..bcbd089 100644 --- a/balius-runtime/src/lib.rs +++ b/balius-runtime/src/lib.rs @@ -5,7 +5,7 @@ use ledgers::LedgerHost; use logging::LoggerHost; use router::Router; use sign::SignerHost; -use std::{collections::HashMap, io::Read, path::Path, sync::Arc}; +use std::{collections::HashMap, io::Read, path::Path, sync::Arc, time::Instant}; use thiserror::Error; use tokio::sync::{Mutex, RwLock}; use tracing::{debug, info, warn}; @@ -574,7 +574,8 @@ impl Runtime { let cursor = self.store.get_worker_cursor(id)?; debug!(cursor, id, "found cursor for worker"); - self.loaded.write().await.insert( + let mut loaded = self.loaded.write().await; + loaded.insert( id.to_owned(), Mutex::new(LoadedWorker { wasm_store, @@ -584,6 +585,8 @@ impl Runtime { }), ); + self.metrics.workers_loaded(loaded.len() as u64); + Ok(()) } @@ -615,7 +618,12 @@ impl Runtime { } pub async fn remove_worker(&self, id: &str) -> Result<(), Error> { - match self.loaded.write().await.remove(id) { + let mut loaded = self.loaded.write().await; + let removed = loaded.remove(id); + + self.metrics.workers_loaded(loaded.len() as u64); + + match removed { Some(_) => { info!(worker = id, "Successfully removed worker from runtime.") } @@ -632,6 +640,7 @@ impl Runtime { undo_blocks: &Vec, next_block: &Block, ) -> Result<(), Error> { + let start = Instant::now(); info!("applying block"); let log_seq = self.store.write_ahead(undo_blocks, next_block)?; @@ -640,23 +649,35 @@ impl Runtime { let mut store_update = self.store.start_atomic_update(log_seq)?; - let update = async |worker: &Mutex| -> Result { + let update = async |worker: &Mutex| -> Result<(String, f64), Error> { + let worker_start = Instant::now(); let mut lock = worker.lock().await; lock.apply_chain(undo_blocks, next_block).await?; - Ok(lock.wasm_store.data().worker_id.clone()) + Ok(( + lock.wasm_store.data().worker_id.clone(), + worker_start.elapsed().as_secs_f64() * 1000.0, + )) }; let updates = workers.values().map(update).collect_vec(); join_all(updates) .await .into_iter() - .collect::, _>>()? + .collect::, _>>()? .iter() - .try_for_each(|x| store_update.update_worker_cursor(x))?; + .try_for_each(|(x, duration)| { + self.metrics.handle_worker_chain_duration_ms(x, *duration); + store_update.update_worker_cursor(x) + })?; store_update.commit()?; + self.metrics + .handle_chain_duration_ms(start.elapsed().as_secs_f64() * 1000.0); + self.metrics.latest_block_height(next_block.height()); + self.metrics.latest_block_slot(next_block.slot()); + Ok(()) } @@ -666,6 +687,7 @@ impl Runtime { method: &str, params: Vec, ) -> Result { + let start = Instant::now(); let workers = self.loaded.read().await; let mut worker = workers .get(worker_id) @@ -683,6 +705,9 @@ impl Runtime { let result = worker.dispatch_event(channel, &evt).await; self.metrics.request(worker_id, method, result.is_ok()); + let duration_ms = start.elapsed().as_secs_f64() * 1000.0; + self.metrics + .handle_request_duration_ms(worker_id, method, duration_ms); result } } @@ -818,8 +843,11 @@ impl RuntimeBuilder { http, } = this; + let metrics: Arc = Default::default(); + metrics.workers_loaded(0); + Ok(Runtime { - metrics: Default::default(), + metrics, loaded: Default::default(), engine, linker, diff --git a/balius-runtime/src/metrics.rs b/balius-runtime/src/metrics.rs index c148430..60b3549 100644 --- a/balius-runtime/src/metrics.rs +++ b/balius-runtime/src/metrics.rs @@ -1,4 +1,8 @@ -use opentelemetry::{global, metrics::Counter, KeyValue}; +use opentelemetry::{ + global, + metrics::{Counter, Gauge, Histogram}, + KeyValue, +}; use crate::{logging::level_to_string, wit::balius::app::logging::Level}; @@ -17,6 +21,12 @@ pub struct Metrics { ledger_read_utxos: Counter, ledger_search_utxos: Counter, ledger_read_params: Counter, + workers_loaded: Gauge, + handle_chain_duration_ms: Histogram, + handle_request_duration_ms: Histogram, + handle_worker_chain_duration_ms: Histogram, + latest_block_height: Gauge, + latest_block_slot: Gauge, } impl Metrics { @@ -88,6 +98,48 @@ impl Metrics { .with_description("Amount of calls to read_params on the ledger interface.") .build(); + let workers_loaded = meter + .u64_gauge("workers_loaded") + .with_description("Current amount of workers loaded into the runtime.") + .build(); + + let handle_chain_duration_ms = meter + .f64_histogram("handle_chain_duration_ms") + .with_description("Duration to process handle_chain in milliseconds.") + .with_unit("ms") + .with_boundaries(vec![ + 100.0, 250.0, 500.0, 1000.0, 2500.0, 5000.0, 10000.0, 25000.0, 60000.0, 1200000.0, + ]) + .build(); + + let handle_request_duration_ms = meter + .f64_histogram("handle_request_duration_ms") + .with_description("Duration to process handle_request in milliseconds.") + .with_unit("ms") + .with_boundaries(vec![ + 100.0, 250.0, 500.0, 1000.0, 2500.0, 5000.0, 10000.0, 25000.0, 60000.0, 1200000.0, + ]) + .build(); + + let handle_worker_chain_duration_ms = meter + .f64_histogram("handle_worker_chain_duration_ms") + .with_description("Duration for a worker to process apply_chain in milliseconds.") + .with_unit("ms") + .with_boundaries(vec![ + 100.0, 250.0, 500.0, 1000.0, 2500.0, 5000.0, 10000.0, 25000.0, 60000.0, 1200000.0, + ]) + .build(); + + let latest_block_height = meter + .u64_gauge("latest_block_height") + .with_description("Latest block height successfully processed by handle_chain.") + .build(); + + let latest_block_slot = meter + .u64_gauge("latest_block_slot") + .with_description("Latest block slot successfully processed by handle_chain.") + .build(); + Metrics { requests, kv_get, @@ -102,6 +154,12 @@ impl Metrics { ledger_read_utxos, ledger_search_utxos, ledger_read_params, + workers_loaded, + handle_chain_duration_ms, + handle_request_duration_ms, + handle_worker_chain_duration_ms, + latest_block_height, + latest_block_slot, } } @@ -180,6 +238,39 @@ impl Metrics { self.ledger_read_params .add(1, &[KeyValue::new("worker", worker_id.to_owned())]); } + + pub fn workers_loaded(&self, count: u64) { + self.workers_loaded.record(count, &[]); + } + + pub fn handle_chain_duration_ms(&self, duration_ms: f64) { + self.handle_chain_duration_ms.record(duration_ms, &[]); + } + + pub fn handle_request_duration_ms(&self, worker_id: &str, method: &str, duration_ms: f64) { + self.handle_request_duration_ms.record( + duration_ms, + &[ + KeyValue::new("worker", worker_id.to_owned()), + KeyValue::new("method", method.to_owned()), + ], + ); + } + + pub fn latest_block_height(&self, height: u64) { + self.latest_block_height.record(height, &[]); + } + + pub fn latest_block_slot(&self, slot: u64) { + self.latest_block_slot.record(slot, &[]); + } + + pub fn handle_worker_chain_duration_ms(&self, worker_id: &str, duration_ms: f64) { + self.handle_worker_chain_duration_ms.record( + duration_ms, + &[KeyValue::new("worker", worker_id.to_owned())], + ); + } } impl Default for Metrics { diff --git a/examples/asteria-tracker/src/lib.rs b/examples/asteria-tracker/src/lib.rs index 4cc7819..2067f82 100644 --- a/examples/asteria-tracker/src/lib.rs +++ b/examples/asteria-tracker/src/lib.rs @@ -87,15 +87,12 @@ fn handle_utxo(config: sdk::Config, utxo: sdk::Utxo) -> sdk::Work if let Some(datum) = utxo.utxo.datum { let p = datum.payload.unwrap().plutus_data.unwrap(); - match p { - plutus_data::PlutusData::Constr(x) => { - let mut f = x.fields.iter(); + if let plutus_data::PlutusData::Constr(x) = p { + let mut f = x.fields.iter(); - pos_x = integer_plutus_field(f.next()).unwrap(); - pos_y = integer_plutus_field(f.next()).unwrap(); - asset_name = hex::encode(string_plutus_field(f.next()).unwrap()); - } - _ => {} + pos_x = integer_plutus_field(f.next()).unwrap(); + pos_y = integer_plutus_field(f.next()).unwrap(); + asset_name = hex::encode(string_plutus_field(f.next()).unwrap()); } }