4
4
from fastapi .requests import Request
5
5
from fastapi .security import SecurityScopes
6
6
from fastapi .websockets import WebSocket
7
- from jose import jwt
7
+ from joserfc import jwt
8
+ from joserfc .errors import BadSignatureError
8
9
9
10
from goosebit .settings import PWD_CXT , SECRET , USERS
10
11
@@ -29,46 +30,49 @@ async def authenticate_user(request: Request):
29
30
return user
30
31
31
32
32
- sessions = {}
33
-
34
-
35
- def create_session (email : str ) -> str :
36
- token = jwt .encode ({"email" : email }, SECRET )
37
- sessions [token ] = email
38
- return token
33
+ def create_session (username : str ) -> str :
34
+ return jwt .encode (
35
+ header = {"alg" : "HS256" }, claims = {"username" : username }, key = SECRET
36
+ )
39
37
40
38
41
39
def authenticate_session (request : Request ):
42
40
session_id = request .cookies .get ("session_id" )
43
- if session_id is None or session_id not in sessions :
41
+ if session_id is None :
44
42
raise HTTPException (
45
43
status_code = 302 ,
46
44
headers = {"location" : str (request .url_for ("login" ))},
47
45
detail = "Invalid session ID" ,
48
46
)
49
47
user = get_user_from_session (session_id )
48
+ if user is None :
49
+ raise HTTPException (
50
+ status_code = 302 ,
51
+ headers = {"location" : str (request .url_for ("login" ))},
52
+ detail = "Invalid username" ,
53
+ )
50
54
return user
51
55
52
56
53
57
def authenticate_api_session (request : Request ):
54
58
session_id = request .cookies .get ("session_id" )
55
- if session_id is None or session_id not in sessions :
56
- raise HTTPException (status_code = 401 , detail = "Not logged in" )
57
59
user = get_user_from_session (session_id )
60
+ if user is None :
61
+ raise HTTPException (status_code = 401 , detail = "Not logged in" )
58
62
return user
59
63
60
64
61
65
def authenticate_ws_session (websocket : WebSocket ):
62
66
session_id = websocket .cookies .get ("session_id" )
63
- if session_id is None or session_id not in sessions :
64
- raise HTTPException (status_code = 401 , detail = "Not logged in" )
65
67
user = get_user_from_session (session_id )
68
+ if user is None :
69
+ raise HTTPException (status_code = 401 , detail = "Not logged in" )
66
70
return user
67
71
68
72
69
73
def auto_redirect (request : Request ):
70
74
session_id = request .cookies .get ("session_id" )
71
- if session_id is None or session_id not in sessions :
75
+ if get_user_from_session ( session_id ) is None :
72
76
return request
73
77
raise HTTPException (
74
78
status_code = 302 ,
@@ -78,16 +82,20 @@ def auto_redirect(request: Request):
78
82
79
83
80
84
def get_user_from_session (session_id : str ):
81
- for username in USERS :
82
- if username == sessions .get (session_id ):
83
- return username
85
+ if session_id is None :
86
+ return
87
+ try :
88
+ session_data = jwt .decode (session_id , SECRET )
89
+ return session_data .claims ["username" ]
90
+ except (BadSignatureError , LookupError ):
91
+ pass
84
92
85
93
86
94
def get_current_user (request : Request ):
87
95
session_id = request .cookies .get ("session_id" )
88
- if session_id is None or session_id not in sessions :
89
- return None
90
96
user = get_user_from_session (session_id )
97
+ if user is None :
98
+ return None
91
99
return USERS [user ]
92
100
93
101
0 commit comments