-
Notifications
You must be signed in to change notification settings - Fork 1
[SVLS-8351] Add CPU Enhanced Metrics in Linux Azure Functions #77
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
55 commits
Select commit
Hold shift + click to select a range
55262ab
Set up Cgroup, CpuStats, and CpuMetricsCollector structs, and cgroup …
kathiehuang 4484e33
Add cpu collector into loop with dogstatsd
kathiehuang f745a5d
Fix license
kathiehuang 43e00ca
Move metrics_collector into its own crate
kathiehuang 306fab2
Submit cpu usage and limit metrics and fix units
kathiehuang 7224dfd
Test more precise time interval, add instance ID as a tag
kathiehuang 7efc04f
Refactor to make CpuMetricsCollector, CpuStats, and metrics submissio…
kathiehuang 8121d06
Testing different cpu collection methods
kathiehuang c36a683
Clean up and emit cpu usage and host-level cpu usage metrics
kathiehuang aeaa146
Clean up and emit cpu usage and host-level cpu usage metrics
kathiehuang 5b30445
Add tags to metrics
kathiehuang 6878059
Ensure tags match cloud integration metrics
kathiehuang 14d4276
Separate Windows CPU metrics collection into separate PR
kathiehuang 0dc5714
Separate CPU host usage metrics collection into separate PR
kathiehuang f9598da
Remove functionname tag
kathiehuang e8261ac
Send enhanced metrics even if custom metrics are turned off
kathiehuang 72a28e2
Pull out building metrics tags into function
kathiehuang a1b74e0
Add unit tests
kathiehuang 6970196
Clean up
kathiehuang 72c3cfb
Refactor
kathiehuang 938c2c2
Remove last_collection_time
kathiehuang 35df85d
Only send enhanced metrics for Azure Functions
kathiehuang cde19fc
Add back last_collection_time
kathiehuang 81071a8
Only enable enhanced metrics for Azure Functions
kathiehuang f53a24e
Only create CPUMetricsCollector when metrics flusher is successfully …
kathiehuang ec2f5c7
Create windows-enhanced-metrics feature for Windows-specific logic
kathiehuang 5183da6
Add unit to collection interval variable
kathiehuang a3429d6
Make last_usage_ns an Option and keep CPU total as u64 until f64 is n…
kathiehuang 9a09748
Change collection interval to 1 for precision and remove unneeded logs
kathiehuang 158bf12
Move tag building logic from datadog-serverless-compat to datadog-met…
kathiehuang e01df82
Remove unused dependencies from datadog-trace-agent
kathiehuang be6cf6c
Turn off DD_ENHANCED_METRICS in Windows for now to prevent metrics co…
kathiehuang 720b659
Handle malformed cpuset.cpus file
kathiehuang bba3ac7
Skip collection when elapsed_secs is less than or equal to 0
kathiehuang 64953c2
Update comments to clarify that Windows is not supported yet
kathiehuang bbca537
Add unit test for metric classification
kathiehuang c98f961
Log when scheduler quota can't be parsed
kathiehuang ed8dc8a
nit: address clippy warning
kathiehuang 7b505fa
Remove resource_id tag and add check for invalid CPU set range
kathiehuang b4d6274
Move log flusher into flush_interval.tick()
kathiehuang e26aa74
Clean up
kathiehuang 9a21537
Use azure_tags::build_enhanced_metrics_tags
kathiehuang c2dd52e
Add DD_ENHANCED_METRICS_ENABLED to EnabledMetricsComponents
kathiehuang 4837866
Gate windows-enhanced-metrics feature explicitly by OS
kathiehuang c596511
Update license
kathiehuang a44bbad
Fix Windows CI checks
kathiehuang 4f84924
Remove CPU limit metric
kathiehuang ed0e58f
Remove CgroupStats struct
kathiehuang b36b000
Rename Azure-specific files
kathiehuang 1ac8c68
Update comments
kathiehuang 5a27f63
Guard CPU collection loop if cpu_collector is None
kathiehuang f2273f0
Add unit tests for LinuxCpuStatsReader logic
kathiehuang 0430a66
Add unit tests for CpuMetricsCollector logic
kathiehuang 24924b0
Move Unix timestamp capture to right after Instant capture
kathiehuang 72f8068
Minor refactor
kathiehuang File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,277 @@ | ||
| // Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| //! CPU metrics collector for Azure Functions | ||
| //! | ||
| //! This module provides OS-agnostic CPU stats collection, CPU usage | ||
| //! computation, and metrics submission to Datadog. | ||
| //! | ||
| //! All CPU metrics are reported in nanocores (1 core = 1,000,000,000 nanocores). | ||
|
|
||
| use dogstatsd::aggregator::AggregatorHandle; | ||
| use dogstatsd::metric::{Metric, MetricValue, SortedTags}; | ||
| use tracing::{debug, error}; | ||
|
|
||
| const CPU_USAGE_METRIC: &str = "azure.functions.enhanced.cpu.usage"; | ||
|
|
||
| /// Computed CPU total usage metric | ||
| pub struct CpuStats { | ||
| pub total: u64, // Cumulative CPU usage in nanoseconds | ||
| } | ||
|
|
||
| pub trait CpuStatsReader { | ||
| fn read(&self) -> Option<CpuStats>; | ||
| } | ||
|
|
||
| pub struct CpuMetricsCollector { | ||
| reader: Box<dyn CpuStatsReader>, | ||
| aggregator: AggregatorHandle, | ||
| tags: Option<SortedTags>, | ||
| last_usage_ns: Option<u64>, | ||
| last_collection_time: std::time::Instant, | ||
| } | ||
|
|
||
| impl CpuMetricsCollector { | ||
| /// Creates a new CpuMetricsCollector | ||
| /// | ||
| /// # Arguments | ||
| /// | ||
| /// * `aggregator` - The aggregator handle to submit metrics to | ||
| /// * `tags` - Optional tags to attach to all metrics | ||
| pub fn new(aggregator: AggregatorHandle, tags: Option<SortedTags>) -> Self { | ||
| #[cfg(all(windows, feature = "windows-enhanced-metrics"))] | ||
| let reader: Box<dyn CpuStatsReader> = Box::new(crate::azure_windows::WindowsCpuStatsReader); | ||
| #[cfg(not(windows))] | ||
| let reader: Box<dyn CpuStatsReader> = Box::new(crate::azure_linux::LinuxCpuStatsReader); | ||
|
kathiehuang marked this conversation as resolved.
kathiehuang marked this conversation as resolved.
|
||
| Self { | ||
| reader, | ||
| aggregator, | ||
| tags, | ||
| last_usage_ns: None, | ||
| last_collection_time: std::time::Instant::now(), | ||
| } | ||
| } | ||
|
|
||
| pub fn collect_and_submit(&mut self) { | ||
| let Some(cpu_stats) = self.reader.read() else { | ||
| debug!( | ||
| "Skipping CPU enhanced metrics collection - could not find data to generate CPU usage metrics" | ||
| ); | ||
| return; | ||
| }; | ||
|
|
||
| let current_usage_ns = cpu_stats.total; | ||
| let now_instant = std::time::Instant::now(); | ||
| let now = std::time::UNIX_EPOCH | ||
| .elapsed() | ||
| .map(|d| d.as_secs()) | ||
| .unwrap_or(0) | ||
| .try_into() | ||
| .unwrap_or(0); | ||
|
|
||
| // Skip first collection | ||
| let Some(last_usage_ns) = self.last_usage_ns else { | ||
| debug!("First CPU collection, skipping interval"); | ||
| self.last_usage_ns = Some(current_usage_ns); | ||
| self.last_collection_time = now_instant; | ||
| return; | ||
| }; | ||
|
|
||
| let elapsed_secs = now_instant | ||
| .duration_since(self.last_collection_time) | ||
| .as_secs_f64(); | ||
|
|
||
| // Update state so the next collection always compares against the most recent reading, even if the interval is skipped | ||
| self.last_usage_ns = Some(current_usage_ns); | ||
| self.last_collection_time = now_instant; | ||
|
|
||
| let Some(usage_rate_nc) = | ||
| compute_usage_rate_nc(current_usage_ns, last_usage_ns, elapsed_secs) | ||
| else { | ||
| return; | ||
| }; | ||
|
|
||
| let usage_metric = Metric::new( | ||
| CPU_USAGE_METRIC.into(), | ||
| MetricValue::distribution(usage_rate_nc), | ||
| self.tags.clone(), | ||
| Some(now), | ||
| ); | ||
|
|
||
| if let Err(e) = self.aggregator.insert_batch(vec![usage_metric]) { | ||
| error!("Failed to insert CPU usage metric: {}", e); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn compute_usage_rate_nc( | ||
| current_usage_ns: u64, | ||
| last_usage_ns: u64, | ||
| elapsed_secs: f64, | ||
| ) -> Option<f64> { | ||
| if current_usage_ns < last_usage_ns { | ||
| debug!("Current CPU usage is less than last usage, skipping interval"); | ||
| return None; | ||
| } | ||
| if elapsed_secs <= 0.0 { | ||
| debug!("Elapsed time is less than or equal to 0, skipping interval"); | ||
| return None; | ||
| } | ||
| let delta_ns = (current_usage_ns - last_usage_ns) as f64; | ||
| Some(delta_ns / elapsed_secs) | ||
| } | ||
|
|
||
| // For testing only since CpuMetricsCollector::new() hardcodes the reader based on OS | ||
| #[cfg(test)] | ||
| impl CpuMetricsCollector { | ||
| pub(crate) fn with_reader( | ||
| reader: Box<dyn CpuStatsReader>, | ||
| aggregator: AggregatorHandle, | ||
| tags: Option<SortedTags>, | ||
| ) -> Self { | ||
| Self { | ||
| reader, | ||
| aggregator, | ||
| tags, | ||
| last_usage_ns: None, | ||
| last_collection_time: std::time::Instant::now(), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
| use dogstatsd::aggregator::AggregatorService; | ||
| use dogstatsd::metric::EMPTY_TAGS; | ||
| use std::cell::Cell; | ||
|
|
||
| #[test] | ||
| fn test_normal_delta() { | ||
| assert_eq!( | ||
| compute_usage_rate_nc(2_000_000_000, 1_000_000_000, 1.0), | ||
| Some(1_000_000_000.0) | ||
| ); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_current_usage_less_than_last_usage_returns_none() { | ||
| assert!(compute_usage_rate_nc(1_000_000_000, 2_000_000_000, 1.0).is_none()); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_zero_elapsed_time_returns_none() { | ||
| assert!(compute_usage_rate_nc(2_000_000_000, 1_000_000_000, 0.0).is_none()); | ||
| } | ||
|
|
||
| struct MockCpuStatsReader { | ||
| idx: Cell<usize>, | ||
| values: Vec<Option<u64>>, | ||
| } | ||
|
|
||
| impl MockCpuStatsReader { | ||
| fn new(values: Vec<Option<u64>>) -> Self { | ||
| Self { | ||
| idx: Cell::new(0), | ||
| values, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl CpuStatsReader for MockCpuStatsReader { | ||
| fn read(&self) -> Option<CpuStats> { | ||
| let i = self.idx.get(); | ||
| self.idx.set(i + 1); | ||
| self.values | ||
| .get(i) | ||
| .and_then(|v| v.map(|total| CpuStats { total })) | ||
| } | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_first_collection_skipped() { | ||
| let (service, handle) = | ||
| AggregatorService::new(EMPTY_TAGS, 1000).expect("Aggregator creation failed"); | ||
| let task = tokio::spawn(service.run()); | ||
|
|
||
| let mut collector = CpuMetricsCollector::with_reader( | ||
| Box::new(MockCpuStatsReader::new(vec![Some(1_000_000_000)])), | ||
| handle.clone(), | ||
| None, | ||
| ); | ||
| collector.collect_and_submit(); | ||
|
|
||
| let response = handle.flush().await.expect("flush failed"); | ||
| assert!( | ||
| response.distributions.is_empty(), | ||
| "Expected no batches flushed on first collection, got {:?}", | ||
| response.distributions.len() | ||
| ); | ||
|
|
||
| handle.shutdown().expect("Shutdown failed"); | ||
| task.await.expect("Service task panicked"); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_second_collection_submits_metric() { | ||
| let (service, handle) = | ||
| AggregatorService::new(EMPTY_TAGS, 1000).expect("Aggregator creation failed"); | ||
| let task = tokio::spawn(service.run()); | ||
|
|
||
| let mut collector = CpuMetricsCollector::with_reader( | ||
| Box::new(MockCpuStatsReader::new(vec![ | ||
| Some(1_000_000_000), | ||
| Some(2_000_000_000), | ||
| ])), | ||
| handle.clone(), | ||
| None, | ||
| ); | ||
| collector.collect_and_submit(); | ||
| collector.collect_and_submit(); | ||
|
|
||
| let response = handle.flush().await.expect("Flush failed"); | ||
| assert_eq!( | ||
| response.distributions.len(), | ||
| 1, | ||
| "Expected 1 batch to be flushed, got {:?}", | ||
| response.distributions.len() | ||
| ); | ||
| assert_eq!( | ||
| response.distributions[0].sketches.len(), | ||
| 1, | ||
| "Expected 1 metric, got {:?}", | ||
| response.distributions[0].sketches.len() | ||
| ); | ||
|
|
||
| handle.shutdown().expect("Shutdown failed"); | ||
| task.await.expect("Service task panicked"); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_current_usage_less_than_last_usage_skips_interval() { | ||
| let (service, handle) = | ||
| AggregatorService::new(EMPTY_TAGS, 1000).expect("Aggregator creation failed"); | ||
| let task = tokio::spawn(service.run()); | ||
|
|
||
| let mut collector = CpuMetricsCollector::with_reader( | ||
| Box::new(MockCpuStatsReader::new(vec![ | ||
| Some(2_000_000_000), | ||
| Some(1_000_000_000), | ||
| ])), | ||
| handle.clone(), | ||
| None, | ||
| ); | ||
| collector.collect_and_submit(); | ||
| collector.collect_and_submit(); | ||
|
|
||
| let response = handle.flush().await.expect("Flush failed"); | ||
| assert!( | ||
| response.distributions.is_empty(), | ||
| "Expected no batches flushed when current usage is less than last usage, got {:?}", | ||
| response.distributions.len() | ||
| ); | ||
|
|
||
| handle.shutdown().expect("Shutdown failed"); | ||
| task.await.expect("Service task panicked"); | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we have azure in the path name for this, in the directory or the filename?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this crate is only used in Azure Functions environments right now, I'm thinking of leaving the name as-is and renaming it in the future if we expand enhanced metrics to Cloud Functions Gen 1
Edit: NVM, renamed in ae8b3c7 to make it clear that these are Azure-specific without having to open up the file