From f0a174a4e6578b5855c7d033ce0db47d89a4a506 Mon Sep 17 00:00:00 2001 From: Snehil Kishore Date: Wed, 15 Oct 2025 10:14:14 +0530 Subject: [PATCH 1/9] feat: Add DPoP Support --- .gitignore | 1 + README.md | 131 +++++++++++++++++++++- fastapi_plugin/fast_api_client.py | 62 +++++++--- fastapi_plugin/test_utils.py | 180 +++++++++++++++++++++++++++++- fastapi_plugin/utils.py | 108 +++++++++++++++++- tests/test_fast_api_client.py | 102 +++++++++++++++-- 6 files changed, 547 insertions(+), 37 deletions(-) diff --git a/.gitignore b/.gitignore index fe90143..4825a42 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,7 @@ server.py setup.py test.py test-script.py +integration_test*.py .coverage coverage.xml diff --git a/README.md b/README.md index 64078c1..b2b7a78 100644 --- a/README.md +++ b/README.md @@ -42,9 +42,78 @@ auth0 = Auth0FastAPI( ) ``` +#### DPoP Configuration (Optional) + +The SDK supports DPoP (Demonstrating Proof-of-Possession) for enhanced security: + +```python +# Mixed mode - accepts both Bearer and DPoP (recommended for migration) +auth0 = Auth0FastAPI( + domain="", + audience="", + dpop_enabled=True, # Enable DPoP support + dpop_required=False # Allow Bearer tokens too +) + +# DPoP-only mode - rejects Bearer tokens +auth0 = Auth0FastAPI( + domain="", + audience="", + dpop_enabled=True, + dpop_required=True # Only accept DPoP tokens +) + +# Custom DPoP timing configuration +auth0 = Auth0FastAPI( + domain="", + audience="", + dpop_enabled=True, + dpop_iat_leeway=30, # Clock skew tolerance (seconds) + dpop_iat_offset=300 # Maximum DPoP proof age (seconds) +) +``` + +#### Reverse Proxy Configuration + +When deploying behind a reverse proxy (nginx, AWS ALB, etc.), you **must** enable proxy trust for DPoP validation to work correctly: + +```python +from fastapi import FastAPI +from fastapi_plugin import Auth0FastAPI + +app = FastAPI() + +# CRITICAL: Enable proxy trust when behind a reverse proxy +app.state.trust_proxy = True + +auth0 = Auth0FastAPI( + domain="", + audience="", + dpop_enabled=True +) +``` + +**Why this matters:** +- DPoP validation requires matching the exact URL the client used +- Behind a proxy, your app sees internal URLs (e.g., `http://localhost:8000/api`) +- The client's DPoP proof contains the public URL (e.g., `https://api.example.com/api`) +- Without `trust_proxy=True`, validation will fail + +**Note:** Only enable `trust_proxy=True` when your app is actually behind a trusted reverse proxy. Never enable this for direct internet-facing deployments, as it would allow header injection attacks. + +**Nginx Configuration Example:** +```nginx +location /api { + proxy_pass http://backend:8000; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_set_header X-Forwarded-Host $host; + proxy_set_header X-Forwarded-Prefix /api; +} +``` + ### 3. Protecting API Routes -To protect a FastAPI route, use the `require_auth(...)` dependency. Any incoming requests must include a valid Bearer token (JWT) in the `Authorization` header, or they will receive an error response (e.g., 401 Unauthorized). +To protect a FastAPI route, use the `require_auth(...)` dependency. The SDK automatically detects and validates both Bearer and DPoP authentication schemes. Any incoming requests must include a valid token in the `Authorization` header, or they will receive an error response (e.g., 401 Unauthorized). ```python @app.get("/protected-api") @@ -58,9 +127,18 @@ async def protected( #### How It Works +**Bearer Authentication:** 1. The user sends a request with `Authorization: Bearer `. -2. The SDK parses the token, checks signature via Auth0’s JWKS, validates standard claims like `iss`, `aud`, `exp`, etc. -3. If valid, the route receives the decoded claims as a Python dict (e.g. `{"sub": "user123", "scope": "read:stuff", ...}`). +2. The SDK parses the token, checks signature via Auth0's JWKS, validates standard claims like `iss`, `aud`, `exp`, etc. +3. If valid, the route receives the decoded claims as a Python dict. + +**DPoP Authentication:** +1. The user sends a request with `Authorization: DPoP ` and `DPoP: ` headers. +2. The SDK validates both the access token and the DPoP proof, including cryptographic binding. +3. DPoP provides enhanced security through proof-of-possession of private keys. +4. If valid, the route receives the decoded claims as a Python dict. + +The SDK automatically detects which authentication scheme is being used and validates accordingly. > [!IMPORTANT] > This method protects API endpoints using bearer tokens. It does not **create or manage user sessions in server-side rendering scenarios**. For session-based usage, consider a separate library or approach. @@ -134,7 +212,52 @@ def test_public_route(): -### 5. Get an access token for a connection +### 5. DPoP Authentication + +> **Note**: DPoP is currently in [Early Access](https://auth0.com/docs/troubleshoot/product-lifecycle/product-release-stages). Contact Auth0 support to enable it for your tenant. + +DPoP (Demonstrating Proof-of-Possession) provides enhanced security by binding access tokens to cryptographic proof of possession. + +#### Client Requirements + +To use DPoP authentication, clients must: + +1. **Generate an ES256 key pair** for DPoP proof signing +2. **Include two headers** in requests: + - `Authorization: DPoP ` - The DPoP-bound access token + - `DPoP: ` - The DPoP proof JWT + +#### Example DPoP Request + +```http +GET /protected-api HTTP/1.1 +Host: api.example.com +Authorization: DPoP eyJ0eXAiOiJKV1Q... +DPoP: eyJ0eXAiOiJkcG9wK2p3dC... +``` + +#### Migration Strategy + +Use mixed mode for gradual migration: + +```python +# Start with mixed mode to support both Bearer and DPoP +auth0 = Auth0FastAPI( + domain="", + audience="", + dpop_enabled=True, + dpop_required=False # Allows both Bearer and DPoP +) + +# Later, enforce DPoP-only +auth0 = Auth0FastAPI( + domain="", + audience="", + dpop_required=True # Rejects Bearer tokens +) +``` + +### 6. Get an access token for a connection If you need to get an access token for an upstream idp via a connection, you can use the `get_access_token_for_connection` method on the underlying api_client: diff --git a/fastapi_plugin/fast_api_client.py b/fastapi_plugin/fast_api_client.py index 3da89eb..69bb458 100644 --- a/fastapi_plugin/fast_api_client.py +++ b/fastapi_plugin/fast_api_client.py @@ -2,9 +2,9 @@ from fastapi import Request, HTTPException from starlette.responses import Response -from .utils import get_bearer_token, validate_scopes, http_exception +from .utils import validate_scopes, http_exception, get_canonical_url -from auth0_api_python.api_client import ApiClient, ApiClientOptions, VerifyAccessTokenError +from auth0_api_python.api_client import ApiClient, ApiClientOptions, BaseAuthError class Auth0FastAPI: @@ -13,11 +13,25 @@ class Auth0FastAPI: mirroring the concept from TypeScript's Fastify plugin. """ - def __init__(self, domain: str, audience: str, client_id=None, client_secret=None, custom_fetch=None): + def __init__( + self, + domain: str, + audience: str, + client_id=None, + client_secret=None, + custom_fetch=None, + dpop_enabled=True, + dpop_required=False, + dpop_iat_leeway=30, + dpop_iat_offset=300): """ domain: Your Auth0 domain (like 'my-tenant.us.auth0.com') audience: API identifier from the Auth0 Dashboard custom_fetch: optional HTTP fetch override for the underlying SDK + dpop_enabled: Enable DPoP support (default: True) + dpop_required: Require DPoP authentication, reject Bearer tokens (default: False) + dpop_iat_leeway: Clock skew tolerance for DPoP proof iat claim in seconds (default: 30) + dpop_iat_offset: Maximum DPoP proof age in seconds (default: 300) """ if not domain: raise ValueError("domain is required.") @@ -25,7 +39,17 @@ def __init__(self, domain: str, audience: str, client_id=None, client_secret=Non raise ValueError("audience is required.") self.api_client = ApiClient( - ApiClientOptions(domain=domain, audience=audience, client_id=client_id, client_secret=client_secret, custom_fetch=custom_fetch) + ApiClientOptions( + domain=domain, + audience=audience, + client_id=client_id, + client_secret=client_secret, + custom_fetch=custom_fetch, + dpop_enabled=dpop_enabled, + dpop_required=dpop_required, + dpop_iat_leeway=dpop_iat_leeway, + dpop_iat_offset=dpop_iat_offset + ) ) def require_auth( @@ -34,27 +58,31 @@ def require_auth( ): """ Returns an async FastAPI dependency that: - 1) Extracts a 'Bearer' token from the Authorization header - 2) Verifies it with auth0-api-python + 1) Uses the unified verify_request() method to auto-detect Bearer or DPoP authentication + 2) Verifies the request with auth0-api-python including full HTTP context 3) If 'scopes' is provided, checks for them in the token's 'scope' claim 4) Raises HTTPException on error 5) On success, returns the decoded claims """ async def _dependency(request: Request) -> Dict: - token = get_bearer_token(request) - if not token: - # No Authorization provided + try: + claims = await self.api_client.verify_request( + headers=dict(request.headers), + http_method=request.method, + http_url=get_canonical_url(request) + ) + except BaseAuthError as e: raise http_exception( - 400, - "invalid_request", - "No Authorization provided" + status_code=e.get_status_code(), + error=e.get_error_code(), + error_desc=e.get_error_description(), + headers=e.get_headers() ) - try: - claims = await self.api_client.verify_access_token(access_token=token) - except VerifyAccessTokenError as e: + except Exception as e: + # Handle any unexpected errors raise http_exception( - status_code=401, - error="invalid_token", + status_code=400, + error="invalid_request", error_desc=str(e) ) diff --git a/fastapi_plugin/test_utils.py b/fastapi_plugin/test_utils.py index 01199de..1fe7232 100644 --- a/fastapi_plugin/test_utils.py +++ b/fastapi_plugin/test_utils.py @@ -1,10 +1,12 @@ import time +import hashlib +import base64 +import secrets from typing import Optional, Dict, Any, Union from authlib.jose import JsonWebKey, jwt -# A private RSA JWK for test usage. - +# A private RSA JWK for test usage (Bearer tokens). PRIVATE_JWK = { "kty": "RSA", "alg": "RS256", @@ -20,6 +22,29 @@ "qi": "iYltkV_4PmQDfZfGFpzn2UtYEKyhy-9t3Vy8Mw2VHLAADKGwJvVK5ficQAr2atIF1-agXY2bd6KV-w52zR8rmZfTr0gobzYIyqHczOm13t7uXJv2WygY7QEC2OGjdxa2Fr9RnvS99ozMa5nomZBqTqT7z5QV33czjPRCjvg6FcE", } +# A private EC P-256 JWK for DPoP test usage. +PRIVATE_DPOP_JWK = { + "kty": "EC", + "alg": "ES256", + "use": "sig", + "kid": "DPOP_TEST_KEY", + "crv": "P-256", + "x": "WKn-ZIGevcwGIyyrzFoZNBdaq9_TsqzGHwHitJBcBmXdA_x4ySJOjY_1WykDKVf_", + "y": "Hkwp7nOHFcFancWLb-AmIYhZaUO_6-DoV0oNNXLgr-M", + "d": "Hndv7ZZjs_ke8o9zXYo3iq-Yr8SewI5vrqd0pAvqPqg" +} + +# Public counterpart for JWKS +PUBLIC_DPOP_JWK = { + "kty": "EC", + "alg": "ES256", + "use": "sig", + "kid": "DPOP_TEST_KEY", + "crv": "P-256", + "x": "WKn-ZIGevcwGIyyrzFoZNBdaq9_TsqzGHwHitJBcBmXdA_x4ySJOjY_1WykDKVf_", + "y": "Hkwp7nOHFcFancWLb-AmIYhZaUO_6-DoV0oNNXLgr-M" +} + async def generate_token( domain: str, @@ -30,6 +55,7 @@ async def generate_token( exp: bool = True, claims: Optional[Dict[str, Any]] = None, expiration_time: int = 3600, + token_type: str = "bearer" ) -> str: """ Generates a real RS256-signed JWT using the private key above. @@ -46,6 +72,7 @@ async def generate_token( exp: Whether to set the 'exp' claim. If False, skip it. claims: Additional custom claims to merge into the token. expiration_time: If exp is True, how many seconds from now until expiration. + token_type: "bearer" for regular tokens, "dpop" for DPoP-bound tokens Returns: A RS256-signed JWT string. @@ -65,10 +92,157 @@ async def generate_token( if audience: token_claims["aud"] = audience - + + # Add DPoP binding for DPoP tokens + if token_type == "dpop": + jkt = calculate_jwk_thumbprint(PRIVATE_DPOP_JWK) + token_claims["cnf"] = {"jkt": jkt} key = JsonWebKey.import_key(PRIVATE_JWK) + header = {"alg": "RS256", "kid": PRIVATE_JWK["kid"]} + token = jwt.encode(header, token_claims, key) + return token + +def base64url_encode(data: bytes) -> str: + """Base64URL encode without padding.""" + return base64.urlsafe_b64encode(data).decode('ascii').rstrip('=') +def sha256_hash(data: str) -> str: + """SHA256 hash and base64url encode.""" + return base64url_encode(hashlib.sha256(data.encode('utf-8')).digest()) + +def generate_jti() -> str: + """Generate a random JTI (JWT ID) for DPoP proof.""" + return base64url_encode(secrets.token_bytes(16)) + +def calculate_jwk_thumbprint(jwk_dict: Dict[str, Any]) -> str: + """Calculate JWK thumbprint for DPoP proof.""" + # For EC P-256 keys, thumbprint is calculated from crv, kty, x, y + thumbprint_jwk = { + "crv": jwk_dict["crv"], + "kty": jwk_dict["kty"], + "x": jwk_dict["x"], + "y": jwk_dict["y"] + } + # Sort keys and create JSON string + import json + canonical_json = json.dumps(thumbprint_jwk, sort_keys=True, separators=(',', ':')) + return base64url_encode(hashlib.sha256(canonical_json.encode('utf-8')).digest()) + +async def generate_dpop_proof( + http_method: str, + http_url: str, + access_token: Optional[str] = None, + nonce: Optional[str] = None, + iat_offset: int = 0 +) -> str: + """ + Generate a DPoP proof JWT for testing. + + Args: + http_method: HTTP method (GET, POST, etc.) + http_url: Full HTTP URL + access_token: Access token to bind (for ath claim) + nonce: Server nonce for DPoP proof + iat_offset: Offset for iat claim (for testing expired proofs) + + Returns: + DPoP proof JWT string + """ + current_time = int(time.time()) + iat_offset + jti = generate_jti() + + # Calculate JWK thumbprint for jkt claim + jkt = calculate_jwk_thumbprint(PRIVATE_DPOP_JWK) + + # DPoP proof claims + proof_claims = { + "jti": jti, + "htm": http_method.upper(), + "htu": http_url, + "iat": current_time, + "jkt": jkt + } + + # Add access token hash if provided + if access_token: + proof_claims["ath"] = sha256_hash(access_token) + + # Add nonce if provided + if nonce: + proof_claims["nonce"] = nonce + + # Create header with public key + header = { + "alg": "ES256", + "typ": "dpop+jwt", + "jwk": { + "kty": PUBLIC_DPOP_JWK["kty"], + "crv": PUBLIC_DPOP_JWK["crv"], + "x": PUBLIC_DPOP_JWK["x"], + "y": PUBLIC_DPOP_JWK["y"] + } + } + + # Sign with private key + key = JsonWebKey.import_key(PRIVATE_DPOP_JWK) + proof_jwt = jwt.encode(header, proof_claims, key) + return proof_jwt + +async def generate_dpop_bound_token( + domain: str, + user_id: str, + audience: Optional[str] = None, + issuer: Union[str, bool, None] = None, + iat: bool = True, + exp: bool = True, + claims: Optional[Dict[str, Any]] = None, + expiration_time: int = 3600, + cnf_jkt: Optional[str] = None +) -> str: + """ + Generate a DPoP-bound access token for testing. + Similar to generate_token but includes cnf claim with jkt. + + Args: + domain: Auth0 domain + user_id: Subject claim + audience: Audience claim + issuer: Issuer claim + iat: Include iat claim + exp: Include exp claim + claims: Additional claims + expiration_time: Token expiration in seconds + cnf_jkt: JWK thumbprint for confirmation claim (auto-calculated if None) + + Returns: + DPoP-bound access token JWT string + """ + token_claims = dict(claims or {}) + token_claims.setdefault("sub", user_id) + + if iat: + token_claims["iat"] = int(time.time()) + + if exp: + token_claims["exp"] = int(time.time()) + expiration_time + + if issuer is not False: + token_claims["iss"] = issuer if isinstance(issuer, str) else f"https://{domain}/" + + if audience: + token_claims["aud"] = audience + + # Add DPoP binding + if cnf_jkt is None: + cnf_jkt = calculate_jwk_thumbprint(PRIVATE_DPOP_JWK) + + token_claims["cnf"] = { + "jkt": cnf_jkt + } + + # Sign with RS256 (access tokens are still RS256, only DPoP proofs are ES256) + key = JsonWebKey.import_key(PRIVATE_JWK) header = {"alg": "RS256", "kid": PRIVATE_JWK["kid"]} token = jwt.encode(header, token_claims, key) return token diff --git a/fastapi_plugin/utils.py b/fastapi_plugin/utils.py index dda119a..4c858d5 100644 --- a/fastapi_plugin/utils.py +++ b/fastapi_plugin/utils.py @@ -1,6 +1,7 @@ from typing import Optional, List, Union, Dict from fastapi import Request, HTTPException from starlette.responses import Response +from urllib.parse import urlparse, urlunparse @@ -8,6 +9,9 @@ def get_bearer_token(request: Request) -> Optional[str]: """ Parse 'Authorization: Bearer ' from the incoming request. Returns the token string or None if missing/invalid. + + DEPRECATED: This function is no longer used in the main auth flow. + The ApiClient.verify_request() method handles token extraction automatically. """ auth_header = request.headers.get("authorization") if not auth_header: @@ -18,14 +22,26 @@ def get_bearer_token(request: Request) -> Optional[str]: return parts[1] return None + +# ===== ACTIVE UTILITY FUNCTIONS ===== + def http_exception( status_code: int, error: str, - error_desc: str + error_desc: str, + headers: Optional[Dict[str, str]] = None ) -> HTTPException: """ - Construct an HTTPException with a 'WWW-Authenticate' header - mimicking the style from the TypeScript example. + Construct an HTTPException with appropriate headers. + + Args: + status_code: HTTP status code + error: OAuth2/DPoP error code + error_desc: Human-readable error description + headers: Optional headers dict (e.g., from BaseAuthError.get_headers()) + + Note: When used with BaseAuthError, pass the headers from get_headers() + to ensure correct WWW-Authenticate challenges are included. """ return HTTPException( status_code=status_code, @@ -33,11 +49,91 @@ def http_exception( "error": error, "error_description": error_desc }, - headers={ - "WWW-Authenticate": f'Bearer error="{error}", error_description="{error_desc}"' - } + headers=headers or {} ) +def _should_trust_proxy(request: Request) -> bool: + """ + Determines if X-Forwarded-* headers should be trusted. + + Returns: + bool: True if proxy headers should be trusted + """ + # Check if the app has explicitly enabled proxy trust + try: + return getattr(request.app.state, 'trust_proxy', False) + except AttributeError: + # If app.state doesn't exist or trust_proxy isn't set, don't trust + return False + +def _parse_forwarded_host(forwarded_host: Optional[str]) -> Optional[str]: + """ + Parses X-Forwarded-Host header, handling multiple comma-separated values. + + Args: + forwarded_host: Value of X-Forwarded-Host header + + Returns: + The first host value, or None if empty + """ + if not forwarded_host: + return None + + # Handle comma-separated values (multiple proxies) + comma_index = forwarded_host.find(',') + if comma_index != -1: + forwarded_host = forwarded_host[:comma_index].rstrip() + + return forwarded_host.strip() or None + +def get_canonical_url(request: Request) -> str: + """ + Constructs the canonical URL for DPoP validation, securely handling reverse proxy headers. + + Args: + request: FastAPI/Starlette Request object + + Returns: + Canonical URL string matching what the client used + + """ + parsed = urlparse(str(request.url)) + + # Default to direct request values + scheme = parsed.scheme + netloc = parsed.netloc + path = parsed.path + + # Only process X-Forwarded headers if proxy is trusted + if _should_trust_proxy(request): + # X-Forwarded-Proto: Override scheme if present + forwarded_proto = request.headers.get("x-forwarded-proto") + if forwarded_proto: + scheme = forwarded_proto.strip().lower() + + # X-Forwarded-Host: Override host, handling multiple proxies + forwarded_host = request.headers.get("x-forwarded-host") + parsed_host = _parse_forwarded_host(forwarded_host) + if parsed_host: + netloc = parsed_host + + # X-Forwarded-Prefix: Prepend path prefix + forwarded_prefix = request.headers.get("x-forwarded-prefix", "").strip() + if forwarded_prefix: + # Remove trailing slash from prefix to avoid double slashes + path = forwarded_prefix.rstrip("/") + path + + canonical_url = urlunparse(( + scheme, + netloc, + path, + parsed.params, + parsed.query, + "" # No fragment in DPoP htu claim + )) + + return canonical_url + def validate_scopes(claims: Dict, required_scopes: List[str]) -> bool: """ Verifies the 'scope' claim (a space-delimited string) includes all required_scopes. diff --git a/tests/test_fast_api_client.py b/tests/test_fast_api_client.py index 13800e9..46bf19c 100644 --- a/tests/test_fast_api_client.py +++ b/tests/test_fast_api_client.py @@ -4,7 +4,7 @@ from fastapi.testclient import TestClient from fastapi_plugin.fast_api_client import Auth0FastAPI -from fastapi_plugin.test_utils import generate_token +from fastapi_plugin.test_utils import generate_token, generate_dpop_proof, generate_dpop_bound_token, PUBLIC_DPOP_JWK @pytest.mark.asyncio @@ -57,7 +57,8 @@ async def test_should_return_200_when_valid_token(httpx_mock: HTTPXMock): "e": "AQAB", "alg": "RS256", "use": "sig" - } + }, + PUBLIC_DPOP_JWK ] } ) @@ -115,7 +116,8 @@ async def test_should_return_401_when_no_iss(httpx_mock: HTTPXMock): "e": "AQAB", "alg": "RS256", "use": "sig" - } + }, + PUBLIC_DPOP_JWK ] } ) @@ -175,7 +177,8 @@ async def test_should_return_401_when_invalid_iss(httpx_mock: HTTPXMock): "e": "AQAB", "alg": "RS256", "use": "sig" - } + }, + PUBLIC_DPOP_JWK ] } ) @@ -229,7 +232,8 @@ async def test_should_return_401_when_no_exp(httpx_mock: HTTPXMock): "e": "AQAB", "alg": "RS256", "use": "sig" - } + }, + PUBLIC_DPOP_JWK ] } ) @@ -281,7 +285,8 @@ async def test_should_return_403_when_invalid_scope(httpx_mock: HTTPXMock): "e": "AQAB", "alg": "RS256", "use": "sig" - } + }, + PUBLIC_DPOP_JWK ] } ) @@ -325,4 +330,87 @@ def test_auth0fastapi_accepts_client_id_and_secret(): ) options = auth0.api_client.options assert options.client_id == client_id - assert options.client_secret == client_secret \ No newline at end of file + assert options.client_secret == client_secret + +def test_auth0fastapi_accepts_dpop_configuration(): + """Test that Auth0FastAPI accepts DPoP configuration parameters.""" + auth0 = Auth0FastAPI( + domain="auth0.local", + audience="test_audience", + dpop_enabled=True, + dpop_required=True, + dpop_iat_leeway=60, + dpop_iat_offset=600 + ) + options = auth0.api_client.options + assert options.dpop_enabled == True + assert options.dpop_required == True + assert options.dpop_iat_leeway == 60 + assert options.dpop_iat_offset == 600 + +@pytest.mark.asyncio +async def test_should_return_200_with_dpop_authentication(httpx_mock: HTTPXMock): + """ + Test successful DPoP authentication with proper DPoP proof and bound access token. + """ + # Mock OIDC discovery and JWKS with both RSA and EC keys + httpx_mock.add_response( + method="GET", + url="https://auth0.local/.well-known/openid-configuration", + json={ + "issuer": "https://auth0.local/", + "jwks_uri": "https://auth0.local/.well-known/jwks.json" + } + ) + httpx_mock.add_response( + method="GET", + url="https://auth0.local/.well-known/jwks.json", + json={ + "keys": [ + { + "kty": "RSA", + "kid": "TEST_KEY", + "n": "whYOFK2Ocbbpb_zVypi9SeKiNUqKQH0zTKN1-6fpCTu6ZalGI82s7XK3tan4dJt90ptUPKD2zvxqTzFNfx4HHHsrYCf2-FMLn1VTJfQazA2BvJqAwcpW1bqRUEty8tS_Yv4hRvWfQPcc2Gc3-_fQOOW57zVy-rNoJc744kb30NjQxdGp03J2S3GLQu7oKtSDDPooQHD38PEMNnITf0pj-KgDPjymkMGoJlO3aKppsjfbt_AH6GGdRghYRLOUwQU-h-ofWHR3lbYiKtXPn5dN24kiHy61e3VAQ9_YAZlwXC_99GGtw_NpghFAuM4P1JDn0DppJldy3PGFC0GfBCZASw", + "e": "AQAB", + "alg": "RS256", + "use": "sig" + }, + PUBLIC_DPOP_JWK + ] + } + ) + + # Generate DPoP-bound access token + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + # Generate DPoP proof for the request + dpop_proof = await generate_dpop_proof( + http_method="GET", + http_url="http://testserver/test", + access_token=access_token + ) + + app = FastAPI() + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return "OK" + + client = TestClient(app) + + response = client.get( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": dpop_proof + } + ) + + assert response.status_code == 200 + assert response.text == '"OK"' \ No newline at end of file From 54fe8edbde02a588d2b6f5c9a4b4898b56437787 Mon Sep 17 00:00:00 2001 From: Snehil Kishore Date: Wed, 15 Oct 2025 16:35:56 +0530 Subject: [PATCH 2/9] tests: add authentication tests --- fastapi_plugin/test_utils.py | 28 +- tests/test_configuration_modes.py | 339 +++++++++++++++++++++++ tests/test_dpop_authentication.py | 341 +++++++++++++++++++++++ tests/test_fast_api_client.py | 1 - tests/test_reverse_proxy.py | 432 ++++++++++++++++++++++++++++++ 5 files changed, 1127 insertions(+), 14 deletions(-) create mode 100644 tests/test_configuration_modes.py create mode 100644 tests/test_dpop_authentication.py create mode 100644 tests/test_reverse_proxy.py diff --git a/fastapi_plugin/test_utils.py b/fastapi_plugin/test_utils.py index 1fe7232..a996bea 100644 --- a/fastapi_plugin/test_utils.py +++ b/fastapi_plugin/test_utils.py @@ -25,13 +25,10 @@ # A private EC P-256 JWK for DPoP test usage. PRIVATE_DPOP_JWK = { "kty": "EC", - "alg": "ES256", - "use": "sig", - "kid": "DPOP_TEST_KEY", "crv": "P-256", - "x": "WKn-ZIGevcwGIyyrzFoZNBdaq9_TsqzGHwHitJBcBmXdA_x4ySJOjY_1WykDKVf_", - "y": "Hkwp7nOHFcFancWLb-AmIYhZaUO_6-DoV0oNNXLgr-M", - "d": "Hndv7ZZjs_ke8o9zXYo3iq-Yr8SewI5vrqd0pAvqPqg" + "x": "MKBCTNIcKUSDii11ySs3526iDZ8AiTo7Tu6KPAqv7D4", + "y": "4Etl6SRW2YiLUrN5vfvVHuhp7x8PxltmWWlbbM4IFyM", + "d": "870MB6gfuTJ4HtUnUvYMyJpr5eUZNP4Bk43bVdj3eAE" } # Public counterpart for JWKS @@ -41,8 +38,8 @@ "use": "sig", "kid": "DPOP_TEST_KEY", "crv": "P-256", - "x": "WKn-ZIGevcwGIyyrzFoZNBdaq9_TsqzGHwHitJBcBmXdA_x4ySJOjY_1WykDKVf_", - "y": "Hkwp7nOHFcFancWLb-AmIYhZaUO_6-DoV0oNNXLgr-M" + "x": "MKBCTNIcKUSDii11ySs3526iDZ8AiTo7Tu6KPAqv7D4", + "y": "4Etl6SRW2YiLUrN5vfvVHuhp7x8PxltmWWlbbM4IFyM" } @@ -101,15 +98,18 @@ async def generate_token( key = JsonWebKey.import_key(PRIVATE_JWK) header = {"alg": "RS256", "kid": PRIVATE_JWK["kid"]} token = jwt.encode(header, token_claims, key) - return token + # Ensure we return a string, not bytes + return token.decode('utf-8') if isinstance(token, bytes) else token def base64url_encode(data: bytes) -> str: """Base64URL encode without padding.""" return base64.urlsafe_b64encode(data).decode('ascii').rstrip('=') -def sha256_hash(data: str) -> str: +def sha256_hash(data: Union[str, bytes]) -> str: """SHA256 hash and base64url encode.""" - return base64url_encode(hashlib.sha256(data.encode('utf-8')).digest()) + if isinstance(data, str): + data = data.encode('utf-8') + return base64url_encode(hashlib.sha256(data).digest()) def generate_jti() -> str: """Generate a random JTI (JWT ID) for DPoP proof.""" @@ -187,7 +187,8 @@ async def generate_dpop_proof( # Sign with private key key = JsonWebKey.import_key(PRIVATE_DPOP_JWK) proof_jwt = jwt.encode(header, proof_claims, key) - return proof_jwt + # Ensure we return a string, not bytes + return proof_jwt.decode('utf-8') if isinstance(proof_jwt, bytes) else proof_jwt async def generate_dpop_bound_token( domain: str, @@ -245,4 +246,5 @@ async def generate_dpop_bound_token( key = JsonWebKey.import_key(PRIVATE_JWK) header = {"alg": "RS256", "kid": PRIVATE_JWK["kid"]} token = jwt.encode(header, token_claims, key) - return token + # Ensure we return a string, not bytes + return token.decode('utf-8') if isinstance(token, bytes) else token diff --git a/tests/test_configuration_modes.py b/tests/test_configuration_modes.py new file mode 100644 index 0000000..7171caa --- /dev/null +++ b/tests/test_configuration_modes.py @@ -0,0 +1,339 @@ +""" +Tests for DPoP configuration modes (enabled/required/disabled). +Tests the different operational modes of the Auth0FastAPI middleware. +""" +import pytest +from fastapi import FastAPI, Depends +from pytest_httpx import HTTPXMock +from fastapi.testclient import TestClient + +from fastapi_plugin.fast_api_client import Auth0FastAPI +from fastapi_plugin.test_utils import ( + generate_token, + generate_dpop_proof, + generate_dpop_bound_token, + PUBLIC_DPOP_JWK +) + + +def setup_mocks(httpx_mock: HTTPXMock): + """Setup common OIDC and JWKS mocks.""" + httpx_mock.add_response( + method="GET", + url="https://auth0.local/.well-known/openid-configuration", + json={ + "issuer": "https://auth0.local/", + "jwks_uri": "https://auth0.local/.well-known/jwks.json" + } + ) + httpx_mock.add_response( + method="GET", + url="https://auth0.local/.well-known/jwks.json", + json={ + "keys": [ + { + "kty": "RSA", + "kid": "TEST_KEY", + "n": "whYOFK2Ocbbpb_zVypi9SeKiNUqKQH0zTKN1-6fpCTu6ZalGI82s7XK3tan4dJt90ptUPKD2zvxqTzFNfx4HHHsrYCf2-FMLn1VTJfQazA2BvJqAwcpW1bqRUEty8tS_Yv4hRvWfQPcc2Gc3-_fQOOW57zVy-rNoJc744kb30NjQxdGp03J2S3GLQu7oKtSDDPooQHD38PEMNnITf0pj-KgDPjymkMGoJlO3aKppsjfbt_AH6GGdRghYRLOUwQU-h-ofWHR3lbYiKtXPn5dN24kiHy61e3VAQ9_YAZlwXC_99GGtw_NpghFAuM4P1JDn0DppJldy3PGFC0GfBCZASw", + "e": "AQAB", + "alg": "RS256", + "use": "sig" + }, + PUBLIC_DPOP_JWK + ] + } + ) + + +def test_dpop_configuration_defaults(): + """Test that DPoP configuration has correct defaults.""" + auth0 = Auth0FastAPI(domain="auth0.local", audience="test") + assert auth0.api_client.options.dpop_enabled == True + assert auth0.api_client.options.dpop_required == False + assert auth0.api_client.options.dpop_iat_leeway == 30 + assert auth0.api_client.options.dpop_iat_offset == 300 + + +def test_dpop_disabled_configuration(): + """Test DPoP can be explicitly disabled.""" + auth0 = Auth0FastAPI( + domain="auth0.local", + audience="test", + dpop_enabled=False + ) + assert auth0.api_client.options.dpop_enabled == False + + +def test_dpop_custom_timing_configuration(): + """Test custom DPoP timing parameters.""" + auth0 = Auth0FastAPI( + domain="auth0.local", + audience="test", + dpop_iat_leeway=60, + dpop_iat_offset=600 + ) + assert auth0.api_client.options.dpop_iat_leeway == 60 + assert auth0.api_client.options.dpop_iat_offset == 600 + + +@pytest.mark.asyncio +async def test_dpop_required_mode_rejects_bearer(): + """Test that Bearer tokens are rejected when dpop_required=True.""" + # No need to mock JWKS - request should fail before token validation + bearer_token = await generate_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + app = FastAPI() + auth0 = Auth0FastAPI( + domain="auth0.local", + audience="", + dpop_required=True # DPoP required mode + ) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return "OK" + + client = TestClient(app) + response = client.get( + "/test", + headers={"Authorization": f"Bearer {bearer_token}"} + ) + + assert response.status_code == 400 + json_body = response.json() + assert json_body["detail"]["error"] == "invalid_request" + + +@pytest.mark.asyncio +async def test_dpop_required_mode_accepts_dpop(httpx_mock: HTTPXMock): + """Test that DPoP tokens are accepted when dpop_required=True.""" + setup_mocks(httpx_mock) + + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + dpop_proof = await generate_dpop_proof( + http_method="GET", + http_url="http://testserver/test", + access_token=access_token + ) + + app = FastAPI() + auth0 = Auth0FastAPI( + domain="auth0.local", + audience="", + dpop_required=True + ) + + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return {"user": claims["sub"]} + + client = TestClient(app) + response = client.get( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": dpop_proof + } + ) + + assert response.status_code == 200 + assert response.json()["user"] == "user_123" + + +@pytest.mark.asyncio +async def test_dpop_disabled_mode_rejects_dpop(): + """Test that DPoP tokens are rejected when dpop_enabled=False.""" + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + dpop_proof = await generate_dpop_proof( + http_method="GET", + http_url="http://testserver/test", + access_token=access_token + ) + + app = FastAPI() + auth0 = Auth0FastAPI( + domain="auth0.local", + audience="", + dpop_enabled=False # DPoP disabled + ) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return "OK" + + client = TestClient(app) + response = client.get( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": dpop_proof + } + ) + + assert response.status_code == 400 + json_body = response.json() + assert json_body["detail"]["error"] == "invalid_request" + + +@pytest.mark.asyncio +async def test_mixed_mode_accepts_bearer(httpx_mock: HTTPXMock): + """Test that Bearer tokens are accepted in mixed mode (default).""" + setup_mocks(httpx_mock) + + bearer_token = await generate_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + app = FastAPI() + auth0 = Auth0FastAPI( + domain="auth0.local", + audience="", + dpop_enabled=True, + dpop_required=False # Mixed mode + ) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return {"user": claims["sub"]} + + client = TestClient(app) + response = client.get( + "/test", + headers={"Authorization": f"Bearer {bearer_token}"} + ) + + assert response.status_code == 200 + assert response.json()["user"] == "user_123" + + +@pytest.mark.asyncio +async def test_mixed_mode_accepts_dpop(httpx_mock: HTTPXMock): + """Test that DPoP tokens are accepted in mixed mode (default).""" + setup_mocks(httpx_mock) + + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + dpop_proof = await generate_dpop_proof( + http_method="GET", + http_url="http://testserver/test", + access_token=access_token + ) + + app = FastAPI() + auth0 = Auth0FastAPI( + domain="auth0.local", + audience="", + dpop_enabled=True, + dpop_required=False # Mixed mode + ) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return {"user": claims["sub"]} + + client = TestClient(app) + response = client.get( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": dpop_proof + } + ) + + assert response.status_code == 200 + assert response.json()["user"] == "user_123" + + +@pytest.mark.asyncio +async def test_bearer_only_mode_accepts_bearer(httpx_mock: HTTPXMock): + """Test that Bearer tokens work when DPoP is disabled.""" + setup_mocks(httpx_mock) + + bearer_token = await generate_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + app = FastAPI() + auth0 = Auth0FastAPI( + domain="auth0.local", + audience="", + dpop_enabled=False # Bearer only mode + ) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return {"user": claims["sub"]} + + client = TestClient(app) + response = client.get( + "/test", + headers={"Authorization": f"Bearer {bearer_token}"} + ) + + assert response.status_code == 200 + assert response.json()["user"] == "user_123" + + +@pytest.mark.asyncio +async def test_dpop_bound_token_with_bearer_scheme_fails(httpx_mock: HTTPXMock): + """Test that DPoP-bound tokens fail when using Bearer scheme.""" + setup_mocks(httpx_mock) + + # Generate a DPoP-bound token (has cnf claim) + dpop_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + app = FastAPI() + auth0 = Auth0FastAPI( + domain="auth0.local", + audience="", + dpop_enabled=True + ) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return "OK" + + client = TestClient(app) + # Try to use DPoP-bound token with Bearer scheme (should fail) + response = client.get( + "/test", + headers={"Authorization": f"Bearer {dpop_token}"} + ) + + assert response.status_code == 401 + json_body = response.json() + assert "dpop" in json_body["detail"]["error_description"].lower() diff --git a/tests/test_dpop_authentication.py b/tests/test_dpop_authentication.py new file mode 100644 index 0000000..4d94fc8 --- /dev/null +++ b/tests/test_dpop_authentication.py @@ -0,0 +1,341 @@ +""" +Tests for DPoP authentication functionality in FastAPI middleware. +Focuses on DPoP-specific authentication flows and error cases. +""" +import pytest +from fastapi import FastAPI, Depends +from pytest_httpx import HTTPXMock +from fastapi.testclient import TestClient + +from fastapi_plugin.fast_api_client import Auth0FastAPI +from fastapi_plugin.test_utils import ( + generate_token, + generate_dpop_proof, + generate_dpop_bound_token, + PUBLIC_DPOP_JWK +) + + +def setup_mocks(httpx_mock: HTTPXMock): + """Setup common OIDC and JWKS mocks.""" + httpx_mock.add_response( + method="GET", + url="https://auth0.local/.well-known/openid-configuration", + json={ + "issuer": "https://auth0.local/", + "jwks_uri": "https://auth0.local/.well-known/jwks.json" + } + ) + httpx_mock.add_response( + method="GET", + url="https://auth0.local/.well-known/jwks.json", + json={ + "keys": [ + { + "kty": "RSA", + "kid": "TEST_KEY", + "n": "whYOFK2Ocbbpb_zVypi9SeKiNUqKQH0zTKN1-6fpCTu6ZalGI82s7XK3tan4dJt90ptUPKD2zvxqTzFNfx4HHHsrYCf2-FMLn1VTJfQazA2BvJqAwcpW1bqRUEty8tS_Yv4hRvWfQPcc2Gc3-_fQOOW57zVy-rNoJc744kb30NjQxdGp03J2S3GLQu7oKtSDDPooQHD38PEMNnITf0pj-KgDPjymkMGoJlO3aKppsjfbt_AH6GGdRghYRLOUwQU-h-ofWHR3lbYiKtXPn5dN24kiHy61e3VAQ9_YAZlwXC_99GGtw_NpghFAuM4P1JDn0DppJldy3PGFC0GfBCZASw", + "e": "AQAB", + "alg": "RS256", + "use": "sig" + }, + PUBLIC_DPOP_JWK + ] + } + ) + + +@pytest.mark.asyncio +async def test_dpop_authentication_success(httpx_mock: HTTPXMock): + """Test successful DPoP authentication with valid token and proof.""" + setup_mocks(httpx_mock) + + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + dpop_proof = await generate_dpop_proof( + http_method="GET", + http_url="http://testserver/test", + access_token=access_token + ) + + app = FastAPI() + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return {"user": claims["sub"]} + + client = TestClient(app) + response = client.get( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": dpop_proof + } + ) + + assert response.status_code == 200 + assert response.json()["user"] == "user_123" + + +@pytest.mark.asyncio +async def test_dpop_authentication_missing_dpop_header(httpx_mock: HTTPXMock): + """Test DPoP request fails when DPoP header is missing.""" + + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + app = FastAPI() + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return "OK" + + client = TestClient(app) + response = client.get( + "/test", + headers={"Authorization": f"DPoP {access_token}"} # Missing DPoP header + ) + + assert response.status_code == 400 + json_body = response.json() + assert "invalid_request" in json_body["detail"]["error"] + + +@pytest.mark.asyncio +async def test_dpop_authentication_invalid_dpop_proof(httpx_mock: HTTPXMock): + """Test DPoP request fails with malformed DPoP proof.""" + + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + app = FastAPI() + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return "OK" + + client = TestClient(app) + response = client.get( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": "invalid.jwt.proof" + } + ) + + assert response.status_code == 400 + json_body = response.json() + assert "invalid_dpop_proof" in json_body["detail"]["error"] + + +@pytest.mark.asyncio +async def test_dpop_authentication_url_mismatch(httpx_mock: HTTPXMock): + """Test DPoP proof fails when htu doesn't match request URL.""" + setup_mocks(httpx_mock) + + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + # Generate proof for WRONG URL + dpop_proof = await generate_dpop_proof( + http_method="GET", + http_url="http://testserver/wrong-url", # Wrong URL + access_token=access_token + ) + + app = FastAPI() + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return "OK" + + client = TestClient(app) + response = client.get( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": dpop_proof + } + ) + + assert response.status_code == 400 + json_body = response.json() + assert "invalid_dpop_proof" in json_body["detail"]["error"] + + +@pytest.mark.asyncio +async def test_dpop_authentication_method_mismatch(httpx_mock: HTTPXMock): + """Test DPoP proof fails when htm doesn't match request method.""" + setup_mocks(httpx_mock) + + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + # Generate proof for POST but send GET + dpop_proof = await generate_dpop_proof( + http_method="POST", # Wrong method + http_url="http://testserver/test", + access_token=access_token + ) + + app = FastAPI() + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return "OK" + + client = TestClient(app) + response = client.get( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": dpop_proof + } + ) + + assert response.status_code == 400 + json_body = response.json() + assert "invalid_dpop_proof" in json_body["detail"]["error"] + + +@pytest.mark.asyncio +async def test_dpop_with_scope_validation(httpx_mock: HTTPXMock): + """Test that scope validation works with DPoP tokens.""" + setup_mocks(httpx_mock) + + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/", + claims={"scope": "read:data write:data"} + ) + + dpop_proof = await generate_dpop_proof( + http_method="GET", + http_url="http://testserver/test", + access_token=access_token + ) + + app = FastAPI() + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth(scopes=["read:data"]))): + return {"user": claims["sub"]} + + client = TestClient(app) + response = client.get( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": dpop_proof + } + ) + + assert response.status_code == 200 + assert response.json()["user"] == "user_123" + + +@pytest.mark.asyncio +async def test_dpop_with_invalid_scope(httpx_mock: HTTPXMock): + """Test that scope validation fails with insufficient scopes.""" + setup_mocks(httpx_mock) + + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/", + claims={"scope": "read:data"} + ) + + dpop_proof = await generate_dpop_proof( + http_method="GET", + http_url="http://testserver/test", + access_token=access_token + ) + + app = FastAPI() + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth(scopes=["write:data"]))): + return "OK" + + client = TestClient(app) + response = client.get( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": dpop_proof + } + ) + + assert response.status_code == 403 + json_body = response.json() + assert json_body["detail"]["error"] == "insufficient_scope" + + +@pytest.mark.asyncio +async def test_dpop_with_post_request(httpx_mock: HTTPXMock): + """Test DPoP authentication works with POST requests.""" + setup_mocks(httpx_mock) + + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + dpop_proof = await generate_dpop_proof( + http_method="POST", + http_url="http://testserver/test", + access_token=access_token + ) + + app = FastAPI() + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) + + @app.post("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return {"user": claims["sub"]} + + client = TestClient(app) + response = client.post( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": dpop_proof + } + ) + + assert response.status_code == 200 + assert response.json()["user"] == "user_123" diff --git a/tests/test_fast_api_client.py b/tests/test_fast_api_client.py index 46bf19c..202adc6 100644 --- a/tests/test_fast_api_client.py +++ b/tests/test_fast_api_client.py @@ -25,7 +25,6 @@ async def test_route(claims=Depends(auth0.require_auth())): assert response.status_code == 400 json_body = response.json() assert json_body["detail"]["error"] == "invalid_request" - assert json_body["detail"]["error_description"] == "No Authorization provided" @pytest.mark.asyncio diff --git a/tests/test_reverse_proxy.py b/tests/test_reverse_proxy.py new file mode 100644 index 0000000..ae5e1a6 --- /dev/null +++ b/tests/test_reverse_proxy.py @@ -0,0 +1,432 @@ +""" +Tests for reverse proxy support in FastAPI middleware. +Tests the get_canonical_url functionality and X-Forwarded-* header handling. +""" +import pytest +from fastapi import FastAPI, Depends, Request +from pytest_httpx import HTTPXMock +from fastapi.testclient import TestClient + +from fastapi_plugin.fast_api_client import Auth0FastAPI +from fastapi_plugin.utils import get_canonical_url +from fastapi_plugin.test_utils import ( + generate_dpop_proof, + generate_dpop_bound_token, + PUBLIC_DPOP_JWK +) + + +def setup_mocks(httpx_mock: HTTPXMock): + """Setup common OIDC and JWKS mocks.""" + httpx_mock.add_response( + method="GET", + url="https://auth0.local/.well-known/openid-configuration", + json={ + "issuer": "https://auth0.local/", + "jwks_uri": "https://auth0.local/.well-known/jwks.json" + } + ) + httpx_mock.add_response( + method="GET", + url="https://auth0.local/.well-known/jwks.json", + json={ + "keys": [ + { + "kty": "RSA", + "kid": "TEST_KEY", + "n": "whYOFK2Ocbbpb_zVypi9SeKiNUqKQH0zTKN1-6fpCTu6ZalGI82s7XK3tan4dJt90ptUPKD2zvxqTzFNfx4HHHsrYCf2-FMLn1VTJfQazA2BvJqAwcpW1bqRUEty8tS_Yv4hRvWfQPcc2Gc3-_fQOOW57zVy-rNoJc744kb30NjQxdGp03J2S3GLQu7oKtSDDPooQHD38PEMNnITf0pj-KgDPjymkMGoJlO3aKppsjfbt_AH6GGdRghYRLOUwQU-h-ofWHR3lbYiKtXPn5dN24kiHy61e3VAQ9_YAZlwXC_99GGtw_NpghFAuM4P1JDn0DppJldy3PGFC0GfBCZASw", + "e": "AQAB", + "alg": "RS256", + "use": "sig" + }, + PUBLIC_DPOP_JWK + ] + } + ) + + +@pytest.mark.asyncio +async def test_reverse_proxy_with_trust_enabled(httpx_mock: HTTPXMock): + """Test that X-Forwarded headers are used when trust_proxy=True.""" + setup_mocks(httpx_mock) + + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + # Generate DPoP proof for the PUBLIC URL + dpop_proof = await generate_dpop_proof( + http_method="GET", + http_url="https://api.example.com/test", # Public URL + access_token=access_token + ) + + app = FastAPI() + app.state.trust_proxy = True # Enable proxy trust + + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return {"user": claims["sub"]} + + client = TestClient(app) + + # Request comes to internal URL but with X-Forwarded headers + response = client.get( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": dpop_proof, + "X-Forwarded-Proto": "https", + "X-Forwarded-Host": "api.example.com" + } + ) + + assert response.status_code == 200 + assert response.json()["user"] == "user_123" + + +@pytest.mark.asyncio +async def test_reverse_proxy_without_trust_fails(httpx_mock: HTTPXMock): + """Test that X-Forwarded headers are IGNORED when trust_proxy=False.""" + setup_mocks(httpx_mock) + + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + # Generate DPoP proof for the PUBLIC URL + dpop_proof = await generate_dpop_proof( + http_method="GET", + http_url="https://api.example.com/test", + access_token=access_token + ) + + app = FastAPI() + # trust_proxy defaults to False + + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return "OK" + + client = TestClient(app) + + # Headers are present but should be ignored + response = client.get( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": dpop_proof, + "X-Forwarded-Proto": "https", + "X-Forwarded-Host": "api.example.com" + } + ) + + # Should fail because proof expects https://api.example.com + # but app sees http://testserver (headers ignored) + assert response.status_code == 400 + json_body = response.json() + assert "invalid_dpop_proof" in json_body["detail"]["error"] + + +@pytest.mark.asyncio +async def test_reverse_proxy_with_path_prefix(httpx_mock: HTTPXMock): + """Test X-Forwarded-Prefix handling.""" + setup_mocks(httpx_mock) + + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + # Generate DPoP proof with prefix + dpop_proof = await generate_dpop_proof( + http_method="GET", + http_url="https://api.example.com/api/v1/test", + access_token=access_token + ) + + app = FastAPI() + app.state.trust_proxy = True + + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return {"user": claims["sub"]} + + client = TestClient(app) + + response = client.get( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": dpop_proof, + "X-Forwarded-Proto": "https", + "X-Forwarded-Host": "api.example.com", + "X-Forwarded-Prefix": "/api/v1" + } + ) + + assert response.status_code == 200 + assert response.json()["user"] == "user_123" + + +@pytest.mark.asyncio +async def test_reverse_proxy_with_trailing_slash_prefix(httpx_mock: HTTPXMock): + """Test X-Forwarded-Prefix with trailing slash is handled correctly.""" + setup_mocks(httpx_mock) + + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + # Generate DPoP proof WITHOUT trailing slash + dpop_proof = await generate_dpop_proof( + http_method="GET", + http_url="https://api.example.com/api/v1/test", + access_token=access_token + ) + + app = FastAPI() + app.state.trust_proxy = True + + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return {"user": claims["sub"]} + + client = TestClient(app) + + response = client.get( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": dpop_proof, + "X-Forwarded-Proto": "https", + "X-Forwarded-Host": "api.example.com", + "X-Forwarded-Prefix": "/api/v1/" # With trailing slash + } + ) + + # Should still work - trailing slash should be stripped + assert response.status_code == 200 + + +@pytest.mark.asyncio +async def test_reverse_proxy_multiple_hosts(httpx_mock: HTTPXMock): + """Test that first host is used from comma-separated X-Forwarded-Host.""" + setup_mocks(httpx_mock) + + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + dpop_proof = await generate_dpop_proof( + http_method="GET", + http_url="https://client.example.com/test", + access_token=access_token + ) + + app = FastAPI() + app.state.trust_proxy = True + + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return {"user": claims["sub"]} + + client = TestClient(app) + + response = client.get( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": dpop_proof, + "X-Forwarded-Proto": "https", + "X-Forwarded-Host": "client.example.com, proxy1.internal, proxy2.internal" + } + ) + + assert response.status_code == 200 + assert response.json()["user"] == "user_123" + + +@pytest.mark.asyncio +async def test_reverse_proxy_with_query_params(httpx_mock: HTTPXMock): + """Test that query parameters are preserved in canonical URL.""" + setup_mocks(httpx_mock) + + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + dpop_proof = await generate_dpop_proof( + http_method="GET", + http_url="https://api.example.com/test?page=1&limit=10", + access_token=access_token + ) + + app = FastAPI() + app.state.trust_proxy = True + + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return {"user": claims["sub"]} + + client = TestClient(app) + + response = client.get( + "/test?page=1&limit=10", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": dpop_proof, + "X-Forwarded-Proto": "https", + "X-Forwarded-Host": "api.example.com" + } + ) + + assert response.status_code == 200 + assert response.json()["user"] == "user_123" + + +@pytest.mark.asyncio +async def test_reverse_proxy_partial_headers(httpx_mock: HTTPXMock): + """Test with only X-Forwarded-Proto (partial headers).""" + setup_mocks(httpx_mock) + + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + # Proof expects https but with testserver host + dpop_proof = await generate_dpop_proof( + http_method="GET", + http_url="https://testserver/test", + access_token=access_token + ) + + app = FastAPI() + app.state.trust_proxy = True + + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return {"user": claims["sub"]} + + client = TestClient(app) + + response = client.get( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": dpop_proof, + "X-Forwarded-Proto": "https" # Only proto, no host + } + ) + + assert response.status_code == 200 + + +def test_get_canonical_url_without_proxy(): + """Test get_canonical_url returns direct URL when trust_proxy=False.""" + app = FastAPI() + # trust_proxy defaults to False + + @app.get("/test") + async def test_route(request: Request): + canonical_url = get_canonical_url(request) + return {"url": canonical_url} + + client = TestClient(app) + response = client.get( + "/test", + headers={ + "X-Forwarded-Proto": "https", + "X-Forwarded-Host": "evil.com" # Should be ignored + } + ) + + assert response.status_code == 200 + url = response.json()["url"] + # Should use testserver, not evil.com + assert "testserver" in url + assert "evil.com" not in url + + +def test_get_canonical_url_with_proxy(): + """Test get_canonical_url uses X-Forwarded headers when trust_proxy=True.""" + app = FastAPI() + app.state.trust_proxy = True + + @app.get("/test") + async def test_route(request: Request): + canonical_url = get_canonical_url(request) + return {"url": canonical_url} + + client = TestClient(app) + response = client.get( + "/test", + headers={ + "X-Forwarded-Proto": "https", + "X-Forwarded-Host": "api.example.com" + } + ) + + assert response.status_code == 200 + url = response.json()["url"] + assert "https://api.example.com/test" == url + + +def test_get_canonical_url_security_without_trust(): + """Test that malicious X-Forwarded headers are ignored without trust.""" + app = FastAPI() + # trust_proxy = False (default) + + @app.get("/test") + async def test_route(request: Request): + canonical_url = get_canonical_url(request) + return {"url": canonical_url} + + client = TestClient(app) + response = client.get( + "/test", + headers={ + "X-Forwarded-Proto": "https", + "X-Forwarded-Host": "attacker.com" + } + ) + + assert response.status_code == 200 + url = response.json()["url"] + # Malicious headers should be ignored + assert "attacker.com" not in url + assert "testserver" in url From b0fdfbc2dcd33e827554fd267a70e7890bf67d83 Mon Sep 17 00:00:00 2001 From: Snehil Kishore Date: Wed, 15 Oct 2025 16:54:09 +0530 Subject: [PATCH 3/9] refactor: move common test mocks to conftest.py and remove duplicate code --- tests/conftest.py | 31 +++++++++++ tests/test_configuration_modes.py | 75 +-------------------------- tests/test_dpop_authentication.py | 72 +------------------------- tests/test_fast_api_client.py | 85 +------------------------------ tests/test_reverse_proxy.py | 33 +----------- 5 files changed, 37 insertions(+), 259 deletions(-) create mode 100644 tests/conftest.py diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..1dda4ec --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,31 @@ +from pytest_httpx import HTTPXMock +from fastapi_plugin.test_utils import PUBLIC_DPOP_JWK + + +def setup_mocks(httpx_mock: HTTPXMock): + """Setup common OIDC and JWKS mocks for all tests.""" + httpx_mock.add_response( + method="GET", + url="https://auth0.local/.well-known/openid-configuration", + json={ + "issuer": "https://auth0.local/", + "jwks_uri": "https://auth0.local/.well-known/jwks.json" + } + ) + httpx_mock.add_response( + method="GET", + url="https://auth0.local/.well-known/jwks.json", + json={ + "keys": [ + { + "kty": "RSA", + "kid": "TEST_KEY", + "n": "whYOFK2Ocbbpb_zVypi9SeKiNUqKQH0zTKN1-6fpCTu6ZalGI82s7XK3tan4dJt90ptUPKD2zvxqTzFNfx4HHHsrYCf2-FMLn1VTJfQazA2BvJqAwcpW1bqRUEty8tS_Yv4hRvWfQPcc2Gc3-_fQOOW57zVy-rNoJc744kb30NjQxdGp03J2S3GLQu7oKtSDDPooQHD38PEMNnITf0pj-KgDPjymkMGoJlO3aKppsjfbt_AH6GGdRghYRLOUwQU-h-ofWHR3lbYiKtXPn5dN24kiHy61e3VAQ9_YAZlwXC_99GGtw_NpghFAuM4P1JDn0DppJldy3PGFC0GfBCZASw", + "e": "AQAB", + "alg": "RS256", + "use": "sig" + }, + PUBLIC_DPOP_JWK + ] + } + ) \ No newline at end of file diff --git a/tests/test_configuration_modes.py b/tests/test_configuration_modes.py index 7171caa..44c399b 100644 --- a/tests/test_configuration_modes.py +++ b/tests/test_configuration_modes.py @@ -12,37 +12,8 @@ generate_token, generate_dpop_proof, generate_dpop_bound_token, - PUBLIC_DPOP_JWK ) - - -def setup_mocks(httpx_mock: HTTPXMock): - """Setup common OIDC and JWKS mocks.""" - httpx_mock.add_response( - method="GET", - url="https://auth0.local/.well-known/openid-configuration", - json={ - "issuer": "https://auth0.local/", - "jwks_uri": "https://auth0.local/.well-known/jwks.json" - } - ) - httpx_mock.add_response( - method="GET", - url="https://auth0.local/.well-known/jwks.json", - json={ - "keys": [ - { - "kty": "RSA", - "kid": "TEST_KEY", - "n": "whYOFK2Ocbbpb_zVypi9SeKiNUqKQH0zTKN1-6fpCTu6ZalGI82s7XK3tan4dJt90ptUPKD2zvxqTzFNfx4HHHsrYCf2-FMLn1VTJfQazA2BvJqAwcpW1bqRUEty8tS_Yv4hRvWfQPcc2Gc3-_fQOOW57zVy-rNoJc744kb30NjQxdGp03J2S3GLQu7oKtSDDPooQHD38PEMNnITf0pj-KgDPjymkMGoJlO3aKppsjfbt_AH6GGdRghYRLOUwQU-h-ofWHR3lbYiKtXPn5dN24kiHy61e3VAQ9_YAZlwXC_99GGtw_NpghFAuM4P1JDn0DppJldy3PGFC0GfBCZASw", - "e": "AQAB", - "alg": "RS256", - "use": "sig" - }, - PUBLIC_DPOP_JWK - ] - } - ) +from conftest import setup_mocks def test_dpop_configuration_defaults(): @@ -226,50 +197,6 @@ async def test_route(claims=Depends(auth0.require_auth())): assert response.status_code == 200 assert response.json()["user"] == "user_123" - -@pytest.mark.asyncio -async def test_mixed_mode_accepts_dpop(httpx_mock: HTTPXMock): - """Test that DPoP tokens are accepted in mixed mode (default).""" - setup_mocks(httpx_mock) - - access_token = await generate_dpop_bound_token( - domain="auth0.local", - user_id="user_123", - audience="", - issuer="https://auth0.local/" - ) - - dpop_proof = await generate_dpop_proof( - http_method="GET", - http_url="http://testserver/test", - access_token=access_token - ) - - app = FastAPI() - auth0 = Auth0FastAPI( - domain="auth0.local", - audience="", - dpop_enabled=True, - dpop_required=False # Mixed mode - ) - - @app.get("/test") - async def test_route(claims=Depends(auth0.require_auth())): - return {"user": claims["sub"]} - - client = TestClient(app) - response = client.get( - "/test", - headers={ - "Authorization": f"DPoP {access_token}", - "DPoP": dpop_proof - } - ) - - assert response.status_code == 200 - assert response.json()["user"] == "user_123" - - @pytest.mark.asyncio async def test_bearer_only_mode_accepts_bearer(httpx_mock: HTTPXMock): """Test that Bearer tokens work when DPoP is disabled.""" diff --git a/tests/test_dpop_authentication.py b/tests/test_dpop_authentication.py index 4d94fc8..ba43a01 100644 --- a/tests/test_dpop_authentication.py +++ b/tests/test_dpop_authentication.py @@ -9,40 +9,10 @@ from fastapi_plugin.fast_api_client import Auth0FastAPI from fastapi_plugin.test_utils import ( - generate_token, generate_dpop_proof, - generate_dpop_bound_token, - PUBLIC_DPOP_JWK + generate_dpop_bound_token ) - - -def setup_mocks(httpx_mock: HTTPXMock): - """Setup common OIDC and JWKS mocks.""" - httpx_mock.add_response( - method="GET", - url="https://auth0.local/.well-known/openid-configuration", - json={ - "issuer": "https://auth0.local/", - "jwks_uri": "https://auth0.local/.well-known/jwks.json" - } - ) - httpx_mock.add_response( - method="GET", - url="https://auth0.local/.well-known/jwks.json", - json={ - "keys": [ - { - "kty": "RSA", - "kid": "TEST_KEY", - "n": "whYOFK2Ocbbpb_zVypi9SeKiNUqKQH0zTKN1-6fpCTu6ZalGI82s7XK3tan4dJt90ptUPKD2zvxqTzFNfx4HHHsrYCf2-FMLn1VTJfQazA2BvJqAwcpW1bqRUEty8tS_Yv4hRvWfQPcc2Gc3-_fQOOW57zVy-rNoJc744kb30NjQxdGp03J2S3GLQu7oKtSDDPooQHD38PEMNnITf0pj-KgDPjymkMGoJlO3aKppsjfbt_AH6GGdRghYRLOUwQU-h-ofWHR3lbYiKtXPn5dN24kiHy61e3VAQ9_YAZlwXC_99GGtw_NpghFAuM4P1JDn0DppJldy3PGFC0GfBCZASw", - "e": "AQAB", - "alg": "RS256", - "use": "sig" - }, - PUBLIC_DPOP_JWK - ] - } - ) +from conftest import setup_mocks @pytest.mark.asyncio @@ -301,41 +271,3 @@ async def test_route(claims=Depends(auth0.require_auth(scopes=["write:data"]))): assert response.status_code == 403 json_body = response.json() assert json_body["detail"]["error"] == "insufficient_scope" - - -@pytest.mark.asyncio -async def test_dpop_with_post_request(httpx_mock: HTTPXMock): - """Test DPoP authentication works with POST requests.""" - setup_mocks(httpx_mock) - - access_token = await generate_dpop_bound_token( - domain="auth0.local", - user_id="user_123", - audience="", - issuer="https://auth0.local/" - ) - - dpop_proof = await generate_dpop_proof( - http_method="POST", - http_url="http://testserver/test", - access_token=access_token - ) - - app = FastAPI() - auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) - - @app.post("/test") - async def test_route(claims=Depends(auth0.require_auth())): - return {"user": claims["sub"]} - - client = TestClient(app) - response = client.post( - "/test", - headers={ - "Authorization": f"DPoP {access_token}", - "DPoP": dpop_proof - } - ) - - assert response.status_code == 200 - assert response.json()["user"] == "user_123" diff --git a/tests/test_fast_api_client.py b/tests/test_fast_api_client.py index 202adc6..3940021 100644 --- a/tests/test_fast_api_client.py +++ b/tests/test_fast_api_client.py @@ -329,87 +329,4 @@ def test_auth0fastapi_accepts_client_id_and_secret(): ) options = auth0.api_client.options assert options.client_id == client_id - assert options.client_secret == client_secret - -def test_auth0fastapi_accepts_dpop_configuration(): - """Test that Auth0FastAPI accepts DPoP configuration parameters.""" - auth0 = Auth0FastAPI( - domain="auth0.local", - audience="test_audience", - dpop_enabled=True, - dpop_required=True, - dpop_iat_leeway=60, - dpop_iat_offset=600 - ) - options = auth0.api_client.options - assert options.dpop_enabled == True - assert options.dpop_required == True - assert options.dpop_iat_leeway == 60 - assert options.dpop_iat_offset == 600 - -@pytest.mark.asyncio -async def test_should_return_200_with_dpop_authentication(httpx_mock: HTTPXMock): - """ - Test successful DPoP authentication with proper DPoP proof and bound access token. - """ - # Mock OIDC discovery and JWKS with both RSA and EC keys - httpx_mock.add_response( - method="GET", - url="https://auth0.local/.well-known/openid-configuration", - json={ - "issuer": "https://auth0.local/", - "jwks_uri": "https://auth0.local/.well-known/jwks.json" - } - ) - httpx_mock.add_response( - method="GET", - url="https://auth0.local/.well-known/jwks.json", - json={ - "keys": [ - { - "kty": "RSA", - "kid": "TEST_KEY", - "n": "whYOFK2Ocbbpb_zVypi9SeKiNUqKQH0zTKN1-6fpCTu6ZalGI82s7XK3tan4dJt90ptUPKD2zvxqTzFNfx4HHHsrYCf2-FMLn1VTJfQazA2BvJqAwcpW1bqRUEty8tS_Yv4hRvWfQPcc2Gc3-_fQOOW57zVy-rNoJc744kb30NjQxdGp03J2S3GLQu7oKtSDDPooQHD38PEMNnITf0pj-KgDPjymkMGoJlO3aKppsjfbt_AH6GGdRghYRLOUwQU-h-ofWHR3lbYiKtXPn5dN24kiHy61e3VAQ9_YAZlwXC_99GGtw_NpghFAuM4P1JDn0DppJldy3PGFC0GfBCZASw", - "e": "AQAB", - "alg": "RS256", - "use": "sig" - }, - PUBLIC_DPOP_JWK - ] - } - ) - - # Generate DPoP-bound access token - access_token = await generate_dpop_bound_token( - domain="auth0.local", - user_id="user_123", - audience="", - issuer="https://auth0.local/" - ) - - # Generate DPoP proof for the request - dpop_proof = await generate_dpop_proof( - http_method="GET", - http_url="http://testserver/test", - access_token=access_token - ) - - app = FastAPI() - auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) - - @app.get("/test") - async def test_route(claims=Depends(auth0.require_auth())): - return "OK" - - client = TestClient(app) - - response = client.get( - "/test", - headers={ - "Authorization": f"DPoP {access_token}", - "DPoP": dpop_proof - } - ) - - assert response.status_code == 200 - assert response.text == '"OK"' \ No newline at end of file + assert options.client_secret == client_secret \ No newline at end of file diff --git a/tests/test_reverse_proxy.py b/tests/test_reverse_proxy.py index ae5e1a6..23b44b5 100644 --- a/tests/test_reverse_proxy.py +++ b/tests/test_reverse_proxy.py @@ -11,38 +11,9 @@ from fastapi_plugin.utils import get_canonical_url from fastapi_plugin.test_utils import ( generate_dpop_proof, - generate_dpop_bound_token, - PUBLIC_DPOP_JWK + generate_dpop_bound_token ) - - -def setup_mocks(httpx_mock: HTTPXMock): - """Setup common OIDC and JWKS mocks.""" - httpx_mock.add_response( - method="GET", - url="https://auth0.local/.well-known/openid-configuration", - json={ - "issuer": "https://auth0.local/", - "jwks_uri": "https://auth0.local/.well-known/jwks.json" - } - ) - httpx_mock.add_response( - method="GET", - url="https://auth0.local/.well-known/jwks.json", - json={ - "keys": [ - { - "kty": "RSA", - "kid": "TEST_KEY", - "n": "whYOFK2Ocbbpb_zVypi9SeKiNUqKQH0zTKN1-6fpCTu6ZalGI82s7XK3tan4dJt90ptUPKD2zvxqTzFNfx4HHHsrYCf2-FMLn1VTJfQazA2BvJqAwcpW1bqRUEty8tS_Yv4hRvWfQPcc2Gc3-_fQOOW57zVy-rNoJc744kb30NjQxdGp03J2S3GLQu7oKtSDDPooQHD38PEMNnITf0pj-KgDPjymkMGoJlO3aKppsjfbt_AH6GGdRghYRLOUwQU-h-ofWHR3lbYiKtXPn5dN24kiHy61e3VAQ9_YAZlwXC_99GGtw_NpghFAuM4P1JDn0DppJldy3PGFC0GfBCZASw", - "e": "AQAB", - "alg": "RS256", - "use": "sig" - }, - PUBLIC_DPOP_JWK - ] - } - ) +from conftest import setup_mocks @pytest.mark.asyncio From 96aacaee56698b8be265efdfd6ea765896a72f6f Mon Sep 17 00:00:00 2001 From: Snehil Kishore Date: Tue, 21 Oct 2025 19:31:00 +0530 Subject: [PATCH 4/9] refactor: move test utilities into tests directory and improve test organization --- pyproject.toml | 3 +- tests/conftest.py | 2 +- tests/test_bearer_token_validation.py | 203 +++++++++++++++ tests/test_client_initialization.py | 64 +++++ tests/test_configuration_modes.py | 84 ++---- tests/test_dpop_authentication.py | 8 +- tests/test_fast_api_client.py | 332 ------------------------ tests/test_reverse_proxy.py | 35 +-- {fastapi_plugin => tests}/test_utils.py | 0 9 files changed, 299 insertions(+), 432 deletions(-) create mode 100644 tests/test_bearer_token_validation.py create mode 100644 tests/test_client_initialization.py delete mode 100644 tests/test_fast_api_client.py rename {fastapi_plugin => tests}/test_utils.py (100%) diff --git a/pyproject.toml b/pyproject.toml index 55beb68..f4433ce 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,8 @@ license = "MIT" readme = "README.md" packages = [ - { include = "fastapi_plugin" } + { include = "fastapi_plugin" }, + { include = "tests", format = "sdist" } ] [tool.poetry.dependencies] diff --git a/tests/conftest.py b/tests/conftest.py index 1dda4ec..ab2ac67 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,5 @@ from pytest_httpx import HTTPXMock -from fastapi_plugin.test_utils import PUBLIC_DPOP_JWK +from test_utils import PUBLIC_DPOP_JWK def setup_mocks(httpx_mock: HTTPXMock): diff --git a/tests/test_bearer_token_validation.py b/tests/test_bearer_token_validation.py new file mode 100644 index 0000000..0855ed1 --- /dev/null +++ b/tests/test_bearer_token_validation.py @@ -0,0 +1,203 @@ +""" +Tests for Bearer token authentication and JWT validation. +Tests core JWT validation logic including issuer, expiration, and scope checks. +""" +import pytest +from fastapi import FastAPI, Depends +from pytest_httpx import HTTPXMock +from fastapi.testclient import TestClient + +from fastapi_plugin.fast_api_client import Auth0FastAPI +from test_utils import generate_token +from conftest import setup_mocks + + +# ============================================================================= +# Missing Token Tests +# ============================================================================= + +@pytest.mark.asyncio +async def test_missing_authorization_header(): + """Test that requests without Authorization header return 400.""" + app = FastAPI() + auth0 = Auth0FastAPI(domain="auth0.local", audience="") + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return "OK" + + client = TestClient(app) + response = client.get("/test") + + assert response.status_code == 400 + json_body = response.json() + assert json_body["detail"]["error"] == "invalid_request" + + +# ============================================================================= +# Valid Token Tests +# ============================================================================= + +@pytest.mark.asyncio +async def test_valid_bearer_token_authentication(httpx_mock: HTTPXMock): + """Test successful authentication with a valid Bearer token.""" + setup_mocks(httpx_mock) + + access_token = await generate_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/", + iat=True, + exp=True + ) + + app = FastAPI() + auth0 = Auth0FastAPI(domain="auth0.local", audience="") + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return "OK" + + client = TestClient(app) + response = client.get( + "/test", + headers={"Authorization": f"Bearer {access_token}"} + ) + + assert response.status_code == 200 + + +# ============================================================================= +# Issuer Validation Tests +# ============================================================================= + +@pytest.mark.asyncio +async def test_missing_issuer_claim(httpx_mock: HTTPXMock): + """Test that tokens without 'iss' claim are rejected with 401.""" + setup_mocks(httpx_mock) + + access_token = await generate_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer=False, # Omit issuer claim + iat=True, + exp=True + ) + + app = FastAPI() + auth0 = Auth0FastAPI(domain="auth0.local", audience="") + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return "OK" + + client = TestClient(app) + response = client.get( + "/test", + headers={"Authorization": f"Bearer {access_token}"} + ) + + assert response.status_code == 401 + + +@pytest.mark.asyncio +async def test_invalid_issuer_claim(httpx_mock: HTTPXMock): + """Test that tokens with mismatched issuer are rejected with 401.""" + setup_mocks(httpx_mock) + + access_token = await generate_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://invalid-issuer.local", # Wrong issuer + iat=True, + exp=True + ) + + app = FastAPI() + auth0 = Auth0FastAPI(domain="auth0.local", audience="") + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return "OK" + + client = TestClient(app) + response = client.get( + "/test", + headers={"Authorization": f"Bearer {access_token}"} + ) + + assert response.status_code == 401 + + +# ============================================================================= +# Expiration Validation Tests +# ============================================================================= + +@pytest.mark.asyncio +async def test_missing_expiration_claim(httpx_mock: HTTPXMock): + """Test that tokens without 'exp' claim are rejected with 401.""" + setup_mocks(httpx_mock) + + access_token = await generate_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/", + iat=True, + exp=False # Omit expiration claim + ) + + app = FastAPI() + auth0 = Auth0FastAPI(domain="auth0.local", audience="") + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return "OK" + + client = TestClient(app) + response = client.get( + "/test", + headers={"Authorization": f"Bearer {access_token}"} + ) + + assert response.status_code == 401 + + +# ============================================================================= +# Scope Validation Tests +# ============================================================================= + +@pytest.mark.asyncio +async def test_insufficient_scope(httpx_mock: HTTPXMock): + """Test that tokens with insufficient scopes are rejected with 403.""" + setup_mocks(httpx_mock) + + access_token = await generate_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/", + iat=True, + exp=True, + claims={"scope": "invalid"} # Wrong scope + ) + + app = FastAPI() + auth0 = Auth0FastAPI(domain="auth0.local", audience="") + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth(scopes="valid"))): + return "OK" + + client = TestClient(app) + response = client.get( + "/test", + headers={"Authorization": f"Bearer {access_token}"} + ) + + assert response.status_code == 403 + json_body = response.json() + assert json_body["detail"]["error"] == "insufficient_scope" diff --git a/tests/test_client_initialization.py b/tests/test_client_initialization.py new file mode 100644 index 0000000..4f43671 --- /dev/null +++ b/tests/test_client_initialization.py @@ -0,0 +1,64 @@ +""" +Tests for Auth0FastAPI client initialization and configuration. +Tests constructor parameters, default values, and configuration options. +""" +from fastapi_plugin.fast_api_client import Auth0FastAPI + + +# ============================================================================= +# Client Credentials Configuration +# ============================================================================= + +def test_initialization_with_client_credentials(): + """Test that Auth0FastAPI accepts and stores client_id and client_secret.""" + client_id = "test_client_id" + client_secret = "test_client_secret" + + auth0 = Auth0FastAPI( + domain="auth0.local", + audience="test_audience", + client_id=client_id, + client_secret=client_secret + ) + + options = auth0.api_client.options + assert options.client_id == client_id + assert options.client_secret == client_secret + + +# ============================================================================= +# DPoP Configuration Tests +# ============================================================================= + +def test_dpop_default_configuration(): + """Test that DPoP has correct default configuration values.""" + auth0 = Auth0FastAPI(domain="auth0.local", audience="test") + + assert auth0.api_client.options.dpop_enabled is True + assert auth0.api_client.options.dpop_required is False + assert auth0.api_client.options.dpop_iat_leeway == 30 + assert auth0.api_client.options.dpop_iat_offset == 300 + + +def test_dpop_disabled_configuration(): + """Test that DPoP can be explicitly disabled.""" + auth0 = Auth0FastAPI( + domain="auth0.local", + audience="test", + dpop_enabled=False + ) + + assert auth0.api_client.options.dpop_enabled is False + + +def test_dpop_custom_timing_configuration(): + """Test that custom DPoP timing parameters are accepted.""" + auth0 = Auth0FastAPI( + domain="auth0.local", + audience="test", + dpop_iat_leeway=60, + dpop_iat_offset=600 + ) + + assert auth0.api_client.options.dpop_iat_leeway == 60 + assert auth0.api_client.options.dpop_iat_offset == 600 diff --git a/tests/test_configuration_modes.py b/tests/test_configuration_modes.py index 44c399b..2e22497 100644 --- a/tests/test_configuration_modes.py +++ b/tests/test_configuration_modes.py @@ -1,6 +1,6 @@ """ -Tests for DPoP configuration modes (enabled/required/disabled). -Tests the different operational modes of the Auth0FastAPI middleware. +Tests for DPoP operational modes (enabled/required/disabled). +Tests the runtime behavior of different DPoP configuration modes. """ import pytest from fastapi import FastAPI, Depends @@ -8,7 +8,7 @@ from fastapi.testclient import TestClient from fastapi_plugin.fast_api_client import Auth0FastAPI -from fastapi_plugin.test_utils import ( +from test_utils import ( generate_token, generate_dpop_proof, generate_dpop_bound_token, @@ -16,36 +16,9 @@ from conftest import setup_mocks -def test_dpop_configuration_defaults(): - """Test that DPoP configuration has correct defaults.""" - auth0 = Auth0FastAPI(domain="auth0.local", audience="test") - assert auth0.api_client.options.dpop_enabled == True - assert auth0.api_client.options.dpop_required == False - assert auth0.api_client.options.dpop_iat_leeway == 30 - assert auth0.api_client.options.dpop_iat_offset == 300 - - -def test_dpop_disabled_configuration(): - """Test DPoP can be explicitly disabled.""" - auth0 = Auth0FastAPI( - domain="auth0.local", - audience="test", - dpop_enabled=False - ) - assert auth0.api_client.options.dpop_enabled == False - - -def test_dpop_custom_timing_configuration(): - """Test custom DPoP timing parameters.""" - auth0 = Auth0FastAPI( - domain="auth0.local", - audience="test", - dpop_iat_leeway=60, - dpop_iat_offset=600 - ) - assert auth0.api_client.options.dpop_iat_leeway == 60 - assert auth0.api_client.options.dpop_iat_offset == 600 - +# ============================================================================= +# DPoP Required Mode Tests +# ============================================================================= @pytest.mark.asyncio async def test_dpop_required_mode_rejects_bearer(): @@ -105,7 +78,6 @@ async def test_dpop_required_mode_accepts_dpop(httpx_mock: HTTPXMock): dpop_required=True ) - @app.get("/test") async def test_route(claims=Depends(auth0.require_auth())): return {"user": claims["sub"]} @@ -123,6 +95,10 @@ async def test_route(claims=Depends(auth0.require_auth())): assert response.json()["user"] == "user_123" +# ============================================================================= +# DPoP Disabled Mode Tests +# ============================================================================= + @pytest.mark.asyncio async def test_dpop_disabled_mode_rejects_dpop(): """Test that DPoP tokens are rejected when dpop_enabled=False.""" @@ -164,38 +140,9 @@ async def test_route(claims=Depends(auth0.require_auth())): assert json_body["detail"]["error"] == "invalid_request" -@pytest.mark.asyncio -async def test_mixed_mode_accepts_bearer(httpx_mock: HTTPXMock): - """Test that Bearer tokens are accepted in mixed mode (default).""" - setup_mocks(httpx_mock) - - bearer_token = await generate_token( - domain="auth0.local", - user_id="user_123", - audience="", - issuer="https://auth0.local/" - ) - - app = FastAPI() - auth0 = Auth0FastAPI( - domain="auth0.local", - audience="", - dpop_enabled=True, - dpop_required=False # Mixed mode - ) - - @app.get("/test") - async def test_route(claims=Depends(auth0.require_auth())): - return {"user": claims["sub"]} - - client = TestClient(app) - response = client.get( - "/test", - headers={"Authorization": f"Bearer {bearer_token}"} - ) - - assert response.status_code == 200 - assert response.json()["user"] == "user_123" +# ============================================================================= +# Bearer-Only Mode Tests (DPoP disabled) +# ============================================================================= @pytest.mark.asyncio async def test_bearer_only_mode_accepts_bearer(httpx_mock: HTTPXMock): @@ -230,6 +177,10 @@ async def test_route(claims=Depends(auth0.require_auth())): assert response.json()["user"] == "user_123" +# ============================================================================= +# Token/Scheme Mismatch Tests +# ============================================================================= + @pytest.mark.asyncio async def test_dpop_bound_token_with_bearer_scheme_fails(httpx_mock: HTTPXMock): """Test that DPoP-bound tokens fail when using Bearer scheme.""" @@ -263,4 +214,5 @@ async def test_route(claims=Depends(auth0.require_auth())): assert response.status_code == 401 json_body = response.json() + assert json_body["detail"]["error"] == "invalid_token" assert "dpop" in json_body["detail"]["error_description"].lower() diff --git a/tests/test_dpop_authentication.py b/tests/test_dpop_authentication.py index ba43a01..5d1d779 100644 --- a/tests/test_dpop_authentication.py +++ b/tests/test_dpop_authentication.py @@ -8,7 +8,7 @@ from fastapi.testclient import TestClient from fastapi_plugin.fast_api_client import Auth0FastAPI -from fastapi_plugin.test_utils import ( +from test_utils import ( generate_dpop_proof, generate_dpop_bound_token ) @@ -55,7 +55,8 @@ async def test_route(claims=Depends(auth0.require_auth())): @pytest.mark.asyncio async def test_dpop_authentication_missing_dpop_header(httpx_mock: HTTPXMock): - """Test DPoP request fails when DPoP header is missing.""" + """Test DPoP request fails early when DPoP header is missing (before token validation).""" + # No setup_mocks needed - request fails before JWKS lookup access_token = await generate_dpop_bound_token( domain="auth0.local", @@ -84,7 +85,8 @@ async def test_route(claims=Depends(auth0.require_auth())): @pytest.mark.asyncio async def test_dpop_authentication_invalid_dpop_proof(httpx_mock: HTTPXMock): - """Test DPoP request fails with malformed DPoP proof.""" + """Test DPoP request fails early with malformed DPoP proof (before token validation).""" + # No setup_mocks needed - request fails before JWKS lookup access_token = await generate_dpop_bound_token( domain="auth0.local", diff --git a/tests/test_fast_api_client.py b/tests/test_fast_api_client.py deleted file mode 100644 index 3940021..0000000 --- a/tests/test_fast_api_client.py +++ /dev/null @@ -1,332 +0,0 @@ -import pytest -from fastapi import FastAPI, Depends -from pytest_httpx import HTTPXMock -from fastapi.testclient import TestClient - -from fastapi_plugin.fast_api_client import Auth0FastAPI -from fastapi_plugin.test_utils import generate_token, generate_dpop_proof, generate_dpop_bound_token, PUBLIC_DPOP_JWK - - -@pytest.mark.asyncio -async def test_should_return_400_when_no_token(): - """ - should return 400 when no token - """ - app = FastAPI() - auth0 = Auth0FastAPI(domain="auth0.local", audience="") - - @app.get("/test") - async def test_route(claims=Depends(auth0.require_auth())): - return "OK" - - client = TestClient(app) - - response = client.get("/test") - assert response.status_code == 400 - json_body = response.json() - assert json_body["detail"]["error"] == "invalid_request" - - -@pytest.mark.asyncio -async def test_should_return_200_when_valid_token(httpx_mock: HTTPXMock): - """ - This time we mock OIDC discovery & JWKS calls so that our code sees valid data. - - 1. We generate a legitimate token with a 'kid' that matches the JWKS we mock. - 2. We add responses for https://auth0.local/.well-known/openid-configuration and JWKS - so that the plugin accepts the token as valid. - """ - httpx_mock.add_response( - method="GET", - url="https://auth0.local/.well-known/openid-configuration", - json={ - "issuer": "https://auth0.local/", - "jwks_uri": "https://auth0.local/.well-known/jwks.json" - } - ) - httpx_mock.add_response( - method="GET", - url="https://auth0.local/.well-known/jwks.json", - json={ - "keys": [ - { - "kty": "RSA", - "kid": "TEST_KEY", - "n": "whYOFK2Ocbbpb_zVypi9SeKiNUqKQH0zTKN1-6fpCTu6ZalGI82s7XK3tan4dJt90ptUPKD2zvxqTzFNfx4HHHsrYCf2-FMLn1VTJfQazA2BvJqAwcpW1bqRUEty8tS_Yv4hRvWfQPcc2Gc3-_fQOOW57zVy-rNoJc744kb30NjQxdGp03J2S3GLQu7oKtSDDPooQHD38PEMNnITf0pj-KgDPjymkMGoJlO3aKppsjfbt_AH6GGdRghYRLOUwQU-h-ofWHR3lbYiKtXPn5dN24kiHy61e3VAQ9_YAZlwXC_99GGtw_NpghFAuM4P1JDn0DppJldy3PGFC0GfBCZASw", - "e": "AQAB", - "alg": "RS256", - "use": "sig" - }, - PUBLIC_DPOP_JWK - ] - } - ) - - access_token = await generate_token( - domain="auth0.local", - user_id="user_123", - audience="", - issuer="https://auth0.local/", # match the mocked OIDC discovery's issuer - iat=True, - exp=True - ) - - app = FastAPI() - auth0 = Auth0FastAPI(domain="auth0.local", audience="") - - @app.get("/test") - async def test_route(claims=Depends(auth0.require_auth())): - return "OK" - - client = TestClient(app) - - response = client.get( - "/test", - headers={"Authorization": f"Bearer {access_token}"} - ) - - assert response.status_code == 200 - assert response.text == '"OK"' - -@pytest.mark.asyncio -async def test_should_return_401_when_no_iss(httpx_mock: HTTPXMock): - """ - 2) Mocks OIDC & JWKS. Generates a token missing 'iss' (issuer=False). - Expects a 401 from the plugin. - """ - # 1) Mock endpoints - httpx_mock.add_response( - method="GET", - url="https://auth0.local/.well-known/openid-configuration", - json={ - "issuer": "https://auth0.local/", - "jwks_uri": "https://auth0.local/.well-known/jwks.json" - } - ) - httpx_mock.add_response( - method="GET", - url="https://auth0.local/.well-known/jwks.json", - json={ - "keys": [ - { - "kty": "RSA", - "kid": "TEST_KEY", - "n": "whYOFK2Ocbbpb_zVypi9SeKiNUqKQH0zTKN1-6fpCTu6ZalGI82s7XK3tan4dJt90ptUPKD2zvxqTzFNfx4HHHsrYCf2-FMLn1VTJfQazA2BvJqAwcpW1bqRUEty8tS_Yv4hRvWfQPcc2Gc3-_fQOOW57zVy-rNoJc744kb30NjQxdGp03J2S3GLQu7oKtSDDPooQHD38PEMNnITf0pj-KgDPjymkMGoJlO3aKppsjfbt_AH6GGdRghYRLOUwQU-h-ofWHR3lbYiKtXPn5dN24kiHy61e3VAQ9_YAZlwXC_99GGtw_NpghFAuM4P1JDn0DppJldy3PGFC0GfBCZASw", - "e": "AQAB", - "alg": "RS256", - "use": "sig" - }, - PUBLIC_DPOP_JWK - ] - } - ) - - # 2) Generate token missing 'iss' => issuer=False - access_token = await generate_token( - domain="auth0.local", - user_id="user_123", - audience="", - issuer=False, # no iss claim - iat=True, - exp=True - ) - - app = FastAPI() - auth0 = Auth0FastAPI(domain="auth0.local", audience="") - - @app.get("/test") - async def test_route(claims=Depends(auth0.require_auth())): - return "OK" - - client = TestClient(app) - - response = client.get( - "/test", - headers={"Authorization": f"Bearer {access_token}"} - ) - - # Typically we expect a 401 for 'missing iss' or something similar - assert response.status_code == 401 - - - -@pytest.mark.asyncio -async def test_should_return_401_when_invalid_iss(httpx_mock: HTTPXMock): - """ - 3) Mocks OIDC & JWKS. The token sets a different issuer from the plugin's domain, - we expect 401 'invalid issuer'. - """ - httpx_mock.add_response( - method="GET", - url="https://auth0.local/.well-known/openid-configuration", - json={ - "issuer": "https://auth0.local/", - "jwks_uri": "https://auth0.local/.well-known/jwks.json" - } - ) - httpx_mock.add_response( - method="GET", - url="https://auth0.local/.well-known/jwks.json", - json={ - "keys": [ - { - "kty": "RSA", - "kid": "TEST_KEY", - "n": "whYOFK2Ocbbpb_zVypi9SeKiNUqKQH0zTKN1-6fpCTu6ZalGI82s7XK3tan4dJt90ptUPKD2zvxqTzFNfx4HHHsrYCf2-FMLn1VTJfQazA2BvJqAwcpW1bqRUEty8tS_Yv4hRvWfQPcc2Gc3-_fQOOW57zVy-rNoJc744kb30NjQxdGp03J2S3GLQu7oKtSDDPooQHD38PEMNnITf0pj-KgDPjymkMGoJlO3aKppsjfbt_AH6GGdRghYRLOUwQU-h-ofWHR3lbYiKtXPn5dN24kiHy61e3VAQ9_YAZlwXC_99GGtw_NpghFAuM4P1JDn0DppJldy3PGFC0GfBCZASw", - "e": "AQAB", - "alg": "RS256", - "use": "sig" - }, - PUBLIC_DPOP_JWK - ] - } - ) - - # Generate a token with an 'iss' that doesn't match "https://auth0.local/" - access_token = await generate_token( - domain="auth0.local", # for the default if issuer=None, but we override below - user_id="user_123", - audience="", - issuer="https://invalid-issuer.local", # mismatch - iat=True, - exp=True - ) - - app = FastAPI() - auth0 = Auth0FastAPI(domain="auth0.local", audience="") - - @app.get("/test") - async def test_route(claims=Depends(auth0.require_auth())): - return "OK" - - client = TestClient(app) - response = client.get("/test", headers={"Authorization": f"Bearer {access_token}"}) - assert response.status_code == 401 - - - -@pytest.mark.asyncio -async def test_should_return_401_when_no_exp(httpx_mock: HTTPXMock): - """ - 4) Mocks OIDC & JWKS, generates token with exp=False => no 'exp' claim. - The plugin or underlying code sees a missing 'exp' => 401. - """ - httpx_mock.add_response( - method="GET", - url="https://auth0.local/.well-known/openid-configuration", - json={ - "issuer": "https://auth0.local/", - "jwks_uri": "https://auth0.local/.well-known/jwks.json" - } - ) - httpx_mock.add_response( - method="GET", - url="https://auth0.local/.well-known/jwks.json", - json={ - "keys": [ - { - "kty": "RSA", - "kid": "TEST_KEY", - "n": "whYOFK2Ocbbpb_zVypi9SeKiNUqKQH0zTKN1-6fpCTu6ZalGI82s7XK3tan4dJt90ptUPKD2zvxqTzFNfx4HHHsrYCf2-FMLn1VTJfQazA2BvJqAwcpW1bqRUEty8tS_Yv4hRvWfQPcc2Gc3-_fQOOW57zVy-rNoJc744kb30NjQxdGp03J2S3GLQu7oKtSDDPooQHD38PEMNnITf0pj-KgDPjymkMGoJlO3aKppsjfbt_AH6GGdRghYRLOUwQU-h-ofWHR3lbYiKtXPn5dN24kiHy61e3VAQ9_YAZlwXC_99GGtw_NpghFAuM4P1JDn0DppJldy3PGFC0GfBCZASw", - "e": "AQAB", - "alg": "RS256", - "use": "sig" - }, - PUBLIC_DPOP_JWK - ] - } - ) - - access_token = await generate_token( - domain="auth0.local", - user_id="user_123", - audience="", - issuer="https://auth0.local/", - iat=True, - exp=False # skip 'exp' claim - ) - - app = FastAPI() - auth0 = Auth0FastAPI(domain="auth0.local", audience="") - - @app.get("/test") - async def test_route(claims=Depends(auth0.require_auth())): - return "OK" - - client = TestClient(app) - response = client.get("/test", headers={"Authorization": f"Bearer {access_token}"}) - assert response.status_code == 401 - - -@pytest.mark.asyncio -async def test_should_return_403_when_invalid_scope(httpx_mock: HTTPXMock): - """ - 5) Mocks OIDC & JWKS. The token includes scope="invalid", - but the plugin's route requires "valid" scope => 403 'insufficient_scope'. - """ - httpx_mock.add_response( - method="GET", - url="https://auth0.local/.well-known/openid-configuration", - json={ - "issuer": "https://auth0.local/", - "jwks_uri": "https://auth0.local/.well-known/jwks.json" - } - ) - httpx_mock.add_response( - method="GET", - url="https://auth0.local/.well-known/jwks.json", - json={ - "keys": [ - { - "kty": "RSA", - "kid": "TEST_KEY", - "n": "whYOFK2Ocbbpb_zVypi9SeKiNUqKQH0zTKN1-6fpCTu6ZalGI82s7XK3tan4dJt90ptUPKD2zvxqTzFNfx4HHHsrYCf2-FMLn1VTJfQazA2BvJqAwcpW1bqRUEty8tS_Yv4hRvWfQPcc2Gc3-_fQOOW57zVy-rNoJc744kb30NjQxdGp03J2S3GLQu7oKtSDDPooQHD38PEMNnITf0pj-KgDPjymkMGoJlO3aKppsjfbt_AH6GGdRghYRLOUwQU-h-ofWHR3lbYiKtXPn5dN24kiHy61e3VAQ9_YAZlwXC_99GGtw_NpghFAuM4P1JDn0DppJldy3PGFC0GfBCZASw", - "e": "AQAB", - "alg": "RS256", - "use": "sig" - }, - PUBLIC_DPOP_JWK - ] - } - ) - - access_token = await generate_token( - domain="auth0.local", - user_id="user_123", - audience="", - issuer="https://auth0.local/", - iat=True, - exp=True, - claims={"scope": "invalid"} - ) - - app = FastAPI() - auth0 = Auth0FastAPI(domain="auth0.local", audience="") - - @app.get("/test") - async def test_route(claims=Depends(auth0.require_auth(scopes="valid"))): - """ - The plugin or underlying code checks if 'valid' is in token's 'scope'. - Since we only have 'invalid', expect 403 'insufficient_scope'. - """ - return "OK" - - client = TestClient(app) - response = client.get( - "/test", - headers={"Authorization": f"Bearer {access_token}"} - ) - assert response.status_code == 403 - -def test_auth0fastapi_accepts_client_id_and_secret(): - client_id = "test_client_id" - client_secret = "test_client_secret" - auth0 = Auth0FastAPI( - domain="auth0.local", - audience="test_audience", - client_id=client_id, - client_secret=client_secret - ) - options = auth0.api_client.options - assert options.client_id == client_id - assert options.client_secret == client_secret \ No newline at end of file diff --git a/tests/test_reverse_proxy.py b/tests/test_reverse_proxy.py index 23b44b5..23ff512 100644 --- a/tests/test_reverse_proxy.py +++ b/tests/test_reverse_proxy.py @@ -9,7 +9,7 @@ from fastapi_plugin.fast_api_client import Auth0FastAPI from fastapi_plugin.utils import get_canonical_url -from fastapi_plugin.test_utils import ( +from test_utils import ( generate_dpop_proof, generate_dpop_bound_token ) @@ -197,6 +197,7 @@ async def test_route(claims=Depends(auth0.require_auth())): # Should still work - trailing slash should be stripped assert response.status_code == 200 + assert response.json()["user"] == "user_123" @pytest.mark.asyncio @@ -239,11 +240,12 @@ async def test_route(claims=Depends(auth0.require_auth())): ) assert response.status_code == 200 - assert response.json()["user"] == "user_123" + json_response = response.json() + assert json_response["user"] == "user_123" @pytest.mark.asyncio -async def test_reverse_proxy_with_query_params(httpx_mock: HTTPXMock): +async def test_reverse_proxy_with_path_prefix(httpx_mock: HTTPXMock): """Test that query parameters are preserved in canonical URL.""" setup_mocks(httpx_mock) @@ -325,32 +327,7 @@ async def test_route(claims=Depends(auth0.require_auth())): ) assert response.status_code == 200 - - -def test_get_canonical_url_without_proxy(): - """Test get_canonical_url returns direct URL when trust_proxy=False.""" - app = FastAPI() - # trust_proxy defaults to False - - @app.get("/test") - async def test_route(request: Request): - canonical_url = get_canonical_url(request) - return {"url": canonical_url} - - client = TestClient(app) - response = client.get( - "/test", - headers={ - "X-Forwarded-Proto": "https", - "X-Forwarded-Host": "evil.com" # Should be ignored - } - ) - - assert response.status_code == 200 - url = response.json()["url"] - # Should use testserver, not evil.com - assert "testserver" in url - assert "evil.com" not in url + assert response.json()["user"] == "user_123" def test_get_canonical_url_with_proxy(): diff --git a/fastapi_plugin/test_utils.py b/tests/test_utils.py similarity index 100% rename from fastapi_plugin/test_utils.py rename to tests/test_utils.py From 5fcc91f5b6706fe38d8e49ef5ca9fd036e71e5b2 Mon Sep 17 00:00:00 2001 From: Snehil Kishore Date: Tue, 21 Oct 2025 19:55:13 +0530 Subject: [PATCH 5/9] docs: replace Sphinx docs workflow with markdown examples and update README --- .github/workflows/docs.yml | 59 ----------------------- EXAMPLES.md | 97 ++++++++++++++++++++++++++++++++++++++ README.md | 10 ++-- 3 files changed, 102 insertions(+), 64 deletions(-) delete mode 100644 .github/workflows/docs.yml create mode 100644 EXAMPLES.md diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml deleted file mode 100644 index 5ebecb1..0000000 --- a/.github/workflows/docs.yml +++ /dev/null @@ -1,59 +0,0 @@ -name: Build Documentation - -on: - push: - branches: - - main - -permissions: - contents: read - pages: write - id-token: write - -concurrency: - group: "documentation" - cancel-in-progress: true - -jobs: - build: - runs-on: ubuntu-latest - steps: - - name: Checkout - uses: actions/checkout@v5 - - - name: Setup Pages - uses: actions/configure-pages@v5 - - - name: Configure Python - uses: actions/setup-python@v6 - with: - python-version: "3.8" - - - name: Configure dependencies - run: | - pip install --user --upgrade pip - pip install --user pipx - pipx ensurepath - pipx install sphinx==5.3.0 - pipx inject sphinx pyjwt cryptography sphinx-mdinclude sphinx-rtd-theme sphinx-autodoc-typehints - - - name: Build documentation - run: | - sphinx-build ./docs/source ./docs/build --keep-going -n -a -b html - - - name: Upload artifact - uses: actions/upload-pages-artifact@v4 - with: - path: "./docs/build" - - deploy: - needs: build - runs-on: ubuntu-latest - environment: - name: "github-pages" - url: ${{ steps.deployment.outputs.page_url }} - - steps: - - id: deployment - name: Deploy to GitHub Pages - uses: actions/deploy-pages@v4 diff --git a/EXAMPLES.md b/EXAMPLES.md new file mode 100644 index 0000000..a014820 --- /dev/null +++ b/EXAMPLES.md @@ -0,0 +1,97 @@ +# Auth0 FastAPI-API Examples + +This document provides examples for using the `auth0-fastapi-api` package to secure your FastAPI applications with Auth0. + +## Bearer Authentication + +```python +from fastapi import FastAPI, Depends +from fastapi_plugin.fast_api_client import Auth0FastAPI + +app = FastAPI() +auth0 = Auth0FastAPI( + domain="your-domain.auth0.com", + audience="your-api-identifier" +) + +@app.get("/api/protected") +async def protected_route(claims=Depends(auth0.require_auth())): + return {"user_id": claims["sub"]} +``` + +```bash +curl -H "Authorization: Bearer YOUR_ACCESS_TOKEN" \ + http://localhost:8000/api/protected +``` + +## DPoP Authentication + +> [!NOTE] +> DPoP is in Early Access. Contact Auth0 support to enable it. + +**Mixed Mode (default)** - Accept both Bearer and DPoP: + +```python +auth0 = Auth0FastAPI( + domain="your-domain.auth0.com", + audience="your-api-identifier", + dpop_enabled=True, # Default + dpop_required=False # Default +) +``` + +```bash +# DPoP request +curl -H "Authorization: DPoP YOUR_ACCESS_TOKEN" \ + -H "DPoP: YOUR_DPOP_PROOF_JWT" \ + http://localhost:8000/api/protected +``` + +**DPoP Required Mode** - Reject Bearer tokens: + +```python +auth0 = Auth0FastAPI( + domain="your-domain.auth0.com", + audience="your-api-identifier", + dpop_required=True +) +``` + +**Bearer-Only Mode** - Disable DPoP: + +```python +auth0 = Auth0FastAPI( + domain="your-domain.auth0.com", + audience="your-api-identifier", + dpop_enabled=False +) +``` + +## Scope Validation + +```python +@app.get("/api/admin") +async def admin_route(claims=Depends(auth0.require_auth(scopes=["admin:access"]))): + return {"message": "Admin access granted"} + +@app.delete("/api/resource") +async def delete_route( + claims=Depends(auth0.require_auth(scopes=["delete:data", "admin:access"])) +): + """Requires BOTH scopes.""" + return {"message": "Resource deleted"} +``` + +## Reverse Proxy Support + +Enable X-Forwarded-* header trust for DPoP behind proxies: + +```python +app = FastAPI() +app.state.trust_proxy = True # Required for load balancers/CDN + +auth0 = Auth0FastAPI( + domain="your-domain.auth0.com", + audience="your-api-identifier" +) +``` diff --git a/README.md b/README.md index b2b7a78..972e605 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ ## Documentation - [QuickStart](https://auth0.com/docs/quickstart/webapp/fastapi)- our guide for adding Auth0 to your Fastapi app. -- [Examples](https://github.com/auth0/auth0-server-python/blob/main/packages/auth0_server_python/EXAMPLES.md) - examples for your different use cases. +- [Examples](https://github.com/auth0/auth0-fastapi-api/blob/main/EXAMPLES.md) - examples for your different use cases. - [Docs Site](https://auth0.com/docs) - explore our docs site and learn more about Auth0. ## Getting Started @@ -33,7 +33,7 @@ In your FastAPI application, create an instance of the `Auth0FastAPI` class. Sup - The `AUTH0_AUDIENCE` is the identifier of the API that is being called. You can find this in the API section of the Auth0 dashboard. ```python -from fastapi_plugin import Auth0FastAPI +from fastapi_plugin.fast_api_client import Auth0FastAPI # Create the Auth0 integration auth0 = Auth0FastAPI( @@ -79,7 +79,7 @@ When deploying behind a reverse proxy (nginx, AWS ALB, etc.), you **must** enabl ```python from fastapi import FastAPI -from fastapi_plugin import Auth0FastAPI +from fastapi_plugin.fast_api_client import Auth0FastAPI app = FastAPI() @@ -160,7 +160,7 @@ You can parse or validate these claims however you like in your application code In case you don't need to use the `claims` dictionary in your endpoint you can also use the dependency as part of the path decorator. For example: ```python -@app.get("/protected", dependencies=Depends(auth0.require_auth())) +@app.get("/protected", dependencies=[Depends(auth0.require_auth())]) async def protected(): # Protected endpoint return {"msg": "You need to have an access token to see this endpoint."} @@ -264,7 +264,7 @@ If you need to get an access token for an upstream idp via a connection, you can ```python import asyncio -from auth0_fastapi_api import Auth0FastAPI +from fastapi_plugin.fast_api_client import Auth0FastAPI async def main(): auth0 = Auth0FastAPI( From d82e5eb799f7a0a6558d96c6828218004f6d046d Mon Sep 17 00:00:00 2001 From: Snehil Kishore Date: Fri, 24 Oct 2025 17:33:28 +0530 Subject: [PATCH 6/9] refactor: added internal servererror execption --- fastapi_plugin/fast_api_client.py | 6 +++--- fastapi_plugin/utils.py | 35 +++++++++---------------------- 2 files changed, 13 insertions(+), 28 deletions(-) diff --git a/fastapi_plugin/fast_api_client.py b/fastapi_plugin/fast_api_client.py index 69bb458..fa23ffb 100644 --- a/fastapi_plugin/fast_api_client.py +++ b/fastapi_plugin/fast_api_client.py @@ -81,9 +81,9 @@ async def _dependency(request: Request) -> Dict: except Exception as e: # Handle any unexpected errors raise http_exception( - status_code=400, - error="invalid_request", - error_desc=str(e) + status_code=500, + error="internal_server_error", + error_desc="An unexpected error occurred during authentication" ) # If scopes needed, validate diff --git a/fastapi_plugin/utils.py b/fastapi_plugin/utils.py index 4c858d5..8a8298d 100644 --- a/fastapi_plugin/utils.py +++ b/fastapi_plugin/utils.py @@ -3,28 +3,6 @@ from starlette.responses import Response from urllib.parse import urlparse, urlunparse - - -def get_bearer_token(request: Request) -> Optional[str]: - """ - Parse 'Authorization: Bearer ' from the incoming request. - Returns the token string or None if missing/invalid. - - DEPRECATED: This function is no longer used in the main auth flow. - The ApiClient.verify_request() method handles token extraction automatically. - """ - auth_header = request.headers.get("authorization") - if not auth_header: - return None - - parts = auth_header.split() - if len(parts) == 2 and parts[0].lower() == "bearer": - return parts[1] - return None - - -# ===== ACTIVE UTILITY FUNCTIONS ===== - def http_exception( status_code: int, error: str, @@ -109,7 +87,9 @@ def get_canonical_url(request: Request) -> str: # X-Forwarded-Proto: Override scheme if present forwarded_proto = request.headers.get("x-forwarded-proto") if forwarded_proto: - scheme = forwarded_proto.strip().lower() + proto = forwarded_proto.strip().lower() + if proto in ("http", "https"): + scheme = proto # X-Forwarded-Host: Override host, handling multiple proxies forwarded_host = request.headers.get("x-forwarded-host") @@ -119,8 +99,13 @@ def get_canonical_url(request: Request) -> str: # X-Forwarded-Prefix: Prepend path prefix forwarded_prefix = request.headers.get("x-forwarded-prefix", "").strip() - if forwarded_prefix: - # Remove trailing slash from prefix to avoid double slashes + if forwarded_prefix and not any([ + ".." in forwarded_prefix, forwarded_prefix.startswith("//"), + ":" in forwarded_prefix, "\x00" in forwarded_prefix, + "%2e%2e" in forwarded_prefix.lower() + ]): + if not forwarded_prefix.startswith("/"): + forwarded_prefix = "/" + forwarded_prefix path = forwarded_prefix.rstrip("/") + path canonical_url = urlunparse(( From dc53437252cf8b09503033e0858a222252557267 Mon Sep 17 00:00:00 2001 From: Snehil Kishore Date: Tue, 4 Nov 2025 20:27:55 +0530 Subject: [PATCH 7/9] refactor:addressing feedbacks --- README.md | 2 +- fastapi_plugin/utils.py | 1 + tests/test_reverse_proxy.py | 44 ------------------------------------- 3 files changed, 2 insertions(+), 45 deletions(-) diff --git a/README.md b/README.md index d218020..593340c 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,7 @@ auth0 = Auth0FastAPI( domain="", audience="", dpop_enabled=True, # Enable DPoP support - dpop_required=False # Allow Bearer tokens too + dpop_required=False # Allow Bearer tokens too (mixed mode) ) # DPoP-only mode - rejects Bearer tokens diff --git a/fastapi_plugin/utils.py b/fastapi_plugin/utils.py index 8a8298d..dbec367 100644 --- a/fastapi_plugin/utils.py +++ b/fastapi_plugin/utils.py @@ -75,6 +75,7 @@ def get_canonical_url(request: Request) -> str: Canonical URL string matching what the client used """ + # Start with the direct connection URL parsed = urlparse(str(request.url)) # Default to direct request values diff --git a/tests/test_reverse_proxy.py b/tests/test_reverse_proxy.py index 23ff512..1f2e1a4 100644 --- a/tests/test_reverse_proxy.py +++ b/tests/test_reverse_proxy.py @@ -243,50 +243,6 @@ async def test_route(claims=Depends(auth0.require_auth())): json_response = response.json() assert json_response["user"] == "user_123" - -@pytest.mark.asyncio -async def test_reverse_proxy_with_path_prefix(httpx_mock: HTTPXMock): - """Test that query parameters are preserved in canonical URL.""" - setup_mocks(httpx_mock) - - access_token = await generate_dpop_bound_token( - domain="auth0.local", - user_id="user_123", - audience="", - issuer="https://auth0.local/" - ) - - dpop_proof = await generate_dpop_proof( - http_method="GET", - http_url="https://api.example.com/test?page=1&limit=10", - access_token=access_token - ) - - app = FastAPI() - app.state.trust_proxy = True - - auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) - - @app.get("/test") - async def test_route(claims=Depends(auth0.require_auth())): - return {"user": claims["sub"]} - - client = TestClient(app) - - response = client.get( - "/test?page=1&limit=10", - headers={ - "Authorization": f"DPoP {access_token}", - "DPoP": dpop_proof, - "X-Forwarded-Proto": "https", - "X-Forwarded-Host": "api.example.com" - } - ) - - assert response.status_code == 200 - assert response.json()["user"] == "user_123" - - @pytest.mark.asyncio async def test_reverse_proxy_partial_headers(httpx_mock: HTTPXMock): """Test with only X-Forwarded-Proto (partial headers).""" From 589561b88267e19bd8311601b951d8ab9619178d Mon Sep 17 00:00:00 2001 From: Snehil Kishore Date: Tue, 4 Nov 2025 23:45:10 +0530 Subject: [PATCH 8/9] refactor: remove tests from distribution --- pyproject.toml | 5 ++--- tests/__init__.py | 5 +++++ 2 files changed, 7 insertions(+), 3 deletions(-) create mode 100644 tests/__init__.py diff --git a/pyproject.toml b/pyproject.toml index f4433ce..c02c4ed 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,9 +7,8 @@ license = "MIT" readme = "README.md" packages = [ - { include = "fastapi_plugin" }, - { include = "tests", format = "sdist" } -] + { include = "fastapi_plugin" } + ] [tool.poetry.dependencies] python = "^3.9" diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..faea64d --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,5 @@ +""" +Auth0 FastAPI API Test Suite + +Test package for DPoP and Bearer authentication flows. +""" From 277d5ba033dcc16b227119c17dd6edde9e24a0f0 Mon Sep 17 00:00:00 2001 From: Snehil Kishore Date: Wed, 5 Nov 2025 00:01:50 +0530 Subject: [PATCH 9/9] refactor: update test imports to use relative paths --- tests/conftest.py | 2 +- tests/test_bearer_token_validation.py | 4 ++-- tests/test_configuration_modes.py | 4 ++-- tests/test_dpop_authentication.py | 4 ++-- tests/test_reverse_proxy.py | 4 ++-- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index ab2ac67..553793a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,5 @@ from pytest_httpx import HTTPXMock -from test_utils import PUBLIC_DPOP_JWK +from .test_utils import PUBLIC_DPOP_JWK def setup_mocks(httpx_mock: HTTPXMock): diff --git a/tests/test_bearer_token_validation.py b/tests/test_bearer_token_validation.py index 0855ed1..eaec640 100644 --- a/tests/test_bearer_token_validation.py +++ b/tests/test_bearer_token_validation.py @@ -8,8 +8,8 @@ from fastapi.testclient import TestClient from fastapi_plugin.fast_api_client import Auth0FastAPI -from test_utils import generate_token -from conftest import setup_mocks +from .test_utils import generate_token +from .conftest import setup_mocks # ============================================================================= diff --git a/tests/test_configuration_modes.py b/tests/test_configuration_modes.py index 2e22497..2b0cef4 100644 --- a/tests/test_configuration_modes.py +++ b/tests/test_configuration_modes.py @@ -8,12 +8,12 @@ from fastapi.testclient import TestClient from fastapi_plugin.fast_api_client import Auth0FastAPI -from test_utils import ( +from .test_utils import ( generate_token, generate_dpop_proof, generate_dpop_bound_token, ) -from conftest import setup_mocks +from .conftest import setup_mocks # ============================================================================= diff --git a/tests/test_dpop_authentication.py b/tests/test_dpop_authentication.py index 5d1d779..2f69d6e 100644 --- a/tests/test_dpop_authentication.py +++ b/tests/test_dpop_authentication.py @@ -8,11 +8,11 @@ from fastapi.testclient import TestClient from fastapi_plugin.fast_api_client import Auth0FastAPI -from test_utils import ( +from .test_utils import ( generate_dpop_proof, generate_dpop_bound_token ) -from conftest import setup_mocks +from .conftest import setup_mocks @pytest.mark.asyncio diff --git a/tests/test_reverse_proxy.py b/tests/test_reverse_proxy.py index 1f2e1a4..87c6037 100644 --- a/tests/test_reverse_proxy.py +++ b/tests/test_reverse_proxy.py @@ -9,11 +9,11 @@ from fastapi_plugin.fast_api_client import Auth0FastAPI from fastapi_plugin.utils import get_canonical_url -from test_utils import ( +from .test_utils import ( generate_dpop_proof, generate_dpop_bound_token ) -from conftest import setup_mocks +from .conftest import setup_mocks @pytest.mark.asyncio