Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions agentex/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,19 @@ services:
retries: 5
start_period: 5s

agentex-otel-collector:
container_name: agentex-otel-collector
image: otel/opentelemetry-collector-contrib:0.101.0
command: ["--config=/etc/otel-collector-config.yaml"]
volumes:
- ./otel/otel-collector-config.yaml:/etc/otel-collector-config.yaml:ro
ports:
- "4317:4317" # OTLP gRPC
- "4318:4318" # OTLP HTTP
- "8889:8889" # Prometheus metrics endpoint
- "13133:13133" # Health check endpoint
networks:
- agentex-network
agentex:
container_name: agentex
build:
Expand All @@ -152,6 +165,8 @@ services:
- ENABLE_HEALTH_CHECK_WORKFLOW=true
- AGENTEX_SERVER_TASK_QUEUE=agentex-server
- ALLOWED_ORIGINS=http://localhost:3000
- OTEL_EXPORTER_OTLP_ENDPOINT=http://agentex-otel-collector:4317
- OTEL_SERVICE_NAME=agentex-api
ports:
- "5003:5003"
volumes:
Expand All @@ -165,6 +180,8 @@ services:
condition: service_healthy
agentex-mongodb:
condition: service_healthy
agentex-otel-collector:
condition: service_started
networks:
- agentex-network
command: |
Expand Down
41 changes: 41 additions & 0 deletions agentex/otel/otel-collector-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# OpenTelemetry Collector configuration for local development
# Receives OTLP metrics and exports to console + Prometheus

receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318

processors:
batch:
timeout: 10s
send_batch_size: 1024

exporters:
# Log metrics to console for debugging
debug:
verbosity: detailed
sampling_initial: 5
sampling_thereafter: 200

# Expose Prometheus endpoint for scraping
prometheus:
endpoint: 0.0.0.0:8889
namespace: agentex
send_timestamps: true
metric_expiration: 5m

extensions:
health_check:
endpoint: 0.0.0.0:13133

service:
extensions: [health_check]
pipelines:
metrics:
receivers: [otlp]
processors: [batch]
exporters: [debug, prometheus]
3 changes: 3 additions & 0 deletions agentex/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ dependencies = [
"ddtrace>=3.13.0",
"json_log_formatter>=1.1.1",
"datadog>=0.52.1",
"opentelemetry-api>=1.28.0",
"opentelemetry-sdk>=1.28.0",
"opentelemetry-exporter-otlp>=1.28.0",
]

[dependency-groups]
Expand Down
19 changes: 18 additions & 1 deletion agentex/src/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@
tasks,
)
from src.config import dependencies
from src.config.dependencies import resolve_environment_variable_dependency
from src.config.dependencies import (
GlobalDependencies,
resolve_environment_variable_dependency,
)
from src.config.environment_variables import EnvVarKeys
from src.domain.exceptions import GenericException
from src.utils.logging import make_logger
from src.utils.otel_metrics import init_otel_metrics, shutdown_otel_metrics

logger = make_logger(__name__)

Expand Down Expand Up @@ -66,14 +70,27 @@ def __init__(

@asynccontextmanager
async def lifespan(_: FastAPI):
# Initialize OpenTelemetry metrics first (before dependencies register instruments)
init_otel_metrics()

await dependencies.startup_global_dependencies()
configure_statsd()

# Start PostgreSQL metrics collection
global_deps = GlobalDependencies()
if global_deps.postgres_metrics_collector:
await global_deps.postgres_metrics_collector.start_collection()

yield

# Clean up HTTP clients before other shutdown tasks
await HttpxGateway.close_clients()
await dependencies.async_shutdown()
dependencies.shutdown()

# Shutdown OTel metrics (flushes remaining data)
shutdown_otel_metrics()


fastapi_app = FastAPI(
title="Agentex API",
Expand Down
44 changes: 44 additions & 0 deletions agentex/src/config/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from src.config.environment_variables import Environment, EnvironmentVariables
from src.utils.database import async_db_engine_creator
from src.utils.db_metrics import PostgresMetricsCollector
from src.utils.logging import make_logger

logger = make_logger(__name__)
Expand Down Expand Up @@ -47,6 +48,7 @@ def __init__(self):
self.httpx_client: httpx.AsyncClient | None = None
self.redis_pool: redis.ConnectionPool | None = None
self.database_async_read_only_engine: AsyncEngine | None = None
self.postgres_metrics_collector: PostgresMetricsCollector | None = None
self._loaded = False

async def create_temporal_client(self):
Expand Down Expand Up @@ -192,10 +194,46 @@ async def load(self):
pool_recycle=3600,
)

# Initialize PostgreSQL metrics collector
self.postgres_metrics_collector = PostgresMetricsCollector()
environment = self.environment_variables.ENVIRONMENT
service_name = os.environ.get("OTEL_SERVICE_NAME", "agentex")

if self.database_async_read_write_engine:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need each of these engines?

self.postgres_metrics_collector.register_engine(
engine=self.database_async_read_write_engine,
pool_name="main",
db_url=self.environment_variables.DATABASE_URL,
environment=environment,
service_name=service_name,
)

if self.database_async_middleware_read_write_engine:
self.postgres_metrics_collector.register_engine(
engine=self.database_async_middleware_read_write_engine,
pool_name="middleware",
db_url=self.environment_variables.DATABASE_URL,
environment=environment,
service_name=service_name,
)

if self.database_async_read_only_engine:
self.postgres_metrics_collector.register_engine(
engine=self.database_async_read_only_engine,
pool_name="readonly",
db_url=read_only_db_url,
environment=environment,
service_name=service_name,
)

self._loaded = True

async def force_reload(self):
"""Force reload all dependencies with fresh environment variables"""
# Stop metrics collection
if self.postgres_metrics_collector:
await self.postgres_metrics_collector.stop_collection()

# Clear existing connections
if self.database_async_read_write_engine:
await self.database_async_read_write_engine.dispose()
Expand All @@ -215,6 +253,7 @@ async def force_reload(self):
self.docker_client = None
self.mongodb_client = None
self.mongodb_database = None
self.postgres_metrics_collector = None

# Reload with fresh environment variables
EnvironmentVariables.clear_cache()
Expand All @@ -232,6 +271,11 @@ def shutdown():

async def async_shutdown():
global_dependencies = GlobalDependencies()

# Stop PostgreSQL metrics collection
if global_dependencies.postgres_metrics_collector:
await global_dependencies.postgres_metrics_collector.stop_collection()

run_concurrently = []
if global_dependencies.database_async_read_only_engine:
run_concurrently.append(
Expand Down
Loading
Loading