Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
268 changes: 155 additions & 113 deletions src/praisonai/praisonai/sandbox/sandlock.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,21 @@ def __init__(
"sandlock package required for SandlockSandbox. "
"Install with: pip install 'praisonai[sandbox]' or pip install sandlock"
)

# Fail loud if Landlock isn't supported on this kernel.
try:
abi = self._sandlock.landlock_abi_version()
except Exception as e:
raise RuntimeError(
f"failed to query Landlock ABI version: {e}"
) from e
if abi < 1:
raise RuntimeError(
"SandlockSandbox requires Landlock support (Linux kernel "
">= 6.12 with CONFIG_SECURITY_LANDLOCK=y). This kernel "
f"reports Landlock ABI version {abi}. Use SubprocessSandbox "
"explicitly if weaker isolation is acceptable."
)
Comment on lines +86 to +91
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Incorrect minimum kernel version in error message

The error message states Linux kernel >= 6.12, but Landlock ABI version 1 was introduced in Linux 5.13 (not 6.12). The PR description itself mentions "running on a kernel <5.13" in the breaking-change section, so the discrepancy is likely a typo. Users on kernels between 5.13 and 6.12 who encounter this error will be told their kernel is too old when it is actually sufficient.

Suggested change
raise RuntimeError(
"SandlockSandbox requires Landlock support (Linux kernel "
">= 6.12 with CONFIG_SECURITY_LANDLOCK=y). This kernel "
f"reports Landlock ABI version {abi}. Use SubprocessSandbox "
"explicitly if weaker isolation is acceptable."
)
if abi < 1:
raise RuntimeError(
"SandlockSandbox requires Landlock support (Linux kernel "
">= 5.13 with CONFIG_SECURITY_LANDLOCK=y). This kernel "
f"reports Landlock ABI version {abi}. Use SubprocessSandbox "
"explicitly if weaker isolation is acceptable."
)

Comment on lines +85 to +91
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Incorrect kernel version in error message.

The error message states Landlock requires "Linux kernel >= 6.12", but Landlock ABI v1 was introduced in Linux kernel 5.13 (released 2021). Kernel 6.12 is not accurate and may mislead users into thinking they need a much newer kernel than actually required.

📝 Suggested fix
         if abi < 1:
             raise RuntimeError(
                 "SandlockSandbox requires Landlock support (Linux kernel "
-                ">= 6.12 with CONFIG_SECURITY_LANDLOCK=y).  This kernel "
+                ">= 5.13 with CONFIG_SECURITY_LANDLOCK=y).  This kernel "
                 f"reports Landlock ABI version {abi}.  Use SubprocessSandbox "
                 "explicitly if weaker isolation is acceptable."
             )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if abi < 1:
