diff --git a/.agents/skills/manage-evals/SKILL.md b/.agents/skills/manage-evals/SKILL.md index 566baaa6dd..3904c65f19 100644 --- a/.agents/skills/manage-evals/SKILL.md +++ b/.agents/skills/manage-evals/SKILL.md @@ -70,6 +70,7 @@ done | `commit0` | Commit0 — commit generation tasks | | `swebenchmultimodal` | SWE-bench Multimodal — tasks with images | | `terminalbench` | TerminalBench — terminal interaction tasks | +| `programbench` | ProgramBench — program-repair tasks against gold-standard test binaries | ### Trigger Options @@ -130,7 +131,7 @@ Each line is a run path. Match by benchmark and model to find the run. ### Step 2: Identify the Run Path Components A run path has three components: -- **benchmark**: `swebench`, `swebenchpro`, `gaia`, `swtbench`, `commit0`, `swebenchmultimodal`, `terminalbench` +- **benchmark**: `swebench`, `swebenchpro`, `gaia`, `swtbench`, `commit0`, `swebenchmultimodal`, `terminalbench`, `programbench` - **model_slug**: Derived from model name with `/:@.` replaced by `-` (e.g., `litellm_proxy-claude-sonnet-4-5-20250929`) - **run_id**: The GitHub Actions workflow run ID from the `OpenHands/evaluation` repo diff --git a/.agents/skills/manage-evals/references/eval-infrastructure.md b/.agents/skills/manage-evals/references/eval-infrastructure.md index d64318e121..dbbb6cf6d8 100644 --- a/.agents/skills/manage-evals/references/eval-infrastructure.md +++ b/.agents/skills/manage-evals/references/eval-infrastructure.md @@ -42,7 +42,7 @@ and served via CDN at `https://results.eval.all-hands.dev/`. {benchmark}/{model_slug}/{github_run_id}/ ``` -- **benchmark**: `swebench`, `swebenchpro`, `gaia`, `swtbench`, `commit0`, `swebenchmultimodal`, `terminalbench` +- **benchmark**: `swebench`, `swebenchpro`, `gaia`, `swtbench`, `commit0`, `swebenchmultimodal`, `terminalbench`, `programbench` - **model_slug**: Model name with `/:@.` replaced by `-` - Example: `litellm_proxy/claude-sonnet-4-5-20250929` → `litellm_proxy-claude-sonnet-4-5-20250929` - **github_run_id**: The GitHub Actions run ID from the `OpenHands/evaluation` repo diff --git a/.agents/skills/manage-evals/scripts/manage_evals.py b/.agents/skills/manage-evals/scripts/manage_evals.py index 8a75172240..ae9de88406 100755 --- a/.agents/skills/manage-evals/scripts/manage_evals.py +++ b/.agents/skills/manage-evals/scripts/manage_evals.py @@ -43,6 +43,7 @@ "commit0", "swebenchmultimodal", "terminalbench", + "programbench", ] TOOL_PRESETS = ["default", "gemini", "gpt5", "planning"] AGENT_TYPES = ["default", "acp-claude", "acp-codex"] diff --git a/.agents/skills/run-eval.md b/.agents/skills/run-eval.md index 68cd61f560..53c5a9db7a 100644 --- a/.agents/skills/run-eval.md +++ b/.agents/skills/run-eval.md @@ -31,7 +31,7 @@ curl -X POST \ ``` **Key parameters:** -- `benchmark`: `swebench`, `swebenchpro`, `swebenchmultimodal`, `gaia`, `swtbench`, `commit0`, `multiswebench`, `terminalbench` +- `benchmark`: `swebench`, `swebenchpro`, `swebenchmultimodal`, `gaia`, `swtbench`, `commit0`, `multiswebench`, `terminalbench`, `programbench` - `eval_limit`: Any positive integer (e.g., `1`, `10`, `50`, `200`) - `model_ids`: See `.github/run-eval/resolve_model_config.py` for available models - `benchmarks_branch`: Use feature branch from the benchmarks repo to test benchmark changes before merging diff --git a/.github/workflows/run-eval.yml b/.github/workflows/run-eval.yml index eaeb918ee1..da0f3e80b5 100644 --- a/.github/workflows/run-eval.yml +++ b/.github/workflows/run-eval.yml @@ -22,6 +22,7 @@ on: - commit0 - swebenchmultimodal - terminalbench + - programbench sdk_ref: description: SDK commit/ref to evaluate (must be a semantic version like v1.0.0 unless 'Allow unreleased branches' is checked) required: true diff --git a/AGENTS.md b/AGENTS.md index 9df0c84929..d28bba99c0 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -383,4 +383,9 @@ Note: This is separate from `persistence_dir` which is used for conversation sta - Repository guidance lives in the project root AGENTS.md (loaded as a third-party skill file). + +- **`RemoteConversation._wait_for_run_completion` and stop hooks**: Per-field WebSocket `FINISHED` status events are *hints*, not authoritative termination. The server-side `LocalConversation.run` loop releases its state lock at the end of each iteration, so a `FINISHED` status set by `agent.step()` is visible to clients before the *next* loop iteration runs stop hooks (`hook_processor.run_stop`). If a stop hook returns rc=2 (denying the stop), status flips back to RUNNING and the agent gets another iteration. The client's `_wait_for_run_completion` therefore must **not** return on the first WS-delivered FINISHED. Instead, post-run full-state WebSocket snapshots are authoritative; if that snapshot is missing, the time-based hard-fallback path (`TERMINAL_HARD_FALLBACK_SECS = 30.0`) accepts REST-confirmed terminal status after 30 continuous seconds. ERROR/STUCK still raise immediately through `_handle_conversation_status`. Empirically this caused agents to consume just 0–1 iterations after a hook block on programbench retry-16; fix shipped in `feat/programbench`. +- **Hook events vs `state.events`**: `HookExecutionEvent` is emitted via `hook_processor.original_callback` (the chained `_on_event`), so it *should* land in `state.events` when the run is allowed to complete. But because the WS-FINISHED race above used to make the client snapshot `list(conversation.state.events)` *before* the server-side hook eval ran, `output.jsonl` history could miss hook events while on-disk persisted events under `/workspace/conversations/.../events/` had them — useful as a forensic signal that the race fired. + + diff --git a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py index a7328af06e..6bed779716 100644 --- a/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py +++ b/openhands-sdk/openhands/sdk/conversation/impl/remote_conversation.py @@ -641,7 +641,8 @@ class RemoteConversation(BaseConversation): workspace: RemoteWorkspace _client: httpx.Client _cleanup_initiated: bool - _terminal_status_queue: Queue[str] # Thread-safe queue for terminal status from WS + _terminal_status_queue: Queue[str] + _run_armed: threading.Event _conversation_info_base_path: str _conversation_action_base_path: str delete_on_close: bool = False @@ -706,6 +707,7 @@ def __init__( self._conversation_action_base_path = LEGACY_CONVERSATIONS_PATH self._cleanup_initiated = False self._terminal_status_queue: Queue[str] = Queue() + self._run_armed = threading.Event() should_create = conversation_id is None if conversation_id is not None: @@ -832,17 +834,47 @@ def __init__( # No visualization (visualizer is None) self._visualizer = None - # Add a callback that signals when run completes via WebSocket - # This ensures we wait for all events to be delivered before run() returns + # Add a callback that signals when run completes via WebSocket. + # The server's post-run full-state snapshot is the only authoritative + # WebSocket success signal. Per-field FINISHED is a hint because stop + # hooks can still revert it; per-field ERROR/STUCK remain immediate. def run_complete_callback(event: Event) -> None: - if isinstance(event, ConversationStateUpdateEvent): - if event.key == "execution_status": - try: - status = ConversationExecutionStatus(event.value) - if status.is_terminal(): - self._terminal_status_queue.put(event.value) - except ValueError: - pass # Unknown status value, ignore + if not isinstance(event, ConversationStateUpdateEvent): + return + + if event.key == "execution_status": + try: + status = ConversationExecutionStatus(event.value) + except ValueError: + return + if status in ( + ConversationExecutionStatus.ERROR, + ConversationExecutionStatus.STUCK, + ): + self._terminal_status_queue.put(status.value) + return + + if event.key != FULL_STATE_KEY: + return + + # Only accept full-state snapshots as run-completion signals when a + # run is actually in progress. The WS subscription delivers an + # initial full-state snapshot during connect(); if that snapshot + # carries a non-RUNNING status (e.g. "idle"), it could be picked up + # by _wait_for_run_completion() as the completion signal for the + # *next* run() invocation, causing blocking=True to return before the + # server has actually finished. + if not self._run_armed.is_set(): + return + + raw_status = event.value.get("execution_status") + try: + status = ConversationExecutionStatus(raw_status) + except ValueError: + return + + if status != ConversationExecutionStatus.RUNNING: + self._terminal_status_queue.put(status.value) # Compose all callbacks into a single callback all_callbacks = self._callbacks + [run_complete_callback] @@ -985,13 +1017,12 @@ def run( Raises: ConversationRunError: If the run fails or times out. """ - # Drain any stale terminal status events from previous runs. - # This prevents stale events from causing early returns. - while True: - try: - self._terminal_status_queue.get_nowait() - except Empty: - break + # Disarm and drain any stale terminal status events from previous runs + # before arming for the new one. _run_armed gates full-state WS snapshots + # in run_complete_callback; clearing it here prevents the initial WS + # subscription snapshot from being mistaken for the post-run snapshot. + self._run_armed.clear() + self._drain_terminal_status_queue() # Trigger a run on the server using the dedicated run endpoint. # Let the server tell us if it's already running (409), avoiding an extra GET. @@ -1013,7 +1044,13 @@ def run( logger.info(f"run() triggered successfully: {resp}") if blocking: - self._wait_for_run_completion(poll_interval, timeout) + # Arm after POST so that only WS full-state snapshots arriving + # after the run was triggered are treated as run-completion signals. + self._run_armed.set() + try: + self._wait_for_run_completion(poll_interval, timeout) + finally: + self._run_armed.clear() def _wait_for_run_completion( self, @@ -1028,9 +1065,9 @@ def _wait_for_run_completion( status before WebSocket delivers the final events. As a fallback, it also polls the server periodically. If the WebSocket - is delayed or disconnected, we return after multiple consecutive polls - show a terminal status, and reconcile events to catch any that were - missed via WebSocket. + is delayed or disconnected, polling still surfaces ERROR/STUCK promptly. + A REST-only FINISHED status is not authoritative because stop hooks can + still revert it to RUNNING before the server-side run task exits. Args: poll_interval: Time in seconds between status polls (fallback). @@ -1042,14 +1079,19 @@ def _wait_for_run_completion( responses are retried until timeout. """ start_time = time.monotonic() - consecutive_terminal_polls = 0 - # Return after this many consecutive terminal polls (fallback for WS issues). - # We use 3 polls to balance latency vs reliability: - # - 1 poll could be a transient state during shutdown - # - 2 polls might still catch a race condition - # - 3 polls (with default 1s interval = 3s total) provides high confidence - # that the run is truly complete while keeping fallback latency reasonable - TERMINAL_POLL_THRESHOLD = 3 + terminal_poll_count = 0 + # Log a warning after this many consecutive REST terminal polls without a + # post-run WS snapshot. This is a health signal, not a return path — + # returning immediately on REST FINISHED would reintroduce the stop-hook race. + TERMINAL_POLL_WARNING_THRESHOLD = 3 + # Time-based hard fallback: accept REST-confirmed terminal status after + # the server has been reporting terminal for at least this many seconds + # without a post-run WS snapshot. Stop hooks are fast (seconds); 30 s + # is a safe bound regardless of poll_interval. This prevents an + # indefinite hang when the WS snapshot is never delivered (e.g., socket + # dropped after the run finishes on the server). + TERMINAL_HARD_FALLBACK_SECS = 30.0 + terminal_first_seen_at: float | None = None while True: elapsed = time.monotonic() - start_time @@ -1063,20 +1105,23 @@ def _wait_for_run_completion( ) # Wait for either: - # 1. WebSocket delivers terminal status event (preferred) - # 2. Poll interval expires (fallback - check status via REST) + # 1. WebSocket delivers a run-completion signal + # 2. Poll interval expires (fall through to REST poll) try: ws_status = self._terminal_status_queue.get(timeout=poll_interval) - # Handle ERROR/STUCK states - raises ConversationRunError + # Raises ConversationRunError on ERROR/STUCK; no-op otherwise. self._handle_conversation_status(ws_status) - logger.info( - "Run completed via WebSocket notification " + "Run completed via post-run WebSocket state update " "(status: %s, elapsed: %.1fs)", ws_status, elapsed, ) - self._state.refresh_from_server() + # The server publishes the full ConversationStateUpdateEvent after + # conversation.run()/arun() exits and pending events are flushed, + # so non-running statuses from that snapshot are authoritative + # run-complete signals. + self._state.events.reconcile() return except Empty: pass # Queue.get() timed out, fall through to REST polling @@ -1088,36 +1133,65 @@ def _wait_for_run_completion( status = self._poll_status_once() except Exception as exc: self._handle_poll_exception(exc) - consecutive_terminal_polls = 0 # Reset on error + # Reset on error: we cannot confirm the server is still in a + # terminal state after a failed poll, so conservatively restart + # the hard-fallback timer. In the degenerate case where polls + # alternate between terminal and exception, the 30 s threshold + # slides; this is intentional — we prefer a false-negative wait + # over a false-positive early return. + terminal_poll_count = 0 + terminal_first_seen_at = None else: # Raises ConversationRunError for ERROR/STUCK states self._handle_conversation_status(status) - # Track consecutive terminal polls as a fallback for WS issues. - # If WebSocket is delayed/disconnected, we return after multiple - # consecutive polls confirm the terminal status. if status and ConversationExecutionStatus(status).is_terminal(): - consecutive_terminal_polls += 1 - if consecutive_terminal_polls >= TERMINAL_POLL_THRESHOLD: - logger.info( - "Run completed via REST fallback after %d consecutive " - "terminal polls (status: %s, elapsed: %.1fs). " - "Refreshing final state and reconciling events...", - consecutive_terminal_polls, + # ERROR/STUCK have already been handled above. FINISHED from + # REST is advisory because stop hooks can still veto it; + # prefer waiting for the server's post-run WebSocket state update. + terminal_poll_count += 1 + now = time.monotonic() + if terminal_first_seen_at is None: + terminal_first_seen_at = now + if terminal_poll_count == TERMINAL_POLL_WARNING_THRESHOLD: + logger.warning( + "REST has reported terminal status %s for %d polls " + "without a post-run WebSocket snapshot; continuing " + "to wait for the authoritative snapshot " + "(elapsed: %.1fs)", status, + terminal_poll_count, elapsed, ) - final_info = self._state.refresh_from_server() - self._handle_conversation_status( - final_info.get("execution_status") + terminal_secs = now - terminal_first_seen_at + if terminal_secs >= TERMINAL_HARD_FALLBACK_SECS: + logger.warning( + "REST has reported terminal status %s for %.0fs " + "with no post-run WebSocket snapshot; accepting as " + "final to avoid an indefinite hang (elapsed: %.1fs). " + "This may indicate a WebSocket delivery issue.", + status, + terminal_secs, + elapsed, ) - # Reconcile events to catch any that were missed via WS. - # This is only called in the fallback path, so it doesn't - # add overhead in the common case where WS works. + self._state.refresh_from_server() self._state.events.reconcile() return else: - consecutive_terminal_polls = 0 + terminal_poll_count = 0 + terminal_first_seen_at = None + + def _drain_terminal_status_queue(self) -> None: + """Empty the WS terminal-status hint queue. + + Called at the start of run() (before arming) to discard any stale + signals left over from a previous run invocation. + """ + while True: + try: + self._terminal_status_queue.get_nowait() + except Empty: + break def _poll_status_once(self) -> str | None: """Fetch the current execution status from the remote conversation.""" diff --git a/tests/sdk/conversation/remote/test_remote_conversation.py b/tests/sdk/conversation/remote/test_remote_conversation.py index 53d14f7a6f..6251d63323 100644 --- a/tests/sdk/conversation/remote/test_remote_conversation.py +++ b/tests/sdk/conversation/remote/test_remote_conversation.py @@ -1,5 +1,6 @@ """Tests for RemoteConversation.""" +import time import uuid from unittest.mock import Mock, patch @@ -14,6 +15,10 @@ from openhands.sdk.conversation.secret_registry import SecretValue from openhands.sdk.conversation.visualizer import DefaultConversationVisualizer from openhands.sdk.event import MessageEvent +from openhands.sdk.event.conversation_state import ( + FULL_STATE_KEY, + ConversationStateUpdateEvent, +) from openhands.sdk.event.llm_completion_log import LLMCompletionLogEvent from openhands.sdk.llm import LLM, Message, TextContent from openhands.sdk.security.confirmation_policy import AlwaysConfirm @@ -120,6 +125,47 @@ def create_mock_events_response(self, events: list | None = None): } return mock_response + @staticmethod + def full_state_event(status: str, **values): + return ConversationStateUpdateEvent( + key=FULL_STATE_KEY, + value={"execution_status": status, **values}, + ) + + def install_post_run_full_state( + self, + mock_client_instance, + conversation_id: str, + status: str = "finished", + **values, + ): + """Install a side effect that fires a full-state WS event on the first + REST GET poll after POST /run. + + The event is fired from the GET side effect (inside + _wait_for_run_completion, after _run_armed is set) rather than from the + POST side effect. Firing from POST races _run_armed.set(), which follows + the POST return, so the event would be silently discarded by + run_complete_callback's arming guard. + """ + ws_callback = [lambda event: None] + original_side_effect = mock_client_instance.request.side_effect + fired = [False] + + def custom_side_effect(method, url, **kwargs): + resp = original_side_effect(method, url, **kwargs) + if ( + not fired[0] + and method == "GET" + and url == f"/api/conversations/{conversation_id}" + ): + fired[0] = True + ws_callback[0](self.full_state_event(status, **values)) + return resp + + mock_client_instance.request.side_effect = custom_side_effect + return ws_callback + @patch( "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" ) @@ -645,12 +691,16 @@ def test_remote_conversation_run(self, mock_ws_client): # Setup mocks conversation_id = str(uuid.uuid4()) mock_client_instance = self.setup_mock_client(conversation_id=conversation_id) + ws_callback = self.install_post_run_full_state( + mock_client_instance, conversation_id + ) mock_ws_instance = Mock() mock_ws_client.return_value = mock_ws_instance # Create conversation and run conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) + ws_callback[0] = mock_ws_client.call_args.kwargs["callback"] conversation.run() # Verify run API call @@ -670,9 +720,13 @@ def test_remote_conversation_run_already_running(self, mock_ws_client): # Setup mocks conversation_id = str(uuid.uuid4()) mock_client_instance = self.setup_mock_client(conversation_id=conversation_id) + ws_callback = [lambda event: None] - # Override the default request side_effect to return 409 for /run endpoint + # Override the default request side_effect to return 409 for /run endpoint. + # The full-state completion event fires on the first GET poll (after arming) + # rather than inline with the POST, since _run_armed is set after POST returns. original_side_effect = mock_client_instance.request.side_effect + fired = [False] def custom_side_effect(method, url, **kwargs): if method == "POST" and "/run" in url: @@ -680,7 +734,15 @@ def custom_side_effect(method, url, **kwargs): mock_run_response.status_code = 409 # Already running mock_run_response.raise_for_status.return_value = None return mock_run_response - return original_side_effect(method, url, **kwargs) + resp = original_side_effect(method, url, **kwargs) + if ( + not fired[0] + and method == "GET" + and url == f"/api/conversations/{conversation_id}" + ): + fired[0] = True + ws_callback[0](self.full_state_event("finished")) + return resp mock_client_instance.request.side_effect = custom_side_effect @@ -689,6 +751,7 @@ def custom_side_effect(method, url, **kwargs): # Create conversation and run conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) + ws_callback[0] = mock_ws_client.call_args.kwargs["callback"] # With blocking=True (default), it will poll until finished conversation.run() # Should not raise an exception @@ -744,11 +807,10 @@ def test_remote_conversation_run_non_blocking(self, mock_ws_client): def test_remote_conversation_run_blocking_polls_until_finished( self, mock_ws_client ): - """Test that blocking=True polls until status is not running. + """Test that blocking=True waits for the post-run state snapshot. - The implementation waits for WebSocket to deliver terminal status, but falls - back to REST polling if WebSocket doesn't deliver. The fallback requires 3 - consecutive terminal polls (TERMINAL_POLL_THRESHOLD) before returning. + REST FINISHED is only a health signal; the server's full-state + ConversationStateUpdateEvent is the authoritative run-complete signal. """ # Setup mocks conversation_id = str(uuid.uuid4()) @@ -757,6 +819,7 @@ def test_remote_conversation_run_blocking_polls_until_finished( # Track poll count and return "running" for first 2 polls, then "finished" poll_count = [0] original_side_effect = mock_client_instance.request.side_effect + ws_callback = [lambda event: None] def custom_side_effect(method, url, **kwargs): if method == "GET" and url == f"/api/conversations/{conversation_id}": @@ -773,6 +836,8 @@ def custom_side_effect(method, url, **kwargs): "id": conversation_id, "execution_status": "finished", } + if poll_count[0] == 5: + ws_callback[0](self.full_state_event("finished")) return response return original_side_effect(method, url, **kwargs) @@ -783,24 +848,383 @@ def custom_side_effect(method, url, **kwargs): # Create conversation and run with blocking=True conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) + ws_callback[0] = mock_ws_client.call_args.kwargs["callback"] conversation.run(blocking=True, poll_interval=0.01) # Fast polling for test - # Verify polling happened multiple times - # With the fallback mechanism, we need 3 consecutive terminal polls, - # plus one final authoritative state refresh before returning: - # 2 running + 3 finished + 1 refresh = 6 total GETs. - assert poll_count[0] == 6, ( - f"Should have polled 6 times (2 running + 3 finished + 1 final refresh), " - f"got {poll_count[0]}" + # Verify REST FINISHED alone did not complete the run; the run returned + # only after the post-run full-state snapshot was delivered. + assert poll_count[0] == 5, ( + f"Should have polled until the post-run snapshot arrived, got " + f"{poll_count[0]} poll(s)" ) @patch( "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" ) - def test_remote_conversation_run_rest_fallback_refreshes_final_state( + def test_remote_conversation_run_returns_on_waiting_for_confirmation_snapshot( self, mock_ws_client ): - """REST fallback refreshes cached state before run() returns.""" + """A post-run non-running full-state snapshot completes blocking run().""" + conversation_id = str(uuid.uuid4()) + mock_client_instance = self.setup_mock_client(conversation_id=conversation_id) + ws_callback = [lambda event: None] + original_side_effect = mock_client_instance.request.side_effect + first_poll = [True] + + def custom_side_effect(method, url, **kwargs): + if method == "GET" and url == f"/api/conversations/{conversation_id}": + if first_poll[0]: + # Fire the full-state event on the first REST poll, which runs + # inside _wait_for_run_completion() after _run_armed is set. + first_poll[0] = False + ws_callback[0](self.full_state_event("waiting_for_confirmation")) + response = Mock() + response.status_code = 200 + response.raise_for_status.return_value = None + response.json.return_value = { + "id": conversation_id, + "execution_status": "waiting_for_confirmation", + "stats": {"usage_to_metrics": {}}, + } + return response + return original_side_effect(method, url, **kwargs) + + mock_client_instance.request.side_effect = custom_side_effect + mock_ws_instance = Mock() + mock_ws_client.return_value = mock_ws_instance + + conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) + ws_callback[0] = mock_ws_client.call_args.kwargs["callback"] + + conversation.run(blocking=True, poll_interval=0.01) + + assert conversation.state.execution_status.value == "waiting_for_confirmation" + + @patch( + "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" + ) + def test_remote_conversation_run_preserves_post_run_snapshot_after_running_poll( + self, mock_ws_client + ): + """An in-flight RUNNING REST poll must not discard a post-run snapshot.""" + conversation_id = str(uuid.uuid4()) + mock_client_instance = self.setup_mock_client(conversation_id=conversation_id) + ws_callback = [lambda event: None] + poll_count = [0] + original_side_effect = mock_client_instance.request.side_effect + + def custom_side_effect(method, url, **kwargs): + if method == "GET" and url == f"/api/conversations/{conversation_id}": + poll_count[0] += 1 + ws_callback[0](self.full_state_event("finished")) + response = Mock() + response.status_code = 200 + response.raise_for_status.return_value = None + response.json.return_value = { + "id": conversation_id, + "execution_status": "running", + "stats": {"usage_to_metrics": {}}, + } + return response + return original_side_effect(method, url, **kwargs) + + mock_client_instance.request.side_effect = custom_side_effect + mock_ws_instance = Mock() + mock_ws_client.return_value = mock_ws_instance + + conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) + ws_callback[0] = mock_ws_client.call_args.kwargs["callback"] + + conversation.run(blocking=True, poll_interval=0.01, timeout=0.1) + + assert poll_count[0] == 1 + + @patch( + "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" + ) + def test_remote_conversation_run_ws_finished_is_only_a_hint_not_terminal( + self, mock_ws_client + ): + """A WS-delivered FINISHED status must NOT terminate ``run()`` on its own. + + Regression test for the stop-hook race we observed in retry-16 + (run 25497962453, conversation dd86d184…, agourlay/zip-password-finder): + + Server-side timeline within a single ``LocalConversation.run`` loop: + 1. ``agent.step()`` sets ``execution_status = FINISHED``; that + status update event is broadcast over the WebSocket. + 2. **Lock released** at end of iteration. Client observes + FINISHED via WS. + 3. Next loop iteration acquires lock, runs stop hooks, hook + returns rc=2, status reverts to RUNNING, ``continue``. + + With the old implementation, step 2 caused the client's + ``_wait_for_run_completion`` to ``return`` immediately on the + WS-delivered FINISHED — racing the server's hook eval and tearing + down the agent-server pod (via ``workspace_keepalive`` exit) before + the agent could consume its iteration budget. + + The fix: per-field WS FINISHED is ignored for completion. Only the + post-run full-state snapshot is authoritative. + """ + conversation_id = str(uuid.uuid4()) + mock_client_instance = self.setup_mock_client(conversation_id=conversation_id) + + # REST poll script: the first 3 polls show the server has flipped + # *back* to RUNNING (the stop-hook revert); subsequent polls show + # the agent's second finish, which should be honored. + rest_script = [ + "running", + "running", + "running", + "finished", + "finished", + "finished", + "finished", + ] + poll_count = [0] + original_side_effect = mock_client_instance.request.side_effect + ws_callback = [lambda event: None] + + def custom_side_effect(method, url, **kwargs): + if method == "POST" and url == f"/api/conversations/{conversation_id}/run": + response = original_side_effect(method, url, **kwargs) + ws_callback[0]( + ConversationStateUpdateEvent( + key="execution_status", value="finished" + ) + ) + return response + if method == "GET" and url == f"/api/conversations/{conversation_id}": + idx = min(poll_count[0], len(rest_script) - 1) + status = rest_script[idx] + poll_count[0] += 1 + response = Mock() + response.status_code = 200 + response.raise_for_status.return_value = None + response.json.return_value = { + "id": conversation_id, + "execution_status": status, + "stats": {"usage_to_metrics": {}}, + } + if poll_count[0] >= len(rest_script): + ws_callback[0](self.full_state_event("finished")) + return response + return original_side_effect(method, url, **kwargs) + + mock_client_instance.request.side_effect = custom_side_effect + mock_ws_instance = Mock() + mock_ws_client.return_value = mock_ws_instance + + conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) + ws_callback[0] = mock_ws_client.call_args.kwargs["callback"] + + conversation.run(blocking=True, poll_interval=0.01) + + # Must have polled past the 3 RUNNING REST responses (race window), + # then waited for the post-run full-state snapshot. Pre-fix this would + # have returned on the WS FINISHED injected after the /run trigger with + # poll_count == 0. + assert poll_count[0] == len(rest_script), ( + f"Run() returned before the post-run snapshot. poll_count={poll_count[0]}" + ) + + @patch( + "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" + ) + def test_remote_conversation_run_rest_finished_revert_waits_for_full_state( + self, mock_ws_client + ): + """Do not return from REST FINISHED when a hook can still veto it.""" + conversation_id = str(uuid.uuid4()) + mock_client_instance = self.setup_mock_client(conversation_id=conversation_id) + + rest_script = [ + "finished", + "finished", + "finished", + "running", + "running", + "finished", + "finished", + "finished", + "finished", + ] + poll_count = [0] + original_side_effect = mock_client_instance.request.side_effect + + def custom_side_effect(method, url, **kwargs): + if method == "GET" and url == f"/api/conversations/{conversation_id}": + idx = min(poll_count[0], len(rest_script) - 1) + status = rest_script[idx] + poll_count[0] += 1 + response = Mock() + response.status_code = 200 + response.raise_for_status.return_value = None + response.json.return_value = { + "id": conversation_id, + "execution_status": status, + "stats": {"usage_to_metrics": {}}, + } + if poll_count[0] >= len(rest_script): + ws_callback[0](self.full_state_event("finished")) + return response + return original_side_effect(method, url, **kwargs) + + mock_client_instance.request.side_effect = custom_side_effect + mock_ws_instance = Mock() + mock_ws_client.return_value = mock_ws_instance + ws_callback = [lambda event: None] + + conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) + ws_callback[0] = mock_ws_client.call_args.kwargs["callback"] + + conversation.run(blocking=True, poll_interval=0.01) + + assert poll_count[0] >= len(rest_script), ( + f"Run() returned before the post-run full-state snapshot. " + f"poll_count={poll_count[0]}" + ) + + @patch( + "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" + ) + def test_remote_conversation_run_ws_error_still_terminates_immediately( + self, mock_ws_client + ): + """ERROR via WS still raises immediately (not subject to hook reverts).""" + conversation_id = str(uuid.uuid4()) + mock_client_instance = self.setup_mock_client(conversation_id=conversation_id) + + mock_ws_client.return_value = Mock() + conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) + conversation._get_last_error_detail = Mock(return_value="boom") + ws_callback = mock_ws_client.call_args.kwargs["callback"] + + original_side_effect = mock_client_instance.request.side_effect + + def post_run_seeds_error(method, url, **kwargs): + resp = original_side_effect(method, url, **kwargs) + if method == "POST" and url.endswith("/run"): + ws_callback( + ConversationStateUpdateEvent(key="execution_status", value="error") + ) + return resp + + mock_client_instance.request.side_effect = post_run_seeds_error + + with pytest.raises(Exception) as excinfo: + conversation.run(blocking=True, poll_interval=10.0) + + assert "boom" in str(excinfo.value) or "error" in str(excinfo.value).lower() + + @patch( + "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" + ) + def test_remote_conversation_run_stale_pre_run_snapshot_is_ignored( + self, mock_ws_client + ): + """A full-state WS snapshot received before run() POST must not complete run(). + + The WS subscription delivers an initial full-state snapshot during + connect(). If that snapshot carries a non-RUNNING status (e.g. "idle"), + it must NOT be treated as the post-run completion signal — _run_armed + is not yet set at that point. run() should only complete once a + full-state snapshot arrives after the POST /run call. + """ + conversation_id = str(uuid.uuid4()) + mock_client_instance = self.setup_mock_client(conversation_id=conversation_id) + ws_callback = [lambda event: None] + original_side_effect = mock_client_instance.request.side_effect + poll_count = [0] + + def custom_side_effect(method, url, **kwargs): + resp = original_side_effect(method, url, **kwargs) + if method == "GET" and url == f"/api/conversations/{conversation_id}": + poll_count[0] += 1 + if poll_count[0] == 1: + # Fire the real post-run snapshot on the first REST poll (armed). + ws_callback[0](self.full_state_event("finished")) + return resp + + mock_client_instance.request.side_effect = custom_side_effect + mock_ws_instance = Mock() + mock_ws_client.return_value = mock_ws_instance + + conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) + ws_callback[0] = mock_ws_client.call_args.kwargs["callback"] + + # Inject a stale "idle" snapshot directly into the queue as if it + # arrived from the initial subscription, before run() is called. + # _run_armed is not set yet, so run_complete_callback would discard it, + # but simulating a direct queue put lets us verify the guard works end-to-end. + ws_callback[0](self.full_state_event("idle")) + assert conversation._terminal_status_queue.empty(), ( + "Stale pre-run snapshot must not enter the queue (_run_armed not set)" + ) + + # run() should complete via the first REST poll's full-state event, not + # the stale pre-run snapshot. + conversation.run(blocking=True, poll_interval=0.01) + assert poll_count[0] >= 1 + + @patch( + "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" + ) + def test_remote_conversation_run_rest_hard_fallback_when_ws_silent( + self, mock_ws_client + ): + """run() completes via REST hard-fallback when WS snapshot never arrives. + + When the post-run WS full-state snapshot is never delivered (e.g. socket + dropped after the run finished), the client should not hang until the + overall timeout. After TERMINAL_HARD_FALLBACK_SECS of consecutive REST + terminal polls it must accept the status and return. + + time.monotonic is patched to advance 10 s per call so the 30 s threshold + is crossed after ~3 REST polls (real wall time ~poll_interval * 3). + """ + conversation_id = str(uuid.uuid4()) + mock_client_instance = self.setup_mock_client(conversation_id=conversation_id) + poll_count = [0] + original_side_effect = mock_client_instance.request.side_effect + + def custom_side_effect(method, url, **kwargs): + resp = original_side_effect(method, url, **kwargs) + if method == "GET" and url == f"/api/conversations/{conversation_id}": + poll_count[0] += 1 + # Never fire the post-run WS snapshot — simulate silent socket. + return resp + + mock_client_instance.request.side_effect = custom_side_effect + mock_ws_client.return_value = Mock() + + conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) + + # Patch time.monotonic to advance 10 s per call so the 30 s hard-fallback + # threshold is crossed after ~3 REST polls. + call_counter = [0] + base = time.monotonic() + + def fast_monotonic() -> float: + call_counter[0] += 1 + return base + call_counter[0] * 10.0 + + with patch( + "openhands.sdk.conversation.impl.remote_conversation.time.monotonic", + side_effect=fast_monotonic, + ): + conversation.run(blocking=True, poll_interval=0.01) + + assert poll_count[0] >= 1, f"Expected at least 1 REST poll, got {poll_count[0]}" + + @patch( + "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient" + ) + def test_remote_conversation_run_full_state_updates_cached_state( + self, mock_ws_client + ): + """Post-run full-state snapshots update cached state before run() returns.""" conversation_id = str(uuid.uuid4()) mock_client_instance = self.setup_mock_client(conversation_id=conversation_id) @@ -835,6 +1259,7 @@ def test_remote_conversation_run_rest_fallback_refreshes_final_state( poll_count = [0] original_side_effect = mock_client_instance.request.side_effect + ws_callback = [lambda event: None] def custom_side_effect(method, url, **kwargs): if method == "GET" and url == f"/api/conversations/{conversation_id}": @@ -848,10 +1273,16 @@ def custom_side_effect(method, url, **kwargs): "execution_status": "running", "stats": {"usage_to_metrics": {}}, } - elif poll_count[0] <= 5: + elif poll_count[0] <= 4: response.json.return_value = stale_info else: response.json.return_value = final_info + ws_callback[0]( + ConversationStateUpdateEvent( + key=FULL_STATE_KEY, + value=final_info, + ) + ) return response return original_side_effect(method, url, **kwargs) @@ -861,6 +1292,7 @@ def custom_side_effect(method, url, **kwargs): mock_ws_client.return_value = mock_ws_instance conversation = RemoteConversation(agent=self.agent, workspace=self.workspace) + ws_callback[0] = mock_ws_client.call_args.kwargs["callback"] conversation.state._cached_state = { "id": conversation_id, "execution_status": "running", @@ -869,12 +1301,11 @@ def custom_side_effect(method, url, **kwargs): conversation.run(blocking=True, poll_interval=0.01) - assert poll_count[0] == 6 - assert conversation.state._cached_state == final_info - assert ( - conversation.conversation_stats.get_combined_metrics().accumulated_cost - == pytest.approx(1.25) - ) + assert poll_count[0] >= 1 + assert "test-llm" in conversation.state.stats.usage_to_metrics + assert conversation.state.stats.usage_to_metrics[ + "test-llm" + ].accumulated_cost == pytest.approx(1.25) @patch( "openhands.sdk.conversation.impl.remote_conversation.WebSocketCallbackClient"