"""GCP Cloud Run service-to-service authentication module.""" import logging from typing import Annotated from fastapi import Depends, HTTPException, Request from google.auth.transport import requests as google_requests from google.oauth2 import id_token from app.config import settings logger = logging.getLogger(__name__) def verify_gcp_identity_token(token: str, audience: str) -> dict: """Verify a GCP identity token and return the decoded claims. Args: token: The identity token to verify. audience: The expected audience (backend Cloud Run URL). Returns: The decoded token claims. Raises: ValueError: If the token is invalid or verification fails. """ try: claims = id_token.verify_oauth2_token( token, google_requests.Request(), audience=audience, ) return claims except Exception as e: logger.warning(f"Token verification failed: {e}") raise ValueError(f"Token verification failed: {e}") async def verify_service_auth(request: Request) -> dict | None: """FastAPI dependency to verify GCP service-to-service authentication. Returns None if auth is disabled (local dev), otherwise verifies the identity token and checks the service account allowlist. Returns: The decoded token claims, or None if auth is disabled. Raises: HTTPException: 401 if authentication fails. """ # Skip auth if disabled (local development) if not settings.auth_enabled: logger.debug("Authentication disabled, skipping verification") return None # Extract token from Authorization header auth_header = request.headers.get("Authorization") if not auth_header: logger.warning("Missing Authorization header") raise HTTPException( status_code=401, detail="Missing Authorization header", ) if not auth_header.startswith("Bearer "): logger.warning("Invalid Authorization header format") raise HTTPException( status_code=401, detail="Invalid Authorization header format. Expected 'Bearer '", ) token = auth_header[7:] # Remove "Bearer " prefix # Verify the token if not settings.auth_audience: logger.error("AUTH_AUDIENCE not configured") raise HTTPException( status_code=500, detail="Server authentication not properly configured", ) try: claims = verify_gcp_identity_token(token, settings.auth_audience) except ValueError as e: logger.warning(f"Token verification failed: {e}") raise HTTPException( status_code=401, detail="Invalid or expired token", ) # Check service account allowlist if configured allowed_accounts = settings.allowed_service_accounts_list if allowed_accounts: email = claims.get("email", "") if email not in allowed_accounts: logger.warning( f"Service account '{email}' not in allowlist", extra={"allowed": allowed_accounts}, ) raise HTTPException( status_code=403, detail="Service account not authorized", ) logger.info( "Service authentication successful", extra={"service_account": claims.get("email", "unknown")}, ) return claims # Type alias for clean dependency injection ServiceAuthDependency = Annotated[dict | None, Depends(verify_service_auth)]