tyndale-ai-service/app/auth.py

115 lines
3.5 KiB
Python

"""GCP Cloud Run service-to-service authentication module."""
import logging
from typing import Annotated
from fastapi import Depends, HTTPException, Request
from google.auth.transport import requests as google_requests
from google.oauth2 import id_token
from app.config import settings
logger = logging.getLogger(__name__)
def verify_gcp_identity_token(token: str, audience: str) -> dict:
"""Verify a GCP identity token and return the decoded claims.
Args:
token: The identity token to verify.
audience: The expected audience (backend Cloud Run URL).
Returns:
The decoded token claims.
Raises:
ValueError: If the token is invalid or verification fails.
"""
try:
claims = id_token.verify_oauth2_token(
token,
google_requests.Request(),
audience=audience,
)
return claims
except Exception as e:
logger.warning(f"Token verification failed: {e}")
raise ValueError(f"Token verification failed: {e}")
async def verify_service_auth(request: Request) -> dict | None:
"""FastAPI dependency to verify GCP service-to-service authentication.
Returns None if auth is disabled (local dev), otherwise verifies the
identity token and checks the service account allowlist.
Returns:
The decoded token claims, or None if auth is disabled.
Raises:
HTTPException: 401 if authentication fails.
"""
# Skip auth if disabled (local development)
if not settings.auth_enabled:
logger.debug("Authentication disabled, skipping verification")
return None
# Extract token from Authorization header
auth_header = request.headers.get("Authorization")
if not auth_header:
logger.warning("Missing Authorization header")
raise HTTPException(
status_code=401,
detail="Missing Authorization header",
)
if not auth_header.startswith("Bearer "):
logger.warning("Invalid Authorization header format")
raise HTTPException(
status_code=401,
detail="Invalid Authorization header format. Expected 'Bearer <token>'",
)
token = auth_header[7:] # Remove "Bearer " prefix
# Verify the token
if not settings.auth_audience:
logger.error("AUTH_AUDIENCE not configured")
raise HTTPException(
status_code=500,
detail="Server authentication not properly configured",
)
try:
claims = verify_gcp_identity_token(token, settings.auth_audience)
except ValueError as e:
logger.warning(f"Token verification failed: {e}")
raise HTTPException(
status_code=401,
detail="Invalid or expired token",
)
# Check service account allowlist if configured
allowed_accounts = settings.allowed_service_accounts_list
if allowed_accounts:
email = claims.get("email", "")
if email not in allowed_accounts:
logger.warning(
f"Service account '{email}' not in allowlist",
extra={"allowed": allowed_accounts},
)
raise HTTPException(
status_code=403,
detail="Service account not authorized",
)
logger.info(
"Service authentication successful",
extra={"service_account": claims.get("email", "unknown")},
)
return claims
# Type alias for clean dependency injection
ServiceAuthDependency = Annotated[dict | None, Depends(verify_service_auth)]