diff --git a/terraform/eks/daemon/otel/main.tf b/terraform/eks/daemon/otel/main.tf index 22b3d5de..ff95795e 100644 --- a/terraform/eks/daemon/otel/main.tf +++ b/terraform/eks/daemon/otel/main.tf @@ -140,6 +140,7 @@ resource "helm_release" "aws_observability" { { name = "clusterName", value = aws_eks_cluster.this.name }, { name = "region", value = var.region }, { name = "otelContainerInsights.enabled", value = "true" }, + { name = "otelContainerInsights.logs.enabled", value = "true" } ] depends_on = [ @@ -360,8 +361,8 @@ resource "null_resource" "validator" { echo "Running OTEL standard cluster integration tests" cd ../../../.. - echo "Waiting 3 minutes for metrics to propagate..." - sleep 180 + echo "Waiting 5 minutes for metrics and logs to propagate..." + sleep 300 go test -tags integration -timeout 1h -v ${var.test_dir} \ -eksClusterName=${aws_eks_cluster.this.name} \ diff --git a/test/otel/standard/logs_app_test.go b/test/otel/standard/logs_app_test.go new file mode 100644 index 00000000..22cef68d --- /dev/null +++ b/test/otel/standard/logs_app_test.go @@ -0,0 +1,215 @@ +//go:build integration + +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package standard + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/aws/amazon-cloudwatch-agent-test/util/otelmetrics" +) + +// --------------------------------------------------------------------------- +// TestAppLogsExist — application logs are flowing to the expected log group. +// --------------------------------------------------------------------------- + +func TestAppLogsExist(t *testing.T) { + results, err := logQueryCache.Get(context.Background(), appLogGroup(), pipelineAppLogs) + require.NoError(t, err, "querying application logs") + require.NotEmpty(t, results, "no application logs found in %s", appLogGroup()) +} + +// --------------------------------------------------------------------------- +// TestAppLogsResourceAttrs — application logs have all expected resource attrs. +// --------------------------------------------------------------------------- + +func TestAppLogsResourceAttrs(t *testing.T) { + results, err := logQueryCache.Get(context.Background(), appLogGroup(), pipelineAppLogs) + require.NoError(t, err) + require.NotEmpty(t, results) + + // Core attrs (node + pod + container) must be on ALL logs. + coreAttrs := otelmetrics.ContainerResourceAttrs + for _, attr := range coreAttrs { + t.Run(attr, func(t *testing.T) { + for _, r := range results { + v, ok := r.Resource[attr] + require.True(t, ok, "app log missing resource.attributes.%s", attr) + require.NotEmpty(t, v, "app log has empty resource.attributes.%s", attr) + } + }) + } + + // Workload attrs are present on most logs (pods with an owning workload), + // but bare pods / static pods won't have them. Verify at least 50% do. + workloadAttrs := []string{"k8s.workload.name", "k8s.workload.type", "service.name"} + for _, attr := range workloadAttrs { + t.Run(attr, func(t *testing.T) { + count := 0 + for _, r := range results { + if v, ok := r.Resource[attr]; ok && v != "" { + count++ + } + } + require.True(t, count > len(results)/2, + "app log resource.attributes.%s present on only %d/%d logs (expected majority)", + attr, count, len(results)) + }) + } +} + +// --------------------------------------------------------------------------- +// TestAppLogsScopeAttrs — application logs have correct scope attributes. +// --------------------------------------------------------------------------- + +func TestAppLogsScopeAttrs(t *testing.T) { + results, err := logQueryCache.Get(context.Background(), appLogGroup(), pipelineAppLogs) + require.NoError(t, err) + require.NotEmpty(t, results) + + for key, want := range otelmetrics.AppLogScopeAttrs { + t.Run(key, func(t *testing.T) { + for _, r := range results { + got, ok := r.Scope[key] + require.True(t, ok, "app log missing scope.attributes.%s", key) + require.Equal(t, want, got, "app log scope.attributes.%s", key) + } + }) + } +} + +// --------------------------------------------------------------------------- +// TestAppLogsClusterIdentity — k8s.cluster.name matches test cluster. +// --------------------------------------------------------------------------- + +func TestAppLogsClusterIdentity(t *testing.T) { + results, err := logQueryCache.Get(context.Background(), appLogGroup(), pipelineAppLogs) + require.NoError(t, err) + require.NotEmpty(t, results) + + for _, r := range results { + require.Equal(t, cfg.ClusterName, r.Resource["k8s.cluster.name"], + "app log k8s.cluster.name mismatch") + } +} + +// --------------------------------------------------------------------------- +// TestAppLogsCloudProvider — cloud.provider == "aws". +// --------------------------------------------------------------------------- + +func TestAppLogsCloudProvider(t *testing.T) { + results, err := logQueryCache.Get(context.Background(), appLogGroup(), pipelineAppLogs) + require.NoError(t, err) + require.NotEmpty(t, results) + + for _, r := range results { + require.Equal(t, "aws", r.Resource["cloud.provider"], "app log cloud.provider") + } +} + +// --------------------------------------------------------------------------- +// TestAppLogsHostType — host.type matches expected node group instance types. +// --------------------------------------------------------------------------- + +func TestAppLogsHostType(t *testing.T) { + validTypes := make(map[string]bool, len(clusterNodeGroups)) + for _, ng := range clusterNodeGroups { + validTypes[ng.InstanceType] = true + } + + results, err := logQueryCache.Get(context.Background(), appLogGroup(), pipelineAppLogs) + require.NoError(t, err) + require.NotEmpty(t, results) + + for _, r := range results { + ht := r.Resource["host.type"] + require.True(t, validTypes[ht], + "app log host.type=%q not in expected node groups %v", ht, validTypes) + } +} + +// --------------------------------------------------------------------------- +// TestAppLogsBody — log body is non-empty. +// --------------------------------------------------------------------------- + +func TestAppLogsBody(t *testing.T) { + results, err := logQueryCache.Get(context.Background(), appLogGroup(), pipelineAppLogs) + require.NoError(t, err) + require.NotEmpty(t, results) + + for _, r := range results { + require.NotEmpty(t, r.Body, "app log has empty body") + } +} + +// --------------------------------------------------------------------------- +// TestAppLogsRecordAttrs — log record attributes (log.iostream, log.file.path). +// --------------------------------------------------------------------------- + +func TestAppLogsRecordAttrs(t *testing.T) { + results, err := logQueryCache.Get(context.Background(), appLogGroup(), pipelineAppLogs) + require.NoError(t, err) + require.NotEmpty(t, results) + + t.Run("log.iostream", func(t *testing.T) { + validStreams := map[string]bool{"stdout": true, "stderr": true} + count := 0 + for _, r := range results { + stream, ok := r.Attributes["log.iostream"] + if !ok { + continue + } + count++ + require.True(t, validStreams[stream], + "app log attributes.log.iostream=%q, want stdout or stderr", stream) + } + require.True(t, count > len(results)/2, + "log.iostream present on only %d/%d logs", count, len(results)) + }) + + t.Run("log.file.path", func(t *testing.T) { + for _, r := range results { + path, ok := r.Attributes["log.file.path"] + require.True(t, ok, "app log missing attributes.log.file.path") + require.Contains(t, path, "/var/log/containers/", + "app log log.file.path should be under /var/log/containers/") + } + }) +} + +// --------------------------------------------------------------------------- +// TestAppLogsWorkloadDerivation — workload name/type set correctly. +// --------------------------------------------------------------------------- + +func TestAppLogsWorkloadDerivation(t *testing.T) { + results, err := logQueryCache.Get(context.Background(), appLogGroup(), pipelineAppLogs) + require.NoError(t, err) + require.NotEmpty(t, results) + + validTypes := map[string]bool{ + "Deployment": true, "StatefulSet": true, "DaemonSet": true, + "Job": true, "CronJob": true, "ReplicaSet": true, + } + + withWorkload := 0 + for _, r := range results { + wType := r.Resource["k8s.workload.type"] + if wType == "" { + continue // bare pod / static pod — no workload owner + } + withWorkload++ + require.True(t, validTypes[wType], + "app log k8s.workload.type=%q not a valid workload type", wType) + + // service.name should match workload name + require.Equal(t, r.Resource["k8s.workload.name"], r.Resource["service.name"], + "app log service.name should equal k8s.workload.name") + } + require.True(t, withWorkload > 0, + "no app logs have k8s.workload.type — workload derivation may have failed entirely") +} diff --git a/test/otel/standard/logs_cross_telemetry_test.go b/test/otel/standard/logs_cross_telemetry_test.go new file mode 100644 index 00000000..c7a27888 --- /dev/null +++ b/test/otel/standard/logs_cross_telemetry_test.go @@ -0,0 +1,82 @@ +//go:build integration + +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package standard + +import ( + "context" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +// --------------------------------------------------------------------------- +// TestCrossTelemetryHostID — for a given k8s.node.name, host.id in logs +// matches host.id in metrics. +// --------------------------------------------------------------------------- + +func TestCrossTelemetryHostID(t *testing.T) { + // Get host.id from metrics (node_exporter is node-scoped, always has host.id). + metricResults, err := queryCache.Get(context.Background(), "node_cpu_seconds_total") + require.NoError(t, err) + require.NotEmpty(t, metricResults) + + // Build node→host.id map from metrics. + metricHostIDs := make(map[string]string) // k8s.node.name → host.id + for _, r := range metricResults { + node := r.Labels.Resource["k8s.node.name"] + hostID := r.Labels.Resource["host.id"] + if node != "" && hostID != "" { + metricHostIDs[node] = hostID + } + } + require.NotEmpty(t, metricHostIDs, "no metrics with both k8s.node.name and host.id") + + // Get app logs and verify host.id matches for the same node. + logResults, err := logQueryCache.Get(context.Background(), appLogGroup(), pipelineAppLogs) + require.NoError(t, err) + require.NotEmpty(t, logResults) + + matched := 0 + for _, lr := range logResults { + node := lr.Resource["k8s.node.name"] + logHostID := lr.Resource["host.id"] + if node == "" || logHostID == "" { + continue + } + if metricHostID, ok := metricHostIDs[node]; ok { + matched++ + require.Equal(t, metricHostID, logHostID, + "host.id mismatch for node %s: metrics=%s logs=%s", + node, metricHostID, logHostID) + } + } + require.True(t, matched > 0, + "no app logs matched a node from metrics — cannot verify cross-telemetry consistency") +} + +// --------------------------------------------------------------------------- +// TestCrossTelemetryCloudResourceID — cloud.resource_id format matches +// between logs and metrics. +// --------------------------------------------------------------------------- + +func TestCrossTelemetryCloudResourceID(t *testing.T) { + expectedPrefix := "arn:aws:eks:" + expectedSuffix := ":cluster/" + cfg.ClusterName + + // Check app logs. + logResults, err := logQueryCache.Get(context.Background(), appLogGroup(), pipelineAppLogs) + require.NoError(t, err) + require.NotEmpty(t, logResults) + + for _, r := range logResults { + arn := r.Resource["cloud.resource_id"] + require.True(t, strings.HasPrefix(arn, expectedPrefix), + "app log cloud.resource_id should start with %q, got %q", expectedPrefix, arn) + require.True(t, strings.HasSuffix(arn, expectedSuffix), + "app log cloud.resource_id should end with %q, got %q", expectedSuffix, arn) + } +} diff --git a/test/otel/standard/logs_setup_test.go b/test/otel/standard/logs_setup_test.go new file mode 100644 index 00000000..f0cdbb66 --- /dev/null +++ b/test/otel/standard/logs_setup_test.go @@ -0,0 +1,50 @@ +//go:build integration + +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package standard + +import ( + "context" + "fmt" + "time" + + "github.com/aws/amazon-cloudwatch-agent-test/util/otellogs" +) + +// Log group names follow the convention set in the helm chart: +// /aws/otel/containerinsights/{clusterName}/{pipeline} +func appLogGroup() string { + return fmt.Sprintf("/aws/otel/containerinsights/%s/application", cfg.ClusterName) +} + +// Pipeline identifiers matching scope.attributes.cloudwatch.pipeline values. +const ( + pipelineAppLogs = "application-logs" +) + +var ( + logsClient *otellogs.OtelLogsClient + logQueryCache *otellogs.LogQueryCache + logsLookback = 10 * time.Minute +) + +// initLogsClient is called from TestMain (setup_test.go) after cfg is populated. +func initLogsClient(ctx context.Context) error { + var err error + logsCfg := otellogs.LogsConfig{ + Region: cfg.Region, + ClusterName: cfg.ClusterName, + AccountID: cfg.AccountID, + LookbackWindow: logsLookback, + } + + logsClient, err = otellogs.NewClient(ctx, logsCfg) + if err != nil { + return fmt.Errorf("creating logs client: %w", err) + } + + logQueryCache = otellogs.NewLogQueryCache(logsClient, cfg.ClusterName, logsLookback) + return nil +} diff --git a/test/otel/standard/setup_test.go b/test/otel/standard/setup_test.go index f597be16..c7c523a3 100644 --- a/test/otel/standard/setup_test.go +++ b/test/otel/standard/setup_test.go @@ -102,5 +102,11 @@ func TestMain(m *testing.M) { otelmetrics.WithSourceRegistry(registry), ) + // Initialize logs client for OTLP log validation tests. + if err := initLogsClient(ctx); err != nil { + fmt.Fprintf(os.Stderr, "Logs client error: %v\n", err) + os.Exit(1) + } + os.Exit(m.Run()) } diff --git a/util/otellogs/client.go b/util/otellogs/client.go new file mode 100644 index 00000000..a9783d5d --- /dev/null +++ b/util/otellogs/client.go @@ -0,0 +1,149 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package otellogs + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" + "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types" +) + +// LogsConfig holds configuration for the OTEL logs integration test client. +type LogsConfig struct { + Region string + ClusterName string + AccountID string + // LookbackWindow is how far back to query logs. Defaults to 10 minutes. + LookbackWindow time.Duration +} + +// OtelLogsClient queries CloudWatch Logs Insights for OTLP-ingested logs. +type OtelLogsClient struct { + cwl *cloudwatchlogs.Client + region string + pollInterval time.Duration + maxPollRetries int +} + +// NewClient creates an OtelLogsClient. +func NewClient(ctx context.Context, cfg LogsConfig) (*OtelLogsClient, error) { + awsCfg, err := awsconfig.LoadDefaultConfig(ctx, awsconfig.WithRegion(cfg.Region)) + if err != nil { + return nil, fmt.Errorf("loading AWS config: %w", err) + } + return &OtelLogsClient{ + cwl: cloudwatchlogs.NewFromConfig(awsCfg), + region: cfg.Region, + pollInterval: 2 * time.Second, + maxPollRetries: 30, + }, nil +} + +// Query executes a Logs Insights query against the given log group and returns +// parsed LogResult entries. The query should select fields needed for validation. +// timeRange specifies how far back to look (from now). +func (c *OtelLogsClient) Query(ctx context.Context, logGroup string, query string, timeRange time.Duration) ([]LogResult, error) { + end := time.Now() + start := end.Add(-timeRange) + + slog.Debug("starting logs query", "logGroup", logGroup, "query", query) + + output, err := c.cwl.StartQuery(ctx, &cloudwatchlogs.StartQueryInput{ + LogGroupNames: []string{logGroup}, + StartTime: aws.Int64(start.Unix()), + EndTime: aws.Int64(end.Unix()), + QueryString: aws.String(query), + Limit: aws.Int32(1000), + }) + if err != nil { + return nil, fmt.Errorf("StartQuery: %w", err) + } + + results, err := c.pollResults(ctx, output.QueryId) + if err != nil { + return nil, err + } + + slog.Debug("logs query returned", "count", len(results)) + return results, nil +} + +// QueryRaw executes a Logs Insights query and returns the raw result fields. +func (c *OtelLogsClient) QueryRaw(ctx context.Context, logGroup string, query string, timeRange time.Duration) ([][]types.ResultField, error) { + end := time.Now() + start := end.Add(-timeRange) + + output, err := c.cwl.StartQuery(ctx, &cloudwatchlogs.StartQueryInput{ + LogGroupNames: []string{logGroup}, + StartTime: aws.Int64(start.Unix()), + EndTime: aws.Int64(end.Unix()), + QueryString: aws.String(query), + Limit: aws.Int32(1000), + }) + if err != nil { + return nil, fmt.Errorf("StartQuery: %w", err) + } + + for attempt := 0; attempt < c.maxPollRetries; attempt++ { + resp, err := c.cwl.GetQueryResults(ctx, &cloudwatchlogs.GetQueryResultsInput{ + QueryId: output.QueryId, + }) + if err != nil { + return nil, fmt.Errorf("GetQueryResults: %w", err) + } + switch resp.Status { + case types.QueryStatusComplete: + return resp.Results, nil + case types.QueryStatusFailed, types.QueryStatusCancelled, types.QueryStatusTimeout: + return nil, fmt.Errorf("query ended with status: %s", resp.Status) + } + time.Sleep(c.pollInterval) + } + return nil, fmt.Errorf("query did not complete after %d attempts", c.maxPollRetries) +} + +func (c *OtelLogsClient) pollResults(ctx context.Context, queryID *string) ([]LogResult, error) { + for attempt := 0; attempt < c.maxPollRetries; attempt++ { + resp, err := c.cwl.GetQueryResults(ctx, &cloudwatchlogs.GetQueryResultsInput{ + QueryId: queryID, + }) + if err != nil { + return nil, fmt.Errorf("GetQueryResults: %w", err) + } + + switch resp.Status { + case types.QueryStatusComplete: + return parseQueryResults(resp.Results), nil + case types.QueryStatusFailed, types.QueryStatusCancelled, types.QueryStatusTimeout: + return nil, fmt.Errorf("query ended with status: %s", resp.Status) + } + time.Sleep(c.pollInterval) + } + return nil, fmt.Errorf("query did not complete after %d attempts", c.maxPollRetries) +} + +func parseQueryResults(rows [][]types.ResultField) []LogResult { + results := make([]LogResult, 0, len(rows)) + for _, row := range rows { + lr := LogResult{ + Resource: make(map[string]string), + Scope: make(map[string]string), + Attributes: make(map[string]string), + } + for _, field := range row { + if field.Field == nil || field.Value == nil { + continue + } + lr.SetField(*field.Field, *field.Value) + } + results = append(results, lr) + } + return results +} diff --git a/util/otellogs/models.go b/util/otellogs/models.go new file mode 100644 index 00000000..eb3f246e --- /dev/null +++ b/util/otellogs/models.go @@ -0,0 +1,72 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package otellogs + +import "strings" + +// LogResult represents a single OTLP log record as returned by CW Logs Insights. +// Fields are populated from the Logs Insights query result fields, which use +// dot-path notation matching the stored OTLP JSON structure: +// +// resource.attributes. → Resource map +// scope.attributes. → Scope map +// attributes. → Attributes map (log record attributes) +// body → Body +// severityText → SeverityText +// severityNumber → SeverityNumber +// timeUnixNano → TimeUnixNano +// observedTimeUnixNano → ObservedTimeUnixNano +// traceId → TraceID +// spanId → SpanID +type LogResult struct { + Resource map[string]string + Scope map[string]string + Attributes map[string]string + Body string + SeverityText string + SeverityNumber string + TimeUnixNano string + ObservedTimeUnixNano string + TraceID string + SpanID string +} + +// SetField routes a Logs Insights result field into the appropriate LogResult +// field based on its dot-path prefix. +func (lr *LogResult) SetField(field, value string) { + switch { + case field == "body": + lr.Body = value + case field == "severityText": + lr.SeverityText = value + case field == "severityNumber": + lr.SeverityNumber = value + case field == "timeUnixNano": + lr.TimeUnixNano = value + case field == "observedTimeUnixNano": + lr.ObservedTimeUnixNano = value + case field == "traceId": + lr.TraceID = value + case field == "spanId": + lr.SpanID = value + case strings.HasPrefix(field, "resource.attributes."): + lr.Resource[strings.TrimPrefix(field, "resource.attributes.")] = value + case strings.HasPrefix(field, "scope.attributes."): + lr.Scope[strings.TrimPrefix(field, "scope.attributes.")] = value + case strings.HasPrefix(field, "attributes."): + lr.Attributes[strings.TrimPrefix(field, "attributes.")] = value + } +} + +// HasResource returns true if the resource attribute key exists and is non-empty. +func (lr *LogResult) HasResource(key string) bool { + v, ok := lr.Resource[key] + return ok && v != "" +} + +// HasScope returns true if the scope attribute key exists and is non-empty. +func (lr *LogResult) HasScope(key string) bool { + v, ok := lr.Scope[key] + return ok && v != "" +} diff --git a/util/otellogs/query_cache.go b/util/otellogs/query_cache.go new file mode 100644 index 00000000..7fe544f7 --- /dev/null +++ b/util/otellogs/query_cache.go @@ -0,0 +1,189 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package otellogs + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types" +) + +// LogQueryCache provides session-scoped caching of Logs Insights queries. +// Each unique (logGroup, pipeline) pair is queried once; subsequent calls +// return cached results. +type LogQueryCache struct { + mu sync.RWMutex + cache map[string]cacheEntry + inflight map[string]chan struct{} + client *OtelLogsClient + cluster string + lookback time.Duration +} + +type cacheEntry struct { + results []LogResult + err error +} + +// NewLogQueryCache creates a cache backed by the given client. +func NewLogQueryCache(client *OtelLogsClient, clusterName string, lookback time.Duration) *LogQueryCache { + if lookback == 0 { + lookback = 10 * time.Minute + } + return &LogQueryCache{ + cache: make(map[string]cacheEntry), + inflight: make(map[string]chan struct{}), + client: client, + cluster: clusterName, + lookback: lookback, + } +} + +// Get returns cached log results for a given log group filtered by pipeline. +func (c *LogQueryCache) Get(ctx context.Context, logGroup, pipeline string) ([]LogResult, error) { + key := logGroup + "|" + pipeline + + c.mu.RLock() + if entry, ok := c.cache[key]; ok { + c.mu.RUnlock() + return entry.results, entry.err + } + if ch, ok := c.inflight[key]; ok { + c.mu.RUnlock() + <-ch + c.mu.RLock() + entry := c.cache[key] + c.mu.RUnlock() + return entry.results, entry.err + } + c.mu.RUnlock() + + c.mu.Lock() + if entry, ok := c.cache[key]; ok { + c.mu.Unlock() + return entry.results, entry.err + } + if ch, ok := c.inflight[key]; ok { + c.mu.Unlock() + <-ch + c.mu.RLock() + entry := c.cache[key] + c.mu.RUnlock() + return entry.results, entry.err + } + ch := make(chan struct{}) + c.inflight[key] = ch + c.mu.Unlock() + + entry := c.fetch(ctx, logGroup, pipeline) + + c.mu.Lock() + c.cache[key] = entry + delete(c.inflight, key) + c.mu.Unlock() + close(ch) + + return entry.results, entry.err +} + +func (c *LogQueryCache) fetch(ctx context.Context, logGroup, pipeline string) cacheEntry { + query := fmt.Sprintf( + "fields @message"+ + " | filter resource.attributes.k8s.cluster.name = %q"+ + " | filter scope.attributes.cloudwatch.pipeline = %q"+ + " | limit 200", + c.cluster, pipeline, + ) + + rows, err := c.client.QueryRaw(ctx, logGroup, query, c.lookback) + if err != nil { + return cacheEntry{err: err} + } + results := parseMessageRows(rows) + return cacheEntry{results: results} +} + +// parseMessageRows extracts @message from each Logs Insights row and parses +// the OTLP JSON into LogResult structs. +func parseMessageRows(rows [][]types.ResultField) []LogResult { + var results []LogResult + for _, row := range rows { + msg := extractField(row, "@message") + if msg == "" { + continue + } + lr, err := ParseOTLPLogJSON(msg) + if err != nil { + continue + } + results = append(results, lr) + } + return results +} + +func extractField(row []types.ResultField, name string) string { + for _, f := range row { + if f.Field != nil && *f.Field == name { + if f.Value != nil { + return *f.Value + } + } + } + return "" +} + +// otlpLogJSON mirrors the JSON structure of an OTLP log record as stored in +// CloudWatch Logs when ingested via the OTLP endpoint. +type otlpLogJSON struct { + Resource struct { + Attributes map[string]string `json:"attributes"` + } `json:"resource"` + Scope struct { + Attributes map[string]string `json:"attributes"` + } `json:"scope"` + Body string `json:"body"` + Attributes map[string]string `json:"attributes"` + SeverityText string `json:"severityText"` + SeverityNumber json.Number `json:"severityNumber"` + TimeUnixNano json.Number `json:"timeUnixNano"` + ObservedTimeUnixNano json.Number `json:"observedTimeUnixNano"` + TraceID string `json:"traceId"` + SpanID string `json:"spanId"` +} + +// ParseOTLPLogJSON parses a raw OTLP log JSON string (as stored in CW Logs) +// into a LogResult. +func ParseOTLPLogJSON(raw string) (LogResult, error) { + var j otlpLogJSON + if err := json.Unmarshal([]byte(raw), &j); err != nil { + return LogResult{}, fmt.Errorf("parsing OTLP log JSON: %w", err) + } + + lr := LogResult{ + Resource: j.Resource.Attributes, + Scope: j.Scope.Attributes, + Attributes: j.Attributes, + Body: j.Body, + SeverityText: j.SeverityText, + SeverityNumber: j.SeverityNumber.String(), + TimeUnixNano: j.TimeUnixNano.String(), + ObservedTimeUnixNano: j.ObservedTimeUnixNano.String(), + TraceID: j.TraceID, + SpanID: j.SpanID, + } + if lr.Resource == nil { + lr.Resource = make(map[string]string) + } + if lr.Scope == nil { + lr.Scope = make(map[string]string) + } + if lr.Attributes == nil { + lr.Attributes = make(map[string]string) + } + return lr, nil +} diff --git a/util/otelmetrics/expectations.go b/util/otelmetrics/expectations.go new file mode 100644 index 00000000..b94f3397 --- /dev/null +++ b/util/otelmetrics/expectations.go @@ -0,0 +1,109 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package otelmetrics + +// Shared resource attribute expectations for OTEL Container Insights telemetry. +// These define the canonical sets of resource attributes expected at each scope +// level. Both metrics (PromQL) and logs (Logs Insights) tests should reference +// these to ensure cross-telemetry consistency. + +// NodeResourceAttrs are resource attributes expected on all node-scoped +// telemetry: node_exporter metrics, host logs, kubeletstats node metrics. +// These come from resourcedetection/ec2 + k8sattributes/node + cluster name. +var NodeResourceAttrs = []string{ + "k8s.cluster.name", + "k8s.node.name", + "k8s.node.uid", + "host.id", + "host.type", + "host.name", + "host.image.id", + "cloud.provider", + "cloud.platform", + "cloud.region", + "cloud.availability_zone", + "cloud.account.id", + "cloud.resource_id", +} + +// PodResourceAttrs are resource attributes expected on pod-scoped telemetry: +// cadvisor metrics, kubeletstats pod metrics, application logs. +// Superset of NodeResourceAttrs plus pod identity. +var PodResourceAttrs = append(append([]string(nil), NodeResourceAttrs...), + "k8s.pod.name", + "k8s.namespace.name", + "k8s.pod.uid", +) + +// ContainerResourceAttrs are resource attributes expected on container-scoped +// telemetry: cadvisor container metrics, application logs. +// Superset of PodResourceAttrs plus container identity. +var ContainerResourceAttrs = append(append([]string(nil), PodResourceAttrs...), + "k8s.container.name", +) + +// WorkloadAttrs are resource attributes set by workload derivation. +// Present on pod/container-scoped telemetry when the pod belongs to a +// known workload (Deployment, StatefulSet, DaemonSet, Job, CronJob, ReplicaSet). +var WorkloadAttrs = []string{ + "k8s.workload.name", + "k8s.workload.type", +} + +// AppLogResourceAttrs are resource attributes expected on application logs. +// Same as ContainerResourceAttrs + workload + service.name. +var AppLogResourceAttrs = append(append([]string(nil), ContainerResourceAttrs...), + "k8s.workload.name", + "k8s.workload.type", + "service.name", +) + +// HostLogResourceAttrs are resource attributes expected on host logs. +// Node-level only — no pod, container, or workload context. +var HostLogResourceAttrs = []string{ + "k8s.cluster.name", + "k8s.node.name", + "host.id", + "host.type", + "host.name", + "host.image.id", + "cloud.provider", + "cloud.platform", + "cloud.region", + "cloud.availability_zone", + "cloud.account.id", + "cloud.resource_id", +} + +// HostLogAbsentAttrs are resource attributes that must NOT be present on host logs. +var HostLogAbsentAttrs = []string{ + "k8s.pod.name", + "k8s.pod.uid", + "k8s.namespace.name", + "k8s.container.name", + "k8s.workload.name", + "k8s.workload.type", + "service.name", +} + +// ScopeAttrs are instrumentation scope attributes expected on all Container +// Insights telemetry (both metrics and logs). +var ScopeAttrs = map[string]string{ + "cloudwatch.source": "cloudwatch-agent", + "cloudwatch.solution": "k8s-otel-container-insights", +} + +// AppLogScopeAttrs are scope attributes specific to the application logs pipeline. +var AppLogScopeAttrs = map[string]string{ + "cloudwatch.source": "cloudwatch-agent", + "cloudwatch.solution": "k8s-otel-container-insights", + "cloudwatch.pipeline": "application-logs", +} + +// HostLogScopeAttrs are scope attributes specific to the host logs pipeline. +var HostLogScopeAttrs = map[string]string{ + "cloudwatch.source": "cloudwatch-agent", + "cloudwatch.solution": "k8s-otel-container-insights", + "cloudwatch.pipeline": "host-logs", +}