Skip to content

Commit 3466421

Browse files
authored
Prevent shutdown from blocking broker operator (#40)
This change adds timeouts to `Otel::shutdown()` logic. There is a bug where when attempting to flush the otel logs when the server is not available the shutdown tasks hangs indefinitely and causes the broker operator to stall. Testing: - Locally consuming this change in Azure-MQ no longer causes the broker operator pod/statefulset to get stuck in a nonready state if it is installed using otel and gRPC arguments. Testing the same broker images off of `Otel-lib/main` reproduces the issue.
1 parent 665535f commit 3466421

1 file changed

Lines changed: 57 additions & 8 deletions

File tree

src/lib.rs

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -160,16 +160,61 @@ impl Otel {
160160
// Graceful shutdown that flushes any pending metrics and logs to the exporter.
161161
info!("shutting down otel component");
162162

163-
if let Err(metrics_error) = self.meter_provider.force_flush() {
164-
warn!("encountered error while flushing metrics: {metrics_error:?}");
163+
// Use a timeout for flush/shutdown operations to prevent hanging
164+
// when the server is unavailable
165+
let shutdown_timeout = Duration::from_secs(10);
166+
167+
let flush_result = tokio::time::timeout(
168+
shutdown_timeout,
169+
tokio::task::spawn_blocking({
170+
let meter_provider = self.meter_provider.clone();
171+
move || meter_provider.force_flush()
172+
})
173+
).await;
174+
175+
match flush_result {
176+
Err(_) => warn!("meter provider force_flush timed out"),
177+
Ok(Err(e)) => warn!("meter provider force_flush task failed: {e:?}"),
178+
Ok(Ok(Err(e))) => warn!("encountered error while flushing metrics: {e:?}"),
179+
Ok(Ok(Ok(()))) => debug!("meter provider force_flush completed"),
165180
}
166-
if let Err(metrics_error) = self.meter_provider.shutdown() {
167-
warn!("encountered error while shutting down meter provider: {metrics_error:?}");
181+
182+
let shutdown_result = tokio::time::timeout(
183+
shutdown_timeout,
184+
tokio::task::spawn_blocking({
185+
let meter_provider = self.meter_provider.clone();
186+
move || meter_provider.shutdown()
187+
})
188+
).await;
189+
190+
match shutdown_result {
191+
Err(_) => warn!("meter provider shutdown timed out"),
192+
Ok(Err(e)) => warn!("meter provider shutdown task failed: {e:?}"),
193+
Ok(Ok(Err(e))) => warn!("encountered error while shutting down meter provider: {e:?}"),
194+
Ok(Ok(Ok(()))) => debug!("meter provider shutdown completed"),
168195
}
169196

170197
if let Some(logger_provider) = self.logger_provider.clone() {
171-
logger_provider.force_flush();
172-
let _ = logger_provider.shutdown();
198+
let flush_result = tokio::time::timeout(
199+
shutdown_timeout,
200+
tokio::task::spawn_blocking({
201+
let lp = logger_provider.clone();
202+
move || lp.force_flush()
203+
})
204+
).await;
205+
206+
if let Err(_) | Ok(Err(_)) = flush_result {
207+
warn!("logger provider force_flush timed out or failed");
208+
}
209+
210+
let shutdown_result = tokio::time::timeout(
211+
shutdown_timeout,
212+
tokio::task::spawn_blocking(move || logger_provider.shutdown())
213+
).await;
214+
215+
if let Err(_) | Ok(Err(_)) = shutdown_result {
216+
warn!("logger provider shutdown timed out or failed");
217+
}
173218
}
174219

175220
}
@@ -243,7 +288,7 @@ impl TemporalitySelector for DeltaTemporalitySelector {
243288
/// setup the stdout metrics writer if enabled, and initializes STATIC Metrics.
244289
///
245290
/// Returns the Prometheus Registry or None if Prometheus was disabled.
246-
///
291+
#[allow(clippy::too_many_lines)]
247292
fn init_metrics(config: Config) -> (Option<PrometheusRegistry>, SdkMeterProvider) {
248293
let mut keys = vec![KeyValue::new(SERVICE_NAME_KEY, config.service_name.clone())];
249294
if let Some(resource_attributes) = config.resource_attributes {
@@ -333,6 +378,7 @@ fn init_metrics(config: Config) -> (Option<PrometheusRegistry>, SdkMeterProvider
333378

334379
let reader = PeriodicReader::builder(exporter, runtime::Tokio)
335380
.with_interval(Duration::from_secs(export_target.interval_secs))
381+
.with_timeout(Duration::from_secs(export_target.timeout))
336382
.build();
337383
meter_provider_builder = meter_provider_builder.with_reader(reader);
338384
}
@@ -348,7 +394,9 @@ fn init_metrics(config: Config) -> (Option<PrometheusRegistry>, SdkMeterProvider
348394
})
349395
.build();
350396

351-
let reader = PeriodicReader::builder(exporter, runtime::Tokio).build();
397+
let reader = PeriodicReader::builder(exporter, runtime::Tokio)
398+
.with_timeout(Duration::from_secs(30))
399+
.build();
352400
meter_provider_builder = meter_provider_builder.with_reader(reader);
353401
}
354402

@@ -471,6 +519,7 @@ fn handle_tls(
471519
}
472520
});
473521
let channel = tonic_endpoint
522+
.connect_timeout(timeout)
474523
.timeout(timeout)
475524
.connect_with_connector_lazy(custom_connector);
476525
Ok(exporter_builder.with_channel(channel))

0 commit comments

Comments
 (0)