Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions src/middleware/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ def _should_skip_auth(self, request: Request, scope: dict) -> bool:
For trusted-network and disabled-auth bypasses we seed scope state
with anonymous markers so downstream code that reads
``request.state.api_key_hash`` / ``is_env_key`` does not raise.
We *also* attempt best-effort identity extraction (verified JWT
sub, then User-Id header) so file-ownership checks in the
orchestrator still work when the request is bypassed but the
caller did identify itself. Without this, LibreChat bash_tool
calls from a CIDR-trusted pod can't reach the user's uploaded
files because we have no user_id to match against.
"""
path = request.url.path
is_admin_path = path.startswith("/api/v1/admin") or path.startswith("/admin-dashboard")
Expand All @@ -203,12 +209,14 @@ def _should_skip_auth(self, request: Request, scope: dict) -> bool:
# Trusted-network bypass — only applies to user-facing paths.
if self._trusted_networks and self._is_trusted_network(request):
self._grant_anonymous_access(scope)
self._extract_best_effort_identity(request, scope)
return True

# Operator-controlled bypass for trusted-boundary deployments
# (e.g. mTLS sidecar, VPC ingress). Never applies to admin paths.
if not settings.auth_enabled:
self._grant_anonymous_access(scope)
self._extract_best_effort_identity(request, scope)
return True

return False
Expand All @@ -230,6 +238,66 @@ def _grant_anonymous_access(scope: dict) -> None:
scope_state.setdefault("is_env_key", False)
scope["state"] = scope_state

def _extract_best_effort_identity(self, request: Request, scope: dict) -> None:
"""Populate ``scope.state.user_id`` from any identity signal present.

Even when auth is bypassed (CIDR trust, AUTH_ENABLED=false) we
still want to know *who* is calling, so the orchestrator's
cross-user file-isolation checks can find the user's session.
Without an identity, every bypassed request looks like a brand-
new anonymous user and prior uploads become unreachable.

Sources, in order:
1. ``Authorization: Bearer <jwt>`` — verified if CODEAPI_JWT
is enabled. Failures here are non-fatal (the bypass already
allowed the request); we just log and continue without
user_id.
2. ``User-Id`` / ``X-User-Id`` header — unsigned, only trusted
because the bypass already trusted the network boundary
that delivered the request.
"""
scope_state = scope.get("state") or {}
if scope_state.get("user_id"):
return # already set by a prior helper

# JWT path
if settings.codeapi_jwt_enabled:
jwt_token = self._extract_bearer_jwt(request)
if jwt_token:
from ..services.codeapi_jwt import (
CodeApiJwtConfigurationError,
CodeApiJwtError,
verify,
)

try:
claims = verify(jwt_token)
scope_state["user_id"] = claims.sub
scope_state["auth_principal_source"] = "codeapi_jwt_bypassed"
if claims.tenant_id and settings.codeapi_jwt_trust_tenant_id:
scope_state["tenant_id"] = claims.tenant_id
scope["state"] = scope_state
return
except CodeApiJwtConfigurationError as exc:
# Operator told us JWT is on but didn't configure a key.
# In bypass mode we can't 500 — log loudly and fall back.
logger.error(
"CodeAPI JWT misconfigured (auth bypassed, identity unknown)",
error=str(exc),
)
except CodeApiJwtError as exc:
logger.info(
"CodeAPI JWT rejected during bypass (continuing anonymous)",
error=str(exc),
)

# Header path
header_user_id = request.headers.get("user-id") or request.headers.get("x-user-id")
if header_user_id:
scope_state["user_id"] = header_user_id
scope_state.setdefault("auth_principal_source", "header_bypassed")
scope["state"] = scope_state

@staticmethod
def _parse_trusted_networks(raw: str) -> list[ipaddress.IPv4Network | ipaddress.IPv6Network]:
"""Parse comma-separated CIDR strings into network objects."""
Expand Down
200 changes: 200 additions & 0 deletions tests/unit/test_security_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -838,3 +838,203 @@ async def send(message):
start = sent[0]
assert start["type"] == "http.response.start"
assert start["status"] == 401


class TestBypassWithIdentityExtraction:
"""When auth is bypassed (CIDR trust / AUTH_ENABLED=false), we still
populate scope.state.user_id from any identity signal present.

