Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
55262ab
Set up Cgroup, CpuStats, and CpuMetricsCollector structs, and cgroup …
kathiehuang Feb 13, 2026
4484e33
Add cpu collector into loop with dogstatsd
kathiehuang Feb 13, 2026
f745a5d
Fix license
kathiehuang Feb 13, 2026
43e00ca
Move metrics_collector into its own crate
kathiehuang Feb 23, 2026
306fab2
Submit cpu usage and limit metrics and fix units
kathiehuang Feb 23, 2026
7224dfd
Test more precise time interval, add instance ID as a tag
kathiehuang Feb 25, 2026
7efc04f
Refactor to make CpuMetricsCollector, CpuStats, and metrics submissio…
kathiehuang Feb 26, 2026
8121d06
Testing different cpu collection methods
kathiehuang Mar 4, 2026
c36a683
Clean up and emit cpu usage and host-level cpu usage metrics
kathiehuang Mar 4, 2026
aeaa146
Clean up and emit cpu usage and host-level cpu usage metrics
kathiehuang Mar 4, 2026
5b30445
Add tags to metrics
kathiehuang Mar 5, 2026
6878059
Ensure tags match cloud integration metrics
kathiehuang Mar 6, 2026
14d4276
Separate Windows CPU metrics collection into separate PR
kathiehuang Mar 6, 2026
0dc5714
Separate CPU host usage metrics collection into separate PR
kathiehuang Mar 6, 2026
f9598da
Remove functionname tag
kathiehuang Mar 6, 2026
e8261ac
Send enhanced metrics even if custom metrics are turned off
kathiehuang Mar 6, 2026
72a28e2
Pull out building metrics tags into function
kathiehuang Mar 7, 2026
a1b74e0
Add unit tests
kathiehuang Mar 7, 2026
6970196
Clean up
kathiehuang Mar 7, 2026
72c3cfb
Refactor
kathiehuang Mar 7, 2026
938c2c2
Remove last_collection_time
kathiehuang Mar 7, 2026
35df85d
Only send enhanced metrics for Azure Functions
kathiehuang Mar 7, 2026
cde19fc
Add back last_collection_time
kathiehuang Mar 7, 2026
81071a8
Only enable enhanced metrics for Azure Functions
kathiehuang Mar 9, 2026
f53a24e
Only create CPUMetricsCollector when metrics flusher is successfully …
kathiehuang Mar 9, 2026
ec2f5c7
Create windows-enhanced-metrics feature for Windows-specific logic
kathiehuang Mar 10, 2026
5183da6
Add unit to collection interval variable
kathiehuang Mar 10, 2026
a3429d6
Make last_usage_ns an Option and keep CPU total as u64 until f64 is n…
kathiehuang Mar 10, 2026
9a09748
Change collection interval to 1 for precision and remove unneeded logs
kathiehuang Mar 11, 2026
158bf12
Move tag building logic from datadog-serverless-compat to datadog-met…
kathiehuang Mar 11, 2026
e01df82
Remove unused dependencies from datadog-trace-agent
kathiehuang Mar 11, 2026
be6cf6c
Turn off DD_ENHANCED_METRICS in Windows for now to prevent metrics co…
kathiehuang Mar 30, 2026
720b659
Handle malformed cpuset.cpus file
kathiehuang Mar 31, 2026
bba3ac7
Skip collection when elapsed_secs is less than or equal to 0
kathiehuang Mar 31, 2026
64953c2
Update comments to clarify that Windows is not supported yet
kathiehuang Mar 31, 2026
bbca537
Add unit test for metric classification
kathiehuang Apr 1, 2026
c98f961
Log when scheduler quota can't be parsed
kathiehuang Apr 1, 2026
ed8dc8a
nit: address clippy warning
kathiehuang Apr 1, 2026
7b505fa
Remove resource_id tag and add check for invalid CPU set range
kathiehuang Apr 1, 2026
b4d6274
Move log flusher into flush_interval.tick()
kathiehuang May 1, 2026
e26aa74
Clean up
kathiehuang May 4, 2026
9a21537
Use azure_tags::build_enhanced_metrics_tags
kathiehuang May 4, 2026
c2dd52e
Add DD_ENHANCED_METRICS_ENABLED to EnabledMetricsComponents
kathiehuang May 4, 2026
4837866
Gate windows-enhanced-metrics feature explicitly by OS
kathiehuang May 4, 2026
c596511
Update license
kathiehuang May 4, 2026
a44bbad
Fix Windows CI checks
kathiehuang May 4, 2026
4f84924
Remove CPU limit metric
kathiehuang May 5, 2026
ed0e58f
Remove CgroupStats struct
kathiehuang May 5, 2026
b36b000
Rename Azure-specific files
kathiehuang May 5, 2026
1ac8c68
Update comments
kathiehuang May 5, 2026
5a27f63
Guard CPU collection loop if cpu_collector is None
kathiehuang May 5, 2026
f2273f0
Add unit tests for LinuxCpuStatsReader logic
kathiehuang May 12, 2026
0430a66
Add unit tests for CpuMetricsCollector logic
kathiehuang May 13, 2026
24924b0
Move Unix timestamp capture to right after Instant capture
kathiehuang May 13, 2026
72f8068
Minor refactor
kathiehuang May 13, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build-datadog-serverless-compat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
retention-days: 3
- if: ${{ inputs.runner == 'windows-2022' }}
shell: bash
run: cargo build --release -p datadog-serverless-compat --features windows-pipes
run: cargo build --release -p datadog-serverless-compat --features windows-pipes,windows-enhanced-metrics
- if: ${{ inputs.runner == 'windows-2022' }}
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # 4.6.2
with:
Expand All @@ -69,7 +69,7 @@ jobs:
rustup target add i686-pc-windows-msvc
cargo build --release -p datadog-serverless-compat \
--target i686-pc-windows-msvc \
--features windows-pipes
--features windows-pipes,windows-enhanced-metrics
- if: ${{ inputs.runner == 'windows-2022' }}
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # 4.6.2
with:
Expand Down
16 changes: 13 additions & 3 deletions .github/workflows/cargo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ jobs:
shell: bash
run: chmod +x ./scripts/install-protoc.sh && ./scripts/install-protoc.sh $HOME
- shell: bash
run: cargo check --workspace
run: |
if [[ "${{ inputs.runner }}" == "windows-2022" ]]; then
cargo check --workspace --features datadog-serverless-compat/windows-pipes,datadog-serverless-compat/windows-enhanced-metrics
else
cargo check --workspace
fi

