Skip to content

perf: dispatch PubSub notifications concurrently#3162

Merged
csmith49 merged 2 commits into
mainfrom
fix/3151-pubsub-concurrent-dispatch
May 12, 2026
Merged

perf: dispatch PubSub notifications concurrently#3162
csmith49 merged 2 commits into
mainfrom
fix/3151-pubsub-concurrent-dispatch

Conversation

@csmith49
Copy link
Copy Markdown
Collaborator

@csmith49 csmith49 commented May 8, 2026

Summary

PubSub.__call__ awaited each subscriber sequentially. A single slow WebSocket client (congested network, slow consumer) blocked event delivery to all other subscribers on the same conversation.

Fix

Replace the sequential for/await loop with asyncio.gather over per-subscriber error-handling wrappers. Each subscriber still runs in its own try/except so one failure doesn't cancel others.

This matches the pattern already used by PubSub.close() (line 86) in the same file.

Before

for subscriber_id, subscriber in list(self._subscribers.items()):
    try:
        await subscriber(event)  # blocks on slow subscriber
    except Exception as e:
        logger.error(...)

After

async def _notify(subscriber_id, subscriber):
    try:
        await subscriber(event)
    except Exception as e:
        logger.error(...)

await asyncio.gather(*[_notify(sid, sub) for sid, sub in subscribers])

Verification

  • All 30 test_pub_sub.py tests pass (including new concurrency test)
  • All pre-commit hooks pass (ruff, pyright, etc.)
  • Added TestPubSubConcurrentDispatch::test_slow_subscriber_does_not_block_others — verifies a 200ms subscriber doesn't block a fast subscriber

Fixes #3151

This PR was created by an AI agent (OpenHands) on behalf of @csmith49.


Agent Server images for this PR

GHCR package: https://github.com/OpenHands/agent-sdk/pkgs/container/agent-server

Variants & Base Images

Variant Architectures Base Image Docs / Tags
java amd64, arm64 eclipse-temurin:17-jdk Link
python amd64, arm64 nikolaik/python-nodejs:python3.13-nodejs22-slim Link
golang amd64, arm64 golang:1.21-bookworm Link

Pull (multi-arch manifest)

# Each variant is a multi-arch manifest supporting both amd64 and arm64
docker pull ghcr.io/openhands/agent-server:6da85e4-python

Run

docker run -it --rm \
  -p 8000:8000 \
  --name agent-server-6da85e4-python \
  ghcr.io/openhands/agent-server:6da85e4-python

All tags pushed for this build

ghcr.io/openhands/agent-server:6da85e4-golang-amd64
ghcr.io/openhands/agent-server:6da85e492e269ab5b6533b266c638f480bd5ce18-golang-amd64
ghcr.io/openhands/agent-server:fix-3151-pubsub-concurrent-dispatch-golang-amd64
ghcr.io/openhands/agent-server:6da85e4-golang_tag_1.21-bookworm-amd64
ghcr.io/openhands/agent-server:6da85e4-golang-arm64
ghcr.io/openhands/agent-server:6da85e492e269ab5b6533b266c638f480bd5ce18-golang-arm64
ghcr.io/openhands/agent-server:fix-3151-pubsub-concurrent-dispatch-golang-arm64
ghcr.io/openhands/agent-server:6da85e4-golang_tag_1.21-bookworm-arm64
ghcr.io/openhands/agent-server:6da85e4-java-amd64
ghcr.io/openhands/agent-server:6da85e492e269ab5b6533b266c638f480bd5ce18-java-amd64
ghcr.io/openhands/agent-server:fix-3151-pubsub-concurrent-dispatch-java-amd64
ghcr.io/openhands/agent-server:6da85e4-eclipse-temurin_tag_17-jdk-amd64
ghcr.io/openhands/agent-server:6da85e4-java-arm64
ghcr.io/openhands/agent-server:6da85e492e269ab5b6533b266c638f480bd5ce18-java-arm64
ghcr.io/openhands/agent-server:fix-3151-pubsub-concurrent-dispatch-java-arm64
ghcr.io/openhands/agent-server:6da85e4-eclipse-temurin_tag_17-jdk-arm64
ghcr.io/openhands/agent-server:6da85e4-python-amd64
ghcr.io/openhands/agent-server:6da85e492e269ab5b6533b266c638f480bd5ce18-python-amd64
ghcr.io/openhands/agent-server:fix-3151-pubsub-concurrent-dispatch-python-amd64
ghcr.io/openhands/agent-server:6da85e4-nikolaik_s_python-nodejs_tag_python3.13-nodejs22-slim-amd64
ghcr.io/openhands/agent-server:6da85e4-python-arm64
ghcr.io/openhands/agent-server:6da85e492e269ab5b6533b266c638f480bd5ce18-python-arm64
ghcr.io/openhands/agent-server:fix-3151-pubsub-concurrent-dispatch-python-arm64
ghcr.io/openhands/agent-server:6da85e4-nikolaik_s_python-nodejs_tag_python3.13-nodejs22-slim-arm64
ghcr.io/openhands/agent-server:6da85e4-golang
ghcr.io/openhands/agent-server:6da85e492e269ab5b6533b266c638f480bd5ce18-golang
ghcr.io/openhands/agent-server:fix-3151-pubsub-concurrent-dispatch-golang
ghcr.io/openhands/agent-server:6da85e4-golang_tag_1.21-bookworm
ghcr.io/openhands/agent-server:6da85e4-java
ghcr.io/openhands/agent-server:6da85e492e269ab5b6533b266c638f480bd5ce18-java
ghcr.io/openhands/agent-server:fix-3151-pubsub-concurrent-dispatch-java
ghcr.io/openhands/agent-server:6da85e4-eclipse-temurin_tag_17-jdk
ghcr.io/openhands/agent-server:6da85e4-python
ghcr.io/openhands/agent-server:6da85e492e269ab5b6533b266c638f480bd5ce18-python
ghcr.io/openhands/agent-server:fix-3151-pubsub-concurrent-dispatch-python
ghcr.io/openhands/agent-server:6da85e4-nikolaik_s_python-nodejs_tag_python3.13-nodejs22-slim

