From 4847b132da37681bf44e1f8582e213115162c19c Mon Sep 17 00:00:00 2001 From: Cong Wang Date: Sun, 12 Apr 2026 17:13:51 -0700 Subject: [PATCH 1/2] fix(sandbox): correct sandlock integration semantics and fail loud MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The SandlockSandbox wrapper had several latent correctness issues that could cause it to silently run with weaker isolation than intended, or drop resource limits on the floor. Fixes: * Network policy intent is now explicit. sandlock's net_allow_hosts uses tri-state semantics (None=unrestricted, []=deny all, [..]= allowlist) and network-enabled=True previously passed None to a Sequence[str] field. Rewritten to pass net_connect=["0-65535"] when network is enabled, or net_allow_hosts=[] to block DNS when disabled. TCP defaults ([] = deny all) handle the rest. * stdout/stderr are now str, not bytes. sandlock returns bytes from Sandbox.run(); PraisonAI's SandboxResult is typed as str. Added a _decode() helper with errors="replace" so downstream consumers never see binary artefacts or crash on .lower() / .split(). * max_cpu is now actually passed to the Policy. Previously limits.cpu_percent was silently ignored. * execute_file() passes the script by path, not via `python3 -c `. Large scripts no longer hit ARG_MAX, and the script's parent directory is added to the Landlock read allowlist via the new extra_readable parameter on _create_policy. * Timeout detection is authoritative: we now inspect result.error for "timed out" rather than heuristically comparing wall-clock duration against limits.timeout_seconds. A process that happens to finish just under the timeout no longer gets mis-classified. * Sandbox handles are now managed via `with ... as sb:` so cleanup runs on exception. * fs_readable is filtered to paths that actually exist. Landlock fails at spawn time if any allowlisted path is missing, so the hardcoded list (which included /usr/local/lib/python3) caused sandlock_spawn failures on most hosts. Now we filter with os.path.isdir before constructing the policy. Breaking change — silent fallback removed: SandlockSandbox used to fall back to SubprocessSandbox whenever landlock_abi_version() < 1, logging only a warning. This violates the caller's explicit choice of kernel-level isolation: a SandlockSandbox that isn't actually using Landlock is a security footgun, and a warning in the logs is not a consent mechanism. __init__ now raises RuntimeError if Landlock support is missing. Callers who want graceful degradation should catch ImportError / RuntimeError and construct SubprocessSandbox explicitly, e.g.: try: sb = SandlockSandbox(cfg) except (ImportError, RuntimeError): sb = SubprocessSandbox(cfg) The equivalent fallback branches in execute(), run_command(), and execute_file() are removed. Tests updated: - test_raises_when_landlock_unavailable replaces the two fallback tests and asserts RuntimeError is raised at construction time. - test_sandlock_execution_timeout now mocks result.error instead of patching time.time. - test_sandlock_execution_failure sets result.error=None explicitly. - test_policy_creation_with_minimal_limits strengthened to check max_cpu, the new net_allow_hosts=[] deny-all semantics, and that net_connect is left unset (defaults to deny-all). All 10 unit tests pass, including the real-sandlock integration test (which was failing on baseline because of the /usr/local/lib/python3 hardcoded path). --- src/praisonai/praisonai/sandbox/sandlock.py | 270 ++++++++++-------- .../unit/sandbox/test_sandlock_sandbox.py | 83 +++--- 2 files changed, 191 insertions(+), 162 deletions(-) diff --git a/src/praisonai/praisonai/sandbox/sandlock.py b/src/praisonai/praisonai/sandbox/sandlock.py index 03359e371..3af57be55 100644 --- a/src/praisonai/praisonai/sandbox/sandlock.py +++ b/src/praisonai/praisonai/sandbox/sandlock.py @@ -74,6 +74,21 @@ def __init__( "sandlock package required for SandlockSandbox. " "Install with: pip install 'praisonai[sandbox]' or pip install sandlock" ) + + # Fail loud if Landlock isn't supported on this kernel. + try: + abi = self._sandlock.landlock_abi_version() + except Exception as e: + raise RuntimeError( + f"failed to query Landlock ABI version: {e}" + ) from e + if abi < 1: + raise RuntimeError( + "SandlockSandbox requires Landlock support (Linux kernel " + ">= 6.12 with CONFIG_SECURITY_LANDLOCK=y). This kernel " + f"reports Landlock ABI version {abi}. Use SubprocessSandbox " + "explicitly if weaker isolation is acceptable." + ) @property def is_available(self) -> bool: @@ -112,20 +127,24 @@ def _create_policy( self, limits: ResourceLimits, working_dir: Optional[str] = None, + extra_readable: Optional[List[str]] = None, ) -> Any: """Create sandlock policy from resource limits. - + Args: limits: Resource limits configuration - working_dir: Working directory for execution - - Returns: - Sandlock Policy object + working_dir: Working directory for execution (added to the + writable allowlist). + extra_readable: Additional directories to add to the Landlock + read allowlist (e.g. the parent of a script passed to + ``execute_file``). """ Policy = self._sandlock.Policy - - # Determine allowed paths - allowed_read_paths = [ + + # Landlock requires every path in the allowlist to exist at rule- + # attach time; passing a missing directory makes sandlock_spawn + # fail outright. Filter to paths that actually exist on this host. + _candidate_read_paths = [ "/usr/lib/python3", "/usr/local/lib/python3", "/lib", @@ -133,37 +152,77 @@ def _create_policy( "/bin", "/usr/bin", ] - + allowed_read_paths = [p for p in _candidate_read_paths if os.path.isdir(p)] + if extra_readable: + allowed_read_paths.extend( + p for p in extra_readable if os.path.isdir(p) + ) + allowed_write_paths = [] if working_dir: allowed_write_paths.append(working_dir) if self._temp_dir: allowed_write_paths.append(self._temp_dir) - + # Add any configured allowed paths from security policy if hasattr(self.config, 'security_policy') and self.config.security_policy: allowed_write_paths.extend(self.config.security_policy.allowed_paths) - # Create policy with kernel-level restrictions + # Network policy. + # + # sandlock uses tri-state semantics for net_allow_hosts: + # None -> unrestricted (real /etc/hosts visible) + # [] -> deny all hosts (empty virtual /etc/hosts) + # ["host", ...] -> allowlist + # + # TCP connectivity is governed separately by net_connect/net_bind, + # both of which default to [] = deny all. To enable network we must + # explicitly open TCP ports; to block network we can rely on the + # defaults AND additionally block DNS via net_allow_hosts=[]. + if limits.network_enabled: + net_kwargs: Dict[str, Any] = { + # Allow outbound TCP to any port; leave net_allow_hosts at + # its default (None = /etc/hosts unrestricted). + "net_connect": ["0-65535"], + } + else: + net_kwargs = { + # Empty allowlist -> no host resolvable via virtual /etc/hosts. + # net_bind/net_connect default to [] = deny all TCP. + "net_allow_hosts": [], + } + policy = Policy( # Filesystem restrictions (Landlock) fs_readable=allowed_read_paths, fs_writable=allowed_write_paths, - - # Network restrictions - net_allow_hosts=[] if not limits.network_enabled else None, - + # Resource limits max_memory=f"{limits.memory_mb}M", max_processes=limits.max_processes, max_open_files=limits.max_open_files, - - # Note: CPU throttle percentage, not time limit - # Execution timeout is handled via Sandbox.run(timeout=...) + # max_cpu is a throttle percentage of one core, not a time budget. + # Execution timeout is handled via Sandbox.run(timeout=...). + max_cpu=limits.cpu_percent, + + **net_kwargs, ) - + return policy + @staticmethod + def _decode(buf: Any) -> str: + """Decode a sandlock Result stdout/stderr buffer to str. + + sandlock returns ``bytes`` from ``Sandbox.run()``; PraisonAI's + ``SandboxResult`` uses ``str`` throughout. Invalid UTF-8 is + replaced rather than raised so downstream consumers never see + binary artefacts. + """ + if isinstance(buf, bytes): + return buf.decode("utf-8", errors="replace") + return buf or "" + def _safe_sandbox_path(self, path: str) -> Optional[str]: """Resolve a caller-supplied path to an absolute path inside _temp_dir. @@ -190,6 +249,7 @@ async def _run_sandlocked( limits: ResourceLimits, env: Optional[Dict[str, str]], working_dir: Optional[str], + extra_readable: Optional[List[str]] = None, ) -> SandboxResult: """Execute *cmd* inside a sandlock Sandbox and return a SandboxResult. @@ -202,87 +262,67 @@ async def _run_sandlocked( prior to sandbox creation. ``working_dir`` is applied via the policy (added to the writable-path allow-list). """ - policy = self._create_policy(limits, working_dir) + policy = self._create_policy(limits, working_dir, extra_readable) started_at = time.time() - try: - sandbox = self._sandlock.Sandbox(policy) - - result = await asyncio.get_running_loop().run_in_executor( - None, - lambda: sandbox.run(cmd, timeout=limits.timeout_seconds) - ) - - completed_at = time.time() - duration = completed_at - started_at - - # Check result.success and exit_code to determine actual status - if not result.success: - # Check if this was a timeout based on duration - if duration >= limits.timeout_seconds: - return SandboxResult( - execution_id=execution_id, - status=SandboxStatus.TIMEOUT, - error=f"Execution timed out after {limits.timeout_seconds}s", - exit_code=result.exit_code, - stdout=result.stdout, - stderr=result.stderr, - duration_seconds=duration, - started_at=started_at, - completed_at=completed_at, - ) - else: - # Non-zero exit or other failure - return SandboxResult( - execution_id=execution_id, - status=SandboxStatus.FAILED, - error=f"Execution failed with exit code {result.exit_code}: {result.stderr}", - exit_code=result.exit_code, - stdout=result.stdout, - stderr=result.stderr, - duration_seconds=duration, - started_at=started_at, - completed_at=completed_at, - ) - - return SandboxResult( - execution_id=execution_id, - status=SandboxStatus.COMPLETED, - exit_code=result.exit_code, - stdout=result.stdout, - stderr=result.stderr, - duration_seconds=duration, - started_at=started_at, - completed_at=completed_at, - metadata={ - "sandbox_type": "sandlock", - "landlock_enabled": True, - "seccomp_enabled": True, - }, - ) + def _run() -> Any: + # Context manager ensures the sandbox handle is released even + # if .run() raises partway through. + with self._sandlock.Sandbox(policy) as sb: + return sb.run(cmd, timeout=limits.timeout_seconds) + try: + result = await asyncio.get_running_loop().run_in_executor(None, _run) except Exception as e: completed_at = time.time() - duration = completed_at - started_at - if duration >= limits.timeout_seconds: - return SandboxResult( - execution_id=execution_id, - status=SandboxStatus.TIMEOUT, - error=f"Execution timed out after {limits.timeout_seconds}s", - duration_seconds=duration, - started_at=started_at, - completed_at=completed_at, - ) return SandboxResult( execution_id=execution_id, status=SandboxStatus.FAILED, error=f"Execution failed: {e}", - duration_seconds=duration, + duration_seconds=completed_at - started_at, started_at=started_at, completed_at=completed_at, ) + completed_at = time.time() + duration = completed_at - started_at + stdout = self._decode(result.stdout) + stderr = self._decode(result.stderr) + + # sandlock surfaces timeouts via result.error containing "timed out". + # This is authoritative — wall-clock guesses are unreliable because a + # process can legitimately finish just under the limit. + err_text = (result.error or "") if not result.success else "" + is_timeout = "timed out" in err_text.lower() or "timeout" in err_text.lower() + + if result.success: + status = SandboxStatus.COMPLETED + error = None + elif is_timeout: + status = SandboxStatus.TIMEOUT + error = f"Execution timed out after {limits.timeout_seconds}s" + else: + status = SandboxStatus.FAILED + error = f"Execution failed with exit code {result.exit_code}: {stderr or err_text}" + + return SandboxResult( + execution_id=execution_id, + status=status, + exit_code=result.exit_code, + stdout=stdout, + stderr=stderr, + duration_seconds=duration, + started_at=started_at, + completed_at=completed_at, + error=error, + metadata={ + "sandbox_type": "sandlock", + "landlock_enabled": True, + "seccomp_enabled": True, + }, + ) + async def execute( self, code: str, @@ -294,14 +334,7 @@ async def execute( """Execute code in the sandlock-isolated sandbox.""" if not self._is_running: await self.start() - - if not self.is_available: - # Fallback to subprocess sandbox if sandlock not available - logger.warning("Sandlock not available, falling back to subprocess") - from .subprocess import SubprocessSandbox - fallback = SubprocessSandbox(self.config) - return await fallback.execute(code, language, limits, env, working_dir) - + limits = limits or self.config.resource_limits execution_id = str(uuid.uuid4()) @@ -325,22 +358,39 @@ async def execute_file( limits: Optional[ResourceLimits] = None, env: Optional[Dict[str, str]] = None, ) -> SandboxResult: - """Execute a file in the sandbox.""" + """Execute a file in the sandbox. + + The script is passed to the interpreter by path rather than slurped + through ``-c``, so large scripts don't hit ARG_MAX. The file's + parent directory is added to the Landlock read allowlist for this + run so the sandboxed process can actually open it. + """ + if not self._is_running: + await self.start() + if not os.path.exists(file_path): return SandboxResult( status=SandboxStatus.FAILED, error=f"File not found: {file_path}", ) - - with open(file_path, "r") as f: - code = f.read() - - # Determine language from file extension - language = "python" - if file_path.endswith(".sh") or file_path.endswith(".bash"): - language = "bash" - - return await self.execute(code, language=language, limits=limits, env=env) + + limits = limits or self.config.resource_limits + execution_id = str(uuid.uuid4()) + + abs_path = os.path.realpath(file_path) + interp = "bash" if file_path.endswith((".sh", ".bash")) else "python3" + cmd: List[str] = [interp, abs_path] + if args: + cmd.extend(args) + + return await self._run_sandlocked( + cmd, + execution_id=execution_id, + limits=limits, + env=env, + working_dir=self._temp_dir, + extra_readable=[os.path.dirname(abs_path)], + ) async def run_command( self, @@ -352,13 +402,7 @@ async def run_command( """Run a shell command in the sandbox.""" if not self._is_running: await self.start() - - if not self.is_available: - logger.warning("Sandlock not available, falling back to subprocess") - from .subprocess import SubprocessSandbox - fallback = SubprocessSandbox(self.config) - return await fallback.run_command(command, limits, env, working_dir) - + limits = limits or self.config.resource_limits execution_id = str(uuid.uuid4()) @@ -462,4 +506,4 @@ async def cleanup(self) -> None: async def reset(self) -> None: """Reset sandbox to initial state.""" await self.stop() - await self.start() \ No newline at end of file + await self.start() diff --git a/src/praisonai/tests/unit/sandbox/test_sandlock_sandbox.py b/src/praisonai/tests/unit/sandbox/test_sandlock_sandbox.py index 206099ca2..87d2e4764 100644 --- a/src/praisonai/tests/unit/sandbox/test_sandlock_sandbox.py +++ b/src/praisonai/tests/unit/sandbox/test_sandlock_sandbox.py @@ -42,43 +42,25 @@ def test_import_without_sandlock(self): with pytest.raises(ImportError, match="sandlock package required"): SandlockSandbox() - def test_fallback_to_subprocess_when_unavailable(self): - """Test fallback to subprocess when sandlock is not available.""" - mock_sandlock = Mock() - mock_sandlock.landlock_abi_version.return_value = 0 # < 1, so unavailable - - sandbox = _make_sandbox(mock_sandlock) - assert not sandbox.is_available - assert sandbox.sandbox_type == "sandlock" + def test_raises_when_landlock_unavailable(self): + """Instantiation must fail loud on kernels without Landlock support. - @pytest.mark.asyncio - async def test_fallback_execution(self): - """Test that execution falls back to subprocess when sandlock unavailable.""" + Silent degradation to SubprocessSandbox would violate the caller's + explicit choice of kernel-level isolation — a SandlockSandbox that + isn't actually using Landlock is a security footgun. + """ mock_sandlock = Mock() - mock_sandlock.landlock_abi_version.return_value = 0 # < 1, so unavailable - - sandbox = _make_sandbox(mock_sandlock) - - mock_subprocess_instance = AsyncMock() - mock_subprocess_instance.execute.return_value = Mock( - status=SandboxStatus.COMPLETED, - exit_code=0, - stdout="Hello, World!", - stderr="", - ) + mock_sandlock.landlock_abi_version.return_value = 0 # unsupported - with patch("praisonai.sandbox.subprocess.SubprocessSandbox") as mock_subprocess: - mock_subprocess.return_value = mock_subprocess_instance - result = await sandbox.execute("print('Hello, World!')") - - mock_subprocess.assert_called_once() - mock_subprocess_instance.execute.assert_called_once() + with pytest.raises(RuntimeError, match="requires Landlock"): + _make_sandbox(mock_sandlock) def test_policy_creation_with_minimal_limits(self): """Test policy creation with minimal resource limits.""" mock_sandlock = Mock() mock_policy = Mock() mock_sandlock.Policy.return_value = mock_policy + mock_sandlock.landlock_abi_version.return_value = 6 # supported sandbox = _make_sandbox(mock_sandlock) @@ -90,10 +72,13 @@ def test_policy_creation_with_minimal_limits(self): assert "fs_readable" in call_kwargs assert "fs_writable" in call_kwargs - assert "max_memory" in call_kwargs assert call_kwargs["max_memory"] == "128M" # From minimal limits assert call_kwargs["max_processes"] == 5 - assert call_kwargs["net_allow_hosts"] == [] # Network disabled + assert call_kwargs["max_cpu"] == 50 # From minimal limits + # Network disabled → deny all hosts (empty allowlist). + assert call_kwargs["net_allow_hosts"] == [] + # net_connect must NOT be set (defaults to [] = deny all TCP). + assert "net_connect" not in call_kwargs def test_status_reporting(self): """Test sandbox status reporting.""" @@ -140,54 +125,54 @@ async def test_sandlock_execution_success(self): @pytest.mark.asyncio async def test_sandlock_execution_timeout(self): - """Test timeout handling in sandlock execution.""" + """Timeout is detected via result.error, not wall-clock heuristics.""" mock_sandlock = Mock() mock_sandlock.Policy.return_value = Mock() mock_sandlock.Sandbox.return_value = Mock() - mock_sandlock.landlock_abi_version.return_value = 6 # >= 6, so available + mock_sandlock.landlock_abi_version.return_value = 6 sandbox = _make_sandbox(mock_sandlock) - # Create a mock Result object indicating timeout mock_timeout_result = Mock() mock_timeout_result.success = False - mock_timeout_result.exit_code = 124 # Common timeout exit code - mock_timeout_result.stdout = "" - mock_timeout_result.stderr = "Process timed out" + mock_timeout_result.exit_code = 124 + mock_timeout_result.stdout = b"" + mock_timeout_result.stderr = b"" + # sandlock surfaces timeout via the error field. + mock_timeout_result.error = "process timed out after 10s" - # Simulate timeout by returning failed Result after enough time - with patch("asyncio.get_running_loop") as mock_loop, \ - patch("time.time", side_effect=[0, 11, 11]): # Started at 0, ended at 11s (> 10s timeout) + with patch("asyncio.get_running_loop") as mock_loop: mock_loop.return_value.run_in_executor = AsyncMock( return_value=mock_timeout_result ) await sandbox.start() - result = await sandbox.execute("import time; time.sleep(100)", limits=ResourceLimits(timeout_seconds=10)) + result = await sandbox.execute( + "import time; time.sleep(100)", + limits=ResourceLimits(timeout_seconds=10), + ) assert result.status == SandboxStatus.TIMEOUT assert "timed out" in result.error.lower() @pytest.mark.asyncio async def test_sandlock_execution_failure(self): - """Test general execution failure handling.""" + """Non-timeout failures keep the FAILED status and surface stderr.""" mock_sandlock = Mock() mock_sandlock.Policy.return_value = Mock() mock_sandlock.Sandbox.return_value = Mock() - mock_sandlock.landlock_abi_version.return_value = 6 # >= 6, so available + mock_sandlock.landlock_abi_version.return_value = 6 sandbox = _make_sandbox(mock_sandlock) - # Create a mock Result object indicating non-zero exit mock_failed_result = Mock() mock_failed_result.success = False - mock_failed_result.exit_code = 1 # Non-zero exit code - mock_failed_result.stdout = "" - mock_failed_result.stderr = "Permission denied" + mock_failed_result.exit_code = 1 + mock_failed_result.stdout = b"" + mock_failed_result.stderr = b"Permission denied" + mock_failed_result.error = None # not a timeout - # Simulate execution failure (not timeout) - with patch("asyncio.get_running_loop") as mock_loop, \ - patch("time.time", side_effect=[0, 2, 2]): # Started at 0, ended at 2s (< 10s timeout) + with patch("asyncio.get_running_loop") as mock_loop: mock_loop.return_value.run_in_executor = AsyncMock( return_value=mock_failed_result ) From 8b959b36d0a72c0070b66776c65c84528793c13f Mon Sep 17 00:00:00 2001 From: Cong Wang Date: Sun, 12 Apr 2026 17:22:52 -0700 Subject: [PATCH 2/2] fix(sandbox): use sandlock's exit_code == -1 sentinel for timeout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sandbox.run() does not populate result.error on timeout — only the Pipeline path does. String-matching on result.error was therefore unreliable for the common single-sandbox case: a real timeout from Sandbox.run() returns success=False, exit_code=-1, empty stderr, and error=None, which my previous logic mis-classified as FAILED. Switch to the structural signal: sandlock's ExitStatus::Timeout is exposed as exit_code == -1 (see sandlock's _sdk.py around line 1475). This matches how sandlock itself detects pipeline timeouts and works uniformly across Sandbox.run() and any future execution paths. Verified end-to-end with a real forced timeout against real sandlock: status: SandboxStatus.TIMEOUT exit: -1 error: Execution timed out after 1s Test updated to match: mock_timeout_result.exit_code = -1 and error = None (reflecting actual Sandbox.run() behavior). --- src/praisonai/praisonai/sandbox/sandlock.py | 14 ++++++-------- .../tests/unit/sandbox/test_sandlock_sandbox.py | 9 +++++---- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/src/praisonai/praisonai/sandbox/sandlock.py b/src/praisonai/praisonai/sandbox/sandlock.py index 3af57be55..f14c43d43 100644 --- a/src/praisonai/praisonai/sandbox/sandlock.py +++ b/src/praisonai/praisonai/sandbox/sandlock.py @@ -290,21 +290,19 @@ def _run() -> Any: stdout = self._decode(result.stdout) stderr = self._decode(result.stderr) - # sandlock surfaces timeouts via result.error containing "timed out". - # This is authoritative — wall-clock guesses are unreliable because a - # process can legitimately finish just under the limit. - err_text = (result.error or "") if not result.success else "" - is_timeout = "timed out" in err_text.lower() or "timeout" in err_text.lower() - + # sandlock uses exit_code == -1 as the ExitStatus::Timeout sentinel + # (see sandlock's python/src/sandlock/_sdk.py). This is a + # structural signal — Sandbox.run() doesn't populate result.error + # for timeouts, so string-matching on it is unreliable. if result.success: status = SandboxStatus.COMPLETED error = None - elif is_timeout: + elif result.exit_code == -1: status = SandboxStatus.TIMEOUT error = f"Execution timed out after {limits.timeout_seconds}s" else: status = SandboxStatus.FAILED - error = f"Execution failed with exit code {result.exit_code}: {stderr or err_text}" + error = f"Execution failed with exit code {result.exit_code}: {stderr}" return SandboxResult( execution_id=execution_id, diff --git a/src/praisonai/tests/unit/sandbox/test_sandlock_sandbox.py b/src/praisonai/tests/unit/sandbox/test_sandlock_sandbox.py index 87d2e4764..1c2c7b64b 100644 --- a/src/praisonai/tests/unit/sandbox/test_sandlock_sandbox.py +++ b/src/praisonai/tests/unit/sandbox/test_sandlock_sandbox.py @@ -125,7 +125,7 @@ async def test_sandlock_execution_success(self): @pytest.mark.asyncio async def test_sandlock_execution_timeout(self): - """Timeout is detected via result.error, not wall-clock heuristics.""" + """Timeout is detected via exit_code == -1 (ExitStatus::Timeout).""" mock_sandlock = Mock() mock_sandlock.Policy.return_value = Mock() mock_sandlock.Sandbox.return_value = Mock() @@ -135,11 +135,12 @@ async def test_sandlock_execution_timeout(self): mock_timeout_result = Mock() mock_timeout_result.success = False - mock_timeout_result.exit_code = 124 + # sandlock's timeout sentinel — Sandbox.run() does not populate + # result.error on timeout, so we rely on the exit_code instead. + mock_timeout_result.exit_code = -1 mock_timeout_result.stdout = b"" mock_timeout_result.stderr = b"" - # sandlock surfaces timeout via the error field. - mock_timeout_result.error = "process timed out after 10s" + mock_timeout_result.error = None with patch("asyncio.get_running_loop") as mock_loop: mock_loop.return_value.run_in_executor = AsyncMock(