format:
name: Format
Expand Down Expand Up @@ -72,7 +77,12 @@ jobs:
shell: bash
run: chmod +x ./scripts/install-protoc.sh && ./scripts/install-protoc.sh $HOME
- shell: bash
run: cargo build --all
run: |
if [[ "${{ inputs.runner }}" == "windows-2022" ]]; then
cargo build --all --features datadog-serverless-compat/windows-pipes,datadog-serverless-compat/windows-enhanced-metrics
else
cargo build --all
fi

build-datadog-serverless-compat:
name: Build Datadog Serverless Compat
Expand All @@ -95,7 +105,7 @@ jobs:
- shell: bash
run: |
if [[ "${{ inputs.runner }}" == "windows-2022" ]]; then
cargo nextest run --workspace --features datadog-serverless-compat/windows-pipes
cargo nextest run --workspace --features datadog-serverless-compat/windows-pipes,datadog-serverless-compat/windows-enhanced-metrics
else
cargo nextest run --workspace
fi
11 changes: 6 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions crates/datadog-metrics-collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,9 @@ description = "Collector to read, compute, and submit enhanced metrics in Server
dogstatsd = { path = "../dogstatsd", default-features = true }
tracing = { version = "0.1", default-features = false }
libdd-common = { git = "https://github.com/DataDog/libdatadog", rev = "4ae8ebe252451374c292efd159ce254c3f5a72e0", default-features = false }

[dev-dependencies]
tokio = { version = "1", default-features = false, features = ["macros", "rt-multi-thread"] }

[features]
windows-enhanced-metrics = []
277 changes: 277 additions & 0 deletions crates/datadog-metrics-collector/src/azure_cpu.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! CPU metrics collector for Azure Functions
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we have azure in the path name for this, in the directory or the filename?

