perf: dispatch PubSub notifications concurrently#3162
Conversation
PubSub.__call__ awaited each subscriber sequentially, so a single slow WebSocket client could block event delivery to all other subscribers on the same conversation. Replace the sequential loop with asyncio.gather over per-subscriber error-handling wrappers. This matches the pattern already used by PubSub.close() and preserves fault isolation (one subscriber's exception does not cancel others). Fixes #3151 Co-authored-by: openhands <openhands@all-hands.dev>
Python API breakage checks — ✅ PASSEDResult: ✅ PASSED |
REST API breakage checks (OpenAPI) — ✅ PASSEDResult: ✅ PASSED |
all-hands-bot
left a comment
There was a problem hiding this comment.
Taste Rating: 🟢 Good taste - Elegant solution that eliminates sequential blocking
Assessment:
Clean performance fix that solves a real problem (slow WebSocket clients blocking event delivery). The concurrent dispatch pattern matches what PubSub.close() already does, and error isolation is preserved. Test coverage verifies the concurrent behavior.
[RISK ASSESSMENT]
- [Overall PR]
⚠️ Risk Assessment: 🟢 LOW
Performance improvement with no API changes. Error handling is preserved through per-subscriber wrappers. The shift from sequential to concurrent execution is the intended behavior and unlikely to break existing code (pub/sub patterns typically don't guarantee subscriber execution order).
VERDICT:
✅ Worth merging - Straightforward performance improvement with solid test coverage
KEY INSIGHT:
Eliminates head-of-line blocking by applying the same concurrent pattern already used in close() - good internal consistency.
all-hands-bot
left a comment
There was a problem hiding this comment.
✅ QA Report: PASS
Concurrent PubSub dispatch verified through functional testing. Slow subscribers no longer block fast ones, achieving 2.33x performance improvement in tested scenarios.
Does this PR achieve its stated goal?
Yes. The PR successfully fixes the performance issue where slow WebSocket subscribers blocked all other subscribers. Through actual execution with timed subscribers, I verified that:
- Concurrent dispatch works: A slow subscriber (200ms) and fast subscriber (0ms) complete in ~200ms total (concurrent), not >200ms (sequential)
- Performance improvement is significant: Testing with 3 subscribers (0.1s, 0.15s, 0.1s delays) showed:
- OLD sequential behavior: 0.351s (sum of delays)
- NEW concurrent behavior: 0.151s (max of delays)
- Speedup: 2.33x faster (57% time reduction)
- Error isolation is maintained: A failing subscriber doesn't prevent others from receiving events
The implementation correctly uses asyncio.gather to dispatch to all subscribers concurrently, matching the pattern already used in PubSub.close().
| Phase | Result |
|---|---|
| Environment Setup | ✅ Dependencies installed via uv |
| CI Status | ✅ 21 checks passing (including agent-server-tests), 7 pending (Docker builds), 0 failing |
| Functional Verification | ✅ Concurrent dispatch verified with timing measurements |
Functional Verification
Test 1: Slow Subscriber Does Not Block Fast Subscriber
Goal: Verify that a 200ms slow subscriber doesn't block a 0ms fast subscriber.
Execution:
Created a test script that instantiates PubSub with two subscribers:
- SlowSubscriber: 200ms delay
- FastSubscriber: 0ms delay
Ran the test:
uv run python test_pubsub_concurrent.pyResult:
=== Testing PubSub Concurrent Dispatch ===
Subscribed 2 subscribers:
- SlowSubscriber: 0.2s delay
- FastSubscriber: 0.0s delay
Dispatching event...
FastSubscriber received event after 0.000s
SlowSubscriber received event after 0.200s
Total dispatch time: 0.200s
✅ PASS: Concurrent dispatch verified
Total time (0.200s) is close to slowest subscriber
(0.2s), not the sum. Slow subscriber did not block fast one.
Interpretation: The total time of 0.200s proves concurrent execution. If subscribers ran sequentially, the fast subscriber would have been delayed by the slow one. Instead, both started immediately and completed independently.
Test 2: Multiple Slow Subscribers Run Concurrently
Goal: Verify concurrent dispatch scales with multiple slow subscribers.
Execution:
Created 3 subscribers, each with 0.15s delay:
Result:
=== Testing Multiple Slow Subscribers ===
Subscribed 3 subscribers, each with 0.15s delay
Dispatching event...
Subscriber0 received event after 0.150s
Subscriber1 received event after 0.150s
Subscriber2 received event after 0.150s
Total dispatch time: 0.151s
✅ PASS: Multiple subscribers processed concurrently
Total time (0.151s) ≈ single subscriber time (0.15s)
not sum of all (0.45s)
Interpretation: If sequential, total time would be 0.45s (3 × 0.15s). Actual time of 0.151s confirms all three ran in parallel.
Test 3: Error Isolation Maintained
Goal: Verify that errors in one subscriber don't prevent others from receiving events.
Execution:
Subscribed a failing subscriber (raises RuntimeError) and a successful subscriber:
Result:
=== Testing Error Isolation ===
Subscribed 1 failing subscriber and 1 successful subscriber
Dispatching event...
[ERROR log showing: RuntimeError: FailingSubscriber intentional failure]
SuccessfulSubscriber received event after 0.000s
✅ PASS: Error isolation maintained
Failing subscriber was called but didn't prevent
successful subscriber from receiving the event
Interpretation: The error was logged but didn't stop the successful subscriber from completing. This confirms the try/except wrapper in asyncio.gather provides proper fault isolation.
Test 4: Sequential vs Concurrent Performance Comparison
Goal: Demonstrate the performance improvement by comparing old (sequential) vs new (concurrent) behavior.
Execution:
Created a comparison script that simulates the old sequential dispatch alongside the new concurrent dispatch with 3 subscribers (0.1s, 0.15s, 0.1s delays):
uv run python test_sequential_comparison.pyResult:
1. OLD Sequential Dispatch (BEFORE PR):
Each subscriber blocks the next one
Subscriber1 completed (delay=0.1s)
Subscriber2 completed (delay=0.15s)
Subscriber3 completed (delay=0.1s)
⏱️ Total time: 0.351s
📊 Expected: ~0.35s (sum of all delays)
2. NEW Concurrent Dispatch (AFTER PR):
All subscribers run in parallel
Subscriber1 completed (delay=0.1s)
Subscriber3 completed (delay=0.1s)
Subscriber2 completed (delay=0.15s)
⏱️ Total time: 0.151s
📊 Expected: ~0.15s (max of all delays)
RESULTS:
Sequential (OLD): 0.351s
Concurrent (NEW): 0.151s
Speedup: 2.33x faster
Time saved: 0.200s (57.1% reduction)
✅ Significant performance improvement confirmed!
Interpretation: This clearly demonstrates the PR's fix:
- Before: Subscribers processed sequentially, taking sum of all delays (0.351s)
- After: Subscribers processed concurrently, taking only the max delay (0.151s)
- Benefit: 2.33x speedup, meaning slow WebSocket clients no longer bottleneck event delivery to all other clients
Issues Found
None.
Summary
PubSub.__call__awaited each subscriber sequentially. A single slow WebSocket client (congested network, slow consumer) blocked event delivery to all other subscribers on the same conversation.Fix
Replace the sequential
for/awaitloop withasyncio.gatherover per-subscriber error-handling wrappers. Each subscriber still runs in its own try/except so one failure doesn't cancel others.This matches the pattern already used by
PubSub.close()(line 86) in the same file.Before
After
Verification
test_pub_sub.pytests pass (including new concurrency test)TestPubSubConcurrentDispatch::test_slow_subscriber_does_not_block_others— verifies a 200ms subscriber doesn't block a fast subscriberFixes #3151
This PR was created by an AI agent (OpenHands) on behalf of @csmith49.
Agent Server images for this PR
• GHCR package: https://github.com/OpenHands/agent-sdk/pkgs/container/agent-server
Variants & Base Images
eclipse-temurin:17-jdknikolaik/python-nodejs:python3.13-nodejs22-slimgolang:1.21-bookwormPull (multi-arch manifest)
# Each variant is a multi-arch manifest supporting both amd64 and arm64 docker pull ghcr.io/openhands/agent-server:6da85e4-pythonRun
All tags pushed for this build
About Multi-Architecture Support
6da85e4-python) is a multi-arch manifest supporting both amd64 and arm646da85e4-python-amd64) are also available if needed