Skip to content

fix(voice/#498): surface recv-loop termination as attributable PipecatRecvError#692

Open
drewdrewthis wants to merge 3 commits into
mainfrom
fix/498-pipecat-timeout
Open

fix(voice/#498): surface recv-loop termination as attributable PipecatRecvError#692
drewdrewthis wants to merge 3 commits into
mainfrom
fix/498-pipecat-timeout

Conversation

@drewdrewthis

@drewdrewthis drewdrewthis commented Jun 20, 2026

Copy link
Copy Markdown
Collaborator

Why

Closes #498

PipecatAgentAdapter answered the first audio turn but the scripted second turn died with a bare RuntimeError: [PipecatAgentAdapter] — no cause. Root mechanism: a background _recv_loop exit (a crash, or the bot closing the WebSocket) was swallowed (warning-only), so the inbound queue went silent and the next recv_audio blocked the full response_timeout before raising an empty-bodied asyncio.TimeoutError. Operators got no signal about what actually failed.

What changed

  • Stopped swallowing recv-loop death. _recv_loop now records a crash, and a finally marks the loop done + enqueues a sentinel, so recv_audio fails fast with an attributable PipecatRecvError — chaining the real crash via __cause__, or naming the clean close ("bot closed the WebSocket … hung up") — instead of a blind 60s timeout. Chose a queue sentinel + done-flag over polling because there is no pre-3.13 Queue.shutdown and a waiting get() must be unblocked, not polled.
  • Generic AgentStreamEndedError on the base adapter; transport-specific PipecatRecvError subclass. The base _drain_agent_response needs a polymorphic type: surface a terminated stream loudly on the first chunk (the real 2nd-turn cause), but treat it as a normal end-of-turn on tail chunks (the peer closed after the agent finished). Kept distinct from asyncio.TimeoutError, which stays "transient — agent may still be thinking" → the existing FirstChunkTimeoutError.
  • Net effect: the two PipecatAgentAdapter times out on second turn — TimeoutError with empty exception body hides root cause #498 hypotheses are disambiguated at the raise site — a dead loop fails loud with its cause; a genuinely silent agent still raises FirstChunkTimeoutError (phase + configured timeout).

Test plan

All creds-free and deterministic. Each new test goes RED on main (missing AgentStreamEndedError / PipecatRecvError) and GREEN with the fix. The surfacing tests wrap recv_audio(timeout=30s) in asyncio.wait_for(5s) so a blind timeout (the regression) fails the test instead of passing slowly.

  • tests/voice/test_pipecat_recv_loop_surfacing.py (new, 5 tests):
    • recv-loop crash → PipecatRecvError, __cause__ is the crash, message names it.
    • clean WS close (no audio) → PipecatRecvError, __cause__ is None, "closed"/"hung up".
    • audio-then-close → turn-1 audio returns; turn-2 raises (the PipecatAgentAdapter times out on second turn — TimeoutError with empty exception body hides root cause #498 shape); turn-3 hits the fail-fast path.
    • stop event → PipecatRecvError (clean-close branch).
    • real websockets.serve socket (no mock): close after turn-1 → turn-2 raises — pins the fix against genuine websockets 16.0 close semantics (normal close ends async-iteration cleanly), not only the mock.
  • tests/voice/test_drain_timeout_surfacing.py (+2 tests): base drain propagates AgentStreamEndedError unchanged on the first chunk (not relabeled FirstChunkTimeoutError), and ends the turn (returns collected audio) on a tail stream-end.
cd python
uv run pytest tests/voice/test_pipecat_recv_loop_surfacing.py tests/voice/test_drain_timeout_surfacing.py -q   # 9 passed
uv run pyright scenario/voice/adapters/pipecat.py scenario/voice/adapter.py                                    # 0 errors
# full non-e2e voice suite: 421 passed (only test_agent_wait_false.py excluded — a pre-existing
# interactive-input hang, confirmed identical on main, unrelated to this change)

Human verification

Backend-only, no UI surface. This change lives entirely in the voice recv-loop control path (PipecatAgentAdapter._recv_loop / recv_audio and the base _drain_agent_response); it changes which exception surfaces when a Pipecat bot's WebSocket dies mid-conversation. There is no page, component, or rendered output to screenshot.

A skeptical reviewer can independently re-verify, creds-free and without a live bot:

  1. Reproduce the PipecatAgentAdapter times out on second turn — TimeoutError with empty exception body hides root cause #498 shape on main — check out main (or a fresh worktree) and run the two test files. They fail at contract resolution (PipecatRecvError / AgentStreamEndedError do not exist on main); that is the regression: a terminated recv-loop yields a bare RuntimeError with no cause after a full response_timeout.
  2. Confirm the fix on this branch:
    cd python
    uv run pytest tests/voice/test_pipecat_recv_loop_surfacing.py tests/voice/test_drain_timeout_surfacing.py -q   # 9 passed
    uv run pyright scenario/voice/adapters/pipecat.py scenario/voice/adapter.py                                    # 0 errors
    Expect 9 passed. The asyncio.wait_for(5s) guard around recv_audio(timeout=30s) means a blind timeout (the regression) fails the test instead of passing slowly — so green proves both "attributable" and "fast-fail".
  3. See the operator-facing surface directly — the proof section below captures the live PipecatRecvError messages against a real in-process websocket (crash → chained __cause__; clean close → named "hung up").

How I can prove I was successful

The user-observable surface here is the error an operator sees when the 2nd turn fails. Driving the real PipecatAgentAdapter against a real in-process websocket (no creds, no mock) that terminates after turn 1 — captured live:

=== AFTER: bot closes WS after turn 1 (clean "hung up") ===
  adapter raises : PipecatRecvError
  message        : pipecat bot closed the WebSocket before producing audio — the bot hung up or its pipeline stopped without responding
  __cause__      : None
  operator sees  : RuntimeError: [PipecatAgentAdapter] pipecat bot closed the WebSocket before producing audio — the bot hung up or its pipeline stopped without responding

=== AFTER: recv-loop transport crash after turn 1 (WS 1011) ===
  adapter raises : PipecatRecvError
  message        : pipecat recv loop crashed before the agent produced audio: ConnectionClosedError: received 1011 (internal error) pipeline error; ...
  __cause__      : ConnectionClosedError(Close(code=1011, reason='pipeline error'), ...)
  operator sees  : RuntimeError: [PipecatAgentAdapter] pipecat recv loop crashed before the agent produced audio: ConnectionClosedError: ...

Before this fix, both produced RuntimeError: [PipecatAgentAdapter] — empty body, no cause, after a full 60s response_timeout block. The crash path additionally chains the real ConnectionClosedError via __cause__ and logs the full traceback (recv loop crashed, exc_info=True).

Backing regression suite (each FAILS on main — ImportError/AttributeError on the contract symbols, or the asyncio.wait_for(5s) speed-guard catching a blind timeout — and PASSES with the fix, including one assertion over a real websocket). See Test plan.

Anything surprising?

  • e2e-verification GAP (flagged, not hidden). The primary second-turn timeout from the report cannot be reproduced creds-free here — examples/voice/angry_customer.py needs OPENAI_API_KEY (+ ElevenLabs) for the user-sim TTS / judge and a live event endpoint, and the actual trigger (which hypothesis fired in the original run) is environment- and timing-specific. This PR delivers the diagnostic half in full: the next time the 2nd turn fails, the error will name the cause instead of an empty RuntimeError. Hypotheses to confirm once an instrumented run is available:
    1. Bot pipeline crash / WS close after turn 1 — now surfaced directly as PipecatRecvError (this was the swallowed path).
    2. VAD / end-of-speech not firing on synthetic TTS audio → bot never generates turn 2 → surfaces as FirstChunkTimeoutError (loop alive, agent silent).
    3. response_timeout too short — unlikely (default is already 60s); would also surface as FirstChunkTimeoutError naming the configured value.
  • Follow-up: the same "background recv loop feeds a queue; a dead loop silently starves recv_audio" shape exists in sibling adapters (openai_realtime.py, elevenlabs.py, gemini_live.py, livekit.py). Worth a sweep to adopt AgentStreamEndedError there.

…tRecvError

The PipecatAgentAdapter 2nd-turn hang traced to a swallowed background-task
exit: when `_recv_loop` crashed (decode/transport error) or the bot closed the
WebSocket, the loop only logged a warning and the inbound queue went silent, so
the next `recv_audio` blocked the full `response_timeout` then raised an
empty-bodied `asyncio.TimeoutError` — hiding the real cause.

Diagnostic fix (the primary timeout's true cause is environment-specific and
cannot be reproduced creds-free in this worktree — see PR body):
- New generic `AgentStreamEndedError` (base adapter): a recv transport that
  TERMINATED, distinct from a transient `asyncio.TimeoutError`. `_drain` lets it
  propagate on the first chunk (names the real cause) and treats it as a normal
  end-of-turn on tail chunks.
- New `PipecatRecvError(AgentStreamEndedError)`: `_recv_loop` records a crash
  (no longer swallows) and a `finally` marks the loop done + enqueues a sentinel;
  `recv_audio` fails FAST with an attributable message, chaining the real crash
  via `__cause__` or naming the clean close.

This disambiguates the issue's hypotheses at the raise site: a dead loop now
fails loud (crash cause or "bot closed the WebSocket"), while a genuinely silent
agent still raises the existing `FirstChunkTimeoutError` (phase + timeout).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@drewdrewthis drewdrewthis self-assigned this Jun 20, 2026
@coderabbitai

coderabbitai Bot commented Jun 20, 2026

Copy link
Copy Markdown

Review Change Stack

Walkthrough

Introduces AgentStreamEndedError as a new base exception for terminal agent-audio transport shutdowns and PipecatRecvError as its Pipecat-specific subclass. The PipecatAgentAdapter receive loop is reworked to push a sentinel into the inbound queue on termination (crash or clean close), enabling recv_audio() to fail fast with an attributed error. The base _drain_agent_response is updated to propagate terminal errors before the first chunk and absorb them after. Both layers gain tests.

Changes

Terminal Stream Error Surfacing

Layer / File(s) Summary
AgentStreamEndedError and PipecatRecvError definitions and public exports
python/scenario/voice/adapter.py, python/scenario/voice/adapters/pipecat.py, python/scenario/voice/__init__.py, python/scenario/voice/adapters/__init__.py
AgentStreamEndedError is defined in adapter.py to represent a terminal transport end; PipecatRecvError subclasses it in pipecat.py to name recv-loop–specific failures. Both types are imported and added to __all__ in their respective package initializers.
PipecatAgentAdapter sentinel-based fail-fast recv loop
python/scenario/voice/adapters/pipecat.py
PipecatAgentAdapter.__init__ gains _recv_loop_done, _recv_loop_exc, and an inbound queue typed to Any to accommodate a _RECV_LOOP_DONE sentinel. connect() resets those fields per session. _recv_loop now records crash exceptions into _recv_loop_exc, flushes buffered audio on stop, and in finally sets _recv_loop_done = True and enqueues the sentinel. recv_audio() is reworked to raise PipecatRecvError immediately when the sentinel is dequeued or the loop is already done with an empty queue, via a _recv_loop_ended_error() helper that chains the stored exception.
VoiceAgentAdapter._drain_agent_response terminal error handling
python/scenario/voice/adapter.py
The first-chunk timeout path is annotated to explicitly not catch AgentStreamEndedError, preserving the terminal cause for diagnostics. A new except AgentStreamEndedError branch in the post-first-chunk loop breaks out and returns the accumulated audio as a normal end-of-turn.
Tests for _drain_agent_response terminal stream behavior
python/tests/voice/test_drain_timeout_surfacing.py
Two adapter stubs and two async tests are added: one asserts AgentStreamEndedError propagates with __cause__ intact and is not relabeled as FirstChunkTimeoutError when the transport ends before the first chunk; the other asserts _drain_agent_response returns collected audio normally when the transport ends after the first chunk.
Tests for PipecatAgentAdapter recv-loop termination surfacing
python/tests/voice/test_pipecat_recv_loop_surfacing.py
A fake WebSocket, Twilio frame helpers, and a monkeypatch fixture are introduced. Five async tests cover recv-loop crash attribution, clean peer-close surfacing, multi-call behavior after shutdown, stop-event termination, and a real in-process WebSocket integration scenario, all guarded by asyncio.wait_for to verify fail-fast behavior.

Suggested labels

ai-reviewed, prove-it-clean

Suggested reviewers

  • sergioestebance
  • Aryansharma28
  • rogeriochaves

Poem

🐇 A rabbit hops into the queue,
finds a sentinel waiting — brand new!
No more silent timeout despair,
PipecatRecvError loud and clear,
the cause is chained, the truth shines through! 🎉

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 42.86% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and specifically summarizes the main change: introducing PipecatRecvError as an attributable exception to surface recv-loop termination instead of generic timeouts.
Linked Issues check ✅ Passed The PR fully addresses issue #498 by introducing AgentStreamEndedError and PipecatRecvError to surface recv-loop termination with attributable causes instead of empty TimeoutError exceptions.
Out of Scope Changes check ✅ Passed All changes are directly scoped to #498: new exception types, recv-loop termination detection, fast-fail semantics, and comprehensive tests validating the fix.
Description check ✅ Passed The pull request description clearly and comprehensively explains the problem, changes, testing approach, and verification strategy, directly addressing issue #498 about unhelpful error messages on second audio turns.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/498-pipecat-timeout

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

Comment thread python/tests/voice/test_drain_timeout_surfacing.py Fixed
Comment thread python/tests/voice/test_pipecat_recv_loop_surfacing.py Fixed

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@python/scenario/voice/adapters/pipecat.py`:
- Line 45: The sentinel constant _RECV_LOOP_DONE and the inbound queue type
annotation currently lack proper typing. Add Final and TypeAlias to the imports
at the top of the file, then annotate the _RECV_LOOP_DONE constant as
Final[object] to indicate it should not be reassigned. Create a new type alias
called _InboundQueueItem defined as AudioChunk | object to represent the union
of items that can be in the queue. Finally, locate the queue field definition
that currently uses Any and update its type annotation to use the
_InboundQueueItem type alias instead.

In `@python/tests/voice/test_drain_timeout_surfacing.py`:
- Around line 236-375: The test adapter classes _StreamEndedOnFirstAdapter and
_StreamEndedAfterFirstChunkAdapter have unannotated class attributes
(capabilities, _SENTINEL_MSG, _CAUSE, FIRST_CHUNK), and the async test functions
test_first_chunk_stream_ended_propagates_unchanged and
test_tail_stream_ended_ends_drain_without_raising lack explicit return type
annotations. To fix this, first import ClassVar and Final from typing at the top
of the file. Then annotate capabilities with ClassVar[AdapterCapabilities],
_SENTINEL_MSG and _CAUSE with Final[str] and Final[ConnectionResetError]
respectively in _StreamEndedOnFirstAdapter, and FIRST_CHUNK with
Final[AudioChunk] in _StreamEndedAfterFirstChunkAdapter. Finally, add -> None
return type annotations to both test function definitions.

In `@python/tests/voice/test_pipecat_recv_loop_surfacing.py`:
- Around line 49-351: Add missing type annotations to ensure the test module
passes strict type checking. Update the sentinel constant at the module level to
use `Final[object]`, change the asyncio.Queue initialization in
_ScriptedFakeWebSocket.__init__ from `asyncio.Queue[Any]` to `asyncio.Queue[str
| object]`, update the __anext__ method return type from `Any` to `str`, add
parameter and return type annotations to the scripted_ws fixture to accept
`pytest.MonkeyPatch` and return `_ScriptedFakeWebSocket`, add type hints to the
inner _fake_connect function to accept `str` and `**object` and return
`_ScriptedFakeWebSocket`, and finally add explicit parameter type
`_ScriptedFakeWebSocket` to the scripted_ws argument in all test functions
(test_recv_loop_crash_surfaces_attributable_error,
test_recv_loop_clean_close_surfaces_attributable_error,
test_audio_then_close_returns_turn_then_surfaces_close, and
test_stop_event_terminates_loop_and_surfaces_recv_error) along with return type
`None` for each.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: a2f1261d-3564-444e-87f3-29768c5305d9

📥 Commits

Reviewing files that changed from the base of the PR and between b819849 and 35042cc.

📒 Files selected for processing (6)
  • python/scenario/voice/__init__.py
  • python/scenario/voice/adapter.py
  • python/scenario/voice/adapters/__init__.py
  • python/scenario/voice/adapters/pipecat.py
  • python/tests/voice/test_drain_timeout_surfacing.py
  • python/tests/voice/test_pipecat_recv_loop_surfacing.py

Comment thread python/scenario/voice/adapters/pipecat.py
Comment thread python/tests/voice/test_drain_timeout_surfacing.py
Comment thread python/tests/voice/test_pipecat_recv_loop_surfacing.py
…, tidy test

- Export PipecatRecvError from scenario.voice (top-level catchable, consistent
  with FirstChunkTimeoutError / AgentStreamEndedError) instead of only via
  scenario.voice.adapters.pipecat. [hygiene-reviewer]
- Drop a redundant `# noqa: F841` dereference in the tail-stream-ended test; the
  adapter's recv_audio already references AgentStreamEndedError, so a missing
  contract symbol still fails the test RED. [hygiene + test reviewers]

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@drewdrewthis

drewdrewthis commented Jun 20, 2026

Copy link
Copy Markdown
Collaborator Author

Review verdict: READY

Reviewed by principles, hygiene, security, and test reviewers (4 agents) on the #498 diagnostic fix. Zero must-fix blockers. The two worthwhile findings were applied in 98041112 (export PipecatRecvError top-level from scenario.voice; drop a redundant test dereference). Correctness of the cancellation / finally-sentinel / disconnect() paths was explicitly traced and confirmed by the principles reviewer.

Non-blocking (prose only — does not gate)

  • [security] Forward-defense. PipecatRecvError interpolates the transport exception text and chains __cause__ — a future caller adding auth headers to connect(), or a bot echoing a token in a WebSocket close reason, could surface it in logs/errors. Low risk today: connect() passes no auth and a close reason is protocol-bounded to 123 bytes, and surfacing the cause is the diagnostic point of this PR. Scrub/bound the cause text if auth headers are ever added to connect().
  • [hygiene][followup] Sibling adapters (websocket.py, openai_realtime.py, elevenlabs.py, gemini_live.py, livekit.py) share the "background recv loop feeds a queue; a dead loop starves recv_audio" shape and could adopt AgentStreamEndedError. Noted in the PR body as a follow-up sweep.
  • [hygiene][followup] Top-level scenario package does not re-export the voice error types (FirstChunkTimeoutError/AgentStreamEndedError/PipecatRecvError) — pre-existing, broader than this PR.
  • [principles] Naming nit. _recv_loop_ended_error() is a factory (returns, never raises); _recv_loop_ended_exc reads better at raise ... from. Cosmetic.
  • [test] Concurrent recv_audio callers aren't tested (single-consumer is the current contract; relies on single-threaded asyncio ordering of the flag+sentinel). Follow-up only if concurrent consumers are ever added.

Personas / design-soundness were run against the substantive code (no blockers). The post-review deltas — 98041112 (export + test-tidy) and fff6f1a0 (remove unused _SENTINEL_CRASH test sentinel; a github-code-quality finding) — are cosmetic/test-only with no production-logic change, so personas/design-soundness/drift were not re-run. The fff6f1a0 delta was re-reviewed by principles, hygiene, security, and test on the scoped diff: all clean, zero concerns (grep confirmed zero remaining _SENTINEL_CRASH references; no assertion/fixture depended on it).

The four CodeRabbit / github-code-quality review threads are resolved: the one real defect (unused _SENTINEL_CRASH) was fixed in fff6f1a0; the three pyright --strict annotation suggestions were declined with rationale — this repo enforces typeCheckingMode: "standard" (pyrightconfig.json, run as uv run pyright .), not --strict, and the suggested Anyobject/str narrowing would break reportReturnType (the queue value is returned as AudioChunk after the sentinel check).

github-code-quality flagged _SENTINEL_CRASH as an unused global. The crash
path is driven by _ScriptedFakeWebSocket._crash_with (an exception instance),
not a sentinel; _SENTINEL_CRASH was vestigial from an earlier design. Only
_SENTINEL_CLOSE is live (close/end_stream/__anext__). Dead-code removal only.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@langwatch-agent

Copy link
Copy Markdown
Contributor

🐕 PR Hound review brief — assigned reviewer @0xdeafcafe

Review mode: Deep Review
Scariest prod risk: concurrency in the recv-loop teardown. The fix relies on _recv_loop's finally setting a done-flag and enqueuing a sentinel to unblock a waiting recv_audio get(). If the sentinel is enqueued but a concurrent consumer races the done-flag, or the sentinel is consumed by turn N and turn N+1 blocks again, recv_audio could still hang the full timeout or raise on a live connection.

Inspection targets (max 3, each names a file or behavior):

  • [Must Check] adapters/pipecat.py _recv_loop finally + sentinel/done-flag — verify the sentinel unblocks exactly one waiter and that after the loop is done every subsequent recv_audio fails fast (does not consume real audio as sentinel, and does not re-block on an empty queue).
  • [Must Check] adapter.py _drain_agent_response — the new base-class AgentStreamEndedError handling changes behavior for ALL voice adapters, not just Pipecat: on first chunk it propagates, on tail chunks it ends the turn. Confirm no other adapter's normal end-of-turn now raises where it previously returned audio.
  • [Probably Fine] PipecatRecvError chaining via __cause__ for the crash vs clean-close branches; test-covered.

Author questions (optional, max 3):

  • Can the enqueued sentinel be mistaken for a real audio chunk by recv_audio, and is one sentinel enough when multiple turns await the same queue after loop death?

@langwatch-agent langwatch-agent added the review: deep PR Hound review mode label Jul 3, 2026
@github-actions

github-actions Bot commented Jul 3, 2026

Copy link
Copy Markdown
Contributor

Automated low-risk assessment

This PR was evaluated against the repository's Low-Risk Pull Requests procedure and does not qualify as low risk.

The PR changes transport error handling for the Pipecat WebSocket adapter and introduces new exception types that alter how external-transport failures are surfaced to callers. Because this affects an integration with an external/third-party transport (the Pipecat WebSocket) and changes runtime behavior visible to operators, it does not meet the policy's "no changes to integrations with third‑party systems or external APIs" requirement for low-risk changes.

This PR requires a manual review before merging.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

hound-checked Triaged by the pr-hound agent at the current head SHA review: deep PR Hound review mode

Projects

None yet

Development

Successfully merging this pull request may close these issues.

PipecatAgentAdapter times out on second turn — TimeoutError with empty exception body hides root cause

3 participants