Copy link
Copy Markdown
Contributor Author

@kathiehuang kathiehuang Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this crate is only used in Azure Functions environments right now, I'm thinking of leaving the name as-is and renaming it in the future if we expand enhanced metrics to Cloud Functions Gen 1
Edit: NVM, renamed in ae8b3c7 to make it clear that these are Azure-specific without having to open up the file

//!
//! This module provides OS-agnostic CPU stats collection, CPU usage
//! computation, and metrics submission to Datadog.
//!
//! All CPU metrics are reported in nanocores (1 core = 1,000,000,000 nanocores).

use dogstatsd::aggregator::AggregatorHandle;
use dogstatsd::metric::{Metric, MetricValue, SortedTags};
use tracing::{debug, error};

const CPU_USAGE_METRIC: &str = "azure.functions.enhanced.cpu.usage";

/// Computed CPU total usage metric
pub struct CpuStats {
pub total: u64, // Cumulative CPU usage in nanoseconds
}

pub trait CpuStatsReader {
fn read(&self) -> Option<CpuStats>;
}

pub struct CpuMetricsCollector {
reader: Box<dyn CpuStatsReader>,
aggregator: AggregatorHandle,
tags: Option<SortedTags>,
last_usage_ns: Option<u64>,
last_collection_time: std::time::Instant,
}

impl CpuMetricsCollector {
/// Creates a new CpuMetricsCollector
///
/// # Arguments
///
/// * `aggregator` - The aggregator handle to submit metrics to
/// * `tags` - Optional tags to attach to all metrics
pub fn new(aggregator: AggregatorHandle, tags: Option<SortedTags>) -> Self {
#[cfg(all(windows, feature = "windows-enhanced-metrics"))]
let reader: Box<dyn CpuStatsReader> = Box::new(crate::azure_windows::WindowsCpuStatsReader);
#[cfg(not(windows))]
let reader: Box<dyn CpuStatsReader> = Box::new(crate::azure_linux::LinuxCpuStatsReader);
Comment thread
kathiehuang marked this conversation as resolved.
Comment thread
kathiehuang marked this conversation as resolved.
Self {
reader,
aggregator,
tags,
last_usage_ns: None,
last_collection_time: std::time::Instant::now(),
}
}

pub fn collect_and_submit(&mut self) {
let Some(cpu_stats) = self.reader.read() else {
debug!(
"Skipping CPU enhanced metrics collection - could not find data to generate CPU usage metrics"
);
return;
};

let current_usage_ns = cpu_stats.total;
let now_instant = std::time::Instant::now();
let now = std::time::UNIX_EPOCH
.elapsed()
.map(|d| d.as_secs())
.unwrap_or(0)
.try_into()
.unwrap_or(0);

// Skip first collection
let Some(last_usage_ns) = self.last_usage_ns else {
debug!("First CPU collection, skipping interval");
self.last_usage_ns = Some(current_usage_ns);
self.last_collection_time = now_instant;
return;
};

let elapsed_secs = now_instant
.duration_since(self.last_collection_time)
.as_secs_f64();

// Update state so the next collection always compares against the most recent reading, even if the interval is skipped
self.last_usage_ns = Some(current_usage_ns);
self.last_collection_time = now_instant;

let Some(usage_rate_nc) =
compute_usage_rate_nc(current_usage_ns, last_usage_ns, elapsed_secs)
else {
return;
};

let usage_metric = Metric::new(
CPU_USAGE_METRIC.into(),
MetricValue::distribution(usage_rate_nc),
self.tags.clone(),
Some(now),
);

if let Err(e) = self.aggregator.insert_batch(vec![usage_metric]) {
error!("Failed to insert CPU usage metric: {}", e);
}
}
}

fn compute_usage_rate_nc(
current_usage_ns: u64,
last_usage_ns: u64,
elapsed_secs: f64,
) -> Option<f64> {
if current_usage_ns < last_usage_ns {
debug!("Current CPU usage is less than last usage, skipping interval");
return None;
}
if elapsed_secs <= 0.0 {
debug!("Elapsed time is less than or equal to 0, skipping interval");
return None;
}
let delta_ns = (current_usage_ns - last_usage_ns) as f64;
Some(delta_ns / elapsed_secs)
}