Without this, LibreChat bash_tool calls from a CIDR-trusted pod reach
a "fresh" execution session every time because the orchestrator's
cross-user file-isolation needs a user_id to match against the upload
session. The bypass keeps the user OUT of the auth queue; the identity
extraction keeps the user IN the session-ownership graph.
"""

@pytest.mark.asyncio
async def test_trusted_network_bypass_extracts_jwt_sub(self, security_middleware):
"""CIDR-trusted call carrying a valid JWT: bypass auth-key check
BUT still extract sub for ownership."""
from unittest.mock import patch as _patch

from src.services.codeapi_jwt import JwtClaims

request = MagicMock()
request.url.path = "/exec"
request.method = "POST"
request.client.host = "10.0.0.5"
request.headers.get = lambda name, default=None: {
"authorization": "Bearer aaaa.bbbb.cccc",
"user-id": None,
"x-user-id": None,
}.get(name, default)
scope: dict = {}

import ipaddress

security_middleware._trusted_networks = [ipaddress.ip_network("10.0.0.0/8")]

with (
_patch("src.middleware.security.settings") as mock_settings,
_patch(
"src.services.codeapi_jwt.verify",
return_value=JwtClaims(
sub="user-from-jwt",
tenant_id=None,
role=None,
principal_source=None,
jti=None,
),
),
):
mock_settings.auth_enabled = True
mock_settings.codeapi_jwt_enabled = True
mock_settings.codeapi_jwt_trust_tenant_id = False
mock_settings.max_file_size_mb = 10

assert security_middleware._should_skip_auth(request, scope) is True

assert scope["state"]["user_id"] == "user-from-jwt"
assert scope["state"]["auth_principal_source"] == "codeapi_jwt_bypassed"
# Anonymous markers still seeded so metrics don't crash.
assert scope["state"]["api_key_hash"] == "anonymous"

@pytest.mark.asyncio
async def test_trusted_network_bypass_falls_back_to_user_id_header(self, security_middleware):
"""No JWT, but LibreChat /upload sent User-Id: extract that for ownership."""
from unittest.mock import patch as _patch

request = MagicMock()
request.url.path = "/upload"
request.method = "POST"
request.client.host = "10.0.0.5"
request.headers.get = lambda name, default=None: {
"authorization": None,
"user-id": "lc-user-42",
"x-user-id": None,
}.get(name, default)
scope: dict = {}

import ipaddress

security_middleware._trusted_networks = [ipaddress.ip_network("10.0.0.0/8")]

with _patch("src.middleware.security.settings") as mock_settings:
mock_settings.auth_enabled = True
mock_settings.codeapi_jwt_enabled = False
mock_settings.max_file_size_mb = 10

assert security_middleware._should_skip_auth(request, scope) is True

assert scope["state"]["user_id"] == "lc-user-42"
assert scope["state"]["auth_principal_source"] == "header_bypassed"

@pytest.mark.asyncio
async def test_auth_disabled_bypass_extracts_jwt_sub(self, security_middleware):
"""AUTH_ENABLED=false + JWT enabled + valid JWT → bypass + identity."""
from unittest.mock import patch as _patch

from src.services.codeapi_jwt import JwtClaims

request = MagicMock()
request.url.path = "/exec"
request.method = "POST"
request.headers.get = lambda name, default=None: {
"authorization": "Bearer aaaa.bbbb.cccc",
"user-id": None,
"x-user-id": None,
}.get(name, default)
scope: dict = {}

with (
_patch("src.middleware.security.settings") as mock_settings,
_patch(
"src.services.codeapi_jwt.verify",
return_value=JwtClaims(
sub="user-from-jwt",
tenant_id=None,
role=None,
principal_source=None,
jti=None,
),
),
):
mock_settings.auth_enabled = False
mock_settings.codeapi_jwt_enabled = True
mock_settings.codeapi_jwt_trust_tenant_id = False
mock_settings.auth_trusted_networks = ""
mock_settings.max_file_size_mb = 10

assert security_middleware._should_skip_auth(request, scope) is True

assert scope["state"]["user_id"] == "user-from-jwt"

@pytest.mark.asyncio
async def test_bypass_with_invalid_jwt_falls_back_to_anonymous(self, security_middleware):
"""Bypassed request with an INVALID JWT: don't 401 (bypass already
allowed it), don't set user_id, log and continue."""
from unittest.mock import patch as _patch

from src.services.codeapi_jwt import CodeApiJwtError

request = MagicMock()
request.url.path = "/exec"
request.method = "POST"
request.client.host = "10.0.0.5"
request.headers.get = lambda name, default=None: {
"authorization": "Bearer aaaa.bbbb.cccc",
"user-id": None,
"x-user-id": None,
}.get(name, default)
scope: dict = {}

import ipaddress

security_middleware._trusted_networks = [ipaddress.ip_network("10.0.0.0/8")]

with (
_patch("src.middleware.security.settings") as mock_settings,
_patch("src.services.codeapi_jwt.verify", side_effect=CodeApiJwtError("expired")),
):
mock_settings.auth_enabled = True
mock_settings.codeapi_jwt_enabled = True
mock_settings.codeapi_jwt_trust_tenant_id = False
mock_settings.max_file_size_mb = 10

# Bypass still returns True; we don't 401 on a bypassed request.
assert security_middleware._should_skip_auth(request, scope) is True

# No user_id extracted, but anonymous state still seeded.
assert "user_id" not in scope["state"]
assert scope["state"]["api_key_hash"] == "anonymous"

@pytest.mark.asyncio
async def test_bypass_with_no_identity_signals_stays_anonymous(self, security_middleware):
"""No JWT, no User-Id header: bypass leaves user_id unset.
Orchestrator will then treat the request as truly anonymous (no
session reuse, no file mounting). Documents the behavior."""
from unittest.mock import patch as _patch

request = MagicMock()
request.url.path = "/exec"
request.method = "POST"
request.client.host = "10.0.0.5"
request.headers.get = lambda name, default=None: {
"authorization": None,
"user-id": None,
"x-user-id": None,
}.get(name, default)
scope: dict = {}

import ipaddress

security_middleware._trusted_networks = [ipaddress.ip_network("10.0.0.0/8")]

with _patch("src.middleware.security.settings") as mock_settings:
mock_settings.auth_enabled = True
mock_settings.codeapi_jwt_enabled = False
mock_settings.max_file_size_mb = 10

assert security_middleware._should_skip_auth(request, scope) is True

assert "user_id" not in scope["state"]
Loading