fix(voice/#498): surface recv-loop termination as attributable PipecatRecvError#692
fix(voice/#498): surface recv-loop termination as attributable PipecatRecvError#692drewdrewthis wants to merge 3 commits into
Conversation
…tRecvError The PipecatAgentAdapter 2nd-turn hang traced to a swallowed background-task exit: when `_recv_loop` crashed (decode/transport error) or the bot closed the WebSocket, the loop only logged a warning and the inbound queue went silent, so the next `recv_audio` blocked the full `response_timeout` then raised an empty-bodied `asyncio.TimeoutError` — hiding the real cause. Diagnostic fix (the primary timeout's true cause is environment-specific and cannot be reproduced creds-free in this worktree — see PR body): - New generic `AgentStreamEndedError` (base adapter): a recv transport that TERMINATED, distinct from a transient `asyncio.TimeoutError`. `_drain` lets it propagate on the first chunk (names the real cause) and treats it as a normal end-of-turn on tail chunks. - New `PipecatRecvError(AgentStreamEndedError)`: `_recv_loop` records a crash (no longer swallows) and a `finally` marks the loop done + enqueues a sentinel; `recv_audio` fails FAST with an attributable message, chaining the real crash via `__cause__` or naming the clean close. This disambiguates the issue's hypotheses at the raise site: a dead loop now fails loud (crash cause or "bot closed the WebSocket"), while a genuinely silent agent still raises the existing `FirstChunkTimeoutError` (phase + timeout). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
WalkthroughIntroduces ChangesTerminal Stream Error Surfacing
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@python/scenario/voice/adapters/pipecat.py`:
- Line 45: The sentinel constant _RECV_LOOP_DONE and the inbound queue type
annotation currently lack proper typing. Add Final and TypeAlias to the imports
at the top of the file, then annotate the _RECV_LOOP_DONE constant as
Final[object] to indicate it should not be reassigned. Create a new type alias
called _InboundQueueItem defined as AudioChunk | object to represent the union
of items that can be in the queue. Finally, locate the queue field definition
that currently uses Any and update its type annotation to use the
_InboundQueueItem type alias instead.
In `@python/tests/voice/test_drain_timeout_surfacing.py`:
- Around line 236-375: The test adapter classes _StreamEndedOnFirstAdapter and
_StreamEndedAfterFirstChunkAdapter have unannotated class attributes
(capabilities, _SENTINEL_MSG, _CAUSE, FIRST_CHUNK), and the async test functions
test_first_chunk_stream_ended_propagates_unchanged and
test_tail_stream_ended_ends_drain_without_raising lack explicit return type
annotations. To fix this, first import ClassVar and Final from typing at the top
of the file. Then annotate capabilities with ClassVar[AdapterCapabilities],
_SENTINEL_MSG and _CAUSE with Final[str] and Final[ConnectionResetError]
respectively in _StreamEndedOnFirstAdapter, and FIRST_CHUNK with
Final[AudioChunk] in _StreamEndedAfterFirstChunkAdapter. Finally, add -> None
return type annotations to both test function definitions.
In `@python/tests/voice/test_pipecat_recv_loop_surfacing.py`:
- Around line 49-351: Add missing type annotations to ensure the test module
passes strict type checking. Update the sentinel constant at the module level to
use `Final[object]`, change the asyncio.Queue initialization in
_ScriptedFakeWebSocket.__init__ from `asyncio.Queue[Any]` to `asyncio.Queue[str
| object]`, update the __anext__ method return type from `Any` to `str`, add
parameter and return type annotations to the scripted_ws fixture to accept
`pytest.MonkeyPatch` and return `_ScriptedFakeWebSocket`, add type hints to the
inner _fake_connect function to accept `str` and `**object` and return
`_ScriptedFakeWebSocket`, and finally add explicit parameter type
`_ScriptedFakeWebSocket` to the scripted_ws argument in all test functions
(test_recv_loop_crash_surfaces_attributable_error,
test_recv_loop_clean_close_surfaces_attributable_error,
test_audio_then_close_returns_turn_then_surfaces_close, and
test_stop_event_terminates_loop_and_surfaces_recv_error) along with return type
`None` for each.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: a2f1261d-3564-444e-87f3-29768c5305d9
📒 Files selected for processing (6)
python/scenario/voice/__init__.pypython/scenario/voice/adapter.pypython/scenario/voice/adapters/__init__.pypython/scenario/voice/adapters/pipecat.pypython/tests/voice/test_drain_timeout_surfacing.pypython/tests/voice/test_pipecat_recv_loop_surfacing.py
…, tidy test - Export PipecatRecvError from scenario.voice (top-level catchable, consistent with FirstChunkTimeoutError / AgentStreamEndedError) instead of only via scenario.voice.adapters.pipecat. [hygiene-reviewer] - Drop a redundant `# noqa: F841` dereference in the tail-stream-ended test; the adapter's recv_audio already references AgentStreamEndedError, so a missing contract symbol still fails the test RED. [hygiene + test reviewers] Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Review verdict: READYReviewed by principles, hygiene, security, and test reviewers (4 agents) on the #498 diagnostic fix. Zero must-fix blockers. The two worthwhile findings were applied in Non-blocking (prose only — does not gate)
Personas / design-soundness were run against the substantive code (no blockers). The post-review deltas — The four CodeRabbit / github-code-quality review threads are resolved: the one real defect (unused |
github-code-quality flagged _SENTINEL_CRASH as an unused global. The crash path is driven by _ScriptedFakeWebSocket._crash_with (an exception instance), not a sentinel; _SENTINEL_CRASH was vestigial from an earlier design. Only _SENTINEL_CLOSE is live (close/end_stream/__anext__). Dead-code removal only. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
🐕 PR Hound review brief — assigned reviewer @0xdeafcafe Review mode: Deep Review Inspection targets (max 3, each names a file or behavior):
Author questions (optional, max 3):
|
|
Automated low-risk assessment This PR was evaluated against the repository's Low-Risk Pull Requests procedure and does not qualify as low risk.
This PR requires a manual review before merging. |
Why
Closes #498
PipecatAgentAdapteranswered the first audio turn but the scripted second turn died with a bareRuntimeError: [PipecatAgentAdapter]— no cause. Root mechanism: a background_recv_loopexit (a crash, or the bot closing the WebSocket) was swallowed (warning-only), so the inbound queue went silent and the nextrecv_audioblocked the fullresponse_timeoutbefore raising an empty-bodiedasyncio.TimeoutError. Operators got no signal about what actually failed.What changed
_recv_loopnow records a crash, and afinallymarks the loop done + enqueues a sentinel, sorecv_audiofails fast with an attributablePipecatRecvError— chaining the real crash via__cause__, or naming the clean close ("bot closed the WebSocket … hung up") — instead of a blind 60s timeout. Chose a queue sentinel + done-flag over polling because there is no pre-3.13Queue.shutdownand a waitingget()must be unblocked, not polled.AgentStreamEndedErroron the base adapter; transport-specificPipecatRecvErrorsubclass. The base_drain_agent_responseneeds a polymorphic type: surface a terminated stream loudly on the first chunk (the real 2nd-turn cause), but treat it as a normal end-of-turn on tail chunks (the peer closed after the agent finished). Kept distinct fromasyncio.TimeoutError, which stays "transient — agent may still be thinking" → the existingFirstChunkTimeoutError.FirstChunkTimeoutError(phase + configured timeout).Test plan
All creds-free and deterministic. Each new test goes RED on
main(missingAgentStreamEndedError/PipecatRecvError) and GREEN with the fix. The surfacing tests wraprecv_audio(timeout=30s)inasyncio.wait_for(5s)so a blind timeout (the regression) fails the test instead of passing slowly.tests/voice/test_pipecat_recv_loop_surfacing.py(new, 5 tests):PipecatRecvError,__cause__ isthe crash, message names it.PipecatRecvError,__cause__ is None, "closed"/"hung up".stopevent →PipecatRecvError(clean-close branch).websockets.servesocket (no mock): close after turn-1 → turn-2 raises — pins the fix against genuinewebsockets16.0 close semantics (normal close ends async-iteration cleanly), not only the mock.tests/voice/test_drain_timeout_surfacing.py(+2 tests): base drain propagatesAgentStreamEndedErrorunchanged on the first chunk (not relabeledFirstChunkTimeoutError), and ends the turn (returns collected audio) on a tail stream-end.Human verification
Backend-only, no UI surface. This change lives entirely in the voice recv-loop control path (
PipecatAgentAdapter._recv_loop/recv_audioand the base_drain_agent_response); it changes which exception surfaces when a Pipecat bot's WebSocket dies mid-conversation. There is no page, component, or rendered output to screenshot.A skeptical reviewer can independently re-verify, creds-free and without a live bot:
main— check outmain(or a fresh worktree) and run the two test files. They fail at contract resolution (PipecatRecvError/AgentStreamEndedErrordo not exist onmain); that is the regression: a terminated recv-loop yields a bareRuntimeErrorwith no cause after a fullresponse_timeout.asyncio.wait_for(5s)guard aroundrecv_audio(timeout=30s)means a blind timeout (the regression) fails the test instead of passing slowly — so green proves both "attributable" and "fast-fail".PipecatRecvErrormessages against a real in-process websocket (crash → chained__cause__; clean close → named "hung up").How I can prove I was successful
The user-observable surface here is the error an operator sees when the 2nd turn fails. Driving the real
PipecatAgentAdapteragainst a real in-process websocket (no creds, no mock) that terminates after turn 1 — captured live:Before this fix, both produced
RuntimeError: [PipecatAgentAdapter]— empty body, no cause, after a full 60sresponse_timeoutblock. The crash path additionally chains the realConnectionClosedErrorvia__cause__and logs the full traceback (recv loop crashed,exc_info=True).Backing regression suite (each FAILS on
main— ImportError/AttributeError on the contract symbols, or theasyncio.wait_for(5s)speed-guard catching a blind timeout — and PASSES with the fix, including one assertion over a real websocket). See Test plan.Anything surprising?
examples/voice/angry_customer.pyneedsOPENAI_API_KEY(+ ElevenLabs) for the user-sim TTS / judge and a live event endpoint, and the actual trigger (which hypothesis fired in the original run) is environment- and timing-specific. This PR delivers the diagnostic half in full: the next time the 2nd turn fails, the error will name the cause instead of an emptyRuntimeError. Hypotheses to confirm once an instrumented run is available:PipecatRecvError(this was the swallowed path).FirstChunkTimeoutError(loop alive, agent silent).response_timeouttoo short — unlikely (default is already 60s); would also surface asFirstChunkTimeoutErrornaming the configured value.recv_audio" shape exists in sibling adapters (openai_realtime.py,elevenlabs.py,gemini_live.py,livekit.py). Worth a sweep to adoptAgentStreamEndedErrorthere.