Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions terraform/eks/daemon/otel/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ resource "helm_release" "aws_observability" {
{ name = "clusterName", value = aws_eks_cluster.this.name },
{ name = "region", value = var.region },
{ name = "otelContainerInsights.enabled", value = "true" },
{ name = "otelContainerInsights.logs.enabled", value = "true" }
]

depends_on = [
Expand Down Expand Up @@ -360,8 +361,8 @@ resource "null_resource" "validator" {
echo "Running OTEL standard cluster integration tests"
cd ../../../..

echo "Waiting 3 minutes for metrics to propagate..."
sleep 180
echo "Waiting 5 minutes for metrics and logs to propagate..."
sleep 300

go test -tags integration -timeout 1h -v ${var.test_dir} \
-eksClusterName=${aws_eks_cluster.this.name} \
Expand Down
215 changes: 215 additions & 0 deletions test/otel/standard/logs_app_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
//go:build integration

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package standard

import (
"context"
"testing"

"github.com/stretchr/testify/require"

"github.com/aws/amazon-cloudwatch-agent-test/util/otelmetrics"
)

// ---------------------------------------------------------------------------
// TestAppLogsExist — application logs are flowing to the expected log group.
// ---------------------------------------------------------------------------

func TestAppLogsExist(t *testing.T) {
results, err := logQueryCache.Get(context.Background(), appLogGroup(), pipelineAppLogs)
require.NoError(t, err, "querying application logs")
require.NotEmpty(t, results, "no application logs found in %s", appLogGroup())
}

// ---------------------------------------------------------------------------
// TestAppLogsResourceAttrs — application logs have all expected resource attrs.
// ---------------------------------------------------------------------------

func TestAppLogsResourceAttrs(t *testing.T) {
results, err := logQueryCache.Get(context.Background(), appLogGroup(), pipelineAppLogs)
require.NoError(t, err)
require.NotEmpty(t, results)

// Core attrs (node + pod + container) must be on ALL logs.
coreAttrs := otelmetrics.ContainerResourceAttrs
for _, attr := range coreAttrs {
t.Run(attr, func(t *testing.T) {
for _, r := range results {
v, ok := r.Resource[attr]
require.True(t, ok, "app log missing resource.attributes.%s", attr)
require.NotEmpty(t, v, "app log has empty resource.attributes.%s", attr)
}
})
}

// Workload attrs are present on most logs (pods with an owning workload),
// but bare pods / static pods won't have them. Verify at least 50% do.
workloadAttrs := []string{"k8s.workload.name", "k8s.workload.type", "service.name"}
for _, attr := range workloadAttrs {
t.Run(attr, func(t *testing.T) {
count := 0
for _, r := range results {
if v, ok := r.Resource[attr]; ok && v != "" {
count++
}
}
require.True(t, count > len(results)/2,
"app log resource.attributes.%s present on only %d/%d logs (expected majority)",
attr, count, len(results))
})
}
}

// ---------------------------------------------------------------------------
// TestAppLogsScopeAttrs — application logs have correct scope attributes.
// ---------------------------------------------------------------------------

func TestAppLogsScopeAttrs(t *testing.T) {
results, err := logQueryCache.Get(context.Background(), appLogGroup(), pipelineAppLogs)
require.NoError(t, err)
require.NotEmpty(t, results)

for key, want := range otelmetrics.AppLogScopeAttrs {
t.Run(key, func(t *testing.T) {
for _, r := range results {
got, ok := r.Scope[key]
require.True(t, ok, "app log missing scope.attributes.%s", key)
require.Equal(t, want, got, "app log scope.attributes.%s", key)
}
})
}
}

// ---------------------------------------------------------------------------
// TestAppLogsClusterIdentity — k8s.cluster.name matches test cluster.
// ---------------------------------------------------------------------------

func TestAppLogsClusterIdentity(t *testing.T) {
results, err := logQueryCache.Get(context.Background(), appLogGroup(), pipelineAppLogs)
require.NoError(t, err)
require.NotEmpty(t, results)

for _, r := range results {
require.Equal(t, cfg.ClusterName, r.Resource["k8s.cluster.name"],
"app log k8s.cluster.name mismatch")
}
}

// ---------------------------------------------------------------------------
// TestAppLogsCloudProvider — cloud.provider == "aws".
// ---------------------------------------------------------------------------

func TestAppLogsCloudProvider(t *testing.T) {
results, err := logQueryCache.Get(context.Background(), appLogGroup(), pipelineAppLogs)
require.NoError(t, err)
require.NotEmpty(t, results)

for _, r := range results {
require.Equal(t, "aws", r.Resource["cloud.provider"], "app log cloud.provider")
}
}

// ---------------------------------------------------------------------------
// TestAppLogsHostType — host.type matches expected node group instance types.
// ---------------------------------------------------------------------------

func TestAppLogsHostType(t *testing.T) {
validTypes := make(map[string]bool, len(clusterNodeGroups))
for _, ng := range clusterNodeGroups {
validTypes[ng.InstanceType] = true
}

results, err := logQueryCache.Get(context.Background(), appLogGroup(), pipelineAppLogs)
require.NoError(t, err)
require.NotEmpty(t, results)

for _, r := range results {
ht := r.Resource["host.type"]
require.True(t, validTypes[ht],
"app log host.type=%q not in expected node groups %v", ht, validTypes)
}
}

// ---------------------------------------------------------------------------
// TestAppLogsBody — log body is non-empty.
// ---------------------------------------------------------------------------

func TestAppLogsBody(t *testing.T) {
results, err := logQueryCache.Get(context.Background(), appLogGroup(), pipelineAppLogs)
require.NoError(t, err)
require.NotEmpty(t, results)

for _, r := range results {
require.NotEmpty(t, r.Body, "app log has empty body")
}
}

// ---------------------------------------------------------------------------
// TestAppLogsRecordAttrs — log record attributes (log.iostream, log.file.path).
// ---------------------------------------------------------------------------

func TestAppLogsRecordAttrs(t *testing.T) {
results, err := logQueryCache.Get(context.Background(), appLogGroup(), pipelineAppLogs)
require.NoError(t, err)
require.NotEmpty(t, results)

t.Run("log.iostream", func(t *testing.T) {
validStreams := map[string]bool{"stdout": true, "stderr": true}
count := 0
for _, r := range results {
stream, ok := r.Attributes["log.iostream"]
if !ok {
continue
}
count++
require.True(t, validStreams[stream],
"app log attributes.log.iostream=%q, want stdout or stderr", stream)
}
require.True(t, count > len(results)/2,
"log.iostream present on only %d/%d logs", count, len(results))
})

t.Run("log.file.path", func(t *testing.T) {
for _, r := range results {
path, ok := r.Attributes["log.file.path"]
require.True(t, ok, "app log missing attributes.log.file.path")
require.Contains(t, path, "/var/log/containers/",
"app log log.file.path should be under /var/log/containers/")
}
})
}

// ---------------------------------------------------------------------------
// TestAppLogsWorkloadDerivation — workload name/type set correctly.
// ---------------------------------------------------------------------------

func TestAppLogsWorkloadDerivation(t *testing.T) {
results, err := logQueryCache.Get(context.Background(), appLogGroup(), pipelineAppLogs)
require.NoError(t, err)
require.NotEmpty(t, results)

validTypes := map[string]bool{
"Deployment": true, "StatefulSet": true, "DaemonSet": true,
"Job": true, "CronJob": true, "ReplicaSet": true,
}

withWorkload := 0
for _, r := range results {
wType := r.Resource["k8s.workload.type"]
if wType == "" {
continue // bare pod / static pod — no workload owner
}
withWorkload++
require.True(t, validTypes[wType],
"app log k8s.workload.type=%q not a valid workload type", wType)

// service.name should match workload name
require.Equal(t, r.Resource["k8s.workload.name"], r.Resource["service.name"],
"app log service.name should equal k8s.workload.name")
}
require.True(t, withWorkload > 0,
"no app logs have k8s.workload.type — workload derivation may have failed entirely")
}
82 changes: 82 additions & 0 deletions test/otel/standard/logs_cross_telemetry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
//go:build integration

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package standard

import (
"context"
"strings"
"testing"

"github.com/stretchr/testify/require"
)

// ---------------------------------------------------------------------------
// TestCrossTelemetryHostID — for a given k8s.node.name, host.id in logs
// matches host.id in metrics.
// ---------------------------------------------------------------------------

func TestCrossTelemetryHostID(t *testing.T) {
// Get host.id from metrics (node_exporter is node-scoped, always has host.id).
metricResults, err := queryCache.Get(context.Background(), "node_cpu_seconds_total")
require.NoError(t, err)
require.NotEmpty(t, metricResults)

// Build node→host.id map from metrics.
metricHostIDs := make(map[string]string) // k8s.node.name → host.id
for _, r := range metricResults {
node := r.Labels.Resource["k8s.node.name"]
hostID := r.Labels.Resource["host.id"]
if node != "" && hostID != "" {
metricHostIDs[node] = hostID
}
}
require.NotEmpty(t, metricHostIDs, "no metrics with both k8s.node.name and host.id")

// Get app logs and verify host.id matches for the same node.
logResults, err := logQueryCache.Get(context.Background(), appLogGroup(), pipelineAppLogs)
require.NoError(t, err)
require.NotEmpty(t, logResults)

matched := 0
for _, lr := range logResults {
node := lr.Resource["k8s.node.name"]
logHostID := lr.Resource["host.id"]
if node == "" || logHostID == "" {
continue
}
if metricHostID, ok := metricHostIDs[node]; ok {
matched++
require.Equal(t, metricHostID, logHostID,
"host.id mismatch for node %s: metrics=%s logs=%s",
node, metricHostID, logHostID)
}
}
require.True(t, matched > 0,
"no app logs matched a node from metrics — cannot verify cross-telemetry consistency")
}

// ---------------------------------------------------------------------------
// TestCrossTelemetryCloudResourceID — cloud.resource_id format matches
// between logs and metrics.
// ---------------------------------------------------------------------------

func TestCrossTelemetryCloudResourceID(t *testing.T) {
expectedPrefix := "arn:aws:eks:"
expectedSuffix := ":cluster/" + cfg.ClusterName

// Check app logs.
logResults, err := logQueryCache.Get(context.Background(), appLogGroup(), pipelineAppLogs)
require.NoError(t, err)
require.NotEmpty(t, logResults)

for _, r := range logResults {
arn := r.Resource["cloud.resource_id"]
require.True(t, strings.HasPrefix(arn, expectedPrefix),
"app log cloud.resource_id should start with %q, got %q", expectedPrefix, arn)
require.True(t, strings.HasSuffix(arn, expectedSuffix),
"app log cloud.resource_id should end with %q, got %q", expectedSuffix, arn)
}
}
50 changes: 50 additions & 0 deletions test/otel/standard/logs_setup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
//go:build integration

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package standard

import (
"context"
"fmt"
"time"

"github.com/aws/amazon-cloudwatch-agent-test/util/otellogs"
)

// Log group names follow the convention set in the helm chart:
// /aws/otel/containerinsights/{clusterName}/{pipeline}
func appLogGroup() string {
return fmt.Sprintf("/aws/otel/containerinsights/%s/application", cfg.ClusterName)
}

// Pipeline identifiers matching scope.attributes.cloudwatch.pipeline values.
const (
pipelineAppLogs = "application-logs"
)

var (
logsClient *otellogs.OtelLogsClient
logQueryCache *otellogs.LogQueryCache
logsLookback = 10 * time.Minute
)

// initLogsClient is called from TestMain (setup_test.go) after cfg is populated.
func initLogsClient(ctx context.Context) error {
var err error
logsCfg := otellogs.LogsConfig{
Region: cfg.Region,
ClusterName: cfg.ClusterName,
AccountID: cfg.AccountID,
LookbackWindow: logsLookback,
}

logsClient, err = otellogs.NewClient(ctx, logsCfg)
if err != nil {
return fmt.Errorf("creating logs client: %w", err)
}

logQueryCache = otellogs.NewLogQueryCache(logsClient, cfg.ClusterName, logsLookback)
return nil
}
6 changes: 6 additions & 0 deletions test/otel/standard/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,11 @@ func TestMain(m *testing.M) {
otelmetrics.WithSourceRegistry(registry),
)

// Initialize logs client for OTLP log validation tests.
if err := initLogsClient(ctx); err != nil {
fmt.Fprintf(os.Stderr, "Logs client error: %v\n", err)
os.Exit(1)
}

os.Exit(m.Run())
}
Loading
Loading