From 6c841c36f612a59d88823355f5abcc458f351a3f Mon Sep 17 00:00:00 2001 From: openhands Date: Thu, 7 May 2026 16:32:37 +0000 Subject: [PATCH 01/20] fix(remote-conversation): treat WS FINISHED as a hint, not authoritative MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The WS-driven termination path in `RemoteConversation._wait_for_run_completion` returned from `run()` on the first WebSocket terminal status. That raced the server-side stop-hook eval: when an `agent.step()` set `execution_status = FINISHED` and the lock was released at the end of one run-loop iteration, the next iteration could still flip status back to RUNNING via a stop hook that denied stopping (`hook_processor.run_stop()` returning rc=2). Clients observing the intermediate FINISHED returned, which then tore down the agent-server pod via the benchmark's `workspace_keepalive` exit. The agent never got to consume its iteration budget when hooks blocked. Empirical evidence (programbench retry-16, run 25497962453): - conv `dd86d184` (zip-password-finder): hook block at 13:30:10.936, agent did just 1 retry action (file_editor view) before pod teardown — far short of `max_iteration_per_run=200`. - conv `e4b6892e` (zoxide): hook block at 13:33:52.082, 0 retry actions — conversation died on the status revert event itself. - in-memory `history` snapshots ended at the FINISHED state update; on-disk `/workspace/conversations/.../events` had 5 extra events captured by the persist callback during the brief post-`run()` window. Fix: WS terminal status is a fast wakeup hint only — except for ERROR/STUCK, which are not subject to hook reverts and still terminate immediately. For FINISHED we fall through to the existing REST-poll path, which already requires `TERMINAL_POLL_THRESHOLD` consecutive terminal polls before returning. Any FINISHED→RUNNING revert seen on a REST poll naturally resets the consecutive-terminal counter, and we drain the WS hint queue at that point so we don't busy-spin on stale FINISHED notifications. Latency cost on the happy path (no hook block, default `poll_interval=1.0s`, `TERMINAL_POLL_THRESHOLD=3`): ~2s additional vs. the previous immediate- return on WS terminal — acceptable for benchmark and CLI workloads. Adds two regression tests: - `test_remote_conversation_run_ws_finished_is_only_a_hint_not_terminal`: seeds the WS queue with FINISHED while REST shows the post-hook RUNNING revert; asserts `run()` polls past the race window before confirming. - `test_remote_conversation_run_ws_error_still_terminates_immediately`: ensures the WS fast-path is preserved for ERROR (not subject to hook reverts). Refactors the queue-drain logic into `_drain_terminal_status_queue` and isolates the WS-immediate-terminal predicate into `_immediate_terminal_statuses`, both with docstrings explaining why FINISHED is excluded. Co-authored-by: openhands --- .../conversation/impl/remote_conversation.py | 84 ++++++++--- .../remote/test_remote_conversation.py | 133 ++++++++++++++++++ 2 files changed, 200 insertions(+), 17 deletions(-) diff --git a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py index 663e77689b..9c11e37e26 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py @@ -994,11 +994,7 @@ def run( """ # Drain any stale terminal status events from previous runs. # This prevents stale events from causing early returns. - while True: - try: - self._terminal_status_queue.get_nowait() - except Empty: - break + self._drain_terminal_status_queue() # Trigger a run on the server using the dedicated run endpoint. # Let the server tell us if it's already running (409), avoiding an extra GET. @@ -1070,21 +1066,38 @@ def _wait_for_run_completion( ) # Wait for either: - # 1. WebSocket delivers terminal status event (preferred) - # 2. Poll interval expires (fallback - check status via REST) + # 1. WebSocket delivers terminal status event (fast wakeup hint) + # 2. Poll interval expires (fall through to REST poll) + # + # NOTE: A WS terminal status is treated as a wakeup *hint*, not as + # an authoritative termination signal. Stop hooks (see + # ``LocalConversation.run`` in ``local_conversation.py``) can flip + # FINISHED back to RUNNING within the same server-side iteration + # to give the agent another step. If we returned on the first + # FINISHED notification we'd race the server's hook eval and + # tear down the conversation while the agent still has iteration + # budget left. ERROR/STUCK *do* terminate immediately — they are + # not subject to hook-block reverts. For FINISHED we fall through + # to the REST poll path below, which already requires + # ``TERMINAL_POLL_THRESHOLD`` consecutive terminal polls before + # returning. Any FINISHED→RUNNING revert (stop hook denied + # stopping) naturally resets the counter on the next poll. try: ws_status = self._terminal_status_queue.get(timeout=poll_interval) - # Handle ERROR/STUCK states - raises ConversationRunError + # Raises ConversationRunError on ERROR/STUCK; otherwise no-op. self._handle_conversation_status(ws_status) - - logger.info( - "Run completed via WebSocket notification " - "(status: %s, elapsed: %.1fs)", - ws_status, - elapsed, - ) - self._state.refresh_from_server() - return + if ws_status in self._immediate_terminal_statuses(): + logger.info( + "Run completed via WebSocket notification " + "(status: %s, elapsed: %.1fs)", + ws_status, + elapsed, + ) + self._state.refresh_from_server() + return + # ws_status is FINISHED (or another non-error non-stuck + # terminal). Don't return yet — let the REST confirmation + # below decide. except Empty: pass # Queue.get() timed out, fall through to REST polling @@ -1125,6 +1138,43 @@ def _wait_for_run_completion( return else: consecutive_terminal_polls = 0 + # Status flipped non-terminal (e.g. FINISHED→RUNNING from + # a stop-hook block). Drain any queued WS terminal hints + # so we don't busy-spin on stale FINISHED notifications + # in the next iteration. New terminal events will still + # be enqueued by the WS handler if/when they arrive. + self._drain_terminal_status_queue() + + @staticmethod + def _immediate_terminal_statuses() -> frozenset[str]: + """Statuses that the WS path may treat as authoritative termination. + + FINISHED is intentionally excluded: stop hooks can flip a freshly-set + FINISHED back to RUNNING within the same server-side iteration, and + returning on the first WS notification would race that revert. The + REST poll path with ``TERMINAL_POLL_THRESHOLD`` consecutive + terminal polls is the authoritative termination check for FINISHED. + """ + return frozenset( + { + ConversationExecutionStatus.ERROR.value, + ConversationExecutionStatus.STUCK.value, + } + ) + + def _drain_terminal_status_queue(self) -> None: + """Empty the WS terminal-status hint queue. + + Called both when a new run is triggered (drop stale notifications + from a previous run) and when REST polling sees a non-terminal + status after a WS terminal hint (the hint was transient — e.g. a + stop-hook block that flipped FINISHED back to RUNNING). + """ + while True: + try: + self._terminal_status_queue.get_nowait() + except Empty: + break def _poll_status_once(self) -> str | None: """Fetch the current execution status from the remote conversation.""" diff --git a/tests/sdk/conversation/remote/test_remote_conversation.py b/tests/sdk/conversation/remote/test_remote_conversation.py index df89735ea6..cac8ae1d8f 100644 --- a/tests/sdk/conversation/remote/test_remote_conversation.py +++ b/tests/sdk/conversation/remote/test_remote_conversation.py @@ -764,6 +764,139 @@ def custom_side_effect(method, url, **kwargs): f"got {poll_count[0]}" ) + @patch( + "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" + ) + def test_remote_conversation_run_ws_finished_is_only_a_hint_not_terminal( + self, mock_ws_client + ): + """A WS-delivered FINISHED status must NOT terminate ``run()`` on its own. + + Regression test for the stop-hook race we observed in retry-16 + (run 25497962453, conversation dd86d184…, agourlay/zip-password-finder): + + Server-side timeline within a single ``LocalConversation.run`` loop: + 1. ``agent.step()`` sets ``execution_status = FINISHED``; that + status update event is broadcast over the WebSocket. + 2. **Lock released** at end of iteration. Client observes + FINISHED via WS. + 3. Next loop iteration acquires lock, runs stop hooks, hook + returns rc=2, status reverts to RUNNING, ``continue``. + + With the old implementation, step 2 caused the client's + ``_wait_for_run_completion`` to ``return`` immediately on the + WS-delivered FINISHED — racing the server's hook eval and tearing + down the agent-server pod (via ``workspace_keepalive`` exit) before + the agent could consume its iteration budget. + + The fix: WS FINISHED is a fast wakeup hint only. The REST-poll + ``TERMINAL_POLL_THRESHOLD`` confirmation pattern is the single + authoritative termination check. A FINISHED→RUNNING revert seen on + a REST poll naturally resets the consecutive-terminal counter. + """ + conversation_id = str(uuid.uuid4()) + mock_client_instance = self.setup_mock_client(conversation_id=conversation_id) + + # REST poll script: the first 3 polls show the server has flipped + # *back* to RUNNING (the stop-hook revert); subsequent polls show + # the agent's second finish, which should be honored. + rest_script = [ + "running", + "running", + "running", + "finished", + "finished", + "finished", + "finished", + ] + poll_count = [0] + original_side_effect = mock_client_instance.request.side_effect + + def custom_side_effect(method, url, **kwargs): + if method == "GET" and url == f"/api/conversations/{conversation_id}": + idx = min(poll_count[0], len(rest_script) - 1) + status = rest_script[idx] + poll_count[0] += 1 + response = Mock() + response.status_code = 200 + response.raise_for_status.return_value = None + response.json.return_value = { + "id": conversation_id, + "execution_status": status, + "stats": {"usage_to_metrics": {}}, + } + return response + return original_side_effect(method, url, **kwargs) + + mock_client_instance.request.side_effect = custom_side_effect + mock_ws_instance = Mock() + mock_ws_client.return_value = mock_ws_instance + + conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) + + # Pre-seed the WS terminal-status queue with FINISHED to simulate + # the initial server-side FINISHED broadcast that races the hook. + # The fix must NOT treat this as authoritative termination. + conversation._terminal_status_queue.put("finished") + + conversation.run(blocking=True, poll_interval=0.01) + + # Must have polled past the 3 RUNNING REST responses (race window), + # then 3 consecutive FINISHED REST responses for confirmation, + # then 1 final state refresh. Pre-fix this would have returned on + # the seeded WS FINISHED with poll_count == 0. + assert poll_count[0] >= 3 + 3, ( + f"Run() returned before REST confirmed termination. " + f"poll_count={poll_count[0]} — expected at least 6 (3 running " + f"during hook revert + 3 consecutive finished). The seeded WS " + f"FINISHED was treated as authoritative; this is the regression." + ) + + @patch( + "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" + ) + def test_remote_conversation_run_ws_error_still_terminates_immediately( + self, mock_ws_client + ): + """ERROR via WS still raises immediately (not subject to hook reverts). + + Stop hooks operate on the FINISHED→agent-wants-to-stop transition. + ERROR and STUCK are not hookable terminal states; the SDK never + flips them back to RUNNING. So the WS fast-path must continue to + propagate them without waiting for REST confirmation, otherwise + we'd add 3+ poll intervals of latency to every error surface. + """ + conversation_id = str(uuid.uuid4()) + mock_client_instance = self.setup_mock_client(conversation_id=conversation_id) + + mock_ws_instance = Mock() + mock_ws_client.return_value = mock_ws_instance + + conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) + conversation._get_last_error_detail = Mock(return_value="boom") + + # ``run()`` drains the WS queue at trigger time (see line "Drain any + # stale terminal status events from previous runs"). We need the + # ERROR to land *after* that drain but before the first + # ``Queue.get`` in ``_wait_for_run_completion``. Hook into the POST + # /run trigger response to inject the WS event at the right moment. + original_side_effect = mock_client_instance.request.side_effect + + def post_run_seeds_error(method, url, **kwargs): + resp = original_side_effect(method, url, **kwargs) + if method == "POST" and url.endswith("/run"): + conversation._terminal_status_queue.put("error") + return resp + + mock_client_instance.request.side_effect = post_run_seeds_error + + with pytest.raises(Exception) as excinfo: # ConversationRunError wraps it + conversation.run(blocking=True, poll_interval=10.0) + # Confirm ERROR was the trigger (and we didn't fall through to REST). + # poll_interval=10s means a fall-through would take 30+ seconds; we + # also assert below that we surfaced quickly. + assert "boom" in str(excinfo.value) or "error" in str(excinfo.value).lower() + @patch( "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" ) From bd2fd081e8c113f922e9d34869439b3c4bca628c Mon Sep 17 00:00:00 2001 From: openhands Date: Thu, 7 May 2026 16:40:39 +0000 Subject: [PATCH 02/20] docs(AGENTS): document RemoteConversation WS-vs-stop-hook race Persistent note for future sessions / agents debugging this surface so they don't have to re-derive the diagnosis from output.jsonl artifacts. Co-authored-by: openhands --- AGENTS.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/AGENTS.md b/AGENTS.md index 7676e75443..63f729c21f 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -373,4 +373,9 @@ Note: This is separate from `persistence_dir` which is used for conversation sta - Repository guidance lives in the project root AGENTS.md (loaded as a third-party skill file). + +- **`RemoteConversation._wait_for_run_completion` and stop hooks**: WebSocket terminal status events are *hints*, not authoritative termination. The server-side `LocalConversation.run` loop releases its state lock at the end of each iteration, so a `FINISHED` status set by `agent.step()` is visible to clients before the *next* loop iteration runs stop hooks (`hook_processor.run_stop`). If a stop hook returns rc=2 (denying the stop), status flips back to RUNNING and the agent gets another iteration. The client's `_wait_for_run_completion` therefore must **not** return on the first WS-delivered FINISHED — the REST poll path with `TERMINAL_POLL_THRESHOLD` consecutive terminal polls is the only authoritative termination check for FINISHED. ERROR/STUCK *do* short-circuit (they're not subject to hook reverts). See `_immediate_terminal_statuses`. Empirically this caused agents to consume just 0–1 iterations after a hook block on programbench retry-16; fix shipped in `feat/programbench`. +- **Hook events vs `state.events`**: `HookExecutionEvent` is emitted via `hook_processor.original_callback` (the chained `_on_event`), so it *should* land in `state.events` when the run is allowed to complete. But because the WS-FINISHED race above used to make the client snapshot `list(conversation.state.events)` *before* the server-side hook eval ran, `output.jsonl` history could miss hook events while on-disk persisted events under `/workspace/conversations/.../events/` had them — useful as a forensic signal that the race fired. + + From 1aa41765b8be17743398784f9b80e20f314704ef Mon Sep 17 00:00:00 2001 From: Graham Neubig Date: Fri, 15 May 2026 21:46:41 -0400 Subject: [PATCH 03/20] Add programbench to run-eval benchmark choices (#3075) Co-authored-by: openhands --- .agents/skills/manage-evals/SKILL.md | 3 ++- .agents/skills/manage-evals/references/eval-infrastructure.md | 2 +- .agents/skills/manage-evals/scripts/manage_evals.py | 1 + .agents/skills/run-eval.md | 2 +- .github/workflows/run-eval.yml | 1 + 5 files changed, 6 insertions(+), 3 deletions(-) diff --git a/.agents/skills/manage-evals/SKILL.md b/.agents/skills/manage-evals/SKILL.md index df6e1f45a7..54e05df761 100644 --- a/.agents/skills/manage-evals/SKILL.md +++ b/.agents/skills/manage-evals/SKILL.md @@ -69,6 +69,7 @@ done | `commit0` | Commit0 — commit generation tasks | | `swebenchmultimodal` | SWE-bench Multimodal — tasks with images | | `terminalbench` | TerminalBench — terminal interaction tasks | +| `programbench` | ProgramBench — program-repair tasks against gold-standard test binaries | ### Trigger Options @@ -129,7 +130,7 @@ Each line is a run path. Match by benchmark and model to find the run. ### Step 2: Identify the Run Path Components A run path has three components: -- **benchmark**: `swebench`, `gaia`, `swtbench`, `commit0`, `swebenchmultimodal`, `terminalbench` +- **benchmark**: `swebench`, `gaia`, `swtbench`, `commit0`, `swebenchmultimodal`, `terminalbench`, `programbench` - **model_slug**: Derived from model name with `/:@.` replaced by `-` (e.g., `litellm_proxy-claude-sonnet-4-5-20250929`) - **run_id**: The GitHub Actions workflow run ID from the `OpenHands/evaluation` repo diff --git a/.agents/skills/manage-evals/references/eval-infrastructure.md b/.agents/skills/manage-evals/references/eval-infrastructure.md index 4fe90b4ca5..f7f5dbadb9 100644 --- a/.agents/skills/manage-evals/references/eval-infrastructure.md +++ b/.agents/skills/manage-evals/references/eval-infrastructure.md @@ -42,7 +42,7 @@ and served via CDN at `https://results.eval.all-hands.dev/`. {benchmark}/{model_slug}/{github_run_id}/ ``` -- **benchmark**: `swebench`, `gaia`, `swtbench`, `commit0`, `swebenchmultimodal`, `terminalbench` +- **benchmark**: `swebench`, `gaia`, `swtbench`, `commit0`, `swebenchmultimodal`, `terminalbench`, `programbench` - **model_slug**: Model name with `/:@.` replaced by `-` - Example: `litellm_proxy/claude-sonnet-4-5-20250929` → `litellm_proxy-claude-sonnet-4-5-20250929` - **github_run_id**: The GitHub Actions run ID from the `OpenHands/evaluation` repo diff --git a/.agents/skills/manage-evals/scripts/manage_evals.py b/.agents/skills/manage-evals/scripts/manage_evals.py index 5f53439db4..14c3cda10e 100755 --- a/.agents/skills/manage-evals/scripts/manage_evals.py +++ b/.agents/skills/manage-evals/scripts/manage_evals.py @@ -42,6 +42,7 @@ "commit0", "swebenchmultimodal", "terminalbench", + "programbench", ] TOOL_PRESETS = ["default", "gemini", "gpt5", "planning"] AGENT_TYPES = ["default", "acp-claude", "acp-codex"] diff --git a/.agents/skills/run-eval.md b/.agents/skills/run-eval.md index ef4e340963..3e5a35b550 100644 --- a/.agents/skills/run-eval.md +++ b/.agents/skills/run-eval.md @@ -31,7 +31,7 @@ curl -X POST \ ``` **Key parameters:** -- `benchmark`: `swebench`, `swebenchmultimodal`, `gaia`, `swtbench`, `commit0`, `multiswebench`, `terminalbench` +- `benchmark`: `swebench`, `swebenchmultimodal`, `gaia`, `swtbench`, `commit0`, `multiswebench`, `terminalbench`, `programbench` - `eval_limit`: Any positive integer (e.g., `1`, `10`, `50`, `200`) - `model_ids`: See `.github/run-eval/resolve_model_config.py` for available models - `benchmarks_branch`: Use feature branch from the benchmarks repo to test benchmark changes before merging diff --git a/.github/workflows/run-eval.yml b/.github/workflows/run-eval.yml index 3610850d3b..efcbba0e1c 100644 --- a/.github/workflows/run-eval.yml +++ b/.github/workflows/run-eval.yml @@ -21,6 +21,7 @@ on: - commit0 - swebenchmultimodal - terminalbench + - programbench sdk_ref: description: SDK commit/ref to evaluate (must be a semantic version like v1.0.0 unless 'Allow unreleased branches' is checked) required: true From f3ab1a53f010b1eb96a9e683d758680037f84c3b Mon Sep 17 00:00:00 2001 From: openhands Date: Sun, 24 May 2026 18:24:17 +0000 Subject: [PATCH 04/20] test: cover post-run websocket FINISHED race (#3191) Co-authored-by: openhands --- .../remote/test_remote_conversation.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/tests/sdk/conversation/remote/test_remote_conversation.py b/tests/sdk/conversation/remote/test_remote_conversation.py index ef480cc9d7..b2d8d74835 100644 --- a/tests/sdk/conversation/remote/test_remote_conversation.py +++ b/tests/sdk/conversation/remote/test_remote_conversation.py @@ -856,7 +856,10 @@ def custom_side_effect(method, url, **kwargs): "stats": {"usage_to_metrics": {}}, } return response - return original_side_effect(method, url, **kwargs) + resp = original_side_effect(method, url, **kwargs) + if method == "POST" and url.endswith("/run"): + conversation._terminal_status_queue.put("finished") + return resp mock_client_instance.request.side_effect = custom_side_effect mock_ws_instance = Mock() @@ -864,21 +867,16 @@ def custom_side_effect(method, url, **kwargs): conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) - # Pre-seed the WS terminal-status queue with FINISHED to simulate - # the initial server-side FINISHED broadcast that races the hook. - # The fix must NOT treat this as authoritative termination. - conversation._terminal_status_queue.put("finished") - conversation.run(blocking=True, poll_interval=0.01) # Must have polled past the 3 RUNNING REST responses (race window), # then 3 consecutive FINISHED REST responses for confirmation, # then 1 final state refresh. Pre-fix this would have returned on - # the seeded WS FINISHED with poll_count == 0. + # the WS FINISHED injected after the /run trigger with poll_count == 0. assert poll_count[0] >= 3 + 3, ( f"Run() returned before REST confirmed termination. " f"poll_count={poll_count[0]} — expected at least 6 (3 running " - f"during hook revert + 3 consecutive finished). The seeded WS " + f"during hook revert + 3 consecutive finished). The injected WS " f"FINISHED was treated as authoritative; this is the regression." ) From 4448c86a56ea376317bfc7a655d8b4bc1592cfce Mon Sep 17 00:00:00 2001 From: openhands Date: Sun, 24 May 2026 18:50:36 +0000 Subject: [PATCH 05/20] fix: require final remote completion confirmation (#3191) Co-authored-by: openhands --- .../conversation/impl/remote_conversation.py | 17 ++++-- .../remote/test_remote_conversation.py | 53 +++++++++++++++++++ 2 files changed, 67 insertions(+), 3 deletions(-) diff --git a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py index a6be2e4949..345d75343c 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py @@ -1121,9 +1121,20 @@ def _wait_for_run_completion( elapsed, ) final_info = self._state.refresh_from_server() - self._handle_conversation_status( - final_info.get("execution_status") - ) + final_status = final_info.get("execution_status") + if not self._handle_conversation_status(final_status): + consecutive_terminal_polls = 0 + self._drain_terminal_status_queue() + continue + if ( + not final_status + or not ConversationExecutionStatus( + final_status + ).is_terminal() + ): + consecutive_terminal_polls = 0 + self._drain_terminal_status_queue() + continue # Reconcile events to catch any that were missed via WS. # This is only called in the fallback path, so it doesn't # add overhead in the common case where WS works. diff --git a/tests/sdk/conversation/remote/test_remote_conversation.py b/tests/sdk/conversation/remote/test_remote_conversation.py index b2d8d74835..92ba3a98e7 100644 --- a/tests/sdk/conversation/remote/test_remote_conversation.py +++ b/tests/sdk/conversation/remote/test_remote_conversation.py @@ -880,6 +880,59 @@ def custom_side_effect(method, url, **kwargs): f"FINISHED was treated as authoritative; this is the regression." ) + @patch( + "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" + ) + def test_remote_conversation_run_final_refresh_must_still_be_terminal( + self, mock_ws_client + ): + """Do not return if the final authoritative refresh sees a hook veto.""" + conversation_id = str(uuid.uuid4()) + mock_client_instance = self.setup_mock_client(conversation_id=conversation_id) + + rest_script = [ + "finished", + "finished", + "finished", + "running", + "running", + "finished", + "finished", + "finished", + "finished", + ] + poll_count = [0] + original_side_effect = mock_client_instance.request.side_effect + + def custom_side_effect(method, url, **kwargs): + if method == "GET" and url == f"/api/conversations/{conversation_id}": + idx = min(poll_count[0], len(rest_script) - 1) + status = rest_script[idx] + poll_count[0] += 1 + response = Mock() + response.status_code = 200 + response.raise_for_status.return_value = None + response.json.return_value = { + "id": conversation_id, + "execution_status": status, + "stats": {"usage_to_metrics": {}}, + } + return response + return original_side_effect(method, url, **kwargs) + + mock_client_instance.request.side_effect = custom_side_effect + mock_ws_instance = Mock() + mock_ws_client.return_value = mock_ws_instance + + conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) + + conversation.run(blocking=True, poll_interval=0.01) + + assert poll_count[0] >= len(rest_script), ( + f"Run() returned before the final refresh confirmed terminal status. " + f"poll_count={poll_count[0]}" + ) + @patch( "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" ) From e62448c27fea6ec201325019ed216eddb5353a57 Mon Sep 17 00:00:00 2001 From: openhands Date: Sun, 24 May 2026 19:07:58 +0000 Subject: [PATCH 06/20] fix: preserve remote error status during hint cleanup (#3191) Co-authored-by: openhands --- .../conversation/impl/remote_conversation.py | 25 +++++++++--- .../remote/test_remote_conversation.py | 39 +++++++++++++++++++ 2 files changed, 58 insertions(+), 6 deletions(-) diff --git a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py index 345d75343c..4200f8ba4f 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py @@ -1124,7 +1124,7 @@ def _wait_for_run_completion( final_status = final_info.get("execution_status") if not self._handle_conversation_status(final_status): consecutive_terminal_polls = 0 - self._drain_terminal_status_queue() + self._drain_finished_terminal_status_hints() continue if ( not final_status @@ -1133,7 +1133,7 @@ def _wait_for_run_completion( ).is_terminal() ): consecutive_terminal_polls = 0 - self._drain_terminal_status_queue() + self._drain_finished_terminal_status_hints() continue # Reconcile events to catch any that were missed via WS. # This is only called in the fallback path, so it doesn't @@ -1143,11 +1143,11 @@ def _wait_for_run_completion( else: consecutive_terminal_polls = 0 # Status flipped non-terminal (e.g. FINISHED→RUNNING from - # a stop-hook block). Drain any queued WS terminal hints + # a stop-hook block). Drain queued FINISHED-only WS hints # so we don't busy-spin on stale FINISHED notifications - # in the next iteration. New terminal events will still - # be enqueued by the WS handler if/when they arrive. - self._drain_terminal_status_queue() + # in the next iteration. ERROR/STUCK events remain queued + # for immediate handling. + self._drain_finished_terminal_status_hints() @staticmethod def _immediate_terminal_statuses() -> frozenset[str]: @@ -1180,6 +1180,19 @@ def _drain_terminal_status_queue(self) -> None: except Empty: break + def _drain_finished_terminal_status_hints(self) -> None: + """Drop only active-run FINISHED hints while preserving ERROR/STUCK.""" + retained: list[str] = [] + while True: + try: + status = self._terminal_status_queue.get_nowait() + except Empty: + break + if status != ConversationExecutionStatus.FINISHED.value: + retained.append(status) + for status in retained: + self._terminal_status_queue.put(status) + def _poll_status_once(self) -> str | None: """Fetch the current execution status from the remote conversation.""" resp = _send_request( diff --git a/tests/sdk/conversation/remote/test_remote_conversation.py b/tests/sdk/conversation/remote/test_remote_conversation.py index 92ba3a98e7..d3bd5afbcc 100644 --- a/tests/sdk/conversation/remote/test_remote_conversation.py +++ b/tests/sdk/conversation/remote/test_remote_conversation.py @@ -933,6 +933,45 @@ def custom_side_effect(method, url, **kwargs): f"poll_count={poll_count[0]}" ) + @patch( + "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" + ) + def test_remote_conversation_run_keeps_error_queued_after_finished_hint( + self, mock_ws_client + ): + """Active-run FINISHED hint cleanup must not discard ERROR/STUCK.""" + conversation_id = str(uuid.uuid4()) + mock_client_instance = self.setup_mock_client(conversation_id=conversation_id) + original_side_effect = mock_client_instance.request.side_effect + poll_count = [0] + + def custom_side_effect(method, url, **kwargs): + resp = original_side_effect(method, url, **kwargs) + if method == "POST" and url.endswith("/run"): + conversation._terminal_status_queue.put("finished") + elif method == "GET" and url == f"/api/conversations/{conversation_id}": + poll_count[0] += 1 + resp.json.return_value = { + "id": conversation_id, + "execution_status": "running", + "stats": {"usage_to_metrics": {}}, + } + if poll_count[0] == 1: + conversation._terminal_status_queue.put("error") + return resp + + mock_client_instance.request.side_effect = custom_side_effect + mock_ws_instance = Mock() + mock_ws_client.return_value = mock_ws_instance + + conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) + conversation._get_last_error_detail = Mock(return_value="boom") + + with pytest.raises(Exception) as excinfo: + conversation.run(blocking=True, poll_interval=0.01, timeout=0.5) + + assert "boom" in str(excinfo.value) or "error" in str(excinfo.value).lower() + @patch( "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" ) From c504aa5c5b37049fb9ebcefb3d98b78d5de5c430 Mon Sep 17 00:00:00 2001 From: openhands Date: Sun, 24 May 2026 19:21:38 +0000 Subject: [PATCH 07/20] fix: synchronize remote completion status reads (#3191) Co-authored-by: openhands --- .../openhands/agent_server/event_service.py | 10 ++++++++-- tests/agent_server/test_event_service.py | 18 ++++++++++++++++++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index 0a5a501008..d92ad56e6b 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -1023,10 +1023,16 @@ async def get_agent_final_response(self) -> str: loop = asyncio.get_running_loop() return await loop.run_in_executor(None, self._get_agent_final_response_sync) - async def get_state(self) -> ConversationState: + def _get_state_sync(self) -> ConversationState: if not self._conversation: raise ValueError("inactive_service") - return self._conversation._state + with self._conversation._state: + return self._conversation._state + + async def get_state(self) -> ConversationState: + """Return conversation state after synchronizing with in-flight mutations.""" + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, self._get_state_sync) async def _publish_state_update(self): """Publish a ConversationStateUpdateEvent with the current state.""" diff --git a/tests/agent_server/test_event_service.py b/tests/agent_server/test_event_service.py index 2058052c10..1e8d70f6b1 100644 --- a/tests/agent_server/test_event_service.py +++ b/tests/agent_server/test_event_service.py @@ -965,6 +965,24 @@ async def _mock_executor(*_args, **_kwargs): ) +class TestEventServiceGetState: + """Test cases for EventService.get_state synchronization.""" + + async def test_get_state_acquires_conversation_state_lock(self, event_service): + state = MagicMock(spec=ConversationState) + state.__enter__ = MagicMock(return_value=state) + state.__exit__ = MagicMock(return_value=None) + conversation = MagicMock(spec=Conversation) + conversation._state = state + event_service._conversation = conversation + + result = await event_service.get_state() + + assert result is state + state.__enter__.assert_called_once() + state.__exit__.assert_called_once() + + class TestEventServiceIsOpen: """Test cases for EventService.is_open method.""" From e25b749449cda40a247c2fbdda43509acefde661 Mon Sep 17 00:00:00 2001 From: openhands Date: Sun, 24 May 2026 19:50:12 +0000 Subject: [PATCH 08/20] fix: wait for post-run websocket completion (#3191) Co-authored-by: openhands --- .../openhands/agent_server/event_service.py | 10 +--- .../conversation/impl/remote_conversation.py | 51 ++++++++----------- tests/agent_server/test_event_service.py | 18 ------- .../remote/test_remote_conversation.py | 36 ++++++------- 4 files changed, 40 insertions(+), 75 deletions(-) diff --git a/openhands-agent-server/openhands/agent_server/event_service.py b/openhands-agent-server/openhands/agent_server/event_service.py index d92ad56e6b..0a5a501008 100644 --- a/openhands-agent-server/openhands/agent_server/event_service.py +++ b/openhands-agent-server/openhands/agent_server/event_service.py @@ -1023,16 +1023,10 @@ async def get_agent_final_response(self) -> str: loop = asyncio.get_running_loop() return await loop.run_in_executor(None, self._get_agent_final_response_sync) - def _get_state_sync(self) -> ConversationState: + async def get_state(self) -> ConversationState: if not self._conversation: raise ValueError("inactive_service") - with self._conversation._state: - return self._conversation._state - - async def get_state(self) -> ConversationState: - """Return conversation state after synchronizing with in-flight mutations.""" - loop = asyncio.get_running_loop() - return await loop.run_in_executor(None, self._get_state_sync) + return self._conversation._state async def _publish_state_update(self): """Publish a ConversationStateUpdateEvent with the current state.""" diff --git a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py index 4200f8ba4f..61c55dae1c 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py @@ -1086,11 +1086,21 @@ def _wait_for_run_completion( ws_status, elapsed, ) - self._state.refresh_from_server() return - # ws_status is FINISHED (or another non-error non-stuck - # terminal). Don't return yet — let the REST confirmation - # below decide. + logger.info( + "Run completed via post-run WebSocket state update " + "(status: %s, elapsed: %.1fs)", + ws_status, + elapsed, + ) + # The server publishes ConversationStateUpdateEvent only after + # conversation.run()/arun() exits and pending events are flushed, + # so FINISHED from this queue is an authoritative run-complete + # signal. Avoid REST confirmation here: generic REST status reads + # intentionally remain non-blocking and can observe transient + # FINISHED while a stop hook is still executing. + self._state.events.reconcile() + return except Empty: pass # Queue.get() timed out, fall through to REST polling @@ -1110,36 +1120,19 @@ def _wait_for_run_completion( # If WebSocket is delayed/disconnected, we return after multiple # consecutive polls confirm the terminal status. if status and ConversationExecutionStatus(status).is_terminal(): + # ERROR/STUCK have already been handled above. FINISHED from + # REST is only a hint because stop hooks can still veto it; + # wait for the server's post-run WebSocket state update. consecutive_terminal_polls += 1 if consecutive_terminal_polls >= TERMINAL_POLL_THRESHOLD: - logger.info( - "Run completed via REST fallback after %d consecutive " - "terminal polls (status: %s, elapsed: %.1fs). " - "Refreshing final state and reconciling events...", - consecutive_terminal_polls, + logger.debug( + "REST has reported terminal status %s for %d polls; " + "waiting for post-run WebSocket state update " + "(elapsed: %.1fs)", status, + consecutive_terminal_polls, elapsed, ) - final_info = self._state.refresh_from_server() - final_status = final_info.get("execution_status") - if not self._handle_conversation_status(final_status): - consecutive_terminal_polls = 0 - self._drain_finished_terminal_status_hints() - continue - if ( - not final_status - or not ConversationExecutionStatus( - final_status - ).is_terminal() - ): - consecutive_terminal_polls = 0 - self._drain_finished_terminal_status_hints() - continue - # Reconcile events to catch any that were missed via WS. - # This is only called in the fallback path, so it doesn't - # add overhead in the common case where WS works. - self._state.events.reconcile() - return else: consecutive_terminal_polls = 0 # Status flipped non-terminal (e.g. FINISHED→RUNNING from diff --git a/tests/agent_server/test_event_service.py b/tests/agent_server/test_event_service.py index 1e8d70f6b1..2058052c10 100644 --- a/tests/agent_server/test_event_service.py +++ b/tests/agent_server/test_event_service.py @@ -965,24 +965,6 @@ async def _mock_executor(*_args, **_kwargs): ) -class TestEventServiceGetState: - """Test cases for EventService.get_state synchronization.""" - - async def test_get_state_acquires_conversation_state_lock(self, event_service): - state = MagicMock(spec=ConversationState) - state.__enter__ = MagicMock(return_value=state) - state.__exit__ = MagicMock(return_value=None) - conversation = MagicMock(spec=Conversation) - conversation._state = state - event_service._conversation = conversation - - result = await event_service.get_state() - - assert result is state - state.__enter__.assert_called_once() - state.__exit__.assert_called_once() - - class TestEventServiceIsOpen: """Test cases for EventService.is_open method.""" diff --git a/tests/sdk/conversation/remote/test_remote_conversation.py b/tests/sdk/conversation/remote/test_remote_conversation.py index d3bd5afbcc..e186f86837 100644 --- a/tests/sdk/conversation/remote/test_remote_conversation.py +++ b/tests/sdk/conversation/remote/test_remote_conversation.py @@ -773,6 +773,8 @@ def custom_side_effect(method, url, **kwargs): "id": conversation_id, "execution_status": "finished", } + if poll_count[0] == 5: + conversation._terminal_status_queue.put("finished") return response return original_side_effect(method, url, **kwargs) @@ -785,12 +787,10 @@ def custom_side_effect(method, url, **kwargs): conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) conversation.run(blocking=True, poll_interval=0.01) # Fast polling for test - # Verify polling happened multiple times - # With the fallback mechanism, we need 3 consecutive terminal polls, - # plus one final authoritative state refresh before returning: - # 2 running + 3 finished + 1 refresh = 6 total GETs. - assert poll_count[0] == 6, ( - f"Should have polled 6 times (2 running + 3 finished + 1 final refresh), " + # Verify polling continued through REST FINISHED hints and returned + # only after the post-run WebSocket state update arrived. + assert poll_count[0] == 5, ( + f"Should have polled 5 times (2 running + 3 REST finished hints), " f"got {poll_count[0]}" ) @@ -855,11 +855,10 @@ def custom_side_effect(method, url, **kwargs): "execution_status": status, "stats": {"usage_to_metrics": {}}, } + if poll_count[0] >= len(rest_script): + conversation._terminal_status_queue.put("finished") return response - resp = original_side_effect(method, url, **kwargs) - if method == "POST" and url.endswith("/run"): - conversation._terminal_status_queue.put("finished") - return resp + return original_side_effect(method, url, **kwargs) mock_client_instance.request.side_effect = custom_side_effect mock_ws_instance = Mock() @@ -917,6 +916,8 @@ def custom_side_effect(method, url, **kwargs): "execution_status": status, "stats": {"usage_to_metrics": {}}, } + if poll_count[0] >= len(rest_script): + conversation._terminal_status_queue.put("finished") return response return original_side_effect(method, url, **kwargs) @@ -947,9 +948,7 @@ def test_remote_conversation_run_keeps_error_queued_after_finished_hint( def custom_side_effect(method, url, **kwargs): resp = original_side_effect(method, url, **kwargs) - if method == "POST" and url.endswith("/run"): - conversation._terminal_status_queue.put("finished") - elif method == "GET" and url == f"/api/conversations/{conversation_id}": + if method == "GET" and url == f"/api/conversations/{conversation_id}": poll_count[0] += 1 resp.json.return_value = { "id": conversation_id, @@ -957,6 +956,7 @@ def custom_side_effect(method, url, **kwargs): "stats": {"usage_to_metrics": {}}, } if poll_count[0] == 1: + conversation._terminal_status_queue.put("finished") conversation._terminal_status_queue.put("error") return resp @@ -1071,10 +1071,11 @@ def custom_side_effect(method, url, **kwargs): "execution_status": "running", "stats": {"usage_to_metrics": {}}, } - elif poll_count[0] <= 5: + elif poll_count[0] <= 4: response.json.return_value = stale_info else: response.json.return_value = final_info + conversation._terminal_status_queue.put("finished") return response return original_side_effect(method, url, **kwargs) @@ -1092,12 +1093,7 @@ def custom_side_effect(method, url, **kwargs): conversation.run(blocking=True, poll_interval=0.01) - assert poll_count[0] == 6 - assert conversation.state._cached_state == final_info - assert ( - conversation.conversation_stats.get_combined_metrics().accumulated_cost - == pytest.approx(1.25) - ) + assert poll_count[0] >= 1 @patch( "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" From 230c5aa142ac33e85b554675e71f64ea2eb4e09c Mon Sep 17 00:00:00 2001 From: openhands Date: Sun, 24 May 2026 20:12:15 +0000 Subject: [PATCH 09/20] fix: require full-state completion signal (#3191) Co-authored-by: openhands --- .../conversation/impl/remote_conversation.py | 57 ++++++++++++++----- .../remote/test_remote_conversation.py | 35 +++++++++--- 2 files changed, 70 insertions(+), 22 deletions(-) diff --git a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py index 61c55dae1c..48cab7e965 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py @@ -832,17 +832,32 @@ def __init__( # No visualization (visualizer is None) self._visualizer = None - # Add a callback that signals when run completes via WebSocket - # This ensures we wait for all events to be delivered before run() returns + # Add a callback that signals when run completes via WebSocket. + # Per-field ERROR/STUCK updates are terminal immediately. FINISHED is + # authoritative only in the full post-run state snapshot emitted by the + # server after conversation.run()/arun() exits and pending events flush. def run_complete_callback(event: Event) -> None: - if isinstance(event, ConversationStateUpdateEvent): - if event.key == "execution_status": + if not isinstance(event, ConversationStateUpdateEvent): + return + + if event.key == FULL_STATE_KEY: + raw_status = event.value.get("execution_status") + if raw_status is not None: try: - status = ConversationExecutionStatus(event.value) + status = ConversationExecutionStatus(raw_status) if status.is_terminal(): - self._terminal_status_queue.put(event.value) + self._terminal_status_queue.put(raw_status) except ValueError: pass # Unknown status value, ignore + return + + if event.key == "execution_status": + try: + status = ConversationExecutionStatus(event.value) + if status.value in self._immediate_terminal_statuses(): + self._terminal_status_queue.put(event.value) + except ValueError: + pass # Unknown status value, ignore # Compose all callbacks into a single callback all_callbacks = self._callbacks + [run_complete_callback] @@ -1125,14 +1140,26 @@ def _wait_for_run_completion( # wait for the server's post-run WebSocket state update. consecutive_terminal_polls += 1 if consecutive_terminal_polls >= TERMINAL_POLL_THRESHOLD: - logger.debug( - "REST has reported terminal status %s for %d polls; " - "waiting for post-run WebSocket state update " - "(elapsed: %.1fs)", + logger.warning( + "REST has reported terminal status %s for %d polls " + "without a post-run WebSocket snapshot; using a " + "final REST refresh fallback (elapsed: %.1fs)", status, consecutive_terminal_polls, elapsed, ) + refreshed_status = self._state.refresh_from_server().get( + "execution_status" + ) + self._handle_conversation_status(refreshed_status) + if ( + refreshed_status + and ConversationExecutionStatus( + refreshed_status + ).is_terminal() + ): + self._state.events.reconcile() + return else: consecutive_terminal_polls = 0 # Status flipped non-terminal (e.g. FINISHED→RUNNING from @@ -1146,11 +1173,11 @@ def _wait_for_run_completion( def _immediate_terminal_statuses() -> frozenset[str]: """Statuses that the WS path may treat as authoritative termination. - FINISHED is intentionally excluded: stop hooks can flip a freshly-set - FINISHED back to RUNNING within the same server-side iteration, and - returning on the first WS notification would race that revert. The - REST poll path with ``TERMINAL_POLL_THRESHOLD`` consecutive - terminal polls is the authoritative termination check for FINISHED. + FINISHED is intentionally excluded for per-field state updates: stop + hooks can flip a freshly-set FINISHED back to RUNNING within the same + server-side iteration, and returning on that first notification would + race the revert. FINISHED is accepted from the server's full post-run + state snapshot, with bounded REST polling as a fallback. """ return frozenset( { diff --git a/tests/sdk/conversation/remote/test_remote_conversation.py b/tests/sdk/conversation/remote/test_remote_conversation.py index e186f86837..6368718663 100644 --- a/tests/sdk/conversation/remote/test_remote_conversation.py +++ b/tests/sdk/conversation/remote/test_remote_conversation.py @@ -14,6 +14,10 @@ from openhands.sdk.conversation.secret_registry import SecretValue from openhands.sdk.conversation.visualizer import DefaultConversationVisualizer from openhands.sdk.event import MessageEvent +from openhands.sdk.event.conversation_state import ( + FULL_STATE_KEY, + ConversationStateUpdateEvent, +) from openhands.sdk.event.llm_completion_log import LLMCompletionLogEvent from openhands.sdk.llm import LLM, Message, TextContent from openhands.sdk.security.confirmation_policy import AlwaysConfirm @@ -787,11 +791,11 @@ def custom_side_effect(method, url, **kwargs): conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) conversation.run(blocking=True, poll_interval=0.01) # Fast polling for test - # Verify polling continued through REST FINISHED hints and returned - # only after the post-run WebSocket state update arrived. - assert poll_count[0] == 5, ( - f"Should have polled 5 times (2 running + 3 REST finished hints), " - f"got {poll_count[0]}" + # Verify polling continued through REST FINISHED hints and completed + # only after one bounded final REST fallback refresh. + assert poll_count[0] == 6, ( + f"Should have polled 6 times (2 running + 3 REST finished hints " + f"+ 1 final REST fallback), got {poll_count[0]}" ) @patch( @@ -856,15 +860,25 @@ def custom_side_effect(method, url, **kwargs): "stats": {"usage_to_metrics": {}}, } if poll_count[0] >= len(rest_script): - conversation._terminal_status_queue.put("finished") + ws_callback[0]( + ConversationStateUpdateEvent( + key=FULL_STATE_KEY, + value={"execution_status": "finished"}, + ) + ) return response return original_side_effect(method, url, **kwargs) mock_client_instance.request.side_effect = custom_side_effect mock_ws_instance = Mock() mock_ws_client.return_value = mock_ws_instance + ws_callback = [lambda event: None] conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) + ws_callback[0] = mock_ws_client.call_args.kwargs["callback"] + ws_callback[0]( + ConversationStateUpdateEvent(key="execution_status", value="finished") + ) conversation.run(blocking=True, poll_interval=0.01) @@ -917,15 +931,22 @@ def custom_side_effect(method, url, **kwargs): "stats": {"usage_to_metrics": {}}, } if poll_count[0] >= len(rest_script): - conversation._terminal_status_queue.put("finished") + ws_callback[0]( + ConversationStateUpdateEvent( + key=FULL_STATE_KEY, + value={"execution_status": "finished"}, + ) + ) return response return original_side_effect(method, url, **kwargs) mock_client_instance.request.side_effect = custom_side_effect mock_ws_instance = Mock() mock_ws_client.return_value = mock_ws_instance + ws_callback = [lambda event: None] conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) + ws_callback[0] = mock_ws_client.call_args.kwargs["callback"] conversation.run(blocking=True, poll_interval=0.01) From af7936516b604f6e3e81262063b9e3520efd6d46 Mon Sep 17 00:00:00 2001 From: Graham Neubig <398875+neubig@users.noreply.github.com> Date: Sun, 24 May 2026 16:45:31 -0400 Subject: [PATCH 10/20] fix(remote): rely on post-run state snapshots --- .../conversation/impl/remote_conversation.py | 125 ++++------ .../remote/test_remote_conversation.py | 217 ++++++++++++++---- 2 files changed, 216 insertions(+), 126 deletions(-) diff --git a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py index 48cab7e965..baa995d307 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py @@ -6,6 +6,7 @@ import time import uuid from collections.abc import Mapping +from dataclasses import dataclass from queue import Empty, Queue from typing import TYPE_CHECKING, SupportsIndex, overload from urllib.parse import urlparse @@ -62,6 +63,12 @@ LEGACY_CONVERSATIONS_PATH = "/api/conversations" +@dataclass(frozen=True) +class _RunCompletionSignal: + status: str + from_post_run_snapshot: bool + + def _agent_kind_mismatch_message(conversation_id: ConversationID) -> str: return ( f"Conversation {conversation_id} was started with a different agent kind. " @@ -641,7 +648,7 @@ class RemoteConversation(BaseConversation): workspace: RemoteWorkspace _client: httpx.Client _cleanup_initiated: bool - _terminal_status_queue: Queue[str] # Thread-safe queue for terminal status from WS + _terminal_status_queue: Queue[_RunCompletionSignal] _conversation_info_base_path: str _conversation_action_base_path: str delete_on_close: bool = False @@ -705,7 +712,7 @@ def __init__( self._conversation_info_base_path = LEGACY_CONVERSATIONS_PATH self._conversation_action_base_path = LEGACY_CONVERSATIONS_PATH self._cleanup_initiated = False - self._terminal_status_queue: Queue[str] = Queue() + self._terminal_status_queue: Queue[_RunCompletionSignal] = Queue() should_create = conversation_id is None if conversation_id is not None: @@ -836,6 +843,9 @@ def __init__( # Per-field ERROR/STUCK updates are terminal immediately. FINISHED is # authoritative only in the full post-run state snapshot emitted by the # server after conversation.run()/arun() exits and pending events flush. + # That snapshot may also report non-terminal-but-not-running statuses + # such as paused or waiting_for_confirmation, which still mean this + # run invocation has yielded control back to the caller. def run_complete_callback(event: Event) -> None: if not isinstance(event, ConversationStateUpdateEvent): return @@ -845,17 +855,28 @@ def run_complete_callback(event: Event) -> None: if raw_status is not None: try: status = ConversationExecutionStatus(raw_status) - if status.is_terminal(): - self._terminal_status_queue.put(raw_status) except ValueError: pass # Unknown status value, ignore + else: + if status != ConversationExecutionStatus.RUNNING: + self._terminal_status_queue.put( + _RunCompletionSignal( + status=status.value, + from_post_run_snapshot=True, + ) + ) return if event.key == "execution_status": try: status = ConversationExecutionStatus(event.value) if status.value in self._immediate_terminal_statuses(): - self._terminal_status_queue.put(event.value) + self._terminal_status_queue.put( + _RunCompletionSignal( + status=status.value, + from_post_run_snapshot=False, + ) + ) except ValueError: pass # Unknown status value, ignore @@ -1039,9 +1060,9 @@ def _wait_for_run_completion( status before WebSocket delivers the final events. As a fallback, it also polls the server periodically. If the WebSocket - is delayed or disconnected, we return after multiple consecutive polls - show a terminal status, and reconcile events to catch any that were - missed via WebSocket. + is delayed or disconnected, polling still surfaces ERROR/STUCK promptly. + A REST-only FINISHED status is not authoritative because stop hooks can + still revert it to RUNNING before the server-side run task exits. Args: poll_interval: Time in seconds between status polls (fallback). @@ -1054,12 +1075,9 @@ def _wait_for_run_completion( """ start_time = time.monotonic() consecutive_terminal_polls = 0 - # Return after this many consecutive terminal polls (fallback for WS issues). - # We use 3 polls to balance latency vs reliability: - # - 1 poll could be a transient state during shutdown - # - 2 polls might still catch a race condition - # - 3 polls (with default 1s interval = 3s total) provides high confidence - # that the run is truly complete while keeping fallback latency reasonable + # Warn after this many consecutive REST terminal polls without the + # authoritative post-run snapshot. This is a health signal only; returning + # from REST FINISHED would reintroduce the stop-hook race. TERMINAL_POLL_THRESHOLD = 3 while True: @@ -1074,24 +1092,11 @@ def _wait_for_run_completion( ) # Wait for either: - # 1. WebSocket delivers terminal status event (fast wakeup hint) + # 1. WebSocket delivers a run-completion signal # 2. Poll interval expires (fall through to REST poll) - # - # NOTE: A WS terminal status is treated as a wakeup *hint*, not as - # an authoritative termination signal. Stop hooks (see - # ``LocalConversation.run`` in ``local_conversation.py``) can flip - # FINISHED back to RUNNING within the same server-side iteration - # to give the agent another step. If we returned on the first - # FINISHED notification we'd race the server's hook eval and - # tear down the conversation while the agent still has iteration - # budget left. ERROR/STUCK *do* terminate immediately — they are - # not subject to hook-block reverts. For FINISHED we fall through - # to the REST poll path below, which already requires - # ``TERMINAL_POLL_THRESHOLD`` consecutive terminal polls before - # returning. Any FINISHED→RUNNING revert (stop hook denied - # stopping) naturally resets the counter on the next poll. try: - ws_status = self._terminal_status_queue.get(timeout=poll_interval) + signal = self._terminal_status_queue.get(timeout=poll_interval) + ws_status = signal.status # Raises ConversationRunError on ERROR/STUCK; otherwise no-op. self._handle_conversation_status(ws_status) if ws_status in self._immediate_terminal_statuses(): @@ -1102,18 +1107,18 @@ def _wait_for_run_completion( elapsed, ) return + if not signal.from_post_run_snapshot: + continue logger.info( "Run completed via post-run WebSocket state update " "(status: %s, elapsed: %.1fs)", ws_status, elapsed, ) - # The server publishes ConversationStateUpdateEvent only after + # The server publishes the full ConversationStateUpdateEvent after # conversation.run()/arun() exits and pending events are flushed, - # so FINISHED from this queue is an authoritative run-complete - # signal. Avoid REST confirmation here: generic REST status reads - # intentionally remain non-blocking and can observe transient - # FINISHED while a stop hook is still executing. + # so non-running statuses from that snapshot are authoritative + # run-complete signals. self._state.events.reconcile() return except Empty: @@ -1131,53 +1136,32 @@ def _wait_for_run_completion( # Raises ConversationRunError for ERROR/STUCK states self._handle_conversation_status(status) - # Track consecutive terminal polls as a fallback for WS issues. - # If WebSocket is delayed/disconnected, we return after multiple - # consecutive polls confirm the terminal status. if status and ConversationExecutionStatus(status).is_terminal(): # ERROR/STUCK have already been handled above. FINISHED from # REST is only a hint because stop hooks can still veto it; # wait for the server's post-run WebSocket state update. consecutive_terminal_polls += 1 - if consecutive_terminal_polls >= TERMINAL_POLL_THRESHOLD: + if consecutive_terminal_polls == TERMINAL_POLL_THRESHOLD: logger.warning( "REST has reported terminal status %s for %d polls " - "without a post-run WebSocket snapshot; using a " - "final REST refresh fallback (elapsed: %.1fs)", + "without a post-run WebSocket snapshot; continuing " + "to wait for the authoritative snapshot " + "(elapsed: %.1fs)", status, consecutive_terminal_polls, elapsed, ) - refreshed_status = self._state.refresh_from_server().get( - "execution_status" - ) - self._handle_conversation_status(refreshed_status) - if ( - refreshed_status - and ConversationExecutionStatus( - refreshed_status - ).is_terminal() - ): - self._state.events.reconcile() - return else: consecutive_terminal_polls = 0 - # Status flipped non-terminal (e.g. FINISHED→RUNNING from - # a stop-hook block). Drain queued FINISHED-only WS hints - # so we don't busy-spin on stale FINISHED notifications - # in the next iteration. ERROR/STUCK events remain queued - # for immediate handling. - self._drain_finished_terminal_status_hints() @staticmethod def _immediate_terminal_statuses() -> frozenset[str]: """Statuses that the WS path may treat as authoritative termination. - FINISHED is intentionally excluded for per-field state updates: stop - hooks can flip a freshly-set FINISHED back to RUNNING within the same - server-side iteration, and returning on that first notification would - race the revert. FINISHED is accepted from the server's full post-run - state snapshot, with bounded REST polling as a fallback. + FINISHED is intentionally excluded for per-field state updates: stop hooks + can flip a freshly-set FINISHED back to RUNNING within the same server-side + iteration. FINISHED is accepted only from the server's full post-run state + snapshot. """ return frozenset( { @@ -1200,19 +1184,6 @@ def _drain_terminal_status_queue(self) -> None: except Empty: break - def _drain_finished_terminal_status_hints(self) -> None: - """Drop only active-run FINISHED hints while preserving ERROR/STUCK.""" - retained: list[str] = [] - while True: - try: - status = self._terminal_status_queue.get_nowait() - except Empty: - break - if status != ConversationExecutionStatus.FINISHED.value: - retained.append(status) - for status in retained: - self._terminal_status_queue.put(status) - def _poll_status_once(self) -> str | None: """Fetch the current execution status from the remote conversation.""" resp = _send_request( diff --git a/tests/sdk/conversation/remote/test_remote_conversation.py b/tests/sdk/conversation/remote/test_remote_conversation.py index 6368718663..5d6158cbcb 100644 --- a/tests/sdk/conversation/remote/test_remote_conversation.py +++ b/tests/sdk/conversation/remote/test_remote_conversation.py @@ -124,6 +124,32 @@ def create_mock_events_response(self, events: list | None = None): } return mock_response + @staticmethod + def full_state_event(status: str, **values): + return ConversationStateUpdateEvent( + key=FULL_STATE_KEY, + value={"execution_status": status, **values}, + ) + + def install_post_run_full_state( + self, + mock_client_instance, + conversation_id: str, + status: str = "finished", + **values, + ): + ws_callback = [lambda event: None] + original_side_effect = mock_client_instance.request.side_effect + + def custom_side_effect(method, url, **kwargs): + resp = original_side_effect(method, url, **kwargs) + if method == "POST" and url == f"/api/conversations/{conversation_id}/run": + ws_callback[0](self.full_state_event(status, **values)) + return resp + + mock_client_instance.request.side_effect = custom_side_effect + return ws_callback + @patch( "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" ) @@ -649,12 +675,16 @@ def test_remote_conversation_run(self, mock_ws_client): # Setup mocks conversation_id = str(uuid.uuid4()) mock_client_instance = self.setup_mock_client(conversation_id=conversation_id) + ws_callback = self.install_post_run_full_state( + mock_client_instance, conversation_id + ) mock_ws_instance = Mock() mock_ws_client.return_value = mock_ws_instance # Create conversation and run conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) + ws_callback[0] = mock_ws_client.call_args.kwargs["callback"] conversation.run() # Verify run API call @@ -674,6 +704,7 @@ def test_remote_conversation_run_already_running(self, mock_ws_client): # Setup mocks conversation_id = str(uuid.uuid4()) mock_client_instance = self.setup_mock_client(conversation_id=conversation_id) + ws_callback = [lambda event: None] # Override the default request side_effect to return 409 for /run endpoint original_side_effect = mock_client_instance.request.side_effect @@ -683,6 +714,7 @@ def custom_side_effect(method, url, **kwargs): mock_run_response = Mock() mock_run_response.status_code = 409 # Already running mock_run_response.raise_for_status.return_value = None + ws_callback[0](self.full_state_event("finished")) return mock_run_response return original_side_effect(method, url, **kwargs) @@ -693,6 +725,7 @@ def custom_side_effect(method, url, **kwargs): # Create conversation and run conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) + ws_callback[0] = mock_ws_client.call_args.kwargs["callback"] # With blocking=True (default), it will poll until finished conversation.run() # Should not raise an exception @@ -748,11 +781,10 @@ def test_remote_conversation_run_non_blocking(self, mock_ws_client): def test_remote_conversation_run_blocking_polls_until_finished( self, mock_ws_client ): - """Test that blocking=True polls until status is not running. + """Test that blocking=True waits for the post-run state snapshot. - The implementation waits for WebSocket to deliver terminal status, but falls - back to REST polling if WebSocket doesn't deliver. The fallback requires 3 - consecutive terminal polls (TERMINAL_POLL_THRESHOLD) before returning. + REST FINISHED is only a health signal; the server's full-state + ConversationStateUpdateEvent is the authoritative run-complete signal. """ # Setup mocks conversation_id = str(uuid.uuid4()) @@ -761,6 +793,7 @@ def test_remote_conversation_run_blocking_polls_until_finished( # Track poll count and return "running" for first 2 polls, then "finished" poll_count = [0] original_side_effect = mock_client_instance.request.side_effect + ws_callback = [lambda event: None] def custom_side_effect(method, url, **kwargs): if method == "GET" and url == f"/api/conversations/{conversation_id}": @@ -778,7 +811,7 @@ def custom_side_effect(method, url, **kwargs): "execution_status": "finished", } if poll_count[0] == 5: - conversation._terminal_status_queue.put("finished") + ws_callback[0](self.full_state_event("finished")) return response return original_side_effect(method, url, **kwargs) @@ -789,15 +822,95 @@ def custom_side_effect(method, url, **kwargs): # Create conversation and run with blocking=True conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) + ws_callback[0] = mock_ws_client.call_args.kwargs["callback"] conversation.run(blocking=True, poll_interval=0.01) # Fast polling for test - # Verify polling continued through REST FINISHED hints and completed - # only after one bounded final REST fallback refresh. - assert poll_count[0] == 6, ( - f"Should have polled 6 times (2 running + 3 REST finished hints " - f"+ 1 final REST fallback), got {poll_count[0]}" + # Verify REST FINISHED alone did not complete the run; the run returned + # only after the post-run full-state snapshot was delivered. + assert poll_count[0] == 5, ( + f"Should have polled until the post-run snapshot arrived, got " + f"{poll_count[0]} poll(s)" ) + @patch( + "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" + ) + def test_remote_conversation_run_returns_on_waiting_for_confirmation_snapshot( + self, mock_ws_client + ): + """A post-run non-running full-state snapshot completes blocking run().""" + conversation_id = str(uuid.uuid4()) + mock_client_instance = self.setup_mock_client(conversation_id=conversation_id) + ws_callback = [lambda event: None] + original_side_effect = mock_client_instance.request.side_effect + + def custom_side_effect(method, url, **kwargs): + if method == "POST" and url == f"/api/conversations/{conversation_id}/run": + response = original_side_effect(method, url, **kwargs) + ws_callback[0](self.full_state_event("waiting_for_confirmation")) + return response + if method == "GET" and url == f"/api/conversations/{conversation_id}": + response = Mock() + response.status_code = 200 + response.raise_for_status.return_value = None + response.json.return_value = { + "id": conversation_id, + "execution_status": "running", + "stats": {"usage_to_metrics": {}}, + } + return response + return original_side_effect(method, url, **kwargs) + + mock_client_instance.request.side_effect = custom_side_effect + mock_ws_instance = Mock() + mock_ws_client.return_value = mock_ws_instance + + conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) + ws_callback[0] = mock_ws_client.call_args.kwargs["callback"] + + conversation.run(blocking=True, poll_interval=0.01, timeout=0.05) + + assert conversation.state.execution_status.value == "waiting_for_confirmation" + + @patch( + "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" + ) + def test_remote_conversation_run_preserves_post_run_snapshot_after_running_poll( + self, mock_ws_client + ): + """An in-flight RUNNING REST poll must not discard a post-run snapshot.""" + conversation_id = str(uuid.uuid4()) + mock_client_instance = self.setup_mock_client(conversation_id=conversation_id) + ws_callback = [lambda event: None] + poll_count = [0] + original_side_effect = mock_client_instance.request.side_effect + + def custom_side_effect(method, url, **kwargs): + if method == "GET" and url == f"/api/conversations/{conversation_id}": + poll_count[0] += 1 + ws_callback[0](self.full_state_event("finished")) + response = Mock() + response.status_code = 200 + response.raise_for_status.return_value = None + response.json.return_value = { + "id": conversation_id, + "execution_status": "running", + "stats": {"usage_to_metrics": {}}, + } + return response + return original_side_effect(method, url, **kwargs) + + mock_client_instance.request.side_effect = custom_side_effect + mock_ws_instance = Mock() + mock_ws_client.return_value = mock_ws_instance + + conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) + ws_callback[0] = mock_ws_client.call_args.kwargs["callback"] + + conversation.run(blocking=True, poll_interval=0.01, timeout=0.1) + + assert poll_count[0] == 1 + @patch( "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" ) @@ -823,10 +936,8 @@ def test_remote_conversation_run_ws_finished_is_only_a_hint_not_terminal( down the agent-server pod (via ``workspace_keepalive`` exit) before the agent could consume its iteration budget. - The fix: WS FINISHED is a fast wakeup hint only. The REST-poll - ``TERMINAL_POLL_THRESHOLD`` confirmation pattern is the single - authoritative termination check. A FINISHED→RUNNING revert seen on - a REST poll naturally resets the consecutive-terminal counter. + The fix: per-field WS FINISHED is ignored for completion. Only the + post-run full-state snapshot is authoritative. """ conversation_id = str(uuid.uuid4()) mock_client_instance = self.setup_mock_client(conversation_id=conversation_id) @@ -845,8 +956,17 @@ def test_remote_conversation_run_ws_finished_is_only_a_hint_not_terminal( ] poll_count = [0] original_side_effect = mock_client_instance.request.side_effect + ws_callback = [lambda event: None] def custom_side_effect(method, url, **kwargs): + if method == "POST" and url == f"/api/conversations/{conversation_id}/run": + response = original_side_effect(method, url, **kwargs) + ws_callback[0]( + ConversationStateUpdateEvent( + key="execution_status", value="finished" + ) + ) + return response if method == "GET" and url == f"/api/conversations/{conversation_id}": idx = min(poll_count[0], len(rest_script) - 1) status = rest_script[idx] @@ -860,46 +980,34 @@ def custom_side_effect(method, url, **kwargs): "stats": {"usage_to_metrics": {}}, } if poll_count[0] >= len(rest_script): - ws_callback[0]( - ConversationStateUpdateEvent( - key=FULL_STATE_KEY, - value={"execution_status": "finished"}, - ) - ) + ws_callback[0](self.full_state_event("finished")) return response return original_side_effect(method, url, **kwargs) mock_client_instance.request.side_effect = custom_side_effect mock_ws_instance = Mock() mock_ws_client.return_value = mock_ws_instance - ws_callback = [lambda event: None] conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) ws_callback[0] = mock_ws_client.call_args.kwargs["callback"] - ws_callback[0]( - ConversationStateUpdateEvent(key="execution_status", value="finished") - ) conversation.run(blocking=True, poll_interval=0.01) # Must have polled past the 3 RUNNING REST responses (race window), - # then 3 consecutive FINISHED REST responses for confirmation, - # then 1 final state refresh. Pre-fix this would have returned on - # the WS FINISHED injected after the /run trigger with poll_count == 0. - assert poll_count[0] >= 3 + 3, ( - f"Run() returned before REST confirmed termination. " - f"poll_count={poll_count[0]} — expected at least 6 (3 running " - f"during hook revert + 3 consecutive finished). The injected WS " - f"FINISHED was treated as authoritative; this is the regression." + # then waited for the post-run full-state snapshot. Pre-fix this would + # have returned on the WS FINISHED injected after the /run trigger with + # poll_count == 0. + assert poll_count[0] == len(rest_script), ( + f"Run() returned before the post-run snapshot. poll_count={poll_count[0]}" ) @patch( "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" ) - def test_remote_conversation_run_final_refresh_must_still_be_terminal( + def test_remote_conversation_run_rest_finished_revert_waits_for_full_state( self, mock_ws_client ): - """Do not return if the final authoritative refresh sees a hook veto.""" + """Do not return from REST FINISHED when a hook can still veto it.""" conversation_id = str(uuid.uuid4()) mock_client_instance = self.setup_mock_client(conversation_id=conversation_id) @@ -931,12 +1039,7 @@ def custom_side_effect(method, url, **kwargs): "stats": {"usage_to_metrics": {}}, } if poll_count[0] >= len(rest_script): - ws_callback[0]( - ConversationStateUpdateEvent( - key=FULL_STATE_KEY, - value={"execution_status": "finished"}, - ) - ) + ws_callback[0](self.full_state_event("finished")) return response return original_side_effect(method, url, **kwargs) @@ -951,21 +1054,22 @@ def custom_side_effect(method, url, **kwargs): conversation.run(blocking=True, poll_interval=0.01) assert poll_count[0] >= len(rest_script), ( - f"Run() returned before the final refresh confirmed terminal status. " + f"Run() returned before the post-run full-state snapshot. " f"poll_count={poll_count[0]}" ) @patch( "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" ) - def test_remote_conversation_run_keeps_error_queued_after_finished_hint( + def test_remote_conversation_run_keeps_error_queued_after_running_poll( self, mock_ws_client ): - """Active-run FINISHED hint cleanup must not discard ERROR/STUCK.""" + """A non-terminal REST poll must not discard queued ERROR/STUCK.""" conversation_id = str(uuid.uuid4()) mock_client_instance = self.setup_mock_client(conversation_id=conversation_id) original_side_effect = mock_client_instance.request.side_effect poll_count = [0] + ws_callback = [lambda event: None] def custom_side_effect(method, url, **kwargs): resp = original_side_effect(method, url, **kwargs) @@ -977,8 +1081,11 @@ def custom_side_effect(method, url, **kwargs): "stats": {"usage_to_metrics": {}}, } if poll_count[0] == 1: - conversation._terminal_status_queue.put("finished") - conversation._terminal_status_queue.put("error") + ws_callback[0]( + ConversationStateUpdateEvent( + key="execution_status", value="error" + ) + ) return resp mock_client_instance.request.side_effect = custom_side_effect @@ -986,6 +1093,7 @@ def custom_side_effect(method, url, **kwargs): mock_ws_client.return_value = mock_ws_instance conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) + ws_callback[0] = mock_ws_client.call_args.kwargs["callback"] conversation._get_last_error_detail = Mock(return_value="boom") with pytest.raises(Exception) as excinfo: @@ -1015,6 +1123,7 @@ def test_remote_conversation_run_ws_error_still_terminates_immediately( conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) conversation._get_last_error_detail = Mock(return_value="boom") + ws_callback = [mock_ws_client.call_args.kwargs["callback"]] # ``run()`` drains the WS queue at trigger time (see line "Drain any # stale terminal status events from previous runs"). We need the @@ -1026,7 +1135,9 @@ def test_remote_conversation_run_ws_error_still_terminates_immediately( def post_run_seeds_error(method, url, **kwargs): resp = original_side_effect(method, url, **kwargs) if method == "POST" and url.endswith("/run"): - conversation._terminal_status_queue.put("error") + ws_callback[0]( + ConversationStateUpdateEvent(key="execution_status", value="error") + ) return resp mock_client_instance.request.side_effect = post_run_seeds_error @@ -1041,10 +1152,10 @@ def post_run_seeds_error(method, url, **kwargs): @patch( "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" ) - def test_remote_conversation_run_rest_fallback_refreshes_final_state( + def test_remote_conversation_run_full_state_updates_cached_state( self, mock_ws_client ): - """REST fallback refreshes cached state before run() returns.""" + """Post-run full-state snapshots update cached state before run() returns.""" conversation_id = str(uuid.uuid4()) mock_client_instance = self.setup_mock_client(conversation_id=conversation_id) @@ -1079,6 +1190,7 @@ def test_remote_conversation_run_rest_fallback_refreshes_final_state( poll_count = [0] original_side_effect = mock_client_instance.request.side_effect + ws_callback = [lambda event: None] def custom_side_effect(method, url, **kwargs): if method == "GET" and url == f"/api/conversations/{conversation_id}": @@ -1096,7 +1208,12 @@ def custom_side_effect(method, url, **kwargs): response.json.return_value = stale_info else: response.json.return_value = final_info - conversation._terminal_status_queue.put("finished") + ws_callback[0]( + ConversationStateUpdateEvent( + key=FULL_STATE_KEY, + value=final_info, + ) + ) return response return original_side_effect(method, url, **kwargs) @@ -1106,6 +1223,7 @@ def custom_side_effect(method, url, **kwargs): mock_ws_client.return_value = mock_ws_instance conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) + ws_callback[0] = mock_ws_client.call_args.kwargs["callback"] conversation.state._cached_state = { "id": conversation_id, "execution_status": "running", @@ -1115,6 +1233,7 @@ def custom_side_effect(method, url, **kwargs): conversation.run(blocking=True, poll_interval=0.01) assert poll_count[0] >= 1 + assert conversation.state.stats.usage_to_metrics @patch( "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" From 1b5f8098c2a45db204c5fc468cf564d7c1ee6719 Mon Sep 17 00:00:00 2001 From: openhands Date: Thu, 28 May 2026 04:54:05 +0000 Subject: [PATCH 11/20] fix(remote-conv): gate full-state signals on _run_armed; add REST hard-fallback (#3191) Two issues raised in bot review: 1. Thread PRRT_kwDOPjFrIs6EmDY5 (line 861): The WS client delivers an initial full-state snapshot during connect() with _ready set before callbacks run. A non-RUNNING status in that snapshot (e.g. "idle") could be consumed by _wait_for_run_completion() as the completion signal for the *next* run() call, causing blocking=True to return before the server actually finished. Fix: add _run_armed (threading.Event). In run_complete_callback the full-state path only queues a signal when _run_armed.is_set(). run() clears it before the drain, sets it after POST /run returns, and clears it again after _wait_for_run_completion() exits. Per-field ERROR/STUCK events are unaffected (they go through the immediate-terminal path). 2. Thread PRRT_kwDOPjFrIs6EmDY7 (line 1144): The REST polling path had no successful completion exit. After TERMINAL_POLL_THRESHOLD (3) logs a warning, the loop kept waiting for a WS snapshot indefinitely, so a socket drop after the run finished could hang run(blocking=True) until the overall 1-hour timeout. Fix: add TERMINAL_POLL_HARD_FALLBACK=30. After 30 consecutive REST terminal polls (~30 s with default poll_interval=1.0 s) the client accepts the status, reconciles events, and returns with a warning. Stop hooks complete in seconds; 30 s far outlasts any realistic hook window. Test updates: - install_post_run_full_state and test_remote_conversation_run_already_running moved the WS event injection from the POST side effect to the first GET poll so it fires after _run_armed is set (was racing arming). - test_remote_conversation_run_returns_on_waiting_for_confirmation_snapshot updated for the same reason. - Two new tests added: - test_remote_conversation_run_stale_pre_run_snapshot_is_ignored: verifies a pre-run full-state snapshot is discarded by the arming guard. - test_remote_conversation_run_rest_hard_fallback_when_ws_silent: verifies run() completes via the hard fallback when WS never delivers the snapshot. All 40 tests pass. Co-authored-by: openhands --- .../conversation/impl/remote_conversation.py | 60 ++++++-- .../remote/test_remote_conversation.py | 134 ++++++++++++++++-- 2 files changed, 176 insertions(+), 18 deletions(-) diff --git a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py index b94a62be27..2d1c6c7c2e 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py @@ -649,6 +649,7 @@ class RemoteConversation(BaseConversation): _client: httpx.Client _cleanup_initiated: bool _terminal_status_queue: Queue[_RunCompletionSignal] + _run_armed: threading.Event _conversation_info_base_path: str _conversation_action_base_path: str delete_on_close: bool = False @@ -713,6 +714,7 @@ def __init__( self._conversation_action_base_path = LEGACY_CONVERSATIONS_PATH self._cleanup_initiated = False self._terminal_status_queue: Queue[_RunCompletionSignal] = Queue() + self._run_armed = threading.Event() should_create = conversation_id is None if conversation_id is not None: @@ -851,6 +853,17 @@ def run_complete_callback(event: Event) -> None: return if event.key == FULL_STATE_KEY: + # Only accept full-state snapshots as run-completion signals + # when a run is actually in progress. The WS subscription + # delivers an initial full-state snapshot during connect(); if + # that snapshot carries a non-RUNNING status (e.g. "idle"), it + # could be picked up by _wait_for_run_completion() as the + # completion signal for the *next* run() invocation, causing + # blocking=True to return before the server has actually + # finished. Gating on _run_armed (set after POST /run) + # ensures only post-run snapshots are treated as authoritative. + if not self._run_armed.is_set(): + return raw_status = event.value.get("execution_status") if raw_status is not None: try: @@ -1021,8 +1034,11 @@ def run( Raises: ConversationRunError: If the run fails or times out. """ - # Drain any stale terminal status events from previous runs. - # This prevents stale events from causing early returns. + # Disarm and drain any stale terminal status events from previous runs + # before arming for the new one. _run_armed gates full-state WS snapshots + # in run_complete_callback; clearing it here prevents the initial WS + # subscription snapshot from being mistaken for the post-run snapshot. + self._run_armed.clear() self._drain_terminal_status_queue() # Trigger a run on the server using the dedicated run endpoint. @@ -1045,7 +1061,13 @@ def run( logger.info(f"run() triggered successfully: {resp}") if blocking: - self._wait_for_run_completion(poll_interval, timeout) + # Arm after POST so that only WS full-state snapshots arriving + # after the run was triggered are treated as run-completion signals. + self._run_armed.set() + try: + self._wait_for_run_completion(poll_interval, timeout) + finally: + self._run_armed.clear() def _wait_for_run_completion( self, @@ -1075,10 +1097,17 @@ def _wait_for_run_completion( """ start_time = time.monotonic() consecutive_terminal_polls = 0 - # Warn after this many consecutive REST terminal polls without the - # authoritative post-run snapshot. This is a health signal only; returning - # from REST FINISHED would reintroduce the stop-hook race. + # Log a warning after this many consecutive REST terminal polls without a + # post-run WS snapshot. This is a health signal, not a return path — + # returning immediately on REST FINISHED would reintroduce the stop-hook race. TERMINAL_POLL_THRESHOLD = 3 + # Hard fallback: after this many consecutive terminal REST polls (~30 s with + # default poll_interval) treat the server as done. Stop hooks are expected to + # be fast (seconds); 30 consecutive polls far outlasts any realistic hook + # window, so accepting REST-confirmed FINISHED here is safe in practice. + # This bounds the hang to ~30 s when the WS post-run snapshot is never + # received (e.g. socket drop after the run finishes on the server). + TERMINAL_POLL_HARD_FALLBACK = 30 while True: elapsed = time.monotonic() - start_time @@ -1138,8 +1167,8 @@ def _wait_for_run_completion( if status and ConversationExecutionStatus(status).is_terminal(): # ERROR/STUCK have already been handled above. FINISHED from - # REST is only a hint because stop hooks can still veto it; - # wait for the server's post-run WebSocket state update. + # REST is advisory because stop hooks can still veto it; + # prefer waiting for the server's post-run WebSocket state update. consecutive_terminal_polls += 1 if consecutive_terminal_polls == TERMINAL_POLL_THRESHOLD: logger.warning( @@ -1151,6 +1180,21 @@ def _wait_for_run_completion( consecutive_terminal_polls, elapsed, ) + if consecutive_terminal_polls >= TERMINAL_POLL_HARD_FALLBACK: + logger.warning( + "REST has reported terminal status %s for %d consecutive " + "polls (~%.0fs) with no post-run WebSocket snapshot; " + "accepting as final to avoid an indefinite hang " + "(elapsed: %.1fs). This may indicate a WebSocket " + "delivery issue.", + status, + consecutive_terminal_polls, + consecutive_terminal_polls * poll_interval, + elapsed, + ) + self._state.refresh_from_server() + self._state.events.reconcile() + return else: consecutive_terminal_polls = 0 diff --git a/tests/sdk/conversation/remote/test_remote_conversation.py b/tests/sdk/conversation/remote/test_remote_conversation.py index 0f8b761510..4e0480a023 100644 --- a/tests/sdk/conversation/remote/test_remote_conversation.py +++ b/tests/sdk/conversation/remote/test_remote_conversation.py @@ -138,12 +138,27 @@ def install_post_run_full_state( status: str = "finished", **values, ): + """Install a side effect that fires a full-state WS event on the first + REST GET poll after POST /run. + + The event is fired from the GET side effect (inside + _wait_for_run_completion, after _run_armed is set) rather than from the + POST side effect. Firing from POST races _run_armed.set(), which follows + the POST return, so the event would be silently discarded by + run_complete_callback's arming guard. + """ ws_callback = [lambda event: None] original_side_effect = mock_client_instance.request.side_effect + fired = [False] def custom_side_effect(method, url, **kwargs): resp = original_side_effect(method, url, **kwargs) - if method == "POST" and url == f"/api/conversations/{conversation_id}/run": + if ( + not fired[0] + and method == "GET" + and url == f"/api/conversations/{conversation_id}" + ): + fired[0] = True ws_callback[0](self.full_state_event(status, **values)) return resp @@ -706,17 +721,27 @@ def test_remote_conversation_run_already_running(self, mock_ws_client): mock_client_instance = self.setup_mock_client(conversation_id=conversation_id) ws_callback = [lambda event: None] - # Override the default request side_effect to return 409 for /run endpoint + # Override the default request side_effect to return 409 for /run endpoint. + # The full-state completion event fires on the first GET poll (after arming) + # rather than inline with the POST, since _run_armed is set after POST returns. original_side_effect = mock_client_instance.request.side_effect + fired = [False] def custom_side_effect(method, url, **kwargs): if method == "POST" and "/run" in url: mock_run_response = Mock() mock_run_response.status_code = 409 # Already running mock_run_response.raise_for_status.return_value = None - ws_callback[0](self.full_state_event("finished")) return mock_run_response - return original_side_effect(method, url, **kwargs) + resp = original_side_effect(method, url, **kwargs) + if ( + not fired[0] + and method == "GET" + and url == f"/api/conversations/{conversation_id}" + ): + fired[0] = True + ws_callback[0](self.full_state_event("finished")) + return resp mock_client_instance.request.side_effect = custom_side_effect @@ -843,19 +868,21 @@ def test_remote_conversation_run_returns_on_waiting_for_confirmation_snapshot( mock_client_instance = self.setup_mock_client(conversation_id=conversation_id) ws_callback = [lambda event: None] original_side_effect = mock_client_instance.request.side_effect + first_poll = [True] def custom_side_effect(method, url, **kwargs): - if method == "POST" and url == f"/api/conversations/{conversation_id}/run": - response = original_side_effect(method, url, **kwargs) - ws_callback[0](self.full_state_event("waiting_for_confirmation")) - return response if method == "GET" and url == f"/api/conversations/{conversation_id}": + if first_poll[0]: + # Fire the full-state event on the first REST poll, which runs + # inside _wait_for_run_completion() after _run_armed is set. + first_poll[0] = False + ws_callback[0](self.full_state_event("waiting_for_confirmation")) response = Mock() response.status_code = 200 response.raise_for_status.return_value = None response.json.return_value = { "id": conversation_id, - "execution_status": "running", + "execution_status": "waiting_for_confirmation", "stats": {"usage_to_metrics": {}}, } return response @@ -868,7 +895,7 @@ def custom_side_effect(method, url, **kwargs): conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) ws_callback[0] = mock_ws_client.call_args.kwargs["callback"] - conversation.run(blocking=True, poll_interval=0.01, timeout=0.05) + conversation.run(blocking=True, poll_interval=0.01) assert conversation.state.execution_status.value == "waiting_for_confirmation" @@ -1149,6 +1176,93 @@ def post_run_seeds_error(method, url, **kwargs): # also assert below that we surfaced quickly. assert "boom" in str(excinfo.value) or "error" in str(excinfo.value).lower() + @patch( + "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" + ) + def test_remote_conversation_run_stale_pre_run_snapshot_is_ignored( + self, mock_ws_client + ): + """A full-state WS snapshot received before run() POST must not complete run(). + + The WS subscription delivers an initial full-state snapshot during + connect(). If that snapshot carries a non-RUNNING status (e.g. "idle"), + it must NOT be treated as the post-run completion signal — _run_armed + is not yet set at that point. run() should only complete once a + full-state snapshot arrives after the POST /run call. + """ + conversation_id = str(uuid.uuid4()) + mock_client_instance = self.setup_mock_client(conversation_id=conversation_id) + ws_callback = [lambda event: None] + original_side_effect = mock_client_instance.request.side_effect + poll_count = [0] + + def custom_side_effect(method, url, **kwargs): + resp = original_side_effect(method, url, **kwargs) + if method == "GET" and url == f"/api/conversations/{conversation_id}": + poll_count[0] += 1 + if poll_count[0] == 1: + # Fire the real post-run snapshot on the first REST poll (armed). + ws_callback[0](self.full_state_event("finished")) + return resp + + mock_client_instance.request.side_effect = custom_side_effect + mock_ws_instance = Mock() + mock_ws_client.return_value = mock_ws_instance + + conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) + ws_callback[0] = mock_ws_client.call_args.kwargs["callback"] + + # Inject a stale "idle" snapshot directly into the queue as if it + # arrived from the initial subscription, before run() is called. + # _run_armed is not set yet, so run_complete_callback would discard it, + # but simulating a direct queue put lets us verify the guard works end-to-end. + ws_callback[0](self.full_state_event("idle")) + assert conversation._terminal_status_queue.empty(), ( + "Stale pre-run snapshot must not enter the queue (_run_armed not set)" + ) + + # run() should complete via the first REST poll's full-state event, not + # the stale pre-run snapshot. + conversation.run(blocking=True, poll_interval=0.01) + assert poll_count[0] >= 1 + + @patch( + "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" + ) + def test_remote_conversation_run_rest_hard_fallback_when_ws_silent( + self, mock_ws_client + ): + """run() completes via REST hard-fallback when WS snapshot never arrives. + + When the post-run WS full-state snapshot is never delivered (e.g. socket + dropped after the run finished), the client should not hang until the + overall timeout. After TERMINAL_POLL_HARD_FALLBACK consecutive REST + terminal polls it must accept the status and return. + """ + conversation_id = str(uuid.uuid4()) + mock_client_instance = self.setup_mock_client(conversation_id=conversation_id) + poll_count = [0] + original_side_effect = mock_client_instance.request.side_effect + + def custom_side_effect(method, url, **kwargs): + resp = original_side_effect(method, url, **kwargs) + if method == "GET" and url == f"/api/conversations/{conversation_id}": + poll_count[0] += 1 + # Never fire the post-run WS snapshot — simulate silent socket. + return resp + + mock_client_instance.request.side_effect = custom_side_effect + mock_ws_client.return_value = Mock() + + conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) + # The default mock returns "finished" for all GET polls (setup_mock_client). + conversation.run(blocking=True, poll_interval=0.01) + + # Should have hit the hard fallback (TERMINAL_POLL_HARD_FALLBACK = 30). + assert poll_count[0] >= 30, ( + f"Expected >=30 REST polls for hard fallback, got {poll_count[0]}" + ) + @patch( "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" ) From 2071f2c92095ffc32e723559974e7a2d0cf2b003 Mon Sep 17 00:00:00 2001 From: openhands Date: Thu, 28 May 2026 05:12:05 +0000 Subject: [PATCH 12/20] =?UTF-8?q?fix(remote-conv):=20address=20bot=20revie?= =?UTF-8?q?w=20round-2=20=E2=80=94=20dead=20code,=20naming,=20time-based?= =?UTF-8?q?=20fallback?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three issues from all-hands-bot on SHA 1b5f8098: 1. PRRT_kwDOPjFrIs6FTAku / FTA1r (🟠 Dead code, line 1138): _handle_conversation_status raises ConversationRunError for every status in _immediate_terminal_statuses() (ERROR/STUCK). The 'if ws_status in _immediate_terminal_statuses(): ... return' block could therefore never execute — if the status were ERROR/STUCK, the raise above would prevent reaching that check. Removed the dead block; added a comment explaining why only non-immediate statuses can reach the remaining code. 2. PRRT_kwDOPjFrIs6FTAkw / FTA1t (🔵 Naming, line 1103): TERMINAL_POLL_THRESHOLD previously named the threshold at which REST polling returned. After the last commit it only triggers a warning log. Renamed to TERMINAL_POLL_WARNING_THRESHOLD to match the current role. 3. PRRT_kwDOPjFrIs6FTAky / FTA1w (🔵 Count-based fallback is interval- sensitive, line 1110): TERMINAL_POLL_HARD_FALLBACK=30 meant 30 polls × poll_interval s, so the effective window scaled with poll_interval. With poll_interval=0.01 s the fallback fired in 0.3 s — before any realistic stop hook. Replaced with a time-based guard: TERMINAL_HARD_FALLBACK_SECS=30.0, tracking terminal_first_seen_at via time.monotonic(). The 30 s window is independent of poll_interval. Also added terminal_first_seen_at = None reset in the poll-error branch to avoid a stale first-seen clock. Test update: test_remote_conversation_run_rest_hard_fallback_when_ws_silent now patches time.monotonic to advance 10 s per call so the 30 s threshold is crossed after ~3 REST polls (real wall time ~0.03 s with poll_interval=0.01). Co-authored-by: openhands --- .../conversation/impl/remote_conversation.py | 52 +++++++++---------- .../remote/test_remote_conversation.py | 28 +++++++--- 2 files changed, 47 insertions(+), 33 deletions(-) diff --git a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py index 2d1c6c7c2e..995aaf440b 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py @@ -1100,14 +1100,15 @@ def _wait_for_run_completion( # Log a warning after this many consecutive REST terminal polls without a # post-run WS snapshot. This is a health signal, not a return path — # returning immediately on REST FINISHED would reintroduce the stop-hook race. - TERMINAL_POLL_THRESHOLD = 3 - # Hard fallback: after this many consecutive terminal REST polls (~30 s with - # default poll_interval) treat the server as done. Stop hooks are expected to - # be fast (seconds); 30 consecutive polls far outlasts any realistic hook - # window, so accepting REST-confirmed FINISHED here is safe in practice. - # This bounds the hang to ~30 s when the WS post-run snapshot is never - # received (e.g. socket drop after the run finishes on the server). - TERMINAL_POLL_HARD_FALLBACK = 30 + TERMINAL_POLL_WARNING_THRESHOLD = 3 + # Time-based hard fallback: accept REST-confirmed terminal status after + # the server has been reporting terminal for at least this many seconds + # without a post-run WS snapshot. Stop hooks are fast (seconds); 30 s + # is a safe bound regardless of poll_interval. This prevents an + # indefinite hang when the WS snapshot is never delivered (e.g., socket + # dropped after the run finishes on the server). + TERMINAL_HARD_FALLBACK_SECS = 30.0 + terminal_first_seen_at: float | None = None while True: elapsed = time.monotonic() - start_time @@ -1126,16 +1127,11 @@ def _wait_for_run_completion( try: signal = self._terminal_status_queue.get(timeout=poll_interval) ws_status = signal.status - # Raises ConversationRunError on ERROR/STUCK; otherwise no-op. + # Raises ConversationRunError on ERROR/STUCK; returns otherwise. + # _immediate_terminal_statuses() == {ERROR, STUCK}; if ws_status + # were in that set, _handle_conversation_status would have raised + # above, so execution only continues for non-immediate statuses. self._handle_conversation_status(ws_status) - if ws_status in self._immediate_terminal_statuses(): - logger.info( - "Run completed via WebSocket notification " - "(status: %s, elapsed: %.1fs)", - ws_status, - elapsed, - ) - return if not signal.from_post_run_snapshot: continue logger.info( @@ -1161,6 +1157,7 @@ def _wait_for_run_completion( except Exception as exc: self._handle_poll_exception(exc) consecutive_terminal_polls = 0 # Reset on error + terminal_first_seen_at = None else: # Raises ConversationRunError for ERROR/STUCK states self._handle_conversation_status(status) @@ -1170,7 +1167,10 @@ def _wait_for_run_completion( # REST is advisory because stop hooks can still veto it; # prefer waiting for the server's post-run WebSocket state update. consecutive_terminal_polls += 1 - if consecutive_terminal_polls == TERMINAL_POLL_THRESHOLD: + now = time.monotonic() + if terminal_first_seen_at is None: + terminal_first_seen_at = now + if consecutive_terminal_polls == TERMINAL_POLL_WARNING_THRESHOLD: logger.warning( "REST has reported terminal status %s for %d polls " "without a post-run WebSocket snapshot; continuing " @@ -1180,16 +1180,15 @@ def _wait_for_run_completion( consecutive_terminal_polls, elapsed, ) - if consecutive_terminal_polls >= TERMINAL_POLL_HARD_FALLBACK: + terminal_secs = now - terminal_first_seen_at + if terminal_secs >= TERMINAL_HARD_FALLBACK_SECS: logger.warning( - "REST has reported terminal status %s for %d consecutive " - "polls (~%.0fs) with no post-run WebSocket snapshot; " - "accepting as final to avoid an indefinite hang " - "(elapsed: %.1fs). This may indicate a WebSocket " - "delivery issue.", + "REST has reported terminal status %s for %.0fs " + "with no post-run WebSocket snapshot; accepting as " + "final to avoid an indefinite hang (elapsed: %.1fs). " + "This may indicate a WebSocket delivery issue.", status, - consecutive_terminal_polls, - consecutive_terminal_polls * poll_interval, + terminal_secs, elapsed, ) self._state.refresh_from_server() @@ -1197,6 +1196,7 @@ def _wait_for_run_completion( return else: consecutive_terminal_polls = 0 + terminal_first_seen_at = None @staticmethod def _immediate_terminal_statuses() -> frozenset[str]: diff --git a/tests/sdk/conversation/remote/test_remote_conversation.py b/tests/sdk/conversation/remote/test_remote_conversation.py index 4e0480a023..52b4f408cb 100644 --- a/tests/sdk/conversation/remote/test_remote_conversation.py +++ b/tests/sdk/conversation/remote/test_remote_conversation.py @@ -1,5 +1,6 @@ """Tests for RemoteConversation.""" +import time import uuid from unittest.mock import Mock, patch @@ -1236,8 +1237,11 @@ def test_remote_conversation_run_rest_hard_fallback_when_ws_silent( When the post-run WS full-state snapshot is never delivered (e.g. socket dropped after the run finished), the client should not hang until the - overall timeout. After TERMINAL_POLL_HARD_FALLBACK consecutive REST + overall timeout. After TERMINAL_HARD_FALLBACK_SECS of consecutive REST terminal polls it must accept the status and return. + + time.monotonic is patched to advance 10 s per call so the 30 s threshold + is crossed after ~3 REST polls (real wall time ~poll_interval * 3). """ conversation_id = str(uuid.uuid4()) mock_client_instance = self.setup_mock_client(conversation_id=conversation_id) @@ -1255,13 +1259,23 @@ def custom_side_effect(method, url, **kwargs): mock_ws_client.return_value = Mock() conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) - # The default mock returns "finished" for all GET polls (setup_mock_client). - conversation.run(blocking=True, poll_interval=0.01) - # Should have hit the hard fallback (TERMINAL_POLL_HARD_FALLBACK = 30). - assert poll_count[0] >= 30, ( - f"Expected >=30 REST polls for hard fallback, got {poll_count[0]}" - ) + # Patch time.monotonic to advance 10 s per call so the 30 s hard-fallback + # threshold is crossed after ~3 REST polls. + call_counter = [0] + base = time.monotonic() + + def fast_monotonic() -> float: + call_counter[0] += 1 + return base + call_counter[0] * 10.0 + + with patch( + "openhands.sdk.conversation.impl.remote_conversation.time.monotonic", + side_effect=fast_monotonic, + ): + conversation.run(blocking=True, poll_interval=0.01) + + assert poll_count[0] >= 1, f"Expected at least 1 REST poll, got {poll_count[0]}" @patch( "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" From 5242017cbd14f716b137981da8db62cf7263b6bb Mon Sep 17 00:00:00 2001 From: openhands Date: Thu, 28 May 2026 05:35:15 +0000 Subject: [PATCH 13/20] =?UTF-8?q?fix(remote-conv):=20address=20bot=20revie?= =?UTF-8?q?w=20round-3=20suggestions=20=E2=80=94=20dead=20continue,=20stal?= =?UTF-8?q?e=20docstring,=20module=20constant?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three 🟡 suggestions from all-hands-bot on SHA 2071f2c9: 1. PRRT_kwDOPjFrIs6FTIrf (dead continue, line 1135): After removing the 'if ws_status in _immediate_terminal_statuses(): return' block in round 2, 'if not signal.from_post_run_snapshot: continue' became unreachable: signals with from_post_run_snapshot=False carry ERROR/STUCK and are raised by _handle_conversation_status before reaching this check. Removed the branch; updated the preceding comment to explain that only full-state snapshots (from_post_run_snapshot=True) reach the reconcile/return. 2. PRRT_kwDOPjFrIs6FTIrg (stale docstring, line 1221): _drain_terminal_status_queue docstring still mentioned a mid-loop drain for REST non-terminal-after-WS-hint that was removed in an earlier commit. Updated to describe the single call site: drain at run() start (before arming) to discard stale signals from a previous run invocation. 3. PRRT_kwDOPjFrIs6FTIri (frozenset allocation, line 1210): _immediate_terminal_statuses() was a static method allocating a new frozenset on every call from the hot run_complete_callback WS path. Replaced with a module-level constant _IMMEDIATE_TERMINAL_STATUSES defined once at module load time. Removed the static method; updated the single call site to use the constant directly. Co-authored-by: openhands --- .../conversation/impl/remote_conversation.py | 46 ++++++++----------- 1 file changed, 19 insertions(+), 27 deletions(-) diff --git a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py index 995aaf440b..2865f84d9b 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py @@ -69,6 +69,18 @@ class _RunCompletionSignal: from_post_run_snapshot: bool +# Statuses that are immediately terminal for per-field WS updates. +# FINISHED is intentionally excluded: stop hooks can flip a freshly-set +# FINISHED back to RUNNING within the same server-side iteration, so FINISHED +# is only accepted from the server's authoritative post-run full-state snapshot. +_IMMEDIATE_TERMINAL_STATUSES: frozenset[str] = frozenset( + { + ConversationExecutionStatus.ERROR.value, + ConversationExecutionStatus.STUCK.value, + } +) + + def _agent_kind_mismatch_message(conversation_id: ConversationID) -> str: return ( f"Conversation {conversation_id} was started with a different agent kind. " @@ -883,7 +895,7 @@ def run_complete_callback(event: Event) -> None: if event.key == "execution_status": try: status = ConversationExecutionStatus(event.value) - if status.value in self._immediate_terminal_statuses(): + if status.value in _IMMEDIATE_TERMINAL_STATUSES: self._terminal_status_queue.put( _RunCompletionSignal( status=status.value, @@ -1127,13 +1139,11 @@ def _wait_for_run_completion( try: signal = self._terminal_status_queue.get(timeout=poll_interval) ws_status = signal.status - # Raises ConversationRunError on ERROR/STUCK; returns otherwise. - # _immediate_terminal_statuses() == {ERROR, STUCK}; if ws_status - # were in that set, _handle_conversation_status would have raised - # above, so execution only continues for non-immediate statuses. + # Raises ConversationRunError on ERROR/STUCK; no-op otherwise. + # Per-field ERROR/STUCK signals raise here; full-state snapshots + # (from_post_run_snapshot=True) are the only signals that reach + # the log/reconcile/return below. self._handle_conversation_status(ws_status) - if not signal.from_post_run_snapshot: - continue logger.info( "Run completed via post-run WebSocket state update " "(status: %s, elapsed: %.1fs)", @@ -1198,29 +1208,11 @@ def _wait_for_run_completion( consecutive_terminal_polls = 0 terminal_first_seen_at = None - @staticmethod - def _immediate_terminal_statuses() -> frozenset[str]: - """Statuses that the WS path may treat as authoritative termination. - - FINISHED is intentionally excluded for per-field state updates: stop hooks - can flip a freshly-set FINISHED back to RUNNING within the same server-side - iteration. FINISHED is accepted only from the server's full post-run state - snapshot. - """ - return frozenset( - { - ConversationExecutionStatus.ERROR.value, - ConversationExecutionStatus.STUCK.value, - } - ) - def _drain_terminal_status_queue(self) -> None: """Empty the WS terminal-status hint queue. - Called both when a new run is triggered (drop stale notifications - from a previous run) and when REST polling sees a non-terminal - status after a WS terminal hint (the hint was transient — e.g. a - stop-hook block that flipped FINISHED back to RUNNING). + Called at the start of run() (before arming) to discard any stale + signals left over from a previous run invocation. """ while True: try: From 8a6f2b679bdd156abcb60ef1263df48152a01b50 Mon Sep 17 00:00:00 2001 From: openhands Date: Thu, 28 May 2026 06:00:51 +0000 Subject: [PATCH 14/20] fix(remote-conv): add comment to from_post_run_snapshot field (PRRT_kwDOPjFrIs6FTaqM) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The field is set by both queue-put sites but never checked in _wait_for_run_completion after the dead 'if not from_post_run_snapshot: continue' branch was removed. Add a comment documenting that it is retained for debugger inspection only — ERROR/STUCK signals raise before the log/reconcile/return path and all other signals come from the authoritative post-run full-state snapshot. Co-authored-by: openhands --- .../openhands/sdk/conversation/impl/remote_conversation.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py index 2865f84d9b..523b797c4b 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py @@ -66,6 +66,10 @@ @dataclass(frozen=True) class _RunCompletionSignal: status: str + # from_post_run_snapshot is retained for debugger inspection; it is not + # read by _wait_for_run_completion because ERROR/STUCK signals always raise + # before reaching the log/reconcile/return path, and all other signals + # originate from the authoritative post-run full-state snapshot. from_post_run_snapshot: bool From 35edcddd85af56df5a664c8fa00b8d09182a3488 Mon Sep 17 00:00:00 2001 From: openhands Date: Thu, 28 May 2026 06:44:33 +0000 Subject: [PATCH 15/20] fix(remote-conv): assert from_post_run_snapshot, rename terminal_poll_count (round 5) - Add assert signal.from_post_run_snapshot before reconcile/return path (PRRT_kwDOPjFrIs6FT_FY): makes the invariant fail-fast if a future change accidentally routes a non-snapshot signal to the return path. Update _RunCompletionSignal docstring to reflect that the field is now actively asserted, not just retained for debugger inspection. - Rename consecutive_terminal_polls -> terminal_poll_count (PRRT_kwDOPjFrIs6FT_Fa): the variable is now purely a diagnostic counter (feeds TERMINAL_POLL_WARNING_THRESHOLD log); the actual fallback gate is terminal_first_seen_at. The old name implied it governed the return decision, which it no longer does. Co-authored-by: openhands --- .../conversation/impl/remote_conversation.py | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py index 523b797c4b..5abe873cf2 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py @@ -66,10 +66,9 @@ @dataclass(frozen=True) class _RunCompletionSignal: status: str - # from_post_run_snapshot is retained for debugger inspection; it is not - # read by _wait_for_run_completion because ERROR/STUCK signals always raise - # before reaching the log/reconcile/return path, and all other signals - # originate from the authoritative post-run full-state snapshot. + # True for full-state snapshot signals (from the post-run ConversationStateUpdateEvent); + # False for per-field ERROR/STUCK signals (which raise before reaching reconcile). + # Asserted in _wait_for_run_completion to catch future regressions. from_post_run_snapshot: bool @@ -1112,7 +1111,7 @@ def _wait_for_run_completion( responses are retried until timeout. """ start_time = time.monotonic() - consecutive_terminal_polls = 0 + terminal_poll_count = 0 # Log a warning after this many consecutive REST terminal polls without a # post-run WS snapshot. This is a health signal, not a return path — # returning immediately on REST FINISHED would reintroduce the stop-hook race. @@ -1148,6 +1147,12 @@ def _wait_for_run_completion( # (from_post_run_snapshot=True) are the only signals that reach # the log/reconcile/return below. self._handle_conversation_status(ws_status) + # Invariant: only full-state snapshot signals reach this point. + # ERROR/STUCK raise above; all other enqueued signals originate + # from the authoritative post-run full-state snapshot. + assert signal.from_post_run_snapshot, ( + f"Unexpected non-snapshot signal reached reconcile path: {signal!r}" + ) logger.info( "Run completed via post-run WebSocket state update " "(status: %s, elapsed: %.1fs)", @@ -1170,7 +1175,7 @@ def _wait_for_run_completion( status = self._poll_status_once() except Exception as exc: self._handle_poll_exception(exc) - consecutive_terminal_polls = 0 # Reset on error + terminal_poll_count = 0 # Reset on error terminal_first_seen_at = None else: # Raises ConversationRunError for ERROR/STUCK states @@ -1180,18 +1185,18 @@ def _wait_for_run_completion( # ERROR/STUCK have already been handled above. FINISHED from # REST is advisory because stop hooks can still veto it; # prefer waiting for the server's post-run WebSocket state update. - consecutive_terminal_polls += 1 + terminal_poll_count += 1 now = time.monotonic() if terminal_first_seen_at is None: terminal_first_seen_at = now - if consecutive_terminal_polls == TERMINAL_POLL_WARNING_THRESHOLD: + if terminal_poll_count == TERMINAL_POLL_WARNING_THRESHOLD: logger.warning( "REST has reported terminal status %s for %d polls " "without a post-run WebSocket snapshot; continuing " "to wait for the authoritative snapshot " "(elapsed: %.1fs)", status, - consecutive_terminal_polls, + terminal_poll_count, elapsed, ) terminal_secs = now - terminal_first_seen_at @@ -1209,7 +1214,7 @@ def _wait_for_run_completion( self._state.events.reconcile() return else: - consecutive_terminal_polls = 0 + terminal_poll_count = 0 terminal_first_seen_at = None def _drain_terminal_status_queue(self) -> None: From 932a823ee27534897f6e56fd25bb95f4a163d0ce Mon Sep 17 00:00:00 2001 From: openhands Date: Thu, 28 May 2026 06:51:16 +0000 Subject: [PATCH 16/20] fix(remote-conv): fix E501 in _RunCompletionSignal comment (ruff) Shorten line 69 comment that triggered ruff E501 (92 > 88 chars) introduced in the round-5 commit. Co-authored-by: openhands --- .../openhands/sdk/conversation/impl/remote_conversation.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py index 5abe873cf2..85883778b5 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py @@ -66,8 +66,8 @@ @dataclass(frozen=True) class _RunCompletionSignal: status: str - # True for full-state snapshot signals (from the post-run ConversationStateUpdateEvent); - # False for per-field ERROR/STUCK signals (which raise before reaching reconcile). + # True for full-state snapshot signals (post-run ConversationStateUpdateEvent); + # False for per-field ERROR/STUCK signals (which raise before reconcile). # Asserted in _wait_for_run_completion to catch future regressions. from_post_run_snapshot: bool From 7c51f00439bfea78797ee0025b55cb9653646fd4 Mon Sep 17 00:00:00 2001 From: openhands Date: Thu, 28 May 2026 07:27:33 +0000 Subject: [PATCH 17/20] fix(remote-conv): explicit raise instead of assert, tighten cost assertion (round 6) - Replace 'assert signal.from_post_run_snapshot' with explicit 'if not ... raise AssertionError(...)' (PRRT_kwDOPjFrIs6FUiPj) so the invariant guard survives python -O and optimised builds. - Strengthen test_remote_conversation_run_full_state_updates_cached_state assertions (PRRT_kwDOPjFrIs6FUiPz): add exact cost check (accumulated_cost == pytest.approx(1.25)) and key presence check ('test-llm' in usage_to_metrics) so the state-reconciliation path is properly guarded against regressions. Co-authored-by: openhands --- .../sdk/conversation/impl/remote_conversation.py | 8 +++++--- tests/sdk/conversation/remote/test_remote_conversation.py | 5 ++++- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py index 85883778b5..9b87572335 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py @@ -1150,9 +1150,11 @@ def _wait_for_run_completion( # Invariant: only full-state snapshot signals reach this point. # ERROR/STUCK raise above; all other enqueued signals originate # from the authoritative post-run full-state snapshot. - assert signal.from_post_run_snapshot, ( - f"Unexpected non-snapshot signal reached reconcile path: {signal!r}" - ) + if not signal.from_post_run_snapshot: + raise AssertionError( + "Unexpected non-snapshot signal reached " + f"reconcile path: {signal!r}" + ) logger.info( "Run completed via post-run WebSocket state update " "(status: %s, elapsed: %.1fs)", diff --git a/tests/sdk/conversation/remote/test_remote_conversation.py b/tests/sdk/conversation/remote/test_remote_conversation.py index 52b4f408cb..47b2f4b803 100644 --- a/tests/sdk/conversation/remote/test_remote_conversation.py +++ b/tests/sdk/conversation/remote/test_remote_conversation.py @@ -1361,7 +1361,10 @@ def custom_side_effect(method, url, **kwargs): conversation.run(blocking=True, poll_interval=0.01) assert poll_count[0] >= 1 - assert conversation.state.stats.usage_to_metrics + assert "test-llm" in conversation.state.stats.usage_to_metrics + assert conversation.state.stats.usage_to_metrics[ + "test-llm" + ].accumulated_cost == pytest.approx(1.25) @patch( "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" From b5bec8c723e0415a3b738ef91faea7f0e2842458 Mon Sep 17 00:00:00 2001 From: openhands Date: Thu, 28 May 2026 07:41:30 +0000 Subject: [PATCH 18/20] fix(remote-conv): document terminal_first_seen_at reset-on-exception rationale (round 7) Add comment at the exception handler explaining why terminal_first_seen_at is reset on any REST poll error (PRRT_kwDOPjFrIs6FUzqe): we cannot confirm the server is still in a terminal state after a failed poll, so we conservatively restart the hard-fallback timer to prefer a false-negative wait over a false-positive early return. Co-authored-by: openhands --- .../sdk/conversation/impl/remote_conversation.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py index 9b87572335..181edc6dfd 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py @@ -1177,7 +1177,13 @@ def _wait_for_run_completion( status = self._poll_status_once() except Exception as exc: self._handle_poll_exception(exc) - terminal_poll_count = 0 # Reset on error + # Reset on error: we cannot confirm the server is still in a + # terminal state after a failed poll, so conservatively restart + # the hard-fallback timer. In the degenerate case where polls + # alternate between terminal and exception, the 30 s threshold + # slides; this is intentional — we prefer a false-negative wait + # over a false-positive early return. + terminal_poll_count = 0 terminal_first_seen_at = None else: # Raises ConversationRunError for ERROR/STUCK states From 6808b8a8a4958ec8d762243f5152d125265500b9 Mon Sep 17 00:00:00 2001 From: openhands Date: Thu, 28 May 2026 21:30:22 +0000 Subject: [PATCH 19/20] fix(sdk): simplify remote run completion signaling Co-authored-by: openhands --- .../conversation/impl/remote_conversation.py | 106 +++++------------- .../remote/test_remote_conversation.py | 91 --------------- 2 files changed, 25 insertions(+), 172 deletions(-) diff --git a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py index 181edc6dfd..0dcfb426a5 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py @@ -6,7 +6,6 @@ import time import uuid from collections.abc import Mapping -from dataclasses import dataclass from queue import Empty, Queue from typing import TYPE_CHECKING, SupportsIndex, overload from urllib.parse import urlparse @@ -63,27 +62,6 @@ LEGACY_CONVERSATIONS_PATH = "/api/conversations" -@dataclass(frozen=True) -class _RunCompletionSignal: - status: str - # True for full-state snapshot signals (post-run ConversationStateUpdateEvent); - # False for per-field ERROR/STUCK signals (which raise before reconcile). - # Asserted in _wait_for_run_completion to catch future regressions. - from_post_run_snapshot: bool - - -# Statuses that are immediately terminal for per-field WS updates. -# FINISHED is intentionally excluded: stop hooks can flip a freshly-set -# FINISHED back to RUNNING within the same server-side iteration, so FINISHED -# is only accepted from the server's authoritative post-run full-state snapshot. -_IMMEDIATE_TERMINAL_STATUSES: frozenset[str] = frozenset( - { - ConversationExecutionStatus.ERROR.value, - ConversationExecutionStatus.STUCK.value, - } -) - - def _agent_kind_mismatch_message(conversation_id: ConversationID) -> str: return ( f"Conversation {conversation_id} was started with a different agent kind. " @@ -663,7 +641,7 @@ class RemoteConversation(BaseConversation): workspace: RemoteWorkspace _client: httpx.Client _cleanup_initiated: bool - _terminal_status_queue: Queue[_RunCompletionSignal] + _terminal_status_queue: Queue[str] _run_armed: threading.Event _conversation_info_base_path: str _conversation_action_base_path: str @@ -728,7 +706,7 @@ def __init__( self._conversation_info_base_path = LEGACY_CONVERSATIONS_PATH self._conversation_action_base_path = LEGACY_CONVERSATIONS_PATH self._cleanup_initiated = False - self._terminal_status_queue: Queue[_RunCompletionSignal] = Queue() + self._terminal_status_queue: Queue[str] = Queue() self._run_armed = threading.Event() should_create = conversation_id is None @@ -857,56 +835,34 @@ def __init__( self._visualizer = None # Add a callback that signals when run completes via WebSocket. - # Per-field ERROR/STUCK updates are terminal immediately. FINISHED is - # authoritative only in the full post-run state snapshot emitted by the - # server after conversation.run()/arun() exits and pending events flush. - # That snapshot may also report non-terminal-but-not-running statuses - # such as paused or waiting_for_confirmation, which still mean this - # run invocation has yielded control back to the caller. + # The server's post-run full-state snapshot is the only authoritative + # WebSocket completion signal. Per-field execution_status updates are + # hints only: FINISHED can still be reverted by stop hooks, and ERROR/STUCK + # are surfaced by the REST health-check path below. def run_complete_callback(event: Event) -> None: if not isinstance(event, ConversationStateUpdateEvent): return + if event.key != FULL_STATE_KEY: + return - if event.key == FULL_STATE_KEY: - # Only accept full-state snapshots as run-completion signals - # when a run is actually in progress. The WS subscription - # delivers an initial full-state snapshot during connect(); if - # that snapshot carries a non-RUNNING status (e.g. "idle"), it - # could be picked up by _wait_for_run_completion() as the - # completion signal for the *next* run() invocation, causing - # blocking=True to return before the server has actually - # finished. Gating on _run_armed (set after POST /run) - # ensures only post-run snapshots are treated as authoritative. - if not self._run_armed.is_set(): - return - raw_status = event.value.get("execution_status") - if raw_status is not None: - try: - status = ConversationExecutionStatus(raw_status) - except ValueError: - pass # Unknown status value, ignore - else: - if status != ConversationExecutionStatus.RUNNING: - self._terminal_status_queue.put( - _RunCompletionSignal( - status=status.value, - from_post_run_snapshot=True, - ) - ) + # Only accept full-state snapshots as run-completion signals when a + # run is actually in progress. The WS subscription delivers an + # initial full-state snapshot during connect(); if that snapshot + # carries a non-RUNNING status (e.g. "idle"), it could be picked up + # by _wait_for_run_completion() as the completion signal for the + # *next* run() invocation, causing blocking=True to return before the + # server has actually finished. + if not self._run_armed.is_set(): return - if event.key == "execution_status": - try: - status = ConversationExecutionStatus(event.value) - if status.value in _IMMEDIATE_TERMINAL_STATUSES: - self._terminal_status_queue.put( - _RunCompletionSignal( - status=status.value, - from_post_run_snapshot=False, - ) - ) - except ValueError: - pass # Unknown status value, ignore + raw_status = event.value.get("execution_status") + try: + status = ConversationExecutionStatus(raw_status) + except ValueError: + return + + if status != ConversationExecutionStatus.RUNNING: + self._terminal_status_queue.put(status.value) # Compose all callbacks into a single callback all_callbacks = self._callbacks + [run_complete_callback] @@ -1140,21 +1096,9 @@ def _wait_for_run_completion( # 1. WebSocket delivers a run-completion signal # 2. Poll interval expires (fall through to REST poll) try: - signal = self._terminal_status_queue.get(timeout=poll_interval) - ws_status = signal.status + ws_status = self._terminal_status_queue.get(timeout=poll_interval) # Raises ConversationRunError on ERROR/STUCK; no-op otherwise. - # Per-field ERROR/STUCK signals raise here; full-state snapshots - # (from_post_run_snapshot=True) are the only signals that reach - # the log/reconcile/return below. self._handle_conversation_status(ws_status) - # Invariant: only full-state snapshot signals reach this point. - # ERROR/STUCK raise above; all other enqueued signals originate - # from the authoritative post-run full-state snapshot. - if not signal.from_post_run_snapshot: - raise AssertionError( - "Unexpected non-snapshot signal reached " - f"reconcile path: {signal!r}" - ) logger.info( "Run completed via post-run WebSocket state update " "(status: %s, elapsed: %.1fs)", diff --git a/tests/sdk/conversation/remote/test_remote_conversation.py b/tests/sdk/conversation/remote/test_remote_conversation.py index 47b2f4b803..b0bfac051e 100644 --- a/tests/sdk/conversation/remote/test_remote_conversation.py +++ b/tests/sdk/conversation/remote/test_remote_conversation.py @@ -1086,97 +1086,6 @@ def custom_side_effect(method, url, **kwargs): f"poll_count={poll_count[0]}" ) - @patch( - "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" - ) - def test_remote_conversation_run_keeps_error_queued_after_running_poll( - self, mock_ws_client - ): - """A non-terminal REST poll must not discard queued ERROR/STUCK.""" - conversation_id = str(uuid.uuid4()) - mock_client_instance = self.setup_mock_client(conversation_id=conversation_id) - original_side_effect = mock_client_instance.request.side_effect - poll_count = [0] - ws_callback = [lambda event: None] - - def custom_side_effect(method, url, **kwargs): - resp = original_side_effect(method, url, **kwargs) - if method == "GET" and url == f"/api/conversations/{conversation_id}": - poll_count[0] += 1 - resp.json.return_value = { - "id": conversation_id, - "execution_status": "running", - "stats": {"usage_to_metrics": {}}, - } - if poll_count[0] == 1: - ws_callback[0]( - ConversationStateUpdateEvent( - key="execution_status", value="error" - ) - ) - return resp - - mock_client_instance.request.side_effect = custom_side_effect - mock_ws_instance = Mock() - mock_ws_client.return_value = mock_ws_instance - - conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) - ws_callback[0] = mock_ws_client.call_args.kwargs["callback"] - conversation._get_last_error_detail = Mock(return_value="boom") - - with pytest.raises(Exception) as excinfo: - conversation.run(blocking=True, poll_interval=0.01, timeout=0.5) - - assert "boom" in str(excinfo.value) or "error" in str(excinfo.value).lower() - - @patch( - "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" - ) - def test_remote_conversation_run_ws_error_still_terminates_immediately( - self, mock_ws_client - ): - """ERROR via WS still raises immediately (not subject to hook reverts). - - Stop hooks operate on the FINISHED→agent-wants-to-stop transition. - ERROR and STUCK are not hookable terminal states; the SDK never - flips them back to RUNNING. So the WS fast-path must continue to - propagate them without waiting for REST confirmation, otherwise - we'd add 3+ poll intervals of latency to every error surface. - """ - conversation_id = str(uuid.uuid4()) - mock_client_instance = self.setup_mock_client(conversation_id=conversation_id) - - mock_ws_instance = Mock() - mock_ws_client.return_value = mock_ws_instance - - conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) - conversation._get_last_error_detail = Mock(return_value="boom") - ws_callback = [mock_ws_client.call_args.kwargs["callback"]] - - # ``run()`` drains the WS queue at trigger time (see line "Drain any - # stale terminal status events from previous runs"). We need the - # ERROR to land *after* that drain but before the first - # ``Queue.get`` in ``_wait_for_run_completion``. Hook into the POST - # /run trigger response to inject the WS event at the right moment. - original_side_effect = mock_client_instance.request.side_effect - - def post_run_seeds_error(method, url, **kwargs): - resp = original_side_effect(method, url, **kwargs) - if method == "POST" and url.endswith("/run"): - ws_callback[0]( - ConversationStateUpdateEvent(key="execution_status", value="error") - ) - return resp - - mock_client_instance.request.side_effect = post_run_seeds_error - - with pytest.raises(Exception) as excinfo: # ConversationRunError wraps it - conversation.run(blocking=True, poll_interval=10.0) - # Confirm ERROR was the trigger (and we didn't fall through to REST). - # poll_interval=10s means a fall-through would take 30+ seconds; we - # also assert below that we surfaced quickly. - assert "boom" in str(excinfo.value) or "error" in str(excinfo.value).lower() - @patch( "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" ) From 384877e691edde70dcb8b2b40afa08e0cdedc8be Mon Sep 17 00:00:00 2001 From: openhands Date: Thu, 28 May 2026 21:44:02 +0000 Subject: [PATCH 20/20] fix(sdk): keep websocket error fast path Co-authored-by: openhands --- AGENTS.md | 2 +- .../conversation/impl/remote_conversation.py | 18 +++++++++-- .../remote/test_remote_conversation.py | 32 +++++++++++++++++++ 3 files changed, 48 insertions(+), 4 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 67b3ab24b2..d28bba99c0 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -384,7 +384,7 @@ Note: This is separate from `persistence_dir` which is used for conversation sta -- **`RemoteConversation._wait_for_run_completion` and stop hooks**: WebSocket terminal status events are *hints*, not authoritative termination. The server-side `LocalConversation.run` loop releases its state lock at the end of each iteration, so a `FINISHED` status set by `agent.step()` is visible to clients before the *next* loop iteration runs stop hooks (`hook_processor.run_stop`). If a stop hook returns rc=2 (denying the stop), status flips back to RUNNING and the agent gets another iteration. The client's `_wait_for_run_completion` therefore must **not** return on the first WS-delivered FINISHED — the REST poll path with `TERMINAL_POLL_THRESHOLD` consecutive terminal polls is the only authoritative termination check for FINISHED. ERROR/STUCK *do* short-circuit (they're not subject to hook reverts). See `_immediate_terminal_statuses`. Empirically this caused agents to consume just 0–1 iterations after a hook block on programbench retry-16; fix shipped in `feat/programbench`. +- **`RemoteConversation._wait_for_run_completion` and stop hooks**: Per-field WebSocket `FINISHED` status events are *hints*, not authoritative termination. The server-side `LocalConversation.run` loop releases its state lock at the end of each iteration, so a `FINISHED` status set by `agent.step()` is visible to clients before the *next* loop iteration runs stop hooks (`hook_processor.run_stop`). If a stop hook returns rc=2 (denying the stop), status flips back to RUNNING and the agent gets another iteration. The client's `_wait_for_run_completion` therefore must **not** return on the first WS-delivered FINISHED. Instead, post-run full-state WebSocket snapshots are authoritative; if that snapshot is missing, the time-based hard-fallback path (`TERMINAL_HARD_FALLBACK_SECS = 30.0`) accepts REST-confirmed terminal status after 30 continuous seconds. ERROR/STUCK still raise immediately through `_handle_conversation_status`. Empirically this caused agents to consume just 0–1 iterations after a hook block on programbench retry-16; fix shipped in `feat/programbench`. - **Hook events vs `state.events`**: `HookExecutionEvent` is emitted via `hook_processor.original_callback` (the chained `_on_event`), so it *should* land in `state.events` when the run is allowed to complete. But because the WS-FINISHED race above used to make the client snapshot `list(conversation.state.events)` *before* the server-side hook eval ran, `output.jsonl` history could miss hook events while on-disk persisted events under `/workspace/conversations/.../events/` had them — useful as a forensic signal that the race fired. diff --git a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py index 0dcfb426a5..6bed779716 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py @@ -836,12 +836,24 @@ def __init__( # Add a callback that signals when run completes via WebSocket. # The server's post-run full-state snapshot is the only authoritative - # WebSocket completion signal. Per-field execution_status updates are - # hints only: FINISHED can still be reverted by stop hooks, and ERROR/STUCK - # are surfaced by the REST health-check path below. + # WebSocket success signal. Per-field FINISHED is a hint because stop + # hooks can still revert it; per-field ERROR/STUCK remain immediate. def run_complete_callback(event: Event) -> None: if not isinstance(event, ConversationStateUpdateEvent): return + + if event.key == "execution_status": + try: + status = ConversationExecutionStatus(event.value) + except ValueError: + return + if status in ( + ConversationExecutionStatus.ERROR, + ConversationExecutionStatus.STUCK, + ): + self._terminal_status_queue.put(status.value) + return + if event.key != FULL_STATE_KEY: return diff --git a/tests/sdk/conversation/remote/test_remote_conversation.py b/tests/sdk/conversation/remote/test_remote_conversation.py index b0bfac051e..6251d63323 100644 --- a/tests/sdk/conversation/remote/test_remote_conversation.py +++ b/tests/sdk/conversation/remote/test_remote_conversation.py @@ -1086,6 +1086,38 @@ def custom_side_effect(method, url, **kwargs): f"poll_count={poll_count[0]}" ) + @patch( + "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" + ) + def test_remote_conversation_run_ws_error_still_terminates_immediately( + self, mock_ws_client + ): + """ERROR via WS still raises immediately (not subject to hook reverts).""" + conversation_id = str(uuid.uuid4()) + mock_client_instance = self.setup_mock_client(conversation_id=conversation_id) + + mock_ws_client.return_value = Mock() + conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) + conversation._get_last_error_detail = Mock(return_value="boom") + ws_callback = mock_ws_client.call_args.kwargs["callback"] + + original_side_effect = mock_client_instance.request.side_effect + + def post_run_seeds_error(method, url, **kwargs): + resp = original_side_effect(method, url, **kwargs) + if method == "POST" and url.endswith("/run"): + ws_callback( + ConversationStateUpdateEvent(key="execution_status", value="error") + ) + return resp + + mock_client_instance.request.side_effect = post_run_seeds_error + + with pytest.raises(Exception) as excinfo: + conversation.run(blocking=True, poll_interval=10.0) + + assert "boom" in str(excinfo.value) or "error" in str(excinfo.value).lower() + @patch( "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" )