raise RuntimeError(
"SandlockSandbox requires Landlock support (Linux kernel "
">= 6.12 with CONFIG_SECURITY_LANDLOCK=y). This kernel "
f"reports Landlock ABI version {abi}. Use SubprocessSandbox "
"explicitly if weaker isolation is acceptable."
)
if abi < 1:
raise RuntimeError(
"SandlockSandbox requires Landlock support (Linux kernel "
">= 5.13 with CONFIG_SECURITY_LANDLOCK=y). This kernel "
f"reports Landlock ABI version {abi}. Use SubprocessSandbox "
"explicitly if weaker isolation is acceptable."
)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/praisonai/praisonai/sandbox/sandlock.py` around lines 85 - 91, The
RuntimeError message raised in SandlockSandbox when abi < 1 contains an
incorrect kernel version; update the text in the raise RuntimeError (in
sandlock.py around the SandlockSandbox check using the variable abi) to state
the correct minimum kernel that introduced Landlock ABI v1 (Linux kernel >=
5.13) while preserving the rest of the guidance (CONFIG_SECURITY_LANDLOCK=y and
suggestion to use SubprocessSandbox).


@property
def is_available(self) -> bool:
Expand Down Expand Up @@ -112,58 +127,102 @@ def _create_policy(
self,
limits: ResourceLimits,
working_dir: Optional[str] = None,
extra_readable: Optional[List[str]] = None,
) -> Any:
"""Create sandlock policy from resource limits.

Args:
limits: Resource limits configuration
working_dir: Working directory for execution

Returns:
Sandlock Policy object
working_dir: Working directory for execution (added to the
writable allowlist).
extra_readable: Additional directories to add to the Landlock
read allowlist (e.g. the parent of a script passed to
``execute_file``).
"""
Policy = self._sandlock.Policy

# Determine allowed paths
allowed_read_paths = [

# Landlock requires every path in the allowlist to exist at rule-
# attach time; passing a missing directory makes sandlock_spawn
# fail outright. Filter to paths that actually exist on this host.
_candidate_read_paths = [
"/usr/lib/python3",
"/usr/local/lib/python3",
"/lib",
"/usr/lib",
"/bin",
"/usr/bin",
]

allowed_read_paths = [p for p in _candidate_read_paths if os.path.isdir(p)]
if extra_readable:
allowed_read_paths.extend(
p for p in extra_readable if os.path.isdir(p)
)
Comment on lines +155 to +159
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation of _create_policy has two issues regarding the read allowlist:

  1. It only allows directories in extra_readable due to the os.path.isdir check. This prevents allowlisting individual files, which is necessary for the execute_file method to work securely.
  2. The sandbox's temporary directory (self._temp_dir) and the working_dir are added to the writable allowlist but not the readable one. In Landlock, write access does not implicitly grant read access. The sandboxed process will likely fail to read its own working directory or files written to the sandbox unless they are explicitly added to fs_readable.
Suggested change
allowed_read_paths = [p for p in _candidate_read_paths if os.path.isdir(p)]
if extra_readable:
allowed_read_paths.extend(
p for p in extra_readable if os.path.isdir(p)
)
allowed_read_paths = [p for p in _candidate_read_paths if os.path.isdir(p)]
if extra_readable:
allowed_read_paths.extend(p for p in extra_readable if os.path.exists(p))
for p in [self._temp_dir, working_dir]:
if p and os.path.isdir(p) and p not in allowed_read_paths:
allowed_read_paths.append(p)


allowed_write_paths = []
if working_dir:
allowed_write_paths.append(working_dir)
if self._temp_dir:
allowed_write_paths.append(self._temp_dir)

# Add any configured allowed paths from security policy
if hasattr(self.config, 'security_policy') and self.config.security_policy:
allowed_write_paths.extend(self.config.security_policy.allowed_paths)

# Create policy with kernel-level restrictions
# Network policy.
#
# sandlock uses tri-state semantics for net_allow_hosts:
# None -> unrestricted (real /etc/hosts visible)
# [] -> deny all hosts (empty virtual /etc/hosts)
# ["host", ...] -> allowlist
#
# TCP connectivity is governed separately by net_connect/net_bind,
# both of which default to [] = deny all. To enable network we must
# explicitly open TCP ports; to block network we can rely on the
# defaults AND additionally block DNS via net_allow_hosts=[].
if limits.network_enabled:
net_kwargs: Dict[str, Any] = {
# Allow outbound TCP to any port; leave net_allow_hosts at
# its default (None = /etc/hosts unrestricted).
"net_connect": ["0-65535"],
}
else:
net_kwargs = {
# Empty allowlist -> no host resolvable via virtual /etc/hosts.
# net_bind/net_connect default to [] = deny all TCP.
"net_allow_hosts": [],
}

policy = Policy(
# Filesystem restrictions (Landlock)
fs_readable=allowed_read_paths,
fs_writable=allowed_write_paths,

# Network restrictions
net_allow_hosts=[] if not limits.network_enabled else None,


# Resource limits
max_memory=f"{limits.memory_mb}M",
max_processes=limits.max_processes,
max_open_files=limits.max_open_files,

# Note: CPU throttle percentage, not time limit
# Execution timeout is handled via Sandbox.run(timeout=...)
# max_cpu is a throttle percentage of one core, not a time budget.
# Execution timeout is handled via Sandbox.run(timeout=...).
max_cpu=limits.cpu_percent,

**net_kwargs,
)

return policy

@staticmethod
def _decode(buf: Any) -> str:
"""Decode a sandlock Result stdout/stderr buffer to str.

sandlock returns ``bytes`` from ``Sandbox.run()``; PraisonAI's
``SandboxResult`` uses ``str`` throughout. Invalid UTF-8 is
replaced rather than raised so downstream consumers never see
binary artefacts.
"""
if isinstance(buf, bytes):
return buf.decode("utf-8", errors="replace")
return buf or ""

def _safe_sandbox_path(self, path: str) -> Optional[str]:
"""Resolve a caller-supplied path to an absolute path inside _temp_dir.

Expand All @@ -190,6 +249,7 @@ async def _run_sandlocked(
limits: ResourceLimits,
env: Optional[Dict[str, str]],
working_dir: Optional[str],
extra_readable: Optional[List[str]] = None,
) -> SandboxResult:
"""Execute *cmd* inside a sandlock Sandbox and return a SandboxResult.

Expand All @@ -202,87 +262,65 @@ async def _run_sandlocked(
prior to sandbox creation. ``working_dir`` is applied via the policy
(added to the writable-path allow-list).
"""
policy = self._create_policy(limits, working_dir)
policy = self._create_policy(limits, working_dir, extra_readable)

started_at = time.time()

try:
sandbox = self._sandlock.Sandbox(policy)

result = await asyncio.get_running_loop().run_in_executor(
None,
lambda: sandbox.run(cmd, timeout=limits.timeout_seconds)
)

completed_at = time.time()
duration = completed_at - started_at

# Check result.success and exit_code to determine actual status
if not result.success:
# Check if this was a timeout based on duration
if duration >= limits.timeout_seconds:
return SandboxResult(
execution_id=execution_id,
status=SandboxStatus.TIMEOUT,
error=f"Execution timed out after {limits.timeout_seconds}s",
exit_code=result.exit_code,
stdout=result.stdout,
stderr=result.stderr,
duration_seconds=duration,
started_at=started_at,
completed_at=completed_at,
)
else:
# Non-zero exit or other failure
return SandboxResult(
execution_id=execution_id,
status=SandboxStatus.FAILED,
error=f"Execution failed with exit code {result.exit_code}: {result.stderr}",
exit_code=result.exit_code,
stdout=result.stdout,
stderr=result.stderr,
duration_seconds=duration,
started_at=started_at,
completed_at=completed_at,
)

return SandboxResult(
execution_id=execution_id,
status=SandboxStatus.COMPLETED,
exit_code=result.exit_code,
stdout=result.stdout,
stderr=result.stderr,
duration_seconds=duration,
started_at=started_at,
completed_at=completed_at,
metadata={
"sandbox_type": "sandlock",
"landlock_enabled": True,
"seccomp_enabled": True,
},
)
def _run() -> Any:
# Context manager ensures the sandbox handle is released even
# if .run() raises partway through.
with self._sandlock.Sandbox(policy) as sb:
return sb.run(cmd, timeout=limits.timeout_seconds)

try:
result = await asyncio.get_running_loop().run_in_executor(None, _run)
except Exception as e:
completed_at = time.time()
duration = completed_at - started_at
if duration >= limits.timeout_seconds:
return SandboxResult(
execution_id=execution_id,
status=SandboxStatus.TIMEOUT,
error=f"Execution timed out after {limits.timeout_seconds}s",
duration_seconds=duration,
started_at=started_at,
completed_at=completed_at,
)
return SandboxResult(
execution_id=execution_id,
status=SandboxStatus.FAILED,
error=f"Execution failed: {e}",
duration_seconds=duration,
duration_seconds=completed_at - started_at,
started_at=started_at,
completed_at=completed_at,
)

completed_at = time.time()
duration = completed_at - started_at
stdout = self._decode(result.stdout)
stderr = self._decode(result.stderr)

# sandlock uses exit_code == -1 as the ExitStatus::Timeout sentinel
# (see sandlock's python/src/sandlock/_sdk.py). This is a
# structural signal — Sandbox.run() doesn't populate result.error
# for timeouts, so string-matching on it is unreliable.
if result.success:
status = SandboxStatus.COMPLETED
error = None
elif result.exit_code == -1:
status = SandboxStatus.TIMEOUT
error = f"Execution timed out after {limits.timeout_seconds}s"
else:
status = SandboxStatus.FAILED
error = f"Execution failed with exit code {result.exit_code}: {stderr}"

return SandboxResult(
execution_id=execution_id,
status=status,
exit_code=result.exit_code,
stdout=stdout,
stderr=stderr,
duration_seconds=duration,
started_at=started_at,
completed_at=completed_at,
error=error,
metadata={
"sandbox_type": "sandlock",
"landlock_enabled": True,
"seccomp_enabled": True,
},
)

async def execute(
self,
code: str,
Expand All @@ -294,14 +332,7 @@ async def execute(
"""Execute code in the sandlock-isolated sandbox."""
if not self._is_running:
await self.start()

if not self.is_available:
# Fallback to subprocess sandbox if sandlock not available
logger.warning("Sandlock not available, falling back to subprocess")
from .subprocess import SubprocessSandbox
fallback = SubprocessSandbox(self.config)
return await fallback.execute(code, language, limits, env, working_dir)


limits = limits or self.config.resource_limits
execution_id = str(uuid.uuid4())

Expand All @@ -325,22 +356,39 @@ async def execute_file(
limits: Optional[ResourceLimits] = None,
env: Optional[Dict[str, str]] = None,
) -> SandboxResult:
"""Execute a file in the sandbox."""
"""Execute a file in the sandbox.

The script is passed to the interpreter by path rather than slurped
through ``-c``, so large scripts don't hit ARG_MAX. The file's
parent directory is added to the Landlock read allowlist for this
run so the sandboxed process can actually open it.
"""
if not self._is_running:
await self.start()

if not os.path.exists(file_path):
return SandboxResult(
status=SandboxStatus.FAILED,
error=f"File not found: {file_path}",
)

with open(file_path, "r") as f:
code = f.read()

# Determine language from file extension
language = "python"
if file_path.endswith(".sh") or file_path.endswith(".bash"):
language = "bash"

return await self.execute(code, language=language, limits=limits, env=env)

limits = limits or self.config.resource_limits
execution_id = str(uuid.uuid4())

abs_path = os.path.realpath(file_path)
interp = "bash" if file_path.endswith((".sh", ".bash")) else "python3"
cmd: List[str] = [interp, abs_path]
if args:
cmd.extend(args)

return await self._run_sandlocked(
cmd,
execution_id=execution_id,
limits=limits,
env=env,
working_dir=self._temp_dir,
extra_readable=[os.path.dirname(abs_path)],
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

Allowlisting the entire parent directory of the script is overly permissive and could expose sensitive files on the host if the script is located outside the sandbox's temporary directory. It is safer to only allowlist the script file itself. Note that this requires updating _create_policy to support files in extra_readable (by using os.path.exists instead of os.path.isdir).

Suggested change
extra_readable=[os.path.dirname(abs_path)],
extra_readable=[abs_path],

)
Comment on lines +378 to +391
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Consider edge case when script is at filesystem root.

If abs_path resolves to a file directly under / (e.g., /script.py), then os.path.dirname(abs_path) returns "/", which would add the entire root filesystem to the read allowlist. While this is an unlikely edge case, it could weaken isolation.

🛡️ Suggested guard
         abs_path = os.path.realpath(file_path)
+        script_dir = os.path.dirname(abs_path)
+        extra_read = [script_dir] if script_dir and script_dir != "/" else []
         interp = "bash" if file_path.endswith((".sh", ".bash")) else "python3"
         cmd: List[str] = [interp, abs_path]
         if args:
             cmd.extend(args)
 
         return await self._run_sandlocked(
             cmd,
             execution_id=execution_id,
             limits=limits,
             env=env,
             working_dir=self._temp_dir,
-            extra_readable=[os.path.dirname(abs_path)],
+            extra_readable=extra_read,
         )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
abs_path = os.path.realpath(file_path)
interp = "bash" if file_path.endswith((".sh", ".bash")) else "python3"
cmd: List[str] = [interp, abs_path]
if args:
cmd.extend(args)
return await self._run_sandlocked(
cmd,
execution_id=execution_id,
limits=limits,
env=env,
working_dir=self._temp_dir,
extra_readable=[os.path.dirname(abs_path)],
)
abs_path = os.path.realpath(file_path)
script_dir = os.path.dirname(abs_path)
extra_read = [script_dir] if script_dir and script_dir != "/" else []
interp = "bash" if file_path.endswith((".sh", ".bash")) else "python3"
cmd: List[str] = [interp, abs_path]
if args:
cmd.extend(args)
return await self._run_sandlocked(
cmd,
execution_id=execution_id,
limits=limits,
env=env,
working_dir=self._temp_dir,
extra_readable=extra_read,
)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/praisonai/praisonai/sandbox/sandlock.py` around lines 378 - 391, The code
unconditionally adds os.path.dirname(abs_path) to extra_readable which can be
"/" for files at filesystem root and would expose the entire root; change the
logic before calling _run_sandlocked to detect this edge case (abs_path from
os.path.realpath) and if os.path.dirname(abs_path) == os.path.sep, pass the file
path itself (abs_path) or a more restrictive path list instead of "/" as the
extra_readable entry; update the call site that constructs extra_readable (in
the method that builds cmd and calls self._run_sandlocked) to use this
conditional selection so only the file (or a non-root directory) is added.


async def run_command(
self,
Expand All @@ -352,13 +400,7 @@ async def run_command(
"""Run a shell command in the sandbox."""
if not self._is_running:
await self.start()

if not self.is_available:
logger.warning("Sandlock not available, falling back to subprocess")
from .subprocess import SubprocessSandbox
fallback = SubprocessSandbox(self.config)
return await fallback.run_command(command, limits, env, working_dir)


limits = limits or self.config.resource_limits
execution_id = str(uuid.uuid4())

Expand Down Expand Up @@ -462,4 +504,4 @@ async def cleanup(self) -> None:
async def reset(self) -> None:
"""Reset sandbox to initial state."""
await self.stop()
await self.start()
await self.start()
Loading
Loading