1
- import re
1
+ try :
2
+ from urllib .parse import urlparse
3
+ except ImportError : # Fall back to Python 2
4
+ from urlparse import urlparse
2
5
import logging
3
6
4
7
import requests
15
18
'login.microsoftonline.us' ,
16
19
'login.microsoftonline.de' ,
17
20
])
18
-
21
+ WELL_KNOWN_B2C_HOSTS = [
22
+ "b2clogin.com" ,
23
+ "b2clogin.cn" ,
24
+ "b2clogin.us" ,
25
+ "b2clogin.de" ,
26
+ ]
19
27
20
28
class Authority (object ):
21
29
"""This class represents an (already-validated) authority.
22
30
23
31
Once constructed, it contains members named "*_endpoint" for this instance.
24
32
TODO: It will also cache the previously-validated authority instances.
25
33
"""
34
+ _domains_without_user_realm_discovery = set ([])
35
+
26
36
def __init__ (self , authority_url , validate_authority = True ,
27
37
verify = True , proxies = None , timeout = None ,
28
38
):
@@ -37,18 +47,30 @@ def __init__(self, authority_url, validate_authority=True,
37
47
self .verify = verify
38
48
self .proxies = proxies
39
49
self .timeout = timeout
40
- canonicalized , self .instance , tenant = canonicalize (authority_url )
41
- tenant_discovery_endpoint = (
42
- 'https://{}/{}{}/.well-known/openid-configuration' .format (
43
- self .instance ,
44
- tenant ,
45
- "" if tenant == "adfs" else "/v2.0" # the AAD v2 endpoint
46
- ))
47
- if (tenant != "adfs" and validate_authority
50
+ authority , self .instance , tenant = canonicalize (authority_url )
51
+ is_b2c = any (self .instance .endswith ("." + d ) for d in WELL_KNOWN_B2C_HOSTS )
52
+ if (tenant != "adfs" and (not is_b2c ) and validate_authority
48
53
and self .instance not in WELL_KNOWN_AUTHORITY_HOSTS ):
49
- tenant_discovery_endpoint = instance_discovery (
50
- canonicalized + "/oauth2/v2.0/authorize" ,
54
+ payload = instance_discovery (
55
+ "https://{}{}/oauth2/v2.0/authorize" .format (
56
+ self .instance , authority .path ),
51
57
verify = verify , proxies = proxies , timeout = timeout )
58
+ if payload .get ("error" ) == "invalid_instance" :
59
+ raise ValueError (
60
+ "invalid_instance: "
61
+ "The authority you provided, %s, is not whitelisted. "
62
+ "If it is indeed your legit customized domain name, "
63
+ "you can turn off this check by passing in "
64
+ "validate_authority=False"
65
+ % authority_url )
66
+ tenant_discovery_endpoint = payload ['tenant_discovery_endpoint' ]
67
+ else :
68
+ tenant_discovery_endpoint = (
69
+ 'https://{}{}{}/.well-known/openid-configuration' .format (
70
+ self .instance ,
71
+ authority .path , # In B2C scenario, it is "/tenant/policy"
72
+ "" if tenant == "adfs" else "/v2.0" # the AAD v2 endpoint
73
+ ))
52
74
openid_config = tenant_discovery (
53
75
tenant_discovery_endpoint ,
54
76
verify = verify , proxies = proxies , timeout = timeout )
@@ -58,42 +80,44 @@ def __init__(self, authority_url, validate_authority=True,
58
80
_ , _ , self .tenant = canonicalize (self .token_endpoint ) # Usually a GUID
59
81
self .is_adfs = self .tenant .lower () == 'adfs'
60
82
61
- def user_realm_discovery (self , username ):
62
- resp = requests .get (
63
- "https://{netloc}/common/userrealm/{username}?api-version=1.0" .format (
64
- netloc = self .instance , username = username ),
65
- headers = {'Accept' :'application/json' },
66
- verify = self .verify , proxies = self .proxies , timeout = self .timeout )
67
- resp .raise_for_status ()
68
- return resp .json ()
69
- # It will typically contain "ver", "account_type",
83
+ def user_realm_discovery (self , username , response = None ):
84
+ # It will typically return a dict containing "ver", "account_type",
70
85
# "federation_protocol", "cloud_audience_urn",
71
86
# "federation_metadata_url", "federation_active_auth_url", etc.
87
+ if self .instance not in self .__class__ ._domains_without_user_realm_discovery :
88
+ resp = response or requests .get (
89
+ "https://{netloc}/common/userrealm/{username}?api-version=1.0" .format (
90
+ netloc = self .instance , username = username ),
91
+ headers = {'Accept' :'application/json' },
92
+ verify = self .verify , proxies = self .proxies , timeout = self .timeout )
93
+ if resp .status_code != 404 :
94
+ resp .raise_for_status ()
95
+ return resp .json ()
96
+ self .__class__ ._domains_without_user_realm_discovery .add (self .instance )
97
+ return {} # This can guide the caller to fall back normal ROPC flow
98
+
72
99
73
- def canonicalize (url ):
74
- # Returns (canonicalized_url, netloc, tenant). Raises ValueError on errors.
75
- match_object = re . match ( r'https://([^/]+)/([^/?#]+)' , url . lower () )
76
- if not match_object :
100
+ def canonicalize (authority_url ):
101
+ authority = urlparse ( authority_url )
102
+ parts = authority . path . split ( "/" )
103
+ if authority . scheme != "https" or len ( parts ) < 2 or not parts [ 1 ] :
77
104
raise ValueError (
78
105
"Your given address (%s) should consist of "
79
106
"an https url with a minimum of one segment in a path: e.g. "
80
- "https://login.microsoftonline.com/<tenant_name>" % url )
81
- return match_object .group (0 ), match_object .group (1 ), match_object .group (2 )
107
+ "https://login.microsoftonline.com/<tenant> "
108
+ "or https://<tenant_name>.b2clogin.com/<tenant_name>.onmicrosoft.com/policy"
109
+ % authority_url )
110
+ return authority , authority .netloc , parts [1 ]
82
111
83
- def instance_discovery (url , response = None , ** kwargs ):
84
- # Returns tenant discovery endpoint
85
- resp = requests .get ( # Note: This URL seemingly returns V1 endpoint only
112
+ def instance_discovery (url , ** kwargs ):
113
+ return requests .get ( # Note: This URL seemingly returns V1 endpoint only
86
114
'https://{}/common/discovery/instance' .format (
87
115
WORLD_WIDE # Historically using WORLD_WIDE. Could use self.instance too
88
116
# See https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.0.0/src/Microsoft.Identity.Client/Instance/AadInstanceDiscovery.cs#L101-L103
89
117
# and https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.0.0/src/Microsoft.Identity.Client/Instance/AadAuthority.cs#L19-L33
90
118
),
91
119
params = {'authorization_endpoint' : url , 'api-version' : '1.0' },
92
- ** kwargs )
93
- payload = response or resp .json ()
94
- if 'tenant_discovery_endpoint' not in payload :
95
- raise MsalServiceError (status_code = resp .status_code , ** payload )
96
- return payload ['tenant_discovery_endpoint' ]
120
+ ** kwargs ).json ()
97
121
98
122
def tenant_discovery (tenant_discovery_endpoint , ** kwargs ):
99
123
# Returns Openid Configuration
0 commit comments