Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
298a443
feat: Add Agent Framework to A2A bridge support
Shubham-Kumar-2000 Nov 23, 2025
828b733
fix: Update references from agent_thread_storage to _agent_thread_sto…
Shubham-Kumar-2000 Nov 23, 2025
255ee71
Refactor A2A agent framework and improve code structure
Shubham-Kumar-2000 Nov 23, 2025
1e600dc
fix: Lint fix new line added
Shubham-Kumar-2000 Nov 23, 2025
c7a8b80
test: Add unit tests for AgentThreadStorage and InMemoryAgentThreadSt…
Shubham-Kumar-2000 Nov 23, 2025
db5bdf3
refactor: Update type hints to use new syntax for Union and List
Shubham-Kumar-2000 Nov 23, 2025
a3a5c48
fix: Validate RequestContext for context_id and message before execution
Shubham-Kumar-2000 Nov 23, 2025
4c43248
Refactor tests and remove A2aExecutionContext references
Shubham-Kumar-2000 Nov 23, 2025
cc2df5c
Refactor A2AExecutor tests and remove event adapter
Shubham-Kumar-2000 Nov 25, 2025
c732a05
refactor: Remove AgentThreadStorage and InMemoryAgentThreadStorage cl…
Shubham-Kumar-2000 Nov 30, 2025
64c1cfb
feat: A2AExecutor to have its own override able save and get threads …
Shubham-Kumar-2000 Nov 30, 2025
1f8a002
fix: linter bugs
Shubham-Kumar-2000 Nov 30, 2025
293f25d
Merge branch 'main' of https://github.com/Shubham-Kumar-2000/agent-fr…
Shubham-Kumar-2000 Nov 30, 2025
e3bb889
removed unnecessary changes form core package
Shubham-Kumar-2000 Nov 30, 2025
960bb22
new line added
Shubham-Kumar-2000 Nov 30, 2025
ed01679
Merge branch 'main' of https://github.com/microsoft/agent-framework i…
Shubham-Kumar-2000 Feb 26, 2026
f8ee82b
Refactor A2AExecutor tests and update imports
Shubham-Kumar-2000 Feb 27, 2026
f14e7c4
Update A2A documentation: enhance usage examples for A2AAgent and A2A…
Shubham-Kumar-2000 Feb 27, 2026
9d84791
Updated uv lock
Shubham-Kumar-2000 Feb 27, 2026
6e61d13
Fix metadata assertion in TestA2AExecutorHandleEvents and reorder loa…
Shubham-Kumar-2000 Feb 27, 2026
14b17d8
Update agent card configuration: add default input and output modes, …
Shubham-Kumar-2000 Feb 27, 2026
ae34dcf
Fix assertion for metadata in TestA2AExecutorHandleEvents
Shubham-Kumar-2000 Feb 27, 2026
4674494
Fix formatting issues in TestA2AExecutorExecute and TestA2AExecutorIn…
Shubham-Kumar-2000 Feb 27, 2026
7022f7f
Enhance A2AExecutor documentation with examples and clarify agent exe…
Shubham-Kumar-2000 Feb 27, 2026
1c7a196
Merge branch 'main' into agent_framework_to_a2a
Shubham-Kumar-2000 Feb 27, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions python/packages/a2a/AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,48 @@ Agent-to-Agent (A2A) protocol support for inter-agent communication.

## Main Classes

- **`A2AAgent`** - Agent wrapper that exposes an agent via the A2A protocol
- **`A2AAgent`** - Client to connect to remote A2A-compliant agents.
- **`A2AExecutor`** - Bridge to expose local agents via the A2A protocol.

## Usage

### A2AAgent (Client)

```python
from agent_framework.a2a import A2AAgent

a2a_agent = A2AAgent(agent=my_agent)
# Connect to a remote A2A agent
a2a_agent = A2AAgent(url="http://remote-agent/a2a")
response = await a2a_agent.run("Hello!")
```

### A2AExecutor (Server/Bridge)

