complycore/backend/auth/jwt.py

119 lines
3.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# backend/auth/jwt.py
"""
JWT utilities & FastAPI dependency.
• Extracts bearer token
• Verifies signature against SUPABASE_JWT_SECRET
• OPTIONAL — verifies/ignores audience
• Fetches user + tenant rows from Supabase via SQLAlchemy
"""
from fastapi import Request, HTTPException, status, Depends
from jose import jwt, JWTError
from sqlalchemy.orm import Session
from sqlalchemy import text
import os
# --------------------------------------------------------------------------- #
# Environment / constants
# --------------------------------------------------------------------------- #
JWT_SECRET = os.getenv("SUPABASE_JWT_SECRET")
ALGORITHM = "HS256"
AUDIENCE = os.getenv("SUPABASE_JWT_AUD", "authenticated") # default Supabase aud
# Lazy-import to avoid circular refs
from db.session import get_db
# --------------------------------------------------------------------------- #
# Helpers
# --------------------------------------------------------------------------- #
def _get_token_from_header(request: Request) -> str:
auth = request.headers.get("Authorization", "")
if not auth.startswith("Bearer "):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing or invalid Authorization header",
)
return auth.split(" ", 1)[1]
def _verify_jwt_token(token: str) -> dict:
"""
Decode & validate JWT.
🔸 Option A (recommended): verify the expected audience.
🔸 Option B: skip audience verification entirely.
"""
try:
# ---- Option A: enforce expected audience ---------------------------
payload = jwt.decode(
token,
JWT_SECRET,
algorithms=[ALGORITHM],
audience=AUDIENCE, # <-- satisfies "aud" claim
)
# ---- Option B (if you really dont care) ---------------------------
# payload = jwt.decode(
# token,
# JWT_SECRET,
# algorithms=[ALGORITHM],
# options={"verify_aud": False},
# )
return payload
except JWTError as exc:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Invalid JWT token: {exc}",
)
# --------------------------------------------------------------------------- #
# Public dependency
# --------------------------------------------------------------------------- #
def get_current_user(
request: Request,
db: Session = Depends(get_db),
) -> dict:
"""
Inject with `Depends(get_current_user)` to access:
{
"user": users row (dict),
"tenant": tenants row (dict | None),
"scopes": JWT “role” claim,
}
"""
token = _get_token_from_header(request)
payload = _verify_jwt_token(token)
# ---- look-up user ------------------------------------------------------ #
uid = payload.get("sub")
if not uid:
raise HTTPException(status_code=403, detail="JWT missing 'sub' claim")
user_row = (
db.execute(text("SELECT * FROM users WHERE id = :uid"), {"uid": uid})
.mappings()
.first()
)
if not user_row:
raise HTTPException(status_code=404, detail="User not found")
# ---- tenant (optional) ------------------------------------------------- #
tenant_row = None
if (tid := user_row.get("tenant_id")):
tenant_row = (
db.execute(text("SELECT * FROM tenants WHERE id = :tid"), {"tid": tid})
.mappings()
.first()
)
return {
"user": user_row,
"tenant": tenant_row,
"scopes": payload.get("role", []),
}