Build and deploy event-driven AI agents with simple Python decorators. dispatchagents.ai — currently in public preview.
uv add git+ssh://git@github.com/datadog-labs/dispatch_agents_sdk.git@vX.Y.ZReplace vX.Y.Z with the latest tag:
git ls-remote --tags git@github.com:datadog-labs/dispatch_agents_sdk.git 'v*' \
| sort -t'/' -k3 -V | tail -1 | awk -F'/' '{print $3}'Handle events published to a topic. The payload is automatically validated against your Pydantic model before your handler is called.
from dispatch_agents import BasePayload, on
class OrderPayload(BasePayload):
order_id: str
customer_email: str
class OrderResult(BasePayload):
status: str
@on(topic="orders.process")
async def process_order(payload: OrderPayload) -> OrderResult:
await send_confirmation(payload.customer_email, payload.order_id)
return OrderResult(status="confirmed")Subscribe to GitHub events by passing a GitHub event class:
from dispatch_agents import on
from dispatch_agents.integrations.github.events import PullRequestOpened
@on(github_event=PullRequestOpened)
async def handle_pr(payload: PullRequestOpened) -> None:
print(f"PR opened: {payload.pull_request.title}")Register a function that other agents can call directly by name using invoke().
from dispatch_agents import BasePayload, fn
class WeatherRequest(BasePayload):
city: str
class WeatherResponse(BasePayload):
temperature: float
conditions: str
@fn()
async def get_weather(request: WeatherRequest) -> WeatherResponse:
data = await fetch_weather_api(request.city)
return WeatherResponse(temperature=data["temp"], conditions=data["sky"])Runs once before any events are processed. Use for connecting to databases, loading models, or initializing shared state.
from dispatch_agents import BasePayload, init, on
_client = None
@init
async def setup():
global _client
_client = await connect_to_database()
class QueryPayload(BasePayload):
user_id: str
@on(topic="users.query")
async def query_users(payload: QueryPayload) -> QueryResult:
rows = await _client.fetch(
"select * from users where id = $1",
payload.user_id,
)
return QueryResult(rows=rows)Publish an event without waiting for handlers to process it.
from dispatch_agents import emit_event, on
@on(topic="orders.process")
async def process_order(payload: OrderPayload) -> OrderResult:
await fulfill(payload)
# Notify other agents — returns immediately
await emit_event("notifications.send", {"email": payload.customer_email})
return OrderResult(status="fulfilled")Call a @fn-decorated function on another agent and wait for the result.
from dispatch_agents import invoke, on
@on(topic="reports.request")
async def handle_report(payload: ReportPayload) -> ReportResult:
weather = await invoke(
agent_name="weather-service",
function_name="get_weather",
payload={"city": payload.city},
response_model=WeatherResponse,
timeout=30.0,
)
return ReportResult(weather=weather.conditions)OSErrorand subclasses → automatic retry with exponential backoffValueErrorand subclasses → terminal failure, not retried
from dispatch_agents import on
@on(topic="data.fetch")
async def fetch(payload: FetchPayload) -> FetchResult:
if not payload.url.startswith("https://"):
raise ValueError("Only HTTPS URLs allowed") # not retried
# TimeoutError (an OSError subclass) will be retried automatically
response = await httpx_client.get(payload.url, timeout=10.0)
return FetchResult(data=response.json())All handler inputs and outputs must inherit from BasePayload:
from dispatch_agents import BasePayload
from typing import Optional
class TaskPayload(BasePayload):
task_id: str
priority: int = 1
tags: list[str] = []
metadata: Optional[dict[str, str]] = Nonehello_world— typed payloads, error handling, persistent storageweather-service+weather-assistant— inter-agentinvoke()conversational-agent— multi-turn LLM chat with memorydaily-digest— scheduled agentsdeep-research— parallel supervisor/researcher patternmulti-framework— Claude SDK, OpenAI Agents, LangGraph
GitHub Issues: github.com/datadog-labs/dispatch_agents_sdk/issues