1
1
import logging
2
+ from typing import List
3
+
2
4
import azure .functions as func
3
5
import jwt
4
6
import json
@@ -63,10 +65,17 @@ def blob_trigger(inbound: func.InputStream, outbound: func.Out[str]):
63
65
return f"Error: { str (e )} "
64
66
65
67
66
- def validate_jwt (token : str , audience : str ) -> bool :
68
+ def validate_jwt (token : str , audience : str , required_scopes : List [ str ] ) -> bool :
67
69
try :
68
70
decoded = jwt .decode (token , audience = audience , options = {"verify_signature" : False })
69
- # Optionally check claims like roles or scopes
71
+
72
+ # Check if the required scopes are present
73
+ token_scopes = decoded .get ("scp" , "" ).split (" " )
74
+ if not all (scope in token_scopes for scope in required_scopes ):
75
+ logging .error (f"Required scopes { required_scopes } not found in token scopes { token_scopes } " )
76
+ return False
77
+
78
+ logging .info ("Required scopes found in token: %s" , required_scopes )
70
79
return True
71
80
except Exception as e :
72
81
logging .error (f"JWT validation failed: { e } " )
@@ -86,10 +95,13 @@ def upload_csv(req: func.HttpRequest, outbound: func.Out[str]) -> HttpResponse:
86
95
return func .HttpResponse ("Missing auth header" , status_code = 401 )
87
96
88
97
token = auth_header .split (" " )[1 ] # Extract Bearer token
89
- if not validate_jwt ( token , audience = os .environ .get ("FUNCTION_APP_CLIENT_ID" )):
90
- return func . HttpResponse ( "Unauthorized" , status_code = 401 )
98
+ audience = os .environ .get ("FUNCTION_APP_CLIENT_ID" )
99
+ required_scopes = [ "Csv.Write" ]
91
100
92
- logging .info ("Received HTTP request to upload CSV" )
101
+ if not validate_jwt (token , audience , required_scopes ):
102
+ return HttpResponse ("Unauthorized" , status_code = 401 )
103
+
104
+ logging .info ("Successfully validated JWT token" )
93
105
94
106
# Parse raw bytes derived from request body to string
95
107
string_body = req .get_body ().decode ("utf-8" )
0 commit comments