@@ -3,6 +3,7 @@ use std::path::PathBuf;
33
44use anyhow:: { Context , Result } ;
55use clap:: { Parser , Subcommand } ;
6+ use tokio:: runtime:: Runtime ;
67use tycho_core:: global_config:: GlobalConfig ;
78use tycho_util:: cli:: logger:: { init_logger, set_abort_with_tracing} ;
89use tycho_util:: cli:: { resolve_public_ip, signal} ;
@@ -75,27 +76,24 @@ impl CmdRun {
7576 . stack_size ( 8 * 1024 * 1024 )
7677 . thread_name ( |_| "rayon_worker" . to_string ( ) )
7778 . num_threads ( node_config. threads . rayon_threads )
78- . build_global ( )
79- . unwrap ( ) ;
80-
81- tokio:: runtime:: Builder :: new_multi_thread ( )
82- . enable_all ( )
83- . worker_threads ( node_config. threads . tokio_workers )
84- . build ( ) ?
85- . block_on ( async move {
86- let run_fut = tokio:: spawn ( self . run_impl ( args, node_config) ) ;
87- let stop_fut = signal:: any_signal ( signal:: TERMINATION_SIGNALS ) ;
88- tokio:: select! {
89- res = run_fut => res. unwrap( ) ,
90- signal = stop_fut => match signal {
91- Ok ( signal) => {
92- tracing:: info!( ?signal, "received termination signal" ) ;
93- Ok ( ( ) )
94- }
95- Err ( e) => Err ( e. into( ) ) ,
79+ . build_global ( ) ?;
80+
81+ let rt = build_tokio_runtime ( & node_config) ?;
82+
83+ rt. block_on ( async move {
84+ let run_fut = tokio:: spawn ( self . run_impl ( args, node_config) ) ;
85+ let stop_fut = signal:: any_signal ( signal:: TERMINATION_SIGNALS ) ;
86+ tokio:: select! {
87+ res = run_fut => res. unwrap( ) ,
88+ signal = stop_fut => match signal {
89+ Ok ( signal) => {
90+ tracing:: info!( ?signal, "received termination signal" ) ;
91+ Ok ( ( ) )
9692 }
93+ Err ( e) => Err ( e. into( ) ) ,
9794 }
98- } )
95+ }
96+ } )
9997 }
10098
10199 async fn run_impl ( self , args : BaseArgs , node_config : NodeConfig ) -> Result < ( ) > {
@@ -157,6 +155,146 @@ impl CmdRun {
157155 }
158156}
159157
158+ fn build_tokio_runtime ( node_config : & NodeConfig ) -> Result < Runtime > {
159+ #[ cfg( all( feature = "tokio-metrics" , tokio_unstable) ) ]
160+ use std:: time:: Duration ;
161+
162+ #[ cfg( all( feature = "tokio-metrics" , tokio_unstable) ) ]
163+ use tokio:: runtime:: { HistogramConfiguration , LogHistogram } ;
164+
165+ let mut rt = tokio:: runtime:: Builder :: new_multi_thread ( ) ;
166+
167+ let num_workers = node_config. threads . tokio_workers ;
168+ rt. enable_all ( ) . worker_threads ( num_workers) ;
169+
170+ #[ cfg( all( feature = "tokio-metrics" , tokio_unstable) ) ]
171+ let hist_params = LogHistogram :: builder ( )
172+ . min_value ( Duration :: from_micros ( 100 ) )
173+ . max_value ( Duration :: from_secs_f64 ( 3.2 ) )
174+ . max_buckets ( NUM_BUCKETS ) ?;
175+
176+ #[ cfg( all( feature = "tokio-metrics" , tokio_unstable) ) ]
177+ const NUM_BUCKETS : usize = 16 ;
178+ #[ cfg( all( feature = "tokio-metrics" , tokio_unstable) ) ]
179+ {
180+ rt. enable_metrics_poll_time_histogram ( )
181+ . metrics_poll_time_histogram_configuration ( HistogramConfiguration :: log ( hist_params) ) ;
182+ }
183+
184+ let rt = rt. build ( ) ?;
185+
186+ #[ cfg( all( feature = "tokio-metrics" , tokio_unstable) ) ]
187+ rt. spawn ( async move {
188+ const fn fill_log_buckets ( ) -> [ f64 ; NUM_BUCKETS ] {
189+ let mut boundaries = [ 0.0 ; NUM_BUCKETS ] ;
190+ let mut i = 0 ;
191+ let mut current_micros = 100.0 ;
192+
193+ while i < NUM_BUCKETS {
194+ boundaries[ i] = current_micros / 1_000_000.0 ;
195+ current_micros *= 2.0 ;
196+ i += 1 ;
197+ }
198+
199+ boundaries
200+ }
201+ const LOG_BUCKETS : [ f64 ; NUM_BUCKETS ] = fill_log_buckets ( ) ;
202+
203+
204+ // we can use histogram when https://github.com/metrics-rs/metrics/issues/509 is resolved
205+ // otherwise it will burn CPU and memory
206+ let handle = tokio:: runtime:: Handle :: current ( ) ;
207+ let runtime_monitor = tokio_metrics:: RuntimeMonitor :: new ( & handle) ;
208+
209+ const METRIC_NAME : & str = "tycho_tokio_poll_count_time_bucket" ;
210+ const METRIC_SUM : & str = "tycho_tokio_poll_count_time_sum" ;
211+ const METRIC_COUNT : & str = "tycho_tokio_poll_count_time_count" ;
212+
213+ for interval in runtime_monitor. intervals ( ) {
214+ let histogram = interval. poll_count_histogram ;
215+
216+ let mut cumulative_count = 0 ;
217+ let mut sum = 0.0 ;
218+
219+ // poll time histogram via gauages
220+ for ( idx, value) in histogram. iter ( ) . enumerate ( ) {
221+ let bucket = LOG_BUCKETS [ idx] ;
222+ cumulative_count += * value;
223+ let le = format ! ( "{:.6}" , bucket) ;
224+ metrics:: gauge!( METRIC_NAME , "le" => le) . set ( cumulative_count as f64 ) ;
225+ sum += bucket * ( * value as f64 ) ;
226+ }
227+ // Add sum and count
228+ metrics:: gauge!( METRIC_SUM ) . set ( sum) ;
229+ metrics:: gauge!( METRIC_COUNT ) . set ( cumulative_count as f64 ) ;
230+ // Add +Inf bucket
231+ metrics:: gauge!( METRIC_NAME , "le" => "+Inf" ) . set ( cumulative_count as f64 ) ;
232+
233+ let mean_poll_time = interval. mean_poll_duration . as_secs_f64 ( ) ;
234+ metrics:: gauge!( "tycho_tokio_mean_poll_time" ) . set ( mean_poll_time) ;
235+
236+ let max_poll_time = interval. mean_poll_duration_worker_max . as_secs_f64 ( ) ;
237+ metrics:: gauge!( "tycho_tokio_max_poll_time" ) . set ( max_poll_time) ;
238+
239+ let metrics = handle. metrics ( ) ;
240+ metrics:: gauge!( "tycho_tokio_num_alive_tasks" ) . set ( metrics. num_alive_tasks ( ) as f64 ) ;
241+
242+ let global_queue_depth = metrics. global_queue_depth ( ) ;
243+ metrics:: gauge!( "tycho_tokio_global_queue_depth" ) . set ( global_queue_depth as f64 ) ;
244+
245+ let num_blocking_threads = metrics. num_blocking_threads ( ) ;
246+ metrics:: gauge!( "tycho_tokio_num_blocking_threads" ) . set ( num_blocking_threads as f64 ) ;
247+
248+ let spawned_tasks = metrics. spawned_tasks_count ( ) ;
249+ metrics:: gauge!( "tycho_tokio_spawned_tasks_count" ) . set ( spawned_tasks as f64 ) ;
250+
251+
252+ metrics:: gauge!( "tycho_tokio_num_idle_blocking_threads" )
253+ . set ( metrics. num_idle_blocking_threads ( ) as f64 ) ;
254+
255+ metrics:: gauge!( "tycho_tokio_injection_queue_depth" )
256+ . set ( metrics. global_queue_depth ( ) as f64 ) ;
257+
258+ let blocking_queue_length = metrics. blocking_queue_depth ( ) ;
259+ metrics:: gauge!( "tycho_tokio_blocking_queue_depth" ) . set ( blocking_queue_length as f64 ) ;
260+
261+ for worker_id in 0 ..num_workers {
262+ let park_count = metrics. worker_park_count ( worker_id) ;
263+ metrics:: gauge!( "tycho_tokio_worker_park_count" , "worker_id" => format!( "{worker_id}" ) ) . set ( park_count as f64 ) ;
264+
265+ let worker_noop_count = metrics. worker_noop_count ( worker_id) ;
266+ metrics:: gauge!( "tycho_tokio_worker_noop_count" , "worker_id" => format!( "{worker_id}" ) ) . set ( worker_noop_count as f64 ) ;
267+
268+ let worker_steal_count = metrics. worker_steal_count ( worker_id) ;
269+ metrics:: gauge!( "tycho_tokio_worker_steal_count" , "worker_id" => format!( "{worker_id}" ) ) . set ( worker_steal_count as f64 ) ;
270+
271+ let worker_steal_operations = metrics. worker_steal_operations ( worker_id) ;
272+ metrics:: gauge!( "tycho_tokio_worker_steal_operations" , "worker_id" => format!( "{worker_id}" ) ) . set ( worker_steal_operations as f64 ) ;
273+
274+ let worker_local_queue_depth = metrics. worker_local_queue_depth ( worker_id) ;
275+ metrics:: gauge!( "tycho_tokio_worker_local_queue_depth" , "worker_id" => format!( "{worker_id}" ) ) . set ( worker_local_queue_depth as f64 ) ;
276+
277+ let worker_mean_poll_time = metrics. worker_mean_poll_time ( worker_id) . as_secs_f64 ( ) ;
278+ metrics:: gauge!( "tycho_tokio_worker_mean_poll_time" , "worker_id" => format!( "{worker_id}" ) ) . set ( worker_mean_poll_time) ;
279+
280+ let worker_busy_time = metrics. worker_total_busy_duration ( worker_id) . as_secs_f64 ( ) ;
281+ metrics:: gauge!( "tycho_tokio_worker_busy_time" , "worker_id" => format!( "{worker_id}" ) ) . set ( worker_busy_time) ;
282+ }
283+ metrics:: gauge!( "tycho_tokio_io_driver_fd_registered_count" ) . set ( metrics. io_driver_fd_registered_count ( ) as f64 ) ;
284+ metrics:: gauge!( "tycho_tokio_io_driver_fd_deregistered_count" ) . set ( metrics. io_driver_fd_deregistered_count ( ) as f64 ) ;
285+
286+ metrics:: gauge!( "tycho_tokio_remote_schedule_count" ) . set ( metrics. remote_schedule_count ( ) as f64 ) ;
287+
288+ metrics:: gauge!( "tycho_tokio_budget_forced_yield_count" ) . set ( metrics. budget_forced_yield_count ( ) as f64 ) ;
289+
290+
291+ tokio:: time:: sleep ( Duration :: from_millis ( 5000 ) ) . await ;
292+ }
293+ } ) ;
294+
295+ Ok ( rt)
296+ }
297+
160298fn init_metrics ( config : & MetricsConfig ) -> Result < ( ) > {
161299 use metrics_exporter_prometheus:: Matcher ;
162300 const EXPONENTIAL_SECONDS : & [ f64 ] = & [
0 commit comments