complycore/backend/auth/jwt.py

64 lines
1.8 KiB
Python

# auth/jwt.py
from fastapi import Request, HTTPException, status, Depends
from jose import jwt, JWTError
import os
JWT_SECRET = os.getenv("SUPABASE_JWT_SECRET")
ALGORITHM = "HS256"
def get_token_from_header(request: Request) -> str:
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing or invalid Authorization header",
)
return auth_header.split(" ")[1]
# def verify_jwt_token(token: str) -> dict:
# try:
# payload = jwt.decode(token, JWT_SECRET, algorithms=[ALGORITHM])
# return payload
# except JWTError as e:
# raise HTTPException(
# status_code=status.HTTP_403_FORBIDDEN,
# detail=f"Invalid JWT token: {str(e)}",
# )
def verify_jwt_token(token: str) -> dict:
try:
payload = jwt.decode(
token,
JWT_SECRET,
algorithms=[ALGORITHM],
options={"verify_aud": False} # <- Disable audience verification
)
return payload
except JWTError as e:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Invalid JWT token: {str(e)}",
)
def get_current_user(request: Request):
token = get_token_from_header(request)
payload = verify_jwt_token(token)
user_id = payload.get("sub")
if not user_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="JWT missing 'sub' field",
)
print("🔐 JWT payload seen by API →", payload)
return {
"id": user_id,
"email": payload.get("email"),
"role": payload.get("role"),
"tenant_id": payload.get("tenant_id", "unknown"),
}