```python
from agent_framework.a2a import A2AExecutor
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore

# Create an A2A executor for your agent
executor = A2AExecutor(agent=my_agent)

# Set up the request handler and server application
request_handler = DefaultRequestHandler(
agent_executor=executor,
task_store=InMemoryTaskStore(),
)

app = A2AStarletteApplication(
agent_card=my_agent_card,
http_handler=request_handler,
).build()
```

## Import Path

```python
from agent_framework.a2a import A2AAgent
from agent_framework.a2a import A2AAgent, A2AExecutor
# or directly:
from agent_framework_a2a import A2AAgent
from agent_framework_a2a import A2AAgent, A2AExecutor
```
38 changes: 38 additions & 0 deletions python/packages/a2a/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,49 @@ pip install agent-framework-a2a --pre

The A2A agent integration enables communication with remote A2A-compliant agents using the standardized A2A protocol. This allows your Agent Framework applications to connect to agents running on different platforms, languages, or services.

### A2AAgent (Client)

The `A2AAgent` class is a client that wraps an A2A Client to connect the Agent Framework with external A2A-compliant agents.

```python
from agent_framework.a2a import A2AAgent

# Connect to a remote A2A agent
a2a_agent = A2AAgent(url="http://remote-agent/a2a")
response = await a2a_agent.run("Hello!")
```

### A2AExecutor (Hosting)

The `A2AExecutor` class bridges local AI agents built with the `agent_framework` library to the A2A protocol, allowing them to be hosted and accessed by other A2A-compliant clients.

```python
from agent_framework.a2a import A2AExecutor
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore

# Create an A2A executor for your agent
executor = A2AExecutor(agent=my_agent)

# Set up the request handler and server application
request_handler = DefaultRequestHandler(
agent_executor=executor,
task_store=InMemoryTaskStore(),
)

app = A2AStarletteApplication(
agent_card=my_agent_card,
http_handler=request_handler,
).build()
```

### Basic Usage Example

See the [A2A agent examples](../../samples/04-hosting/a2a/) which demonstrate:

- Connecting to remote A2A agents
- Hosting local agents via A2A protocol
- Sending messages and receiving responses
- Handling different content types (text, files, data)
- Streaming responses and real-time interaction
2 changes: 2 additions & 0 deletions python/packages/a2a/agent_framework_a2a/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import importlib.metadata

from ._agent import A2AAgent, A2AContinuationToken
from ._a2a_executor import A2AExecutor

try:
__version__ = importlib.metadata.version(__name__)
Expand All @@ -12,5 +13,6 @@
__all__ = [
"A2AAgent",
"A2AContinuationToken",
"A2AExecutor",
"__version__",
]
198 changes: 198 additions & 0 deletions python/packages/a2a/agent_framework_a2a/_a2a_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
# Copyright (c) Microsoft. All rights reserved.

from asyncio import CancelledError

from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import FilePart, FileWithBytes, FileWithUri, Part, TaskState, TextPart
from a2a.utils import new_task
from agent_framework import (
Agent,
Content,
Message,
WorkflowAgent,
)
from typing_extensions import override


class A2AExecutor(AgentExecutor):
"""Execute AI agents using the A2A (Agent-to-Agent) protocol.

The A2AExecutor bridges AI agents built with the agent_framework library and the A2A protocol,
enabling structured agent execution with event-driven communication. It handles execution
contexts, delegates history management to the agent's session, and converts agent
responses into A2A protocol events.

The executor supports executing an Agent or WorkflowAgent. It provides comprehensive
error handling with task status updates and supports various content types including text,
binary data, and URI-based content.

Example:
.. code-block:: python

from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import AgentCapabilities, AgentCard
from agent_framework.a2a import A2AExecutor
from agent_framework.openai import OpenAIResponsesClient

public_agent_card = AgentCard(
name='Food Agent',
description='A simple agent that provides food-related information.',
url='http://localhost:9999/',
version='1.0.0',
defaultInputModes=['text'],
defaultOutputModes=['text'],
capabilities=AgentCapabilities(streaming=True),
skills=[],
)

# Create an agent
agent = OpenAIResponsesClient().as_agent(
name="Food Agent",
instructions="A simple agent that provides food-related information.",
)

# Set up the A2A server with the A2AExecutor
request_handler = DefaultRequestHandler(
agent_executor=A2AExecutor(agent),
task_store=InMemoryTaskStore(),
)

server = A2AStarletteApplication(
agent_card=public_agent_card,
http_handler=request_handler,
).build()

Args:
agent: The AI agent to execute.
"""

