-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathauthorization_middleware.py
More file actions
66 lines (51 loc) · 2.42 KB
/
authorization_middleware.py
File metadata and controls
66 lines (51 loc) · 2.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from collections.abc import Callable, Awaitable
from datetime import datetime, UTC
from fastapi import FastAPI, Request, Response
from fastapi.responses import JSONResponse
from fastapi_pundra.common.jwt_utils import decode_token
from fastapi_pundra.rest.exceptions import UnauthorizedException
from jose import JWTError
from starlette.middleware.base import BaseHTTPMiddleware
from app.config.authorization import EXCLUDE_PATHS
from app.lib.auth_path_utils import excluded_path_checker
_path_skips_auth = excluded_path_checker(EXCLUDE_PATHS)
class AuthorizationMiddleware(BaseHTTPMiddleware):
"""Authorization middleware."""
def __init__(self, app: FastAPI) -> None:
"""Initialize the middleware."""
super().__init__(app)
def verify_access_token(self, request: Request) -> dict | None:
"""Verify the access token."""
auth_header = request.headers.get("Authorization")
if not auth_header:
raise UnauthorizedException(message="No authorization header")
try:
scheme, token = auth_header.split()
if scheme.lower() != "bearer":
raise UnauthorizedException(message="Invalid authentication scheme")
return decode_token(token)
except JWTError as err:
raise UnauthorizedException(message="Invalid token") from err
async def dispatch(
self,
request: Request,
call_next: Callable[[Request], Awaitable[Response]],
) -> Response:
"""Dispatch the middleware."""
try:
# Skip token verification for configured paths (exact, prefix /*, segment *)
if _path_skips_auth(request.url.path):
return await call_next(request)
# Verify the token and get payload
payload = self.verify_access_token(request)
# Add the payload to request state for use in route handlers
request.state.user = payload
# Set additional state variables
request.state.auth_user_id = payload.get("user_id")
return await call_next(request)
except UnauthorizedException as exc:
error_response = exc.to_dict()
error_response["path"] = request.url.path
error_response["type"] = exc.__class__.__name__
error_response["timestamp"] = datetime.now(UTC).isoformat()
return JSONResponse(content=error_response, status_code=exc.status_code)