About Multi-Architecture Support

  • Each variant tag (e.g., 6da85e4-python) is a multi-arch manifest supporting both amd64 and arm64
  • Docker automatically pulls the correct architecture for your platform
  • Individual architecture tags (e.g., 6da85e4-python-amd64) are also available if needed

PubSub.__call__ awaited each subscriber sequentially, so a single
slow WebSocket client could block event delivery to all other
subscribers on the same conversation.

Replace the sequential loop with asyncio.gather over per-subscriber
error-handling wrappers.  This matches the pattern already used by
PubSub.close() and preserves fault isolation (one subscriber's
exception does not cancel others).

Fixes #3151

Co-authored-by: openhands <openhands@all-hands.dev>
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 8, 2026

Python API breakage checks — ✅ PASSED

Result:PASSED

Action log

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 8, 2026

REST API breakage checks (OpenAPI) — ✅ PASSED

Result:PASSED

Action log

Copy link
Copy Markdown
Collaborator

@all-hands-bot all-hands-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taste Rating: 🟢 Good taste - Elegant solution that eliminates sequential blocking

Assessment:
Clean performance fix that solves a real problem (slow WebSocket clients blocking event delivery). The concurrent dispatch pattern matches what PubSub.close() already does, and error isolation is preserved. Test coverage verifies the concurrent behavior.

[RISK ASSESSMENT]

  • [Overall PR] ⚠️ Risk Assessment: 🟢 LOW

