diff --git a/src/middleware/security.py b/src/middleware/security.py index 29db3d7..515cffa 100644 --- a/src/middleware/security.py +++ b/src/middleware/security.py @@ -188,6 +188,12 @@ def _should_skip_auth(self, request: Request, scope: dict) -> bool: For trusted-network and disabled-auth bypasses we seed scope state with anonymous markers so downstream code that reads ``request.state.api_key_hash`` / ``is_env_key`` does not raise. + We *also* attempt best-effort identity extraction (verified JWT + sub, then User-Id header) so file-ownership checks in the + orchestrator still work when the request is bypassed but the + caller did identify itself. Without this, LibreChat bash_tool + calls from a CIDR-trusted pod can't reach the user's uploaded + files because we have no user_id to match against. """ path = request.url.path is_admin_path = path.startswith("/api/v1/admin") or path.startswith("/admin-dashboard") @@ -203,12 +209,14 @@ def _should_skip_auth(self, request: Request, scope: dict) -> bool: # Trusted-network bypass — only applies to user-facing paths. if self._trusted_networks and self._is_trusted_network(request): self._grant_anonymous_access(scope) + self._extract_best_effort_identity(request, scope) return True # Operator-controlled bypass for trusted-boundary deployments # (e.g. mTLS sidecar, VPC ingress). Never applies to admin paths. if not settings.auth_enabled: self._grant_anonymous_access(scope) + self._extract_best_effort_identity(request, scope) return True return False @@ -230,6 +238,66 @@ def _grant_anonymous_access(scope: dict) -> None: scope_state.setdefault("is_env_key", False) scope["state"] = scope_state + def _extract_best_effort_identity(self, request: Request, scope: dict) -> None: + """Populate ``scope.state.user_id`` from any identity signal present. + + Even when auth is bypassed (CIDR trust, AUTH_ENABLED=false) we + still want to know *who* is calling, so the orchestrator's + cross-user file-isolation checks can find the user's session. + Without an identity, every bypassed request looks like a brand- + new anonymous user and prior uploads become unreachable. + + Sources, in order: + 1. ``Authorization: Bearer `` — verified if CODEAPI_JWT + is enabled. Failures here are non-fatal (the bypass already + allowed the request); we just log and continue without + user_id. + 2. ``User-Id`` / ``X-User-Id`` header — unsigned, only trusted + because the bypass already trusted the network boundary + that delivered the request. + """ + scope_state = scope.get("state") or {} + if scope_state.get("user_id"): + return # already set by a prior helper + + # JWT path + if settings.codeapi_jwt_enabled: + jwt_token = self._extract_bearer_jwt(request) + if jwt_token: + from ..services.codeapi_jwt import ( + CodeApiJwtConfigurationError, + CodeApiJwtError, + verify, + ) + + try: + claims = verify(jwt_token) + scope_state["user_id"] = claims.sub + scope_state["auth_principal_source"] = "codeapi_jwt_bypassed" + if claims.tenant_id and settings.codeapi_jwt_trust_tenant_id: + scope_state["tenant_id"] = claims.tenant_id + scope["state"] = scope_state + return + except CodeApiJwtConfigurationError as exc: + # Operator told us JWT is on but didn't configure a key. + # In bypass mode we can't 500 — log loudly and fall back. + logger.error( + "CodeAPI JWT misconfigured (auth bypassed, identity unknown)", + error=str(exc), + ) + except CodeApiJwtError as exc: + logger.info( + "CodeAPI JWT rejected during bypass (continuing anonymous)", + error=str(exc), + ) + + # Header path + header_user_id = request.headers.get("user-id") or request.headers.get("x-user-id") + if header_user_id: + scope_state["user_id"] = header_user_id + scope_state.setdefault("auth_principal_source", "header_bypassed") + scope["state"] = scope_state + @staticmethod def _parse_trusted_networks(raw: str) -> list[ipaddress.IPv4Network | ipaddress.IPv6Network]: """Parse comma-separated CIDR strings into network objects.""" diff --git a/tests/unit/test_security_middleware.py b/tests/unit/test_security_middleware.py index 259bf42..e89dece 100644 --- a/tests/unit/test_security_middleware.py +++ b/tests/unit/test_security_middleware.py @@ -838,3 +838,203 @@ async def send(message): start = sent[0] assert start["type"] == "http.response.start" assert start["status"] == 401 + + +class TestBypassWithIdentityExtraction: + """When auth is bypassed (CIDR trust / AUTH_ENABLED=false), we still + populate scope.state.user_id from any identity signal present. + + Without this, LibreChat bash_tool calls from a CIDR-trusted pod reach + a "fresh" execution session every time because the orchestrator's + cross-user file-isolation needs a user_id to match against the upload + session. The bypass keeps the user OUT of the auth queue; the identity + extraction keeps the user IN the session-ownership graph. + """ + + @pytest.mark.asyncio + async def test_trusted_network_bypass_extracts_jwt_sub(self, security_middleware): + """CIDR-trusted call carrying a valid JWT: bypass auth-key check + BUT still extract sub for ownership.""" + from unittest.mock import patch as _patch + + from src.services.codeapi_jwt import JwtClaims + + request = MagicMock() + request.url.path = "/exec" + request.method = "POST" + request.client.host = "10.0.0.5" + request.headers.get = lambda name, default=None: { + "authorization": "Bearer aaaa.bbbb.cccc", + "user-id": None, + "x-user-id": None, + }.get(name, default) + scope: dict = {} + + import ipaddress + + security_middleware._trusted_networks = [ipaddress.ip_network("10.0.0.0/8")] + + with ( + _patch("src.middleware.security.settings") as mock_settings, + _patch( + "src.services.codeapi_jwt.verify", + return_value=JwtClaims( + sub="user-from-jwt", + tenant_id=None, + role=None, + principal_source=None, + jti=None, + ), + ), + ): + mock_settings.auth_enabled = True + mock_settings.codeapi_jwt_enabled = True + mock_settings.codeapi_jwt_trust_tenant_id = False + mock_settings.max_file_size_mb = 10 + + assert security_middleware._should_skip_auth(request, scope) is True + + assert scope["state"]["user_id"] == "user-from-jwt" + assert scope["state"]["auth_principal_source"] == "codeapi_jwt_bypassed" + # Anonymous markers still seeded so metrics don't crash. + assert scope["state"]["api_key_hash"] == "anonymous" + + @pytest.mark.asyncio + async def test_trusted_network_bypass_falls_back_to_user_id_header(self, security_middleware): + """No JWT, but LibreChat /upload sent User-Id: extract that for ownership.""" + from unittest.mock import patch as _patch + + request = MagicMock() + request.url.path = "/upload" + request.method = "POST" + request.client.host = "10.0.0.5" + request.headers.get = lambda name, default=None: { + "authorization": None, + "user-id": "lc-user-42", + "x-user-id": None, + }.get(name, default) + scope: dict = {} + + import ipaddress + + security_middleware._trusted_networks = [ipaddress.ip_network("10.0.0.0/8")] + + with _patch("src.middleware.security.settings") as mock_settings: + mock_settings.auth_enabled = True + mock_settings.codeapi_jwt_enabled = False + mock_settings.max_file_size_mb = 10 + + assert security_middleware._should_skip_auth(request, scope) is True + + assert scope["state"]["user_id"] == "lc-user-42" + assert scope["state"]["auth_principal_source"] == "header_bypassed" + + @pytest.mark.asyncio + async def test_auth_disabled_bypass_extracts_jwt_sub(self, security_middleware): + """AUTH_ENABLED=false + JWT enabled + valid JWT → bypass + identity.""" + from unittest.mock import patch as _patch + + from src.services.codeapi_jwt import JwtClaims + + request = MagicMock() + request.url.path = "/exec" + request.method = "POST" + request.headers.get = lambda name, default=None: { + "authorization": "Bearer aaaa.bbbb.cccc", + "user-id": None, + "x-user-id": None, + }.get(name, default) + scope: dict = {} + + with ( + _patch("src.middleware.security.settings") as mock_settings, + _patch( + "src.services.codeapi_jwt.verify", + return_value=JwtClaims( + sub="user-from-jwt", + tenant_id=None, + role=None, + principal_source=None, + jti=None, + ), + ), + ): + mock_settings.auth_enabled = False + mock_settings.codeapi_jwt_enabled = True + mock_settings.codeapi_jwt_trust_tenant_id = False + mock_settings.auth_trusted_networks = "" + mock_settings.max_file_size_mb = 10 + + assert security_middleware._should_skip_auth(request, scope) is True + + assert scope["state"]["user_id"] == "user-from-jwt" + + @pytest.mark.asyncio + async def test_bypass_with_invalid_jwt_falls_back_to_anonymous(self, security_middleware): + """Bypassed request with an INVALID JWT: don't 401 (bypass already + allowed it), don't set user_id, log and continue.""" + from unittest.mock import patch as _patch + + from src.services.codeapi_jwt import CodeApiJwtError + + request = MagicMock() + request.url.path = "/exec" + request.method = "POST" + request.client.host = "10.0.0.5" + request.headers.get = lambda name, default=None: { + "authorization": "Bearer aaaa.bbbb.cccc", + "user-id": None, + "x-user-id": None, + }.get(name, default) + scope: dict = {} + + import ipaddress + + security_middleware._trusted_networks = [ipaddress.ip_network("10.0.0.0/8")] + + with ( + _patch("src.middleware.security.settings") as mock_settings, + _patch("src.services.codeapi_jwt.verify", side_effect=CodeApiJwtError("expired")), + ): + mock_settings.auth_enabled = True + mock_settings.codeapi_jwt_enabled = True + mock_settings.codeapi_jwt_trust_tenant_id = False + mock_settings.max_file_size_mb = 10 + + # Bypass still returns True; we don't 401 on a bypassed request. + assert security_middleware._should_skip_auth(request, scope) is True + + # No user_id extracted, but anonymous state still seeded. + assert "user_id" not in scope["state"] + assert scope["state"]["api_key_hash"] == "anonymous" + + @pytest.mark.asyncio + async def test_bypass_with_no_identity_signals_stays_anonymous(self, security_middleware): + """No JWT, no User-Id header: bypass leaves user_id unset. + Orchestrator will then treat the request as truly anonymous (no + session reuse, no file mounting). Documents the behavior.""" + from unittest.mock import patch as _patch + + request = MagicMock() + request.url.path = "/exec" + request.method = "POST" + request.client.host = "10.0.0.5" + request.headers.get = lambda name, default=None: { + "authorization": None, + "user-id": None, + "x-user-id": None, + }.get(name, default) + scope: dict = {} + + import ipaddress + + security_middleware._trusted_networks = [ipaddress.ip_network("10.0.0.0/8")] + + with _patch("src.middleware.security.settings") as mock_settings: + mock_settings.auth_enabled = True + mock_settings.codeapi_jwt_enabled = False + mock_settings.max_file_size_mb = 10 + + assert security_middleware._should_skip_auth(request, scope) is True + + assert "user_id" not in scope["state"]