1
1
import base64
2
2
import json
3
3
import logging
4
- from typing import Optional
4
+ from typing import cast , Optional
5
5
from uuid import UUID
6
6
7
7
from fastapi import Depends , HTTPException , Request
8
8
from fastapi .exceptions import HTTPException
9
9
from fastapi .security .utils import get_authorization_scheme_param
10
10
from web3login .auth import to_checksum_address , verify
11
- from web3login .exceptions import Web3VerificationError
11
+ from web3login .exceptions import (
12
+ Web3AuthorizationExpired ,
13
+ Web3AuthorizationWrongApplication ,
14
+ Web3VerificationError ,
15
+ )
12
16
from web3login .middlewares .fastapi import OAuth2BearerOrWeb3
13
17
14
18
from . import actions , data
15
19
from .db import yield_db_read_only_session
16
20
from .settings import (
17
- APPLICATION_NAME ,
18
21
BOT_INSTALLATION_TOKEN ,
19
22
BOT_INSTALLATION_TOKEN_HEADER ,
23
+ BUGOUT_WEB3_SIGNATURE_APPLICATION_HEADER ,
20
24
)
21
25
22
26
logger = logging .getLogger (__name__ )
@@ -41,13 +45,27 @@ async def get_current_user(
41
45
if token is None or token == "" :
42
46
raise HTTPException (status_code = 404 , detail = "Access token not found" )
43
47
48
+ signature_application : str = request .headers .get (
49
+ BUGOUT_WEB3_SIGNATURE_APPLICATION_HEADER
50
+ )
51
+ application_id = None
52
+ if signature_application is not None :
53
+ try :
54
+ application_id = cast (UUID , signature_application )
55
+ except Exception :
56
+ raise HTTPException (
57
+ status_code = 403 , detail = "Wrong Web3 signature application provided"
58
+ )
59
+
44
60
try :
45
61
if scheme == "web3" :
46
62
payload_json = base64 .decodebytes (str (token ).encode ()).decode ("utf-8" )
47
63
payload = json .loads (payload_json )
48
64
verified = verify (
49
65
authorization_payload = payload ,
50
- application_to_check = APPLICATION_NAME ,
66
+ application_to_check = str (application_id )
67
+ if application_id is not None
68
+ else "" ,
51
69
)
52
70
if not verified :
53
71
logger .info ("Web3 verification error" )
@@ -57,7 +75,11 @@ async def get_current_user(
57
75
logger .error ("Web3 address in payload could not be None" )
58
76
raise Exception ()
59
77
web3_address = to_checksum_address (web3_address )
60
- user = actions .get_user (session = db_session , web3_address = web3_address )
78
+ user = actions .get_user (
79
+ session = db_session ,
80
+ web3_address = web3_address ,
81
+ application_id = application_id ,
82
+ )
61
83
62
84
elif scheme == "bearer" :
63
85
is_token_active , user = actions .get_current_user_by_token (
@@ -82,6 +104,10 @@ async def get_current_user(
82
104
except actions .UserInvalidParameters as e :
83
105
logger .info (e )
84
106
raise HTTPException (status_code = 500 )
107
+ except Web3AuthorizationExpired :
108
+ raise HTTPException (status_code = 403 , detail = "Signature not verified" )
109
+ except Web3AuthorizationWrongApplication :
110
+ raise HTTPException (status_code = 403 , detail = "Signature not verified" )
85
111
except Web3VerificationError :
86
112
raise HTTPException (status_code = 403 , detail = "Signature not verified" )
87
113
except Exception :
@@ -117,13 +143,27 @@ async def get_current_user_with_groups(
117
143
if token is None or token == "" :
118
144
raise HTTPException (status_code = 404 , detail = "Access token not found" )
119
145
146
+ signature_application : str = request .headers .get (
147
+ BUGOUT_WEB3_SIGNATURE_APPLICATION_HEADER
148
+ )
149
+ application_id = None
150
+ if signature_application is not None :
151
+ try :
152
+ application_id = cast (UUID , signature_application )
153
+ except Exception :
154
+ raise HTTPException (
155
+ status_code = 403 , detail = "Wrong Web3 signature application provided"
156
+ )
157
+
120
158
try :
121
159
if scheme == "web3" :
122
160
payload_json = base64 .decodebytes (str (token ).encode ()).decode ("utf-8" )
123
161
payload = json .loads (payload_json )
124
162
verified = verify (
125
163
authorization_payload = payload ,
126
- application_to_check = APPLICATION_NAME ,
164
+ application_to_check = str (application_id )
165
+ if application_id is not None
166
+ else "" ,
127
167
)
128
168
if not verified :
129
169
logger .info ("Web3 authorization verification error" )
@@ -134,7 +174,9 @@ async def get_current_user_with_groups(
134
174
raise Exception ()
135
175
web3_address = to_checksum_address (web3_address )
136
176
user_extended = actions .get_user_with_groups (
137
- session = db_session , web3_address = web3_address
177
+ session = db_session ,
178
+ web3_address = web3_address ,
179
+ application_id = application_id ,
138
180
)
139
181
140
182
elif scheme == "bearer" :
@@ -163,6 +205,10 @@ async def get_current_user_with_groups(
163
205
except actions .UserInvalidParameters as e :
164
206
logger .info (e )
165
207
raise HTTPException (status_code = 500 )
208
+ except Web3AuthorizationExpired :
209
+ raise HTTPException (status_code = 403 , detail = "Signature not verified" )
210
+ except Web3AuthorizationWrongApplication :
211
+ raise HTTPException (status_code = 403 , detail = "Signature not verified" )
166
212
except Web3VerificationError :
167
213
raise HTTPException (status_code = 403 , detail = "Signature not verified" )
168
214
except Exception :
0 commit comments