def __init__(
self,
agent: Agent | WorkflowAgent
):
"""Initialize the A2AExecutor with the specified agent.

Example:
.. code-block:: python

# Set up the A2A server with the A2AExecutor
request_handler = DefaultRequestHandler(
agent_executor=A2AExecutor(agent),
task_store=InMemoryTaskStore(),
)

Args:
agent: The AI agent or workflow to execute.
"""
super().__init__()
self._agent: Agent | WorkflowAgent = agent

@override
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
"""Cancel agent execution.

Cancellation is primarily managed by the A2A protocol layer.
"""
pass

@override
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
"""Execute the agent with the given context and event queue.

Orchestrates the agent execution process: sets up the agent session,
executes the agent, processes response messages, and handles errors with appropriate task status updates.
"""
if context.context_id is None:
raise ValueError("Context ID must be provided in the RequestContext")
if context.message is None:
raise ValueError("Message must be provided in the RequestContext")

query = context.get_user_input()
task = context.current_task

if not task:
task = new_task(context.message)
await event_queue.enqueue_event(task)

updater = TaskUpdater(event_queue, task.id, context.context_id)
await updater.submit()

try:
await updater.start_work()

session = self._agent.create_session(session_id=task.context_id)

# Create a Message from the user query
user_message = Message(role="user", contents=[Content.from_text(text=query)])

# Run the agent with the message list
response = await self._agent.run(user_message, session=session)

response_messages = response.messages
if not isinstance(response_messages, list):
response_messages = [response_messages]

for message in response_messages:
await self.handle_events(message, updater)

# Mark as complete
await updater.complete()
except CancelledError:
await updater.update_status(state=TaskState.canceled, final=True)
except Exception as e:
await updater.update_status(
state=TaskState.failed,
final=True,
message=updater.new_agent_message([Part(root=TextPart(text=str(e.args)))]),
)

async def handle_events(self, message: Message, updater: TaskUpdater) -> None:
"""Convert agent response messages to A2A protocol events and update task status.

Processes Message objects returned by the agent and converts them into A2A protocol format.
Handles text, data, and URI content. USER role messages are skipped.

Users can override this method in a subclass to implement custom transformations
from their agent's Message format to A2A protocol events.

Example:
.. code-block:: python

class CustomA2AExecutor(A2AExecutor):
async def handle_events(self, message: Message, updater: TaskUpdater) -> None:
# Custom logic to transform message contents
if message.role == "assistant" and message.contents:
parts = [Part(root=TextPart(text=f"Custom: {message.contents[0].text}"))]
await updater.update_status(
state=TaskState.working,
message=updater.new_agent_message(parts=parts),
)
else:
await super().handle_events(message, updater)
"""
if message.role == "user":
# This is a user message, we can ignore it in the context of task updates
return

parts: list[Part] = []
metadata = getattr(message, "additional_properties", None)

for content in message.contents:
if content.type == "text" and content.text:
parts.append(Part(root=TextPart(text=content.text)))
elif content.type == "data":
base64_str = content.uri
parts.append(Part(root=FilePart(file=FileWithBytes(bytes=base64_str, mime_type=content.media_type))))
elif content.type == "uri":
parts.append(Part(root=FilePart(file=FileWithUri(uri=content.uri, mime_type=content.media_type))))
# Silently skip unsupported content types

if parts:
await updater.update_status(
state=TaskState.working,
message=updater.new_agent_message(parts=parts, metadata=metadata),
)
Loading