diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml deleted file mode 100644 index 5ebecb1..0000000 --- a/.github/workflows/docs.yml +++ /dev/null @@ -1,59 +0,0 @@ -name: Build Documentation - -on: - push: - branches: - - main - -permissions: - contents: read - pages: write - id-token: write - -concurrency: - group: "documentation" - cancel-in-progress: true - -jobs: - build: - runs-on: ubuntu-latest - steps: - - name: Checkout - uses: actions/checkout@v5 - - - name: Setup Pages - uses: actions/configure-pages@v5 - - - name: Configure Python - uses: actions/setup-python@v6 - with: - python-version: "3.8" - - - name: Configure dependencies - run: | - pip install --user --upgrade pip - pip install --user pipx - pipx ensurepath - pipx install sphinx==5.3.0 - pipx inject sphinx pyjwt cryptography sphinx-mdinclude sphinx-rtd-theme sphinx-autodoc-typehints - - - name: Build documentation - run: | - sphinx-build ./docs/source ./docs/build --keep-going -n -a -b html - - - name: Upload artifact - uses: actions/upload-pages-artifact@v4 - with: - path: "./docs/build" - - deploy: - needs: build - runs-on: ubuntu-latest - environment: - name: "github-pages" - url: ${{ steps.deployment.outputs.page_url }} - - steps: - - id: deployment - name: Deploy to GitHub Pages - uses: actions/deploy-pages@v4 diff --git a/.gitignore b/.gitignore index fe90143..4825a42 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,7 @@ server.py setup.py test.py test-script.py +integration_test*.py .coverage coverage.xml diff --git a/EXAMPLES.md b/EXAMPLES.md new file mode 100644 index 0000000..a014820 --- /dev/null +++ b/EXAMPLES.md @@ -0,0 +1,97 @@ +# Auth0 FastAPI-API Examples + +This document provides examples for using the `auth0-fastapi-api` package to secure your FastAPI applications with Auth0. + +## Bearer Authentication + +```python +from fastapi import FastAPI, Depends +from fastapi_plugin.fast_api_client import Auth0FastAPI + +app = FastAPI() +auth0 = Auth0FastAPI( + domain="your-domain.auth0.com", + audience="your-api-identifier" +) + +@app.get("/api/protected") +async def protected_route(claims=Depends(auth0.require_auth())): + return {"user_id": claims["sub"]} +``` + +```bash +curl -H "Authorization: Bearer YOUR_ACCESS_TOKEN" \ + http://localhost:8000/api/protected +``` + +## DPoP Authentication + +> [!NOTE] +> DPoP is in Early Access. Contact Auth0 support to enable it. + +**Mixed Mode (default)** - Accept both Bearer and DPoP: + +```python +auth0 = Auth0FastAPI( + domain="your-domain.auth0.com", + audience="your-api-identifier", + dpop_enabled=True, # Default + dpop_required=False # Default +) +``` + +```bash +# DPoP request +curl -H "Authorization: DPoP YOUR_ACCESS_TOKEN" \ + -H "DPoP: YOUR_DPOP_PROOF_JWT" \ + http://localhost:8000/api/protected +``` + +**DPoP Required Mode** - Reject Bearer tokens: + +```python +auth0 = Auth0FastAPI( + domain="your-domain.auth0.com", + audience="your-api-identifier", + dpop_required=True +) +``` + +**Bearer-Only Mode** - Disable DPoP: + +```python +auth0 = Auth0FastAPI( + domain="your-domain.auth0.com", + audience="your-api-identifier", + dpop_enabled=False +) +``` + +## Scope Validation + +```python +@app.get("/api/admin") +async def admin_route(claims=Depends(auth0.require_auth(scopes=["admin:access"]))): + return {"message": "Admin access granted"} + +@app.delete("/api/resource") +async def delete_route( + claims=Depends(auth0.require_auth(scopes=["delete:data", "admin:access"])) +): + """Requires BOTH scopes.""" + return {"message": "Resource deleted"} +``` + +## Reverse Proxy Support + +Enable X-Forwarded-* header trust for DPoP behind proxies: + +```python +app = FastAPI() +app.state.trust_proxy = True # Required for load balancers/CDN + +auth0 = Auth0FastAPI( + domain="your-domain.auth0.com", + audience="your-api-identifier" +) +``` diff --git a/README.md b/README.md index 574903c..593340c 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ ## Documentation - [QuickStart](https://auth0.com/docs/quickstart/webapp/fastapi)- our guide for adding Auth0 to your Fastapi app. -- [Examples](https://github.com/auth0/auth0-server-python/blob/main/packages/auth0_server_python/EXAMPLES.md) - examples for your different use cases. +- [Examples](https://github.com/auth0/auth0-fastapi-api/blob/main/EXAMPLES.md) - examples for your different use cases. - [Docs Site](https://auth0.com/docs) - explore our docs site and learn more about Auth0. ## Getting Started @@ -34,7 +34,7 @@ In your FastAPI application, create an instance of the `Auth0FastAPI` class. Sup - The `AUTH0_AUDIENCE` is the identifier of the API that is being called. You can find this in the API section of the Auth0 dashboard. ```python -from fastapi_plugin import Auth0FastAPI +from fastapi_plugin.fast_api_client import Auth0FastAPI # Create the Auth0 integration auth0 = Auth0FastAPI( @@ -43,9 +43,78 @@ auth0 = Auth0FastAPI( ) ``` +#### DPoP Configuration (Optional) + +The SDK supports DPoP (Demonstrating Proof-of-Possession) for enhanced security: + +```python +# Mixed mode - accepts both Bearer and DPoP (recommended for migration) +auth0 = Auth0FastAPI( + domain="", + audience="", + dpop_enabled=True, # Enable DPoP support + dpop_required=False # Allow Bearer tokens too (mixed mode) +) + +# DPoP-only mode - rejects Bearer tokens +auth0 = Auth0FastAPI( + domain="", + audience="", + dpop_enabled=True, + dpop_required=True # Only accept DPoP tokens +) + +# Custom DPoP timing configuration +auth0 = Auth0FastAPI( + domain="", + audience="", + dpop_enabled=True, + dpop_iat_leeway=30, # Clock skew tolerance (seconds) + dpop_iat_offset=300 # Maximum DPoP proof age (seconds) +) +``` + +#### Reverse Proxy Configuration + +When deploying behind a reverse proxy (nginx, AWS ALB, etc.), you **must** enable proxy trust for DPoP validation to work correctly: + +```python +from fastapi import FastAPI +from fastapi_plugin.fast_api_client import Auth0FastAPI + +app = FastAPI() + +# CRITICAL: Enable proxy trust when behind a reverse proxy +app.state.trust_proxy = True + +auth0 = Auth0FastAPI( + domain="", + audience="", + dpop_enabled=True +) +``` + +**Why this matters:** +- DPoP validation requires matching the exact URL the client used +- Behind a proxy, your app sees internal URLs (e.g., `http://localhost:8000/api`) +- The client's DPoP proof contains the public URL (e.g., `https://api.example.com/api`) +- Without `trust_proxy=True`, validation will fail + +**Note:** Only enable `trust_proxy=True` when your app is actually behind a trusted reverse proxy. Never enable this for direct internet-facing deployments, as it would allow header injection attacks. + +**Nginx Configuration Example:** +```nginx +location /api { + proxy_pass http://backend:8000; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_set_header X-Forwarded-Host $host; + proxy_set_header X-Forwarded-Prefix /api; +} +``` + ### 3. Protecting API Routes -To protect a FastAPI route, use the `require_auth(...)` dependency. Any incoming requests must include a valid Bearer token (JWT) in the `Authorization` header, or they will receive an error response (e.g., 401 Unauthorized). +To protect a FastAPI route, use the `require_auth(...)` dependency. The SDK automatically detects and validates both Bearer and DPoP authentication schemes. Any incoming requests must include a valid token in the `Authorization` header, or they will receive an error response (e.g., 401 Unauthorized). ```python @app.get("/protected-api") @@ -59,9 +128,18 @@ async def protected( #### How It Works +**Bearer Authentication:** 1. The user sends a request with `Authorization: Bearer `. -2. The SDK parses the token, checks signature via Auth0’s JWKS, validates standard claims like `iss`, `aud`, `exp`, etc. -3. If valid, the route receives the decoded claims as a Python dict (e.g. `{"sub": "user123", "scope": "read:stuff", ...}`). +2. The SDK parses the token, checks signature via Auth0's JWKS, validates standard claims like `iss`, `aud`, `exp`, etc. +3. If valid, the route receives the decoded claims as a Python dict. + +**DPoP Authentication:** +1. The user sends a request with `Authorization: DPoP ` and `DPoP: ` headers. +2. The SDK validates both the access token and the DPoP proof, including cryptographic binding. +3. DPoP provides enhanced security through proof-of-possession of private keys. +4. If valid, the route receives the decoded claims as a Python dict. + +The SDK automatically detects which authentication scheme is being used and validates accordingly. > [!IMPORTANT] > This method protects API endpoints using bearer tokens. It does not **create or manage user sessions in server-side rendering scenarios**. For session-based usage, consider a separate library or approach. @@ -83,7 +161,7 @@ You can parse or validate these claims however you like in your application code In case you don't need to use the `claims` dictionary in your endpoint you can also use the dependency as part of the path decorator. For example: ```python -@app.get("/protected", dependencies=Depends(auth0.require_auth())) +@app.get("/protected", dependencies=[Depends(auth0.require_auth())]) async def protected(): # Protected endpoint return {"msg": "You need to have an access token to see this endpoint."} @@ -135,14 +213,59 @@ def test_public_route(): -### 5. Get an access token for a connection +### 5. DPoP Authentication + +> **Note**: DPoP is currently in [Early Access](https://auth0.com/docs/troubleshoot/product-lifecycle/product-release-stages). Contact Auth0 support to enable it for your tenant. + +DPoP (Demonstrating Proof-of-Possession) provides enhanced security by binding access tokens to cryptographic proof of possession. + +#### Client Requirements + +To use DPoP authentication, clients must: + +1. **Generate an ES256 key pair** for DPoP proof signing +2. **Include two headers** in requests: + - `Authorization: DPoP ` - The DPoP-bound access token + - `DPoP: ` - The DPoP proof JWT + +#### Example DPoP Request + +```http +GET /protected-api HTTP/1.1 +Host: api.example.com +Authorization: DPoP eyJ0eXAiOiJKV1Q... +DPoP: eyJ0eXAiOiJkcG9wK2p3dC... +``` + +#### Migration Strategy + +Use mixed mode for gradual migration: + +```python +# Start with mixed mode to support both Bearer and DPoP +auth0 = Auth0FastAPI( + domain="", + audience="", + dpop_enabled=True, + dpop_required=False # Allows both Bearer and DPoP +) + +# Later, enforce DPoP-only +auth0 = Auth0FastAPI( + domain="", + audience="", + dpop_required=True # Rejects Bearer tokens +) +``` + +### 6. Get an access token for a connection If you need to get an access token for an upstream idp via a connection, you can use the `get_access_token_for_connection` method on the underlying api_client: ```python import asyncio -from auth0_fastapi_api import Auth0FastAPI +from fastapi_plugin.fast_api_client import Auth0FastAPI async def main(): auth0 = Auth0FastAPI( diff --git a/fastapi_plugin/fast_api_client.py b/fastapi_plugin/fast_api_client.py index 3da89eb..fa23ffb 100644 --- a/fastapi_plugin/fast_api_client.py +++ b/fastapi_plugin/fast_api_client.py @@ -2,9 +2,9 @@ from fastapi import Request, HTTPException from starlette.responses import Response -from .utils import get_bearer_token, validate_scopes, http_exception +from .utils import validate_scopes, http_exception, get_canonical_url -from auth0_api_python.api_client import ApiClient, ApiClientOptions, VerifyAccessTokenError +from auth0_api_python.api_client import ApiClient, ApiClientOptions, BaseAuthError class Auth0FastAPI: @@ -13,11 +13,25 @@ class Auth0FastAPI: mirroring the concept from TypeScript's Fastify plugin. """ - def __init__(self, domain: str, audience: str, client_id=None, client_secret=None, custom_fetch=None): + def __init__( + self, + domain: str, + audience: str, + client_id=None, + client_secret=None, + custom_fetch=None, + dpop_enabled=True, + dpop_required=False, + dpop_iat_leeway=30, + dpop_iat_offset=300): """ domain: Your Auth0 domain (like 'my-tenant.us.auth0.com') audience: API identifier from the Auth0 Dashboard custom_fetch: optional HTTP fetch override for the underlying SDK + dpop_enabled: Enable DPoP support (default: True) + dpop_required: Require DPoP authentication, reject Bearer tokens (default: False) + dpop_iat_leeway: Clock skew tolerance for DPoP proof iat claim in seconds (default: 30) + dpop_iat_offset: Maximum DPoP proof age in seconds (default: 300) """ if not domain: raise ValueError("domain is required.") @@ -25,7 +39,17 @@ def __init__(self, domain: str, audience: str, client_id=None, client_secret=Non raise ValueError("audience is required.") self.api_client = ApiClient( - ApiClientOptions(domain=domain, audience=audience, client_id=client_id, client_secret=client_secret, custom_fetch=custom_fetch) + ApiClientOptions( + domain=domain, + audience=audience, + client_id=client_id, + client_secret=client_secret, + custom_fetch=custom_fetch, + dpop_enabled=dpop_enabled, + dpop_required=dpop_required, + dpop_iat_leeway=dpop_iat_leeway, + dpop_iat_offset=dpop_iat_offset + ) ) def require_auth( @@ -34,28 +58,32 @@ def require_auth( ): """ Returns an async FastAPI dependency that: - 1) Extracts a 'Bearer' token from the Authorization header - 2) Verifies it with auth0-api-python + 1) Uses the unified verify_request() method to auto-detect Bearer or DPoP authentication + 2) Verifies the request with auth0-api-python including full HTTP context 3) If 'scopes' is provided, checks for them in the token's 'scope' claim 4) Raises HTTPException on error 5) On success, returns the decoded claims """ async def _dependency(request: Request) -> Dict: - token = get_bearer_token(request) - if not token: - # No Authorization provided + try: + claims = await self.api_client.verify_request( + headers=dict(request.headers), + http_method=request.method, + http_url=get_canonical_url(request) + ) + except BaseAuthError as e: raise http_exception( - 400, - "invalid_request", - "No Authorization provided" + status_code=e.get_status_code(), + error=e.get_error_code(), + error_desc=e.get_error_description(), + headers=e.get_headers() ) - try: - claims = await self.api_client.verify_access_token(access_token=token) - except VerifyAccessTokenError as e: + except Exception as e: + # Handle any unexpected errors raise http_exception( - status_code=401, - error="invalid_token", - error_desc=str(e) + status_code=500, + error="internal_server_error", + error_desc="An unexpected error occurred during authentication" ) # If scopes needed, validate diff --git a/fastapi_plugin/test_utils.py b/fastapi_plugin/test_utils.py deleted file mode 100644 index 01199de..0000000 --- a/fastapi_plugin/test_utils.py +++ /dev/null @@ -1,74 +0,0 @@ -import time -from typing import Optional, Dict, Any, Union -from authlib.jose import JsonWebKey, jwt - - -# A private RSA JWK for test usage. - -PRIVATE_JWK = { - "kty": "RSA", - "alg": "RS256", - "use": "sig", - "kid": "TEST_KEY", - "n": "whYOFK2Ocbbpb_zVypi9SeKiNUqKQH0zTKN1-6fpCTu6ZalGI82s7XK3tan4dJt90ptUPKD2zvxqTzFNfx4HHHsrYCf2-FMLn1VTJfQazA2BvJqAwcpW1bqRUEty8tS_Yv4hRvWfQPcc2Gc3-_fQOOW57zVy-rNoJc744kb30NjQxdGp03J2S3GLQu7oKtSDDPooQHD38PEMNnITf0pj-KgDPjymkMGoJlO3aKppsjfbt_AH6GGdRghYRLOUwQU-h-ofWHR3lbYiKtXPn5dN24kiHy61e3VAQ9_YAZlwXC_99GGtw_NpghFAuM4P1JDn0DppJldy3PGFC0GfBCZASw", - "e": "AQAB", - "d": "VuVE_KEP6323WjpbBdAIv7HGahGrgGANvbxZsIhm34lsVOPK0XDegZkhAybMZHjRhp-gwVxX5ChC-J3cUpOBH5FNxElgW6HizD2Jcq6t6LoLYgPSrfEHm71iHg8JsgrqfUnGYFzMJmv88C6WdCtpgG_qJV1K00_Ly1G1QKoBffEs-v4fAMJrCbUdCz1qWto-PU-HLMEo-krfEpGgcmtZeRlDADh8cETMQlgQfQX2VWq_aAP4a1SXmo-j0cvRU4W5Fj0RVwNesIpetX2ZFz4p_JmB5sWFEj_fC7h5z2lq-6Bme2T3BHtXkIxoBW0_pYVnASC8P2puO5FnVxDmWuHDYQ", - "p": "07rgXd_tLUhVRF_g1OaqRZh5uZ8hiLWUSU0vu9coOaQcatSqjQlIwLW8UdKv_38GrmpIfgcEVQjzq6rFBowUm9zWBO9Eq6enpasYJBOeD8EMeDK-nsST57HjPVOCvoVC5ZX-cozPXna3iRNZ1TVYBY3smn0IaxysIK-zxESf4pM", - "q": "6qrE9TPhCS5iNR7QrKThunLu6t4H_8CkYRPLbvOIt2MgZyPLiZCsvdkTVSOX76QQEXt7Y0nTNua69q3K3Jhf-YOkPSJsWTxgrfOnjoDvRKzbW3OExIMm7D99fVBODuNWinjYgUwGSqGAsb_3TKhtI-Gr5ls3fn6B6oEjVL0dpmk", - "dp": "mHqjrFdgelT2OyiFRS3dAAPf3cLxJoAGC4gP0UoQyPocEP-Y17sQ7t-ygIanguubBy65iDFLeGXa_g0cmSt2iAzRAHrDzI8P1-pQl2KdWSEg9ssspjBRh_F_AiJLLSPRWn_b3-jySkhawtfxwO8Kte1QsK1My765Y0zFvJnjPws", - "dq": "KmjaV4YcsVAUp4z-IXVa5htHWmLuByaFjpXJOjABEUN0467wZdgjn9vPRp-8Ia8AyGgMkJES_uUL_PDDrMJM9gb4c6P4-NeUkVtreLGMjFjA-_IQmIMrUZ7XywHsWXx0c2oLlrJqoKo3W-hZhR0bPFTYgDUT_mRWjk7wV6wl46E", - "qi": "iYltkV_4PmQDfZfGFpzn2UtYEKyhy-9t3Vy8Mw2VHLAADKGwJvVK5ficQAr2atIF1-agXY2bd6KV-w52zR8rmZfTr0gobzYIyqHczOm13t7uXJv2WygY7QEC2OGjdxa2Fr9RnvS99ozMa5nomZBqTqT7z5QV33czjPRCjvg6FcE", -} - - -async def generate_token( - domain: str, - user_id: str, - audience: Optional[str] = None, - issuer: Union[str, bool, None] = None, - iat: bool = True, - exp: bool = True, - claims: Optional[Dict[str, Any]] = None, - expiration_time: int = 3600, -) -> str: - """ - Generates a real RS256-signed JWT using the private key above. - - Args: - domain: The Auth0 domain (used if issuer is not False). - user_id: The 'sub' claim in the token. - audience: The 'aud' claim in the token. If omitted, 'aud' won't be included. - issuer: - - If a string, it's placed in 'iss' claim. - - If None, default is f"https://{domain}/". - - If False, skip 'iss' claim entirely. - iat: Whether to set the 'iat' (issued at) claim. If False, skip it. - exp: Whether to set the 'exp' claim. If False, skip it. - claims: Additional custom claims to merge into the token. - expiration_time: If exp is True, how many seconds from now until expiration. - - Returns: - A RS256-signed JWT string. - - """ - token_claims = dict(claims or {}) - token_claims.setdefault("sub", user_id) - - if iat: - token_claims["iat"] = int(time.time()) - - if exp: - token_claims["exp"] = int(time.time()) + expiration_time - - if issuer is not False: - token_claims["iss"] = issuer if isinstance(issuer, str) else f"https://{domain}/" - - if audience: - token_claims["aud"] = audience - - - key = JsonWebKey.import_key(PRIVATE_JWK) - - header = {"alg": "RS256", "kid": PRIVATE_JWK["kid"]} - token = jwt.encode(header, token_claims, key) - return token diff --git a/fastapi_plugin/utils.py b/fastapi_plugin/utils.py index dda119a..dbec367 100644 --- a/fastapi_plugin/utils.py +++ b/fastapi_plugin/utils.py @@ -1,31 +1,25 @@ from typing import Optional, List, Union, Dict from fastapi import Request, HTTPException from starlette.responses import Response - - - -def get_bearer_token(request: Request) -> Optional[str]: - """ - Parse 'Authorization: Bearer ' from the incoming request. - Returns the token string or None if missing/invalid. - """ - auth_header = request.headers.get("authorization") - if not auth_header: - return None - - parts = auth_header.split() - if len(parts) == 2 and parts[0].lower() == "bearer": - return parts[1] - return None +from urllib.parse import urlparse, urlunparse def http_exception( status_code: int, error: str, - error_desc: str + error_desc: str, + headers: Optional[Dict[str, str]] = None ) -> HTTPException: """ - Construct an HTTPException with a 'WWW-Authenticate' header - mimicking the style from the TypeScript example. + Construct an HTTPException with appropriate headers. + + Args: + status_code: HTTP status code + error: OAuth2/DPoP error code + error_desc: Human-readable error description + headers: Optional headers dict (e.g., from BaseAuthError.get_headers()) + + Note: When used with BaseAuthError, pass the headers from get_headers() + to ensure correct WWW-Authenticate challenges are included. """ return HTTPException( status_code=status_code, @@ -33,11 +27,99 @@ def http_exception( "error": error, "error_description": error_desc }, - headers={ - "WWW-Authenticate": f'Bearer error="{error}", error_description="{error_desc}"' - } + headers=headers or {} ) +def _should_trust_proxy(request: Request) -> bool: + """ + Determines if X-Forwarded-* headers should be trusted. + + Returns: + bool: True if proxy headers should be trusted + """ + # Check if the app has explicitly enabled proxy trust + try: + return getattr(request.app.state, 'trust_proxy', False) + except AttributeError: + # If app.state doesn't exist or trust_proxy isn't set, don't trust + return False + +def _parse_forwarded_host(forwarded_host: Optional[str]) -> Optional[str]: + """ + Parses X-Forwarded-Host header, handling multiple comma-separated values. + + Args: + forwarded_host: Value of X-Forwarded-Host header + + Returns: + The first host value, or None if empty + """ + if not forwarded_host: + return None + + # Handle comma-separated values (multiple proxies) + comma_index = forwarded_host.find(',') + if comma_index != -1: + forwarded_host = forwarded_host[:comma_index].rstrip() + + return forwarded_host.strip() or None + +def get_canonical_url(request: Request) -> str: + """ + Constructs the canonical URL for DPoP validation, securely handling reverse proxy headers. + + Args: + request: FastAPI/Starlette Request object + + Returns: + Canonical URL string matching what the client used + + """ + # Start with the direct connection URL + parsed = urlparse(str(request.url)) + + # Default to direct request values + scheme = parsed.scheme + netloc = parsed.netloc + path = parsed.path + + # Only process X-Forwarded headers if proxy is trusted + if _should_trust_proxy(request): + # X-Forwarded-Proto: Override scheme if present + forwarded_proto = request.headers.get("x-forwarded-proto") + if forwarded_proto: + proto = forwarded_proto.strip().lower() + if proto in ("http", "https"): + scheme = proto + + # X-Forwarded-Host: Override host, handling multiple proxies + forwarded_host = request.headers.get("x-forwarded-host") + parsed_host = _parse_forwarded_host(forwarded_host) + if parsed_host: + netloc = parsed_host + + # X-Forwarded-Prefix: Prepend path prefix + forwarded_prefix = request.headers.get("x-forwarded-prefix", "").strip() + if forwarded_prefix and not any([ + ".." in forwarded_prefix, forwarded_prefix.startswith("//"), + ":" in forwarded_prefix, "\x00" in forwarded_prefix, + "%2e%2e" in forwarded_prefix.lower() + ]): + if not forwarded_prefix.startswith("/"): + forwarded_prefix = "/" + forwarded_prefix + path = forwarded_prefix.rstrip("/") + path + + canonical_url = urlunparse(( + scheme, + netloc, + path, + parsed.params, + parsed.query, + "" # No fragment in DPoP htu claim + )) + + return canonical_url + def validate_scopes(claims: Dict, required_scopes: List[str]) -> bool: """ Verifies the 'scope' claim (a space-delimited string) includes all required_scopes. diff --git a/pyproject.toml b/pyproject.toml index 55beb68..c02c4ed 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,7 +8,7 @@ readme = "README.md" packages = [ { include = "fastapi_plugin" } -] + ] [tool.poetry.dependencies] python = "^3.9" diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..faea64d --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,5 @@ +""" +Auth0 FastAPI API Test Suite + +Test package for DPoP and Bearer authentication flows. +""" diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..553793a --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,31 @@ +from pytest_httpx import HTTPXMock +from .test_utils import PUBLIC_DPOP_JWK + + +def setup_mocks(httpx_mock: HTTPXMock): + """Setup common OIDC and JWKS mocks for all tests.""" + httpx_mock.add_response( + method="GET", + url="https://auth0.local/.well-known/openid-configuration", + json={ + "issuer": "https://auth0.local/", + "jwks_uri": "https://auth0.local/.well-known/jwks.json" + } + ) + httpx_mock.add_response( + method="GET", + url="https://auth0.local/.well-known/jwks.json", + json={ + "keys": [ + { + "kty": "RSA", + "kid": "TEST_KEY", + "n": "whYOFK2Ocbbpb_zVypi9SeKiNUqKQH0zTKN1-6fpCTu6ZalGI82s7XK3tan4dJt90ptUPKD2zvxqTzFNfx4HHHsrYCf2-FMLn1VTJfQazA2BvJqAwcpW1bqRUEty8tS_Yv4hRvWfQPcc2Gc3-_fQOOW57zVy-rNoJc744kb30NjQxdGp03J2S3GLQu7oKtSDDPooQHD38PEMNnITf0pj-KgDPjymkMGoJlO3aKppsjfbt_AH6GGdRghYRLOUwQU-h-ofWHR3lbYiKtXPn5dN24kiHy61e3VAQ9_YAZlwXC_99GGtw_NpghFAuM4P1JDn0DppJldy3PGFC0GfBCZASw", + "e": "AQAB", + "alg": "RS256", + "use": "sig" + }, + PUBLIC_DPOP_JWK + ] + } + ) \ No newline at end of file diff --git a/tests/test_bearer_token_validation.py b/tests/test_bearer_token_validation.py new file mode 100644 index 0000000..eaec640 --- /dev/null +++ b/tests/test_bearer_token_validation.py @@ -0,0 +1,203 @@ +""" +Tests for Bearer token authentication and JWT validation. +Tests core JWT validation logic including issuer, expiration, and scope checks. +""" +import pytest +from fastapi import FastAPI, Depends +from pytest_httpx import HTTPXMock +from fastapi.testclient import TestClient + +from fastapi_plugin.fast_api_client import Auth0FastAPI +from .test_utils import generate_token +from .conftest import setup_mocks + + +# ============================================================================= +# Missing Token Tests +# ============================================================================= + +@pytest.mark.asyncio +async def test_missing_authorization_header(): + """Test that requests without Authorization header return 400.""" + app = FastAPI() + auth0 = Auth0FastAPI(domain="auth0.local", audience="") + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return "OK" + + client = TestClient(app) + response = client.get("/test") + + assert response.status_code == 400 + json_body = response.json() + assert json_body["detail"]["error"] == "invalid_request" + + +# ============================================================================= +# Valid Token Tests +# ============================================================================= + +@pytest.mark.asyncio +async def test_valid_bearer_token_authentication(httpx_mock: HTTPXMock): + """Test successful authentication with a valid Bearer token.""" + setup_mocks(httpx_mock) + + access_token = await generate_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/", + iat=True, + exp=True + ) + + app = FastAPI() + auth0 = Auth0FastAPI(domain="auth0.local", audience="") + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return "OK" + + client = TestClient(app) + response = client.get( + "/test", + headers={"Authorization": f"Bearer {access_token}"} + ) + + assert response.status_code == 200 + + +# ============================================================================= +# Issuer Validation Tests +# ============================================================================= + +@pytest.mark.asyncio +async def test_missing_issuer_claim(httpx_mock: HTTPXMock): + """Test that tokens without 'iss' claim are rejected with 401.""" + setup_mocks(httpx_mock) + + access_token = await generate_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer=False, # Omit issuer claim + iat=True, + exp=True + ) + + app = FastAPI() + auth0 = Auth0FastAPI(domain="auth0.local", audience="") + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return "OK" + + client = TestClient(app) + response = client.get( + "/test", + headers={"Authorization": f"Bearer {access_token}"} + ) + + assert response.status_code == 401 + + +@pytest.mark.asyncio +async def test_invalid_issuer_claim(httpx_mock: HTTPXMock): + """Test that tokens with mismatched issuer are rejected with 401.""" + setup_mocks(httpx_mock) + + access_token = await generate_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://invalid-issuer.local", # Wrong issuer + iat=True, + exp=True + ) + + app = FastAPI() + auth0 = Auth0FastAPI(domain="auth0.local", audience="") + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return "OK" + + client = TestClient(app) + response = client.get( + "/test", + headers={"Authorization": f"Bearer {access_token}"} + ) + + assert response.status_code == 401 + + +# ============================================================================= +# Expiration Validation Tests +# ============================================================================= + +@pytest.mark.asyncio +async def test_missing_expiration_claim(httpx_mock: HTTPXMock): + """Test that tokens without 'exp' claim are rejected with 401.""" + setup_mocks(httpx_mock) + + access_token = await generate_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/", + iat=True, + exp=False # Omit expiration claim + ) + + app = FastAPI() + auth0 = Auth0FastAPI(domain="auth0.local", audience="") + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return "OK" + + client = TestClient(app) + response = client.get( + "/test", + headers={"Authorization": f"Bearer {access_token}"} + ) + + assert response.status_code == 401 + + +# ============================================================================= +# Scope Validation Tests +# ============================================================================= + +@pytest.mark.asyncio +async def test_insufficient_scope(httpx_mock: HTTPXMock): + """Test that tokens with insufficient scopes are rejected with 403.""" + setup_mocks(httpx_mock) + + access_token = await generate_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/", + iat=True, + exp=True, + claims={"scope": "invalid"} # Wrong scope + ) + + app = FastAPI() + auth0 = Auth0FastAPI(domain="auth0.local", audience="") + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth(scopes="valid"))): + return "OK" + + client = TestClient(app) + response = client.get( + "/test", + headers={"Authorization": f"Bearer {access_token}"} + ) + + assert response.status_code == 403 + json_body = response.json() + assert json_body["detail"]["error"] == "insufficient_scope" diff --git a/tests/test_client_initialization.py b/tests/test_client_initialization.py new file mode 100644 index 0000000..4f43671 --- /dev/null +++ b/tests/test_client_initialization.py @@ -0,0 +1,64 @@ +""" +Tests for Auth0FastAPI client initialization and configuration. +Tests constructor parameters, default values, and configuration options. +""" +from fastapi_plugin.fast_api_client import Auth0FastAPI + + +# ============================================================================= +# Client Credentials Configuration +# ============================================================================= + +def test_initialization_with_client_credentials(): + """Test that Auth0FastAPI accepts and stores client_id and client_secret.""" + client_id = "test_client_id" + client_secret = "test_client_secret" + + auth0 = Auth0FastAPI( + domain="auth0.local", + audience="test_audience", + client_id=client_id, + client_secret=client_secret + ) + + options = auth0.api_client.options + assert options.client_id == client_id + assert options.client_secret == client_secret + + +# ============================================================================= +# DPoP Configuration Tests +# ============================================================================= + +def test_dpop_default_configuration(): + """Test that DPoP has correct default configuration values.""" + auth0 = Auth0FastAPI(domain="auth0.local", audience="test") + + assert auth0.api_client.options.dpop_enabled is True + assert auth0.api_client.options.dpop_required is False + assert auth0.api_client.options.dpop_iat_leeway == 30 + assert auth0.api_client.options.dpop_iat_offset == 300 + + +def test_dpop_disabled_configuration(): + """Test that DPoP can be explicitly disabled.""" + auth0 = Auth0FastAPI( + domain="auth0.local", + audience="test", + dpop_enabled=False + ) + + assert auth0.api_client.options.dpop_enabled is False + + +def test_dpop_custom_timing_configuration(): + """Test that custom DPoP timing parameters are accepted.""" + auth0 = Auth0FastAPI( + domain="auth0.local", + audience="test", + dpop_iat_leeway=60, + dpop_iat_offset=600 + ) + + assert auth0.api_client.options.dpop_iat_leeway == 60 + assert auth0.api_client.options.dpop_iat_offset == 600 diff --git a/tests/test_configuration_modes.py b/tests/test_configuration_modes.py new file mode 100644 index 0000000..2b0cef4 --- /dev/null +++ b/tests/test_configuration_modes.py @@ -0,0 +1,218 @@ +""" +Tests for DPoP operational modes (enabled/required/disabled). +Tests the runtime behavior of different DPoP configuration modes. +""" +import pytest +from fastapi import FastAPI, Depends +from pytest_httpx import HTTPXMock +from fastapi.testclient import TestClient + +from fastapi_plugin.fast_api_client import Auth0FastAPI +from .test_utils import ( + generate_token, + generate_dpop_proof, + generate_dpop_bound_token, +) +from .conftest import setup_mocks + + +# ============================================================================= +# DPoP Required Mode Tests +# ============================================================================= + +@pytest.mark.asyncio +async def test_dpop_required_mode_rejects_bearer(): + """Test that Bearer tokens are rejected when dpop_required=True.""" + # No need to mock JWKS - request should fail before token validation + bearer_token = await generate_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + app = FastAPI() + auth0 = Auth0FastAPI( + domain="auth0.local", + audience="", + dpop_required=True # DPoP required mode + ) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return "OK" + + client = TestClient(app) + response = client.get( + "/test", + headers={"Authorization": f"Bearer {bearer_token}"} + ) + + assert response.status_code == 400 + json_body = response.json() + assert json_body["detail"]["error"] == "invalid_request" + + +@pytest.mark.asyncio +async def test_dpop_required_mode_accepts_dpop(httpx_mock: HTTPXMock): + """Test that DPoP tokens are accepted when dpop_required=True.""" + setup_mocks(httpx_mock) + + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + dpop_proof = await generate_dpop_proof( + http_method="GET", + http_url="http://testserver/test", + access_token=access_token + ) + + app = FastAPI() + auth0 = Auth0FastAPI( + domain="auth0.local", + audience="", + dpop_required=True + ) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return {"user": claims["sub"]} + + client = TestClient(app) + response = client.get( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": dpop_proof + } + ) + + assert response.status_code == 200 + assert response.json()["user"] == "user_123" + + +# ============================================================================= +# DPoP Disabled Mode Tests +# ============================================================================= + +@pytest.mark.asyncio +async def test_dpop_disabled_mode_rejects_dpop(): + """Test that DPoP tokens are rejected when dpop_enabled=False.""" + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + dpop_proof = await generate_dpop_proof( + http_method="GET", + http_url="http://testserver/test", + access_token=access_token + ) + + app = FastAPI() + auth0 = Auth0FastAPI( + domain="auth0.local", + audience="", + dpop_enabled=False # DPoP disabled + ) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return "OK" + + client = TestClient(app) + response = client.get( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": dpop_proof + } + ) + + assert response.status_code == 400 + json_body = response.json() + assert json_body["detail"]["error"] == "invalid_request" + + +# ============================================================================= +# Bearer-Only Mode Tests (DPoP disabled) +# ============================================================================= + +@pytest.mark.asyncio +async def test_bearer_only_mode_accepts_bearer(httpx_mock: HTTPXMock): + """Test that Bearer tokens work when DPoP is disabled.""" + setup_mocks(httpx_mock) + + bearer_token = await generate_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + app = FastAPI() + auth0 = Auth0FastAPI( + domain="auth0.local", + audience="", + dpop_enabled=False # Bearer only mode + ) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return {"user": claims["sub"]} + + client = TestClient(app) + response = client.get( + "/test", + headers={"Authorization": f"Bearer {bearer_token}"} + ) + + assert response.status_code == 200 + assert response.json()["user"] == "user_123" + + +# ============================================================================= +# Token/Scheme Mismatch Tests +# ============================================================================= + +@pytest.mark.asyncio +async def test_dpop_bound_token_with_bearer_scheme_fails(httpx_mock: HTTPXMock): + """Test that DPoP-bound tokens fail when using Bearer scheme.""" + setup_mocks(httpx_mock) + + # Generate a DPoP-bound token (has cnf claim) + dpop_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + app = FastAPI() + auth0 = Auth0FastAPI( + domain="auth0.local", + audience="", + dpop_enabled=True + ) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return "OK" + + client = TestClient(app) + # Try to use DPoP-bound token with Bearer scheme (should fail) + response = client.get( + "/test", + headers={"Authorization": f"Bearer {dpop_token}"} + ) + + assert response.status_code == 401 + json_body = response.json() + assert json_body["detail"]["error"] == "invalid_token" + assert "dpop" in json_body["detail"]["error_description"].lower() diff --git a/tests/test_dpop_authentication.py b/tests/test_dpop_authentication.py new file mode 100644 index 0000000..2f69d6e --- /dev/null +++ b/tests/test_dpop_authentication.py @@ -0,0 +1,275 @@ +""" +Tests for DPoP authentication functionality in FastAPI middleware. +Focuses on DPoP-specific authentication flows and error cases. +""" +import pytest +from fastapi import FastAPI, Depends +from pytest_httpx import HTTPXMock +from fastapi.testclient import TestClient + +from fastapi_plugin.fast_api_client import Auth0FastAPI +from .test_utils import ( + generate_dpop_proof, + generate_dpop_bound_token +) +from .conftest import setup_mocks + + +@pytest.mark.asyncio +async def test_dpop_authentication_success(httpx_mock: HTTPXMock): + """Test successful DPoP authentication with valid token and proof.""" + setup_mocks(httpx_mock) + + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + dpop_proof = await generate_dpop_proof( + http_method="GET", + http_url="http://testserver/test", + access_token=access_token + ) + + app = FastAPI() + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return {"user": claims["sub"]} + + client = TestClient(app) + response = client.get( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": dpop_proof + } + ) + + assert response.status_code == 200 + assert response.json()["user"] == "user_123" + + +@pytest.mark.asyncio +async def test_dpop_authentication_missing_dpop_header(httpx_mock: HTTPXMock): + """Test DPoP request fails early when DPoP header is missing (before token validation).""" + # No setup_mocks needed - request fails before JWKS lookup + + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + app = FastAPI() + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return "OK" + + client = TestClient(app) + response = client.get( + "/test", + headers={"Authorization": f"DPoP {access_token}"} # Missing DPoP header + ) + + assert response.status_code == 400 + json_body = response.json() + assert "invalid_request" in json_body["detail"]["error"] + + +@pytest.mark.asyncio +async def test_dpop_authentication_invalid_dpop_proof(httpx_mock: HTTPXMock): + """Test DPoP request fails early with malformed DPoP proof (before token validation).""" + # No setup_mocks needed - request fails before JWKS lookup + + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + app = FastAPI() + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return "OK" + + client = TestClient(app) + response = client.get( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": "invalid.jwt.proof" + } + ) + + assert response.status_code == 400 + json_body = response.json() + assert "invalid_dpop_proof" in json_body["detail"]["error"] + + +@pytest.mark.asyncio +async def test_dpop_authentication_url_mismatch(httpx_mock: HTTPXMock): + """Test DPoP proof fails when htu doesn't match request URL.""" + setup_mocks(httpx_mock) + + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + # Generate proof for WRONG URL + dpop_proof = await generate_dpop_proof( + http_method="GET", + http_url="http://testserver/wrong-url", # Wrong URL + access_token=access_token + ) + + app = FastAPI() + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return "OK" + + client = TestClient(app) + response = client.get( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": dpop_proof + } + ) + + assert response.status_code == 400 + json_body = response.json() + assert "invalid_dpop_proof" in json_body["detail"]["error"] + + +@pytest.mark.asyncio +async def test_dpop_authentication_method_mismatch(httpx_mock: HTTPXMock): + """Test DPoP proof fails when htm doesn't match request method.""" + setup_mocks(httpx_mock) + + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + # Generate proof for POST but send GET + dpop_proof = await generate_dpop_proof( + http_method="POST", # Wrong method + http_url="http://testserver/test", + access_token=access_token + ) + + app = FastAPI() + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return "OK" + + client = TestClient(app) + response = client.get( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": dpop_proof + } + ) + + assert response.status_code == 400 + json_body = response.json() + assert "invalid_dpop_proof" in json_body["detail"]["error"] + + +@pytest.mark.asyncio +async def test_dpop_with_scope_validation(httpx_mock: HTTPXMock): + """Test that scope validation works with DPoP tokens.""" + setup_mocks(httpx_mock) + + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/", + claims={"scope": "read:data write:data"} + ) + + dpop_proof = await generate_dpop_proof( + http_method="GET", + http_url="http://testserver/test", + access_token=access_token + ) + + app = FastAPI() + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth(scopes=["read:data"]))): + return {"user": claims["sub"]} + + client = TestClient(app) + response = client.get( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": dpop_proof + } + ) + + assert response.status_code == 200 + assert response.json()["user"] == "user_123" + + +@pytest.mark.asyncio +async def test_dpop_with_invalid_scope(httpx_mock: HTTPXMock): + """Test that scope validation fails with insufficient scopes.""" + setup_mocks(httpx_mock) + + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/", + claims={"scope": "read:data"} + ) + + dpop_proof = await generate_dpop_proof( + http_method="GET", + http_url="http://testserver/test", + access_token=access_token + ) + + app = FastAPI() + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth(scopes=["write:data"]))): + return "OK" + + client = TestClient(app) + response = client.get( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": dpop_proof + } + ) + + assert response.status_code == 403 + json_body = response.json() + assert json_body["detail"]["error"] == "insufficient_scope" diff --git a/tests/test_fast_api_client.py b/tests/test_fast_api_client.py deleted file mode 100644 index 13800e9..0000000 --- a/tests/test_fast_api_client.py +++ /dev/null @@ -1,328 +0,0 @@ -import pytest -from fastapi import FastAPI, Depends -from pytest_httpx import HTTPXMock -from fastapi.testclient import TestClient - -from fastapi_plugin.fast_api_client import Auth0FastAPI -from fastapi_plugin.test_utils import generate_token - - -@pytest.mark.asyncio -async def test_should_return_400_when_no_token(): - """ - should return 400 when no token - """ - app = FastAPI() - auth0 = Auth0FastAPI(domain="auth0.local", audience="") - - @app.get("/test") - async def test_route(claims=Depends(auth0.require_auth())): - return "OK" - - client = TestClient(app) - - response = client.get("/test") - assert response.status_code == 400 - json_body = response.json() - assert json_body["detail"]["error"] == "invalid_request" - assert json_body["detail"]["error_description"] == "No Authorization provided" - - -@pytest.mark.asyncio -async def test_should_return_200_when_valid_token(httpx_mock: HTTPXMock): - """ - This time we mock OIDC discovery & JWKS calls so that our code sees valid data. - - 1. We generate a legitimate token with a 'kid' that matches the JWKS we mock. - 2. We add responses for https://auth0.local/.well-known/openid-configuration and JWKS - so that the plugin accepts the token as valid. - """ - httpx_mock.add_response( - method="GET", - url="https://auth0.local/.well-known/openid-configuration", - json={ - "issuer": "https://auth0.local/", - "jwks_uri": "https://auth0.local/.well-known/jwks.json" - } - ) - httpx_mock.add_response( - method="GET", - url="https://auth0.local/.well-known/jwks.json", - json={ - "keys": [ - { - "kty": "RSA", - "kid": "TEST_KEY", - "n": "whYOFK2Ocbbpb_zVypi9SeKiNUqKQH0zTKN1-6fpCTu6ZalGI82s7XK3tan4dJt90ptUPKD2zvxqTzFNfx4HHHsrYCf2-FMLn1VTJfQazA2BvJqAwcpW1bqRUEty8tS_Yv4hRvWfQPcc2Gc3-_fQOOW57zVy-rNoJc744kb30NjQxdGp03J2S3GLQu7oKtSDDPooQHD38PEMNnITf0pj-KgDPjymkMGoJlO3aKppsjfbt_AH6GGdRghYRLOUwQU-h-ofWHR3lbYiKtXPn5dN24kiHy61e3VAQ9_YAZlwXC_99GGtw_NpghFAuM4P1JDn0DppJldy3PGFC0GfBCZASw", - "e": "AQAB", - "alg": "RS256", - "use": "sig" - } - ] - } - ) - - access_token = await generate_token( - domain="auth0.local", - user_id="user_123", - audience="", - issuer="https://auth0.local/", # match the mocked OIDC discovery's issuer - iat=True, - exp=True - ) - - app = FastAPI() - auth0 = Auth0FastAPI(domain="auth0.local", audience="") - - @app.get("/test") - async def test_route(claims=Depends(auth0.require_auth())): - return "OK" - - client = TestClient(app) - - response = client.get( - "/test", - headers={"Authorization": f"Bearer {access_token}"} - ) - - assert response.status_code == 200 - assert response.text == '"OK"' - -@pytest.mark.asyncio -async def test_should_return_401_when_no_iss(httpx_mock: HTTPXMock): - """ - 2) Mocks OIDC & JWKS. Generates a token missing 'iss' (issuer=False). - Expects a 401 from the plugin. - """ - # 1) Mock endpoints - httpx_mock.add_response( - method="GET", - url="https://auth0.local/.well-known/openid-configuration", - json={ - "issuer": "https://auth0.local/", - "jwks_uri": "https://auth0.local/.well-known/jwks.json" - } - ) - httpx_mock.add_response( - method="GET", - url="https://auth0.local/.well-known/jwks.json", - json={ - "keys": [ - { - "kty": "RSA", - "kid": "TEST_KEY", - "n": "whYOFK2Ocbbpb_zVypi9SeKiNUqKQH0zTKN1-6fpCTu6ZalGI82s7XK3tan4dJt90ptUPKD2zvxqTzFNfx4HHHsrYCf2-FMLn1VTJfQazA2BvJqAwcpW1bqRUEty8tS_Yv4hRvWfQPcc2Gc3-_fQOOW57zVy-rNoJc744kb30NjQxdGp03J2S3GLQu7oKtSDDPooQHD38PEMNnITf0pj-KgDPjymkMGoJlO3aKppsjfbt_AH6GGdRghYRLOUwQU-h-ofWHR3lbYiKtXPn5dN24kiHy61e3VAQ9_YAZlwXC_99GGtw_NpghFAuM4P1JDn0DppJldy3PGFC0GfBCZASw", - "e": "AQAB", - "alg": "RS256", - "use": "sig" - } - ] - } - ) - - # 2) Generate token missing 'iss' => issuer=False - access_token = await generate_token( - domain="auth0.local", - user_id="user_123", - audience="", - issuer=False, # no iss claim - iat=True, - exp=True - ) - - app = FastAPI() - auth0 = Auth0FastAPI(domain="auth0.local", audience="") - - @app.get("/test") - async def test_route(claims=Depends(auth0.require_auth())): - return "OK" - - client = TestClient(app) - - response = client.get( - "/test", - headers={"Authorization": f"Bearer {access_token}"} - ) - - # Typically we expect a 401 for 'missing iss' or something similar - assert response.status_code == 401 - - - -@pytest.mark.asyncio -async def test_should_return_401_when_invalid_iss(httpx_mock: HTTPXMock): - """ - 3) Mocks OIDC & JWKS. The token sets a different issuer from the plugin's domain, - we expect 401 'invalid issuer'. - """ - httpx_mock.add_response( - method="GET", - url="https://auth0.local/.well-known/openid-configuration", - json={ - "issuer": "https://auth0.local/", - "jwks_uri": "https://auth0.local/.well-known/jwks.json" - } - ) - httpx_mock.add_response( - method="GET", - url="https://auth0.local/.well-known/jwks.json", - json={ - "keys": [ - { - "kty": "RSA", - "kid": "TEST_KEY", - "n": "whYOFK2Ocbbpb_zVypi9SeKiNUqKQH0zTKN1-6fpCTu6ZalGI82s7XK3tan4dJt90ptUPKD2zvxqTzFNfx4HHHsrYCf2-FMLn1VTJfQazA2BvJqAwcpW1bqRUEty8tS_Yv4hRvWfQPcc2Gc3-_fQOOW57zVy-rNoJc744kb30NjQxdGp03J2S3GLQu7oKtSDDPooQHD38PEMNnITf0pj-KgDPjymkMGoJlO3aKppsjfbt_AH6GGdRghYRLOUwQU-h-ofWHR3lbYiKtXPn5dN24kiHy61e3VAQ9_YAZlwXC_99GGtw_NpghFAuM4P1JDn0DppJldy3PGFC0GfBCZASw", - "e": "AQAB", - "alg": "RS256", - "use": "sig" - } - ] - } - ) - - # Generate a token with an 'iss' that doesn't match "https://auth0.local/" - access_token = await generate_token( - domain="auth0.local", # for the default if issuer=None, but we override below - user_id="user_123", - audience="", - issuer="https://invalid-issuer.local", # mismatch - iat=True, - exp=True - ) - - app = FastAPI() - auth0 = Auth0FastAPI(domain="auth0.local", audience="") - - @app.get("/test") - async def test_route(claims=Depends(auth0.require_auth())): - return "OK" - - client = TestClient(app) - response = client.get("/test", headers={"Authorization": f"Bearer {access_token}"}) - assert response.status_code == 401 - - - -@pytest.mark.asyncio -async def test_should_return_401_when_no_exp(httpx_mock: HTTPXMock): - """ - 4) Mocks OIDC & JWKS, generates token with exp=False => no 'exp' claim. - The plugin or underlying code sees a missing 'exp' => 401. - """ - httpx_mock.add_response( - method="GET", - url="https://auth0.local/.well-known/openid-configuration", - json={ - "issuer": "https://auth0.local/", - "jwks_uri": "https://auth0.local/.well-known/jwks.json" - } - ) - httpx_mock.add_response( - method="GET", - url="https://auth0.local/.well-known/jwks.json", - json={ - "keys": [ - { - "kty": "RSA", - "kid": "TEST_KEY", - "n": "whYOFK2Ocbbpb_zVypi9SeKiNUqKQH0zTKN1-6fpCTu6ZalGI82s7XK3tan4dJt90ptUPKD2zvxqTzFNfx4HHHsrYCf2-FMLn1VTJfQazA2BvJqAwcpW1bqRUEty8tS_Yv4hRvWfQPcc2Gc3-_fQOOW57zVy-rNoJc744kb30NjQxdGp03J2S3GLQu7oKtSDDPooQHD38PEMNnITf0pj-KgDPjymkMGoJlO3aKppsjfbt_AH6GGdRghYRLOUwQU-h-ofWHR3lbYiKtXPn5dN24kiHy61e3VAQ9_YAZlwXC_99GGtw_NpghFAuM4P1JDn0DppJldy3PGFC0GfBCZASw", - "e": "AQAB", - "alg": "RS256", - "use": "sig" - } - ] - } - ) - - access_token = await generate_token( - domain="auth0.local", - user_id="user_123", - audience="", - issuer="https://auth0.local/", - iat=True, - exp=False # skip 'exp' claim - ) - - app = FastAPI() - auth0 = Auth0FastAPI(domain="auth0.local", audience="") - - @app.get("/test") - async def test_route(claims=Depends(auth0.require_auth())): - return "OK" - - client = TestClient(app) - response = client.get("/test", headers={"Authorization": f"Bearer {access_token}"}) - assert response.status_code == 401 - - -@pytest.mark.asyncio -async def test_should_return_403_when_invalid_scope(httpx_mock: HTTPXMock): - """ - 5) Mocks OIDC & JWKS. The token includes scope="invalid", - but the plugin's route requires "valid" scope => 403 'insufficient_scope'. - """ - httpx_mock.add_response( - method="GET", - url="https://auth0.local/.well-known/openid-configuration", - json={ - "issuer": "https://auth0.local/", - "jwks_uri": "https://auth0.local/.well-known/jwks.json" - } - ) - httpx_mock.add_response( - method="GET", - url="https://auth0.local/.well-known/jwks.json", - json={ - "keys": [ - { - "kty": "RSA", - "kid": "TEST_KEY", - "n": "whYOFK2Ocbbpb_zVypi9SeKiNUqKQH0zTKN1-6fpCTu6ZalGI82s7XK3tan4dJt90ptUPKD2zvxqTzFNfx4HHHsrYCf2-FMLn1VTJfQazA2BvJqAwcpW1bqRUEty8tS_Yv4hRvWfQPcc2Gc3-_fQOOW57zVy-rNoJc744kb30NjQxdGp03J2S3GLQu7oKtSDDPooQHD38PEMNnITf0pj-KgDPjymkMGoJlO3aKppsjfbt_AH6GGdRghYRLOUwQU-h-ofWHR3lbYiKtXPn5dN24kiHy61e3VAQ9_YAZlwXC_99GGtw_NpghFAuM4P1JDn0DppJldy3PGFC0GfBCZASw", - "e": "AQAB", - "alg": "RS256", - "use": "sig" - } - ] - } - ) - - access_token = await generate_token( - domain="auth0.local", - user_id="user_123", - audience="", - issuer="https://auth0.local/", - iat=True, - exp=True, - claims={"scope": "invalid"} - ) - - app = FastAPI() - auth0 = Auth0FastAPI(domain="auth0.local", audience="") - - @app.get("/test") - async def test_route(claims=Depends(auth0.require_auth(scopes="valid"))): - """ - The plugin or underlying code checks if 'valid' is in token's 'scope'. - Since we only have 'invalid', expect 403 'insufficient_scope'. - """ - return "OK" - - client = TestClient(app) - response = client.get( - "/test", - headers={"Authorization": f"Bearer {access_token}"} - ) - assert response.status_code == 403 - -def test_auth0fastapi_accepts_client_id_and_secret(): - client_id = "test_client_id" - client_secret = "test_client_secret" - auth0 = Auth0FastAPI( - domain="auth0.local", - audience="test_audience", - client_id=client_id, - client_secret=client_secret - ) - options = auth0.api_client.options - assert options.client_id == client_id - assert options.client_secret == client_secret \ No newline at end of file diff --git a/tests/test_reverse_proxy.py b/tests/test_reverse_proxy.py new file mode 100644 index 0000000..87c6037 --- /dev/null +++ b/tests/test_reverse_proxy.py @@ -0,0 +1,336 @@ +""" +Tests for reverse proxy support in FastAPI middleware. +Tests the get_canonical_url functionality and X-Forwarded-* header handling. +""" +import pytest +from fastapi import FastAPI, Depends, Request +from pytest_httpx import HTTPXMock +from fastapi.testclient import TestClient + +from fastapi_plugin.fast_api_client import Auth0FastAPI +from fastapi_plugin.utils import get_canonical_url +from .test_utils import ( + generate_dpop_proof, + generate_dpop_bound_token +) +from .conftest import setup_mocks + + +@pytest.mark.asyncio +async def test_reverse_proxy_with_trust_enabled(httpx_mock: HTTPXMock): + """Test that X-Forwarded headers are used when trust_proxy=True.""" + setup_mocks(httpx_mock) + + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + # Generate DPoP proof for the PUBLIC URL + dpop_proof = await generate_dpop_proof( + http_method="GET", + http_url="https://api.example.com/test", # Public URL + access_token=access_token + ) + + app = FastAPI() + app.state.trust_proxy = True # Enable proxy trust + + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return {"user": claims["sub"]} + + client = TestClient(app) + + # Request comes to internal URL but with X-Forwarded headers + response = client.get( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": dpop_proof, + "X-Forwarded-Proto": "https", + "X-Forwarded-Host": "api.example.com" + } + ) + + assert response.status_code == 200 + assert response.json()["user"] == "user_123" + + +@pytest.mark.asyncio +async def test_reverse_proxy_without_trust_fails(httpx_mock: HTTPXMock): + """Test that X-Forwarded headers are IGNORED when trust_proxy=False.""" + setup_mocks(httpx_mock) + + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + # Generate DPoP proof for the PUBLIC URL + dpop_proof = await generate_dpop_proof( + http_method="GET", + http_url="https://api.example.com/test", + access_token=access_token + ) + + app = FastAPI() + # trust_proxy defaults to False + + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return "OK" + + client = TestClient(app) + + # Headers are present but should be ignored + response = client.get( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": dpop_proof, + "X-Forwarded-Proto": "https", + "X-Forwarded-Host": "api.example.com" + } + ) + + # Should fail because proof expects https://api.example.com + # but app sees http://testserver (headers ignored) + assert response.status_code == 400 + json_body = response.json() + assert "invalid_dpop_proof" in json_body["detail"]["error"] + + +@pytest.mark.asyncio +async def test_reverse_proxy_with_path_prefix(httpx_mock: HTTPXMock): + """Test X-Forwarded-Prefix handling.""" + setup_mocks(httpx_mock) + + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + # Generate DPoP proof with prefix + dpop_proof = await generate_dpop_proof( + http_method="GET", + http_url="https://api.example.com/api/v1/test", + access_token=access_token + ) + + app = FastAPI() + app.state.trust_proxy = True + + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return {"user": claims["sub"]} + + client = TestClient(app) + + response = client.get( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": dpop_proof, + "X-Forwarded-Proto": "https", + "X-Forwarded-Host": "api.example.com", + "X-Forwarded-Prefix": "/api/v1" + } + ) + + assert response.status_code == 200 + assert response.json()["user"] == "user_123" + + +@pytest.mark.asyncio +async def test_reverse_proxy_with_trailing_slash_prefix(httpx_mock: HTTPXMock): + """Test X-Forwarded-Prefix with trailing slash is handled correctly.""" + setup_mocks(httpx_mock) + + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + # Generate DPoP proof WITHOUT trailing slash + dpop_proof = await generate_dpop_proof( + http_method="GET", + http_url="https://api.example.com/api/v1/test", + access_token=access_token + ) + + app = FastAPI() + app.state.trust_proxy = True + + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return {"user": claims["sub"]} + + client = TestClient(app) + + response = client.get( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": dpop_proof, + "X-Forwarded-Proto": "https", + "X-Forwarded-Host": "api.example.com", + "X-Forwarded-Prefix": "/api/v1/" # With trailing slash + } + ) + + # Should still work - trailing slash should be stripped + assert response.status_code == 200 + assert response.json()["user"] == "user_123" + + +@pytest.mark.asyncio +async def test_reverse_proxy_multiple_hosts(httpx_mock: HTTPXMock): + """Test that first host is used from comma-separated X-Forwarded-Host.""" + setup_mocks(httpx_mock) + + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + dpop_proof = await generate_dpop_proof( + http_method="GET", + http_url="https://client.example.com/test", + access_token=access_token + ) + + app = FastAPI() + app.state.trust_proxy = True + + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return {"user": claims["sub"]} + + client = TestClient(app) + + response = client.get( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": dpop_proof, + "X-Forwarded-Proto": "https", + "X-Forwarded-Host": "client.example.com, proxy1.internal, proxy2.internal" + } + ) + + assert response.status_code == 200 + json_response = response.json() + assert json_response["user"] == "user_123" + +@pytest.mark.asyncio +async def test_reverse_proxy_partial_headers(httpx_mock: HTTPXMock): + """Test with only X-Forwarded-Proto (partial headers).""" + setup_mocks(httpx_mock) + + access_token = await generate_dpop_bound_token( + domain="auth0.local", + user_id="user_123", + audience="", + issuer="https://auth0.local/" + ) + + # Proof expects https but with testserver host + dpop_proof = await generate_dpop_proof( + http_method="GET", + http_url="https://testserver/test", + access_token=access_token + ) + + app = FastAPI() + app.state.trust_proxy = True + + auth0 = Auth0FastAPI(domain="auth0.local", audience="", dpop_enabled=True) + + @app.get("/test") + async def test_route(claims=Depends(auth0.require_auth())): + return {"user": claims["sub"]} + + client = TestClient(app) + + response = client.get( + "/test", + headers={ + "Authorization": f"DPoP {access_token}", + "DPoP": dpop_proof, + "X-Forwarded-Proto": "https" # Only proto, no host + } + ) + + assert response.status_code == 200 + assert response.json()["user"] == "user_123" + + +def test_get_canonical_url_with_proxy(): + """Test get_canonical_url uses X-Forwarded headers when trust_proxy=True.""" + app = FastAPI() + app.state.trust_proxy = True + + @app.get("/test") + async def test_route(request: Request): + canonical_url = get_canonical_url(request) + return {"url": canonical_url} + + client = TestClient(app) + response = client.get( + "/test", + headers={ + "X-Forwarded-Proto": "https", + "X-Forwarded-Host": "api.example.com" + } + ) + + assert response.status_code == 200 + url = response.json()["url"] + assert "https://api.example.com/test" == url + + +def test_get_canonical_url_security_without_trust(): + """Test that malicious X-Forwarded headers are ignored without trust.""" + app = FastAPI() + # trust_proxy = False (default) + + @app.get("/test") + async def test_route(request: Request): + canonical_url = get_canonical_url(request) + return {"url": canonical_url} + + client = TestClient(app) + response = client.get( + "/test", + headers={ + "X-Forwarded-Proto": "https", + "X-Forwarded-Host": "attacker.com" + } + ) + + assert response.status_code == 200 + url = response.json()["url"] + # Malicious headers should be ignored + assert "attacker.com" not in url + assert "testserver" in url diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..a996bea --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,250 @@ +import time +import hashlib +import base64 +import secrets +from typing import Optional, Dict, Any, Union +from authlib.jose import JsonWebKey, jwt + + +# A private RSA JWK for test usage (Bearer tokens). +PRIVATE_JWK = { + "kty": "RSA", + "alg": "RS256", + "use": "sig", + "kid": "TEST_KEY", + "n": "whYOFK2Ocbbpb_zVypi9SeKiNUqKQH0zTKN1-6fpCTu6ZalGI82s7XK3tan4dJt90ptUPKD2zvxqTzFNfx4HHHsrYCf2-FMLn1VTJfQazA2BvJqAwcpW1bqRUEty8tS_Yv4hRvWfQPcc2Gc3-_fQOOW57zVy-rNoJc744kb30NjQxdGp03J2S3GLQu7oKtSDDPooQHD38PEMNnITf0pj-KgDPjymkMGoJlO3aKppsjfbt_AH6GGdRghYRLOUwQU-h-ofWHR3lbYiKtXPn5dN24kiHy61e3VAQ9_YAZlwXC_99GGtw_NpghFAuM4P1JDn0DppJldy3PGFC0GfBCZASw", + "e": "AQAB", + "d": "VuVE_KEP6323WjpbBdAIv7HGahGrgGANvbxZsIhm34lsVOPK0XDegZkhAybMZHjRhp-gwVxX5ChC-J3cUpOBH5FNxElgW6HizD2Jcq6t6LoLYgPSrfEHm71iHg8JsgrqfUnGYFzMJmv88C6WdCtpgG_qJV1K00_Ly1G1QKoBffEs-v4fAMJrCbUdCz1qWto-PU-HLMEo-krfEpGgcmtZeRlDADh8cETMQlgQfQX2VWq_aAP4a1SXmo-j0cvRU4W5Fj0RVwNesIpetX2ZFz4p_JmB5sWFEj_fC7h5z2lq-6Bme2T3BHtXkIxoBW0_pYVnASC8P2puO5FnVxDmWuHDYQ", + "p": "07rgXd_tLUhVRF_g1OaqRZh5uZ8hiLWUSU0vu9coOaQcatSqjQlIwLW8UdKv_38GrmpIfgcEVQjzq6rFBowUm9zWBO9Eq6enpasYJBOeD8EMeDK-nsST57HjPVOCvoVC5ZX-cozPXna3iRNZ1TVYBY3smn0IaxysIK-zxESf4pM", + "q": "6qrE9TPhCS5iNR7QrKThunLu6t4H_8CkYRPLbvOIt2MgZyPLiZCsvdkTVSOX76QQEXt7Y0nTNua69q3K3Jhf-YOkPSJsWTxgrfOnjoDvRKzbW3OExIMm7D99fVBODuNWinjYgUwGSqGAsb_3TKhtI-Gr5ls3fn6B6oEjVL0dpmk", + "dp": "mHqjrFdgelT2OyiFRS3dAAPf3cLxJoAGC4gP0UoQyPocEP-Y17sQ7t-ygIanguubBy65iDFLeGXa_g0cmSt2iAzRAHrDzI8P1-pQl2KdWSEg9ssspjBRh_F_AiJLLSPRWn_b3-jySkhawtfxwO8Kte1QsK1My765Y0zFvJnjPws", + "dq": "KmjaV4YcsVAUp4z-IXVa5htHWmLuByaFjpXJOjABEUN0467wZdgjn9vPRp-8Ia8AyGgMkJES_uUL_PDDrMJM9gb4c6P4-NeUkVtreLGMjFjA-_IQmIMrUZ7XywHsWXx0c2oLlrJqoKo3W-hZhR0bPFTYgDUT_mRWjk7wV6wl46E", + "qi": "iYltkV_4PmQDfZfGFpzn2UtYEKyhy-9t3Vy8Mw2VHLAADKGwJvVK5ficQAr2atIF1-agXY2bd6KV-w52zR8rmZfTr0gobzYIyqHczOm13t7uXJv2WygY7QEC2OGjdxa2Fr9RnvS99ozMa5nomZBqTqT7z5QV33czjPRCjvg6FcE", +} + +# A private EC P-256 JWK for DPoP test usage. +PRIVATE_DPOP_JWK = { + "kty": "EC", + "crv": "P-256", + "x": "MKBCTNIcKUSDii11ySs3526iDZ8AiTo7Tu6KPAqv7D4", + "y": "4Etl6SRW2YiLUrN5vfvVHuhp7x8PxltmWWlbbM4IFyM", + "d": "870MB6gfuTJ4HtUnUvYMyJpr5eUZNP4Bk43bVdj3eAE" +} + +# Public counterpart for JWKS +PUBLIC_DPOP_JWK = { + "kty": "EC", + "alg": "ES256", + "use": "sig", + "kid": "DPOP_TEST_KEY", + "crv": "P-256", + "x": "MKBCTNIcKUSDii11ySs3526iDZ8AiTo7Tu6KPAqv7D4", + "y": "4Etl6SRW2YiLUrN5vfvVHuhp7x8PxltmWWlbbM4IFyM" +} + + +async def generate_token( + domain: str, + user_id: str, + audience: Optional[str] = None, + issuer: Union[str, bool, None] = None, + iat: bool = True, + exp: bool = True, + claims: Optional[Dict[str, Any]] = None, + expiration_time: int = 3600, + token_type: str = "bearer" +) -> str: + """ + Generates a real RS256-signed JWT using the private key above. + + Args: + domain: The Auth0 domain (used if issuer is not False). + user_id: The 'sub' claim in the token. + audience: The 'aud' claim in the token. If omitted, 'aud' won't be included. + issuer: + - If a string, it's placed in 'iss' claim. + - If None, default is f"https://{domain}/". + - If False, skip 'iss' claim entirely. + iat: Whether to set the 'iat' (issued at) claim. If False, skip it. + exp: Whether to set the 'exp' claim. If False, skip it. + claims: Additional custom claims to merge into the token. + expiration_time: If exp is True, how many seconds from now until expiration. + token_type: "bearer" for regular tokens, "dpop" for DPoP-bound tokens + + Returns: + A RS256-signed JWT string. + + """ + token_claims = dict(claims or {}) + token_claims.setdefault("sub", user_id) + + if iat: + token_claims["iat"] = int(time.time()) + + if exp: + token_claims["exp"] = int(time.time()) + expiration_time + + if issuer is not False: + token_claims["iss"] = issuer if isinstance(issuer, str) else f"https://{domain}/" + + if audience: + token_claims["aud"] = audience + + # Add DPoP binding for DPoP tokens + if token_type == "dpop": + jkt = calculate_jwk_thumbprint(PRIVATE_DPOP_JWK) + token_claims["cnf"] = {"jkt": jkt} + + key = JsonWebKey.import_key(PRIVATE_JWK) + header = {"alg": "RS256", "kid": PRIVATE_JWK["kid"]} + token = jwt.encode(header, token_claims, key) + # Ensure we return a string, not bytes + return token.decode('utf-8') if isinstance(token, bytes) else token + +def base64url_encode(data: bytes) -> str: + """Base64URL encode without padding.""" + return base64.urlsafe_b64encode(data).decode('ascii').rstrip('=') + +def sha256_hash(data: Union[str, bytes]) -> str: + """SHA256 hash and base64url encode.""" + if isinstance(data, str): + data = data.encode('utf-8') + return base64url_encode(hashlib.sha256(data).digest()) + +def generate_jti() -> str: + """Generate a random JTI (JWT ID) for DPoP proof.""" + return base64url_encode(secrets.token_bytes(16)) + +def calculate_jwk_thumbprint(jwk_dict: Dict[str, Any]) -> str: + """Calculate JWK thumbprint for DPoP proof.""" + # For EC P-256 keys, thumbprint is calculated from crv, kty, x, y + thumbprint_jwk = { + "crv": jwk_dict["crv"], + "kty": jwk_dict["kty"], + "x": jwk_dict["x"], + "y": jwk_dict["y"] + } + # Sort keys and create JSON string + import json + canonical_json = json.dumps(thumbprint_jwk, sort_keys=True, separators=(',', ':')) + return base64url_encode(hashlib.sha256(canonical_json.encode('utf-8')).digest()) + +async def generate_dpop_proof( + http_method: str, + http_url: str, + access_token: Optional[str] = None, + nonce: Optional[str] = None, + iat_offset: int = 0 +) -> str: + """ + Generate a DPoP proof JWT for testing. + + Args: + http_method: HTTP method (GET, POST, etc.) + http_url: Full HTTP URL + access_token: Access token to bind (for ath claim) + nonce: Server nonce for DPoP proof + iat_offset: Offset for iat claim (for testing expired proofs) + + Returns: + DPoP proof JWT string + """ + current_time = int(time.time()) + iat_offset + jti = generate_jti() + + # Calculate JWK thumbprint for jkt claim + jkt = calculate_jwk_thumbprint(PRIVATE_DPOP_JWK) + + # DPoP proof claims + proof_claims = { + "jti": jti, + "htm": http_method.upper(), + "htu": http_url, + "iat": current_time, + "jkt": jkt + } + + # Add access token hash if provided + if access_token: + proof_claims["ath"] = sha256_hash(access_token) + + # Add nonce if provided + if nonce: + proof_claims["nonce"] = nonce + + # Create header with public key + header = { + "alg": "ES256", + "typ": "dpop+jwt", + "jwk": { + "kty": PUBLIC_DPOP_JWK["kty"], + "crv": PUBLIC_DPOP_JWK["crv"], + "x": PUBLIC_DPOP_JWK["x"], + "y": PUBLIC_DPOP_JWK["y"] + } + } + + # Sign with private key + key = JsonWebKey.import_key(PRIVATE_DPOP_JWK) + proof_jwt = jwt.encode(header, proof_claims, key) + # Ensure we return a string, not bytes + return proof_jwt.decode('utf-8') if isinstance(proof_jwt, bytes) else proof_jwt + +async def generate_dpop_bound_token( + domain: str, + user_id: str, + audience: Optional[str] = None, + issuer: Union[str, bool, None] = None, + iat: bool = True, + exp: bool = True, + claims: Optional[Dict[str, Any]] = None, + expiration_time: int = 3600, + cnf_jkt: Optional[str] = None +) -> str: + """ + Generate a DPoP-bound access token for testing. + Similar to generate_token but includes cnf claim with jkt. + + Args: + domain: Auth0 domain + user_id: Subject claim + audience: Audience claim + issuer: Issuer claim + iat: Include iat claim + exp: Include exp claim + claims: Additional claims + expiration_time: Token expiration in seconds + cnf_jkt: JWK thumbprint for confirmation claim (auto-calculated if None) + + Returns: + DPoP-bound access token JWT string + """ + token_claims = dict(claims or {}) + token_claims.setdefault("sub", user_id) + + if iat: + token_claims["iat"] = int(time.time()) + + if exp: + token_claims["exp"] = int(time.time()) + expiration_time + + if issuer is not False: + token_claims["iss"] = issuer if isinstance(issuer, str) else f"https://{domain}/" + + if audience: + token_claims["aud"] = audience + + # Add DPoP binding + if cnf_jkt is None: + cnf_jkt = calculate_jwk_thumbprint(PRIVATE_DPOP_JWK) + + token_claims["cnf"] = { + "jkt": cnf_jkt + } + + # Sign with RS256 (access tokens are still RS256, only DPoP proofs are ES256) + key = JsonWebKey.import_key(PRIVATE_JWK) + header = {"alg": "RS256", "kid": PRIVATE_JWK["kid"]} + token = jwt.encode(header, token_claims, key) + # Ensure we return a string, not bytes + return token.decode('utf-8') if isinstance(token, bytes) else token