|
| 1 | +""" |
| 2 | +The configuration file would look like this: |
| 3 | +
|
| 4 | +{ |
| 5 | + "authority": "https://login.microsoftonline.com/organizations", |
| 6 | + "client_id": "your_client_id", |
| 7 | + "username": "your_username@your_tenant.com", |
| 8 | + "scope": ["User.ReadBasic.All"], |
| 9 | + // You can find the other permission names from this document |
| 10 | + // https://docs.microsoft.com/en-us/graph/permissions-reference |
| 11 | + "endpoint": "https://graph.microsoft.com/v1.0/users" |
| 12 | + // You can find more Microsoft Graph API endpoints from Graph Explorer |
| 13 | + // https://developer.microsoft.com/en-us/graph/graph-explorer |
| 14 | +} |
| 15 | +
|
| 16 | +You can then run this sample with a JSON configuration file: |
| 17 | +
|
| 18 | + python sample.py parameters.json |
| 19 | +""" |
| 20 | + |
| 21 | +import sys # For simplicity, we'll read config file from 1st CLI param sys.argv[1] |
| 22 | +import json |
| 23 | +import logging |
| 24 | +import time |
| 25 | + |
| 26 | +import requests |
| 27 | +import msal |
| 28 | + |
| 29 | + |
| 30 | +# Optional logging |
| 31 | +# logging.basicConfig(level=logging.DEBUG) # Enable DEBUG log for entire script |
| 32 | +# logging.getLogger("msal").setLevel(logging.INFO) # Optionally disable MSAL DEBUG logs |
| 33 | + |
| 34 | +config = json.load(open(sys.argv[1])) |
| 35 | + |
| 36 | +# If for whatever reason you plan to recreate same ClientApplication periodically, |
| 37 | +# you shall create one global token cache and reuse it by each ClientApplication |
| 38 | +global_token_cache = msal.TokenCache() # The TokenCache() is in-memory. |
| 39 | +# See more options in https://msal-python.readthedocs.io/en/latest/#tokencache |
| 40 | + |
| 41 | +# Create a preferably long-lived app instance, to avoid the overhead of app creation |
| 42 | +global_app = msal.PublicClientApplication( |
| 43 | + config["client_id"], authority=config["authority"], |
| 44 | + client_credential=config.get("client_secret"), |
| 45 | + token_cache=global_token_cache, # Let this app (re)use an existing token cache. |
| 46 | + # If absent, ClientApplication will create its own empty token cache |
| 47 | +) |
| 48 | + |
| 49 | + |
| 50 | +def acquire_and_use_token(): |
| 51 | + # The pattern to acquire a token looks like this. |
| 52 | + result = None |
| 53 | + |
| 54 | + # Firstly, check the cache to see if this end user has signed in before |
| 55 | + accounts = global_app.get_accounts(username=config["username"]) |
| 56 | + if accounts: |
| 57 | + logging.info("Account(s) exists in cache, probably with token too. Let's try.") |
| 58 | + result = global_app.acquire_token_silent(config["scope"], account=accounts[0]) |
| 59 | + |
| 60 | + if not result: |
| 61 | + logging.info("No suitable token exists in cache. Let's get a new one from AAD.") |
| 62 | + # See this page for constraints of Username Password Flow. |
| 63 | + # https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication |
| 64 | + result = global_app.acquire_token_integrated_windows_auth( |
| 65 | + config["username"], scopes=config["scope"]) |
| 66 | + |
| 67 | + if "access_token" in result: |
| 68 | + print("Token was obtained from:", result["token_source"]) # Since MSAL 1.25 |
| 69 | + # Calling graph using the access token |
| 70 | + graph_data = requests.get( # Use token to call downstream service |
| 71 | + config["endpoint"], |
| 72 | + headers={'Authorization': 'Bearer ' + result['access_token']},).json() |
| 73 | + print("Graph API call result: %s" % json.dumps(graph_data, indent=2)) |
| 74 | + else: |
| 75 | + print("Token acquisition failed") # Examine result["error_description"] etc. to diagnose error |
| 76 | + print(result) |
| 77 | + if 65001 in result.get("error_codes", []): # Not mean to be coded programatically, but... |
| 78 | + raise RuntimeError( |
| 79 | + "AAD requires user consent for U/P flow to succeed. " |
| 80 | + "Run acquire_token_interactive() instead.") |
| 81 | + |
| 82 | + |
| 83 | +while True: # Here we mimic a long-lived daemon |
| 84 | + acquire_and_use_token() |
| 85 | + print("Press Ctrl-C to stop.") |
| 86 | + time.sleep(5) # Let's say your app would run a workload every X minutes. |
| 87 | + |
0 commit comments