Performance improvement with no API changes. Error handling is preserved through per-subscriber wrappers. The shift from sequential to concurrent execution is the intended behavior and unlikely to break existing code (pub/sub patterns typically don't guarantee subscriber execution order).

VERDICT:
✅ Worth merging - Straightforward performance improvement with solid test coverage

KEY INSIGHT:
Eliminates head-of-line blocking by applying the same concurrent pattern already used in close() - good internal consistency.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 8, 2026

Coverage

Coverage Report •
FileStmtsMissCoverMissing
openhands-agent-server/openhands/agent_server
   pub_sub.py40295%78–79
TOTAL266801163856% 

Copy link
Copy Markdown
Collaborator

@all-hands-bot all-hands-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ QA Report: PASS

Concurrent PubSub dispatch verified through functional testing. Slow subscribers no longer block fast ones, achieving 2.33x performance improvement in tested scenarios.

Does this PR achieve its stated goal?

Yes. The PR successfully fixes the performance issue where slow WebSocket subscribers blocked all other subscribers. Through actual execution with timed subscribers, I verified that:

  1. Concurrent dispatch works: A slow subscriber (200ms) and fast subscriber (0ms) complete in ~200ms total (concurrent), not >200ms (sequential)
  2. Performance improvement is significant: Testing with 3 subscribers (0.1s, 0.15s, 0.1s delays) showed:
    • OLD sequential behavior: 0.351s (sum of delays)
    • NEW concurrent behavior: 0.151s (max of delays)
    • Speedup: 2.33x faster (57% time reduction)
  3. Error isolation is maintained: A failing subscriber doesn't prevent others from receiving events

The implementation correctly uses asyncio.gather to dispatch to all subscribers concurrently, matching the pattern already used in PubSub.close().

Phase Result
Environment Setup ✅ Dependencies installed via uv
CI Status ✅ 21 checks passing (including agent-server-tests), 7 pending (Docker builds), 0 failing
Functional Verification ✅ Concurrent dispatch verified with timing measurements
Functional Verification

Test 1: Slow Subscriber Does Not Block Fast Subscriber

Goal: Verify that a 200ms slow subscriber doesn't block a 0ms fast subscriber.

Execution:
Created a test script that instantiates PubSub with two subscribers:

  • SlowSubscriber: 200ms delay
  • FastSubscriber: 0ms delay

Ran the test:

uv run python test_pubsub_concurrent.py

Result:

=== Testing PubSub Concurrent Dispatch ===

Subscribed 2 subscribers:
  - SlowSubscriber: 0.2s delay
  - FastSubscriber: 0.0s delay

Dispatching event...
  FastSubscriber received event after 0.000s
  SlowSubscriber received event after 0.200s

Total dispatch time: 0.200s

✅ PASS: Concurrent dispatch verified
   Total time (0.200s) is close to slowest subscriber
   (0.2s), not the sum. Slow subscriber did not block fast one.

Interpretation: The total time of 0.200s proves concurrent execution. If subscribers ran sequentially, the fast subscriber would have been delayed by the slow one. Instead, both started immediately and completed independently.


Test 2: Multiple Slow Subscribers Run Concurrently

Goal: Verify concurrent dispatch scales with multiple slow subscribers.

Execution:
Created 3 subscribers, each with 0.15s delay:

Result:

=== Testing Multiple Slow Subscribers ===

Subscribed 3 subscribers, each with 0.15s delay

Dispatching event...
  Subscriber0 received event after 0.150s
  Subscriber1 received event after 0.150s
  Subscriber2 received event after 0.150s

Total dispatch time: 0.151s

✅ PASS: Multiple subscribers processed concurrently
   Total time (0.151s) ≈ single subscriber time (0.15s)
   not sum of all (0.45s)

Interpretation: If sequential, total time would be 0.45s (3 × 0.15s). Actual time of 0.151s confirms all three ran in parallel.


Test 3: Error Isolation Maintained

Goal: Verify that errors in one subscriber don't prevent others from receiving events.

Execution:
Subscribed a failing subscriber (raises RuntimeError) and a successful subscriber:

Result:

=== Testing Error Isolation ===

Subscribed 1 failing subscriber and 1 successful subscriber

Dispatching event...
[ERROR log showing: RuntimeError: FailingSubscriber intentional failure]
  SuccessfulSubscriber received event after 0.000s

✅ PASS: Error isolation maintained
   Failing subscriber was called but didn't prevent
   successful subscriber from receiving the event

Interpretation: The error was logged but didn't stop the successful subscriber from completing. This confirms the try/except wrapper in asyncio.gather provides proper fault isolation.


Test 4: Sequential vs Concurrent Performance Comparison

Goal: Demonstrate the performance improvement by comparing old (sequential) vs new (concurrent) behavior.

Execution:
Created a comparison script that simulates the old sequential dispatch alongside the new concurrent dispatch with 3 subscribers (0.1s, 0.15s, 0.1s delays):

uv run python test_sequential_comparison.py

Result:

1. OLD Sequential Dispatch (BEFORE PR):
   Each subscriber blocks the next one

    Subscriber1 completed (delay=0.1s)
    Subscriber2 completed (delay=0.15s)
    Subscriber3 completed (delay=0.1s)
   ⏱️  Total time: 0.351s
   📊 Expected: ~0.35s (sum of all delays)

2. NEW Concurrent Dispatch (AFTER PR):
   All subscribers run in parallel

    Subscriber1 completed (delay=0.1s)
    Subscriber3 completed (delay=0.1s)
    Subscriber2 completed (delay=0.15s)
   ⏱️  Total time: 0.151s
   📊 Expected: ~0.15s (max of all delays)

RESULTS:
Sequential (OLD):  0.351s
Concurrent (NEW):  0.151s
Speedup:           2.33x faster
Time saved:        0.200s (57.1% reduction)

✅ Significant performance improvement confirmed!

Interpretation: This clearly demonstrates the PR's fix:

  • Before: Subscribers processed sequentially, taking sum of all delays (0.351s)
  • After: Subscribers processed concurrently, taking only the max delay (0.151s)
  • Benefit: 2.33x speedup, meaning slow WebSocket clients no longer bottleneck event delivery to all other clients

Issues Found

None.

@csmith49 csmith49 requested a review from VascoSch92 May 11, 2026 22:15
@csmith49 csmith49 merged commit 8b3db8b into main May 12, 2026
36 checks passed
@csmith49 csmith49 deleted the fix/3151-pubsub-concurrent-dispatch branch May 12, 2026 13:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

perf: sequential PubSub dispatch blocks all subscribers on slow client

3 participants