perf: dedicated thread pool for conversation execution#3169
perf: dedicated thread pool for conversation execution#3169
Conversation
EventService.run() and the fire-and-forget run path in send_message()
dispatch conversation.run() via loop.run_in_executor(None, ...), which
uses asyncio's default executor (min(32, cpu_count+4) threads shared
across all async operations). Long-running agent step loops starve
short I/O operations and silently queue when the pool is exhausted.
Create a dedicated ThreadPoolExecutor for conversation execution:
- Config gains max_concurrent_runs (default 10, configurable)
- ConversationService creates a shared executor in __aenter__(),
shuts it down in __aexit__(), passes config via get_instance()
- Each EventService receives the shared executor via _run_executor;
conversation.run() dispatches to it instead of the default pool
- Short I/O operations (search_events, get_state, etc.) continue
using the default executor, preventing starvation
When _run_executor is None (standalone EventService), run_in_executor
falls back to the default pool for backward compatibility.
Fixes #3143
Co-authored-by: openhands <openhands@all-hands.dev>
Python API breakage checks — ✅ PASSEDResult: ✅ PASSED |
REST API breakage checks (OpenAPI) — ✅ PASSEDResult: ✅ PASSED |
Coverage Report •
|
||||||||||||||||||||||||||||||
all-hands-bot
left a comment
There was a problem hiding this comment.
Taste Rating: 🟢 Good taste - Clean isolation of long-running operations
VERDICT: ✅ Worth merging
KEY INSIGHT: Dedicated executor pool prevents conversation step threads from starving short I/O operations - clean resource isolation with proper backward compatibility.
[RISK ASSESSMENT]
- [Overall PR]
⚠️ Risk Assessment: 🟢 LOW
Infrastructure change for thread pool isolation. Does not modify agent behavior, prompts, or tool execution logic. Conservative default (10 concurrent runs) with proper cleanup and backward compatibility. All 872 tests pass.
all-hands-bot
left a comment
There was a problem hiding this comment.
✅ QA Report: PASS
Verified the dedicated thread pool implementation works as designed. All functional tests pass.
Does this PR achieve its stated goal?
Yes. The PR successfully creates a dedicated thread pool for conversation execution with configurable size (default 10 workers), isolating long-running agent step loops from short I/O operations. Testing confirms: (1) the config field exists and works, (2) the dedicated executor is created with the correct thread count and shared across EventServices, (3) conversation.run() calls execute in the dedicated pool (verified via thread name prefix "conversation-run"), and (4) cleanup works properly on shutdown. Backward compatibility is maintained — when _run_executor is None, the code falls back to the default asyncio executor.
| Phase | Result |
|---|---|
| Environment Setup | ✅ Dependencies installed successfully |
| CI Status | ✅ All checks passing (pre-commit, SDK tests, tools tests, REST API, etc.) |
| Functional Verification | ✅ All 6 functional tests passed |
Functional Verification
Test 1: Config Field Verification
Verified the new max_concurrent_runs config field exists with correct default:
from openhands.agent_server.config import Config
# Default value
config = Config()
assert config.max_concurrent_runs == 10
# Custom value
config_custom = Config(max_concurrent_runs=5)
assert config_custom.max_concurrent_runs == 5Result: ✅ Config field exists with default value 10, accepts custom values
Test 2-5: Executor Lifecycle Verification
Created a ConversationService with custom max_concurrent_runs=7 and verified:
Before __aenter__() (initialization):
service = ConversationService(
conversations_dir=tmp_conversations,
max_concurrent_runs=7
)
assert service._run_executor is None # Not created yetAfter __aenter__() (service started):
async with service:
# Executor created
assert service._run_executor is not None
assert isinstance(service._run_executor, ThreadPoolExecutor)
assert service._run_executor._max_workers == 7 # Correct size
assert service._run_executor._thread_name_prefix == "conversation-run"
# Created a conversation
info, _ = await service.start_conversation(request)
event_service = service._event_services[info.id]
# EventService shares the same executor instance
assert event_service._run_executor is service._run_executorAfter __aexit__() (service stopped):
# Executor cleaned up
assert service._run_executor is None
assert executor_ref._shutdown == True # Properly shut downResult: ✅ Executor lifecycle managed correctly: created on aenter, shared across EventServices, cleaned up on aexit
Test 6: Executor Usage Verification
Verified that conversation.run() actually executes in the dedicated thread pool:
# Track which thread executes conversation.run()
def tracked_run():
thread_name = threading.current_thread().name
executed_threads.append(thread_name)
conversation.run = tracked_run
# Execute via the dedicated executor
loop = asyncio.get_running_loop()
await loop.run_in_executor(event_service._run_executor, conversation.run)
# Verify thread name indicates it came from our dedicated pool
thread_name = executed_threads[0]
assert "conversation-run" in thread_name
# Output: conversation-run_0Result: ✅ conversation.run() executes in dedicated pool thread (name: "conversation-run_0")
Complete Test Output
============================================================
QA Test: Dedicated Thread Pool for Conversation Execution
============================================================
============================================================
Test 1: Config field verification
============================================================
✓ Config has max_concurrent_runs field: True
✓ Default value: 10
✓ Custom value (5): 5
✅ Config field test PASSED
============================================================
Test 2-5: Executor lifecycle verification
============================================================
✓ Created ConversationService with max_concurrent_runs=7
✓ Before __aenter__: _run_executor is None: True
✓ After __aenter__: _run_executor exists: True
✓ Executor thread pool size: 7
✓ Executor thread name prefix: conversation-run
✓ Started conversation: f7d4aa8e-018d-4a45-a8d7-d1c28217828e
✓ EventService has _run_executor: True
✓ EventService shares same executor: True
✓ After __aexit__: _run_executor is None: True
✓ Executor shutdown flag: True
✅ Executor lifecycle test PASSED
============================================================
Test 6: Executor usage verification
============================================================
✓ conversation.run() executed in thread: conversation-run_0
✓ Thread name: conversation-run_0
✅ Executor usage test PASSED
============================================================
✅ ALL TESTS PASSED
============================================================
PR Test Verification
Ran the PR's new test test_event_services_share_dedicated_run_executor:
uv run pytest tests/agent_server/test_conversation_service.py::test_event_services_share_dedicated_run_executor -vResult: ✅ PASSED
Issues Found
None.
|
we have parallel tool calls inside the convo. Can it be that with this change we will have a deadlock? |
Summary
EventService.run()and the fire-and-forget run path insend_message()dispatchconversation.run()vialoop.run_in_executor(None, ...), which uses asyncio's default shared executor (capped atmin(32, cpu_count+4)threads). All 22+run_in_executorcalls inEventServiceshare this single pool. Long-running agent step loops can exhaust it, starving short I/O operations (event search, status checks, pause, etc.) and silently queuing new conversation runs with no visibility.Fix
Create a dedicated
ThreadPoolExecutorfor conversation execution, separate from the default pool used for short I/O operations:config.pymax_concurrent_runs: int = 10— configurable upper bound on simultaneous agent step threadsconversation_service.pyThreadPoolExecutor(max_workers=max_concurrent_runs)in__aenter__(); shuts it down in__aexit__(); passes to eachEventServicevia_run_executor; reads config viaget_instance()event_service.pyIsolation
conversation.run()(long-running agent loop)max_concurrent_runsthreads)search_events,count_events,get_state,pause, etc. (short I/O)Backward compatibility
_run_executorisNone(standaloneEventServicewithoutConversationService),run_in_executor(None, ...)falls back to the default poolmax_concurrent_runs=10is conservative; operators can tune via configVerification
test_event_services_share_dedicated_run_executor— verifies executor creation, sharing, cleanupFixes #3143
This PR was created by an AI agent (OpenHands) on behalf of @csmith49.
Agent Server images for this PR
• GHCR package: https://github.com/OpenHands/agent-sdk/pkgs/container/agent-server
Variants & Base Images
eclipse-temurin:17-jdknikolaik/python-nodejs:python3.13-nodejs22-slimgolang:1.21-bookwormPull (multi-arch manifest)
# Each variant is a multi-arch manifest supporting both amd64 and arm64 docker pull ghcr.io/openhands/agent-server:681c844-pythonRun
All tags pushed for this build
About Multi-Architecture Support
681c844-python) is a multi-arch manifest supporting both amd64 and arm64681c844-python-amd64) are also available if needed