// For testing only since CpuMetricsCollector::new() hardcodes the reader based on OS
#[cfg(test)]
impl CpuMetricsCollector {
pub(crate) fn with_reader(
reader: Box<dyn CpuStatsReader>,
aggregator: AggregatorHandle,
tags: Option<SortedTags>,
) -> Self {
Self {
reader,
aggregator,
tags,
last_usage_ns: None,
last_collection_time: std::time::Instant::now(),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use dogstatsd::aggregator::AggregatorService;
use dogstatsd::metric::EMPTY_TAGS;
use std::cell::Cell;

#[test]
fn test_normal_delta() {
assert_eq!(
compute_usage_rate_nc(2_000_000_000, 1_000_000_000, 1.0),
Some(1_000_000_000.0)
);
}

#[test]
fn test_current_usage_less_than_last_usage_returns_none() {
assert!(compute_usage_rate_nc(1_000_000_000, 2_000_000_000, 1.0).is_none());
}

#[test]
fn test_zero_elapsed_time_returns_none() {
assert!(compute_usage_rate_nc(2_000_000_000, 1_000_000_000, 0.0).is_none());
}

struct MockCpuStatsReader {
idx: Cell<usize>,
values: Vec<Option<u64>>,
}

impl MockCpuStatsReader {
fn new(values: Vec<Option<u64>>) -> Self {
Self {
idx: Cell::new(0),
values,
}
}
}

impl CpuStatsReader for MockCpuStatsReader {
fn read(&self) -> Option<CpuStats> {
let i = self.idx.get();
self.idx.set(i + 1);
self.values
.get(i)
.and_then(|v| v.map(|total| CpuStats { total }))
}
}

#[tokio::test]
async fn test_first_collection_skipped() {
let (service, handle) =
AggregatorService::new(EMPTY_TAGS, 1000).expect("Aggregator creation failed");
let task = tokio::spawn(service.run());

let mut collector = CpuMetricsCollector::with_reader(
Box::new(MockCpuStatsReader::new(vec![Some(1_000_000_000)])),
handle.clone(),
None,
);
collector.collect_and_submit();

let response = handle.flush().await.expect("flush failed");
assert!(
response.distributions.is_empty(),
"Expected no batches flushed on first collection, got {:?}",
response.distributions.len()
);

handle.shutdown().expect("Shutdown failed");
task.await.expect("Service task panicked");
}

#[tokio::test]
async fn test_second_collection_submits_metric() {
let (service, handle) =
AggregatorService::new(EMPTY_TAGS, 1000).expect("Aggregator creation failed");
let task = tokio::spawn(service.run());

let mut collector = CpuMetricsCollector::with_reader(
Box::new(MockCpuStatsReader::new(vec![
Some(1_000_000_000),
Some(2_000_000_000),
])),
handle.clone(),
None,
);
collector.collect_and_submit();
collector.collect_and_submit();

let response = handle.flush().await.expect("Flush failed");
assert_eq!(
response.distributions.len(),
1,
"Expected 1 batch to be flushed, got {:?}",
response.distributions.len()
);
assert_eq!(
response.distributions[0].sketches.len(),
1,
"Expected 1 metric, got {:?}",
response.distributions[0].sketches.len()
);

handle.shutdown().expect("Shutdown failed");
task.await.expect("Service task panicked");
}

#[tokio::test]
async fn test_current_usage_less_than_last_usage_skips_interval() {
let (service, handle) =
AggregatorService::new(EMPTY_TAGS, 1000).expect("Aggregator creation failed");
let task = tokio::spawn(service.run());

let mut collector = CpuMetricsCollector::with_reader(
Box::new(MockCpuStatsReader::new(vec![
Some(2_000_000_000),
Some(1_000_000_000),
])),
handle.clone(),
None,
);
collector.collect_and_submit();
collector.collect_and_submit();

let response = handle.flush().await.expect("Flush failed");
assert!(
response.distributions.is_empty(),
"Expected no batches flushed when current usage is less than last usage, got {:?}",
response.distributions.len()
);

handle.shutdown().expect("Shutdown failed");
task.await.expect("Service task panicked");
}
}
Loading
Loading