Skip to content

Commit a3288d8

Browse files
authored
Merge pull request #5385 from bcgov/feat/5124
Feat/5124
2 parents f300eeb + 58979c4 commit a3288d8

File tree

6 files changed

+200
-1
lines changed

6 files changed

+200
-1
lines changed

helm/tools/dags/_msgraph.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import requests
2+
from urllib.parse import quote
3+
4+
5+
class MsGraph:
6+
def __init__(self, tenant_id, client_id, client_secret):
7+
self.tenant_id = tenant_id
8+
self.client_id = client_id
9+
self.client_secret = client_secret
10+
self.token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
11+
self.scope = "https://graph.microsoft.com/.default"
12+
self.access_token = self._get_access_token()
13+
self.extension_attribute = "extension_85cc52e9286540fcb1f97ed86114a0e5_bcgovGUID" # pragma: allowlist secret
14+
15+
def _get_access_token(self):
16+
payload = {
17+
"client_id": self.client_id,
18+
"client_secret": self.client_secret,
19+
"scope": self.scope,
20+
"grant_type": "client_credentials",
21+
}
22+
response = requests.post(self.token_url, data=payload)
23+
response.raise_for_status()
24+
return response.json().get("access_token", "")
25+
26+
def fetch_azure_user(self, matching_email, retry=True):
27+
headers = {
28+
"Authorization": f"Bearer {self.access_token}",
29+
"ConsistencyLevel": "eventual",
30+
}
31+
32+
escaped_email = matching_email.replace("'", "''")
33+
filter_value = f"mail eq '{escaped_email}'"
34+
params = {
35+
"$filter": filter_value,
36+
"$select": f"officeLocation,jobTitle,userPrincipalName,id,displayName,givenName,surname,mail,onPremisesSamAccountName,{self.extension_attribute}",
37+
"$top": "1",
38+
}
39+
40+
try:
41+
response = requests.get("https://graph.microsoft.com/v1.0/users", params=params, headers=headers)
42+
43+
if response.status_code == 401 and retry:
44+
self.access_token = self._get_access_token()
45+
return self.fetch_azure_user(matching_email, retry=False)
46+
47+
if response.status_code == 200:
48+
users = response.json().get("value", [])
49+
else:
50+
response.raise_for_status()
51+
return {**users[0], "idirGuid": users[0].get(self.extension_attribute)} if users else None
52+
53+
except requests.exceptions.RequestException as e:
54+
raise Exception(f"Failed to fetch user: {str(e)}")
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
from _projects import get_mongo_db
2+
from helm.tools.dags._msgraph import MsGraph
3+
4+
5+
def parse_ministry_from_display_name(display_name: str):
6+
ministry = ""
7+
if display_name and len(display_name) > 0:
8+
divided_string = display_name.split()
9+
if len(divided_string) >= 2:
10+
ministry = divided_string[-1].split(":", 1)[0]
11+
return ministry
12+
13+
14+
def sync_db_users_with_azure_ad(
15+
mongo_conn_id, ms_graph_api_tenant_id, ms_graph_api_client_id, ms_graph_api_client_secret
16+
):
17+
try:
18+
db = get_mongo_db(mongo_conn_id)
19+
projection = {"image": 0, "lastseen": 0, "updatedAt": 0, "createdAt": 0}
20+
users_collection = db["User"]
21+
db_users = users_collection.find({"archived": {"$eq": False}}, projection)
22+
23+
ms_graph = MsGraph(ms_graph_api_tenant_id, ms_graph_api_client_id, ms_graph_api_client_secret)
24+
25+
for db_user in db_users:
26+
db_user_email = db_user["email"].lower()
27+
azure_user = ms_graph.fetch_azure_user(db_user_email)
28+
if azure_user is not None:
29+
update_data = {
30+
"officeLocation": azure_user.get("officeLocation", ""),
31+
"jobTitle": azure_user.get("jobTitle", ""),
32+
"upn": azure_user.get("userPrincipalName", ""),
33+
"providerUserId": azure_user.get("id", ""),
34+
"ministry": parse_ministry_from_display_name(
35+
azure_user.get("displayName", ""),
36+
),
37+
"firstName": azure_user.get("givenName", ""),
38+
"lastName": azure_user.get("surname", ""),
39+
"email": azure_user.get("mail", "").lower(),
40+
"idirGuid": azure_user.get("idirGuid", ""),
41+
"idir": azure_user.get("onPremisesSamAccountName", ""),
42+
}
43+
44+
update_data = {k: v for k, v in update_data.items() if v is not None}
45+
46+
users_collection.update_one({"_id": db_user["_id"]}, {"$set": update_data})
47+
else:
48+
users_collection.update_one({"_id": db_user["_id"]}, {"$set": {"archived": True}})
49+
50+
except Exception as e:
51+
print(f"[sync_users_in_db_and_azure_ad] Error: {e}")

helm/tools/dags/_task_failure_callback.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ def send_alert(context, dag):
1313
airflow_dag_logs = f"https://secdash-airflow.apps.silver.devops.gov.bc.ca/dags/{dag}/grid"
1414

1515
payload = {
16-
"text": f":warning: Airlow: **{dag}**",
16+
"text": f":warning: Airflow: **{dag}**",
1717
"attachments": [
1818
{
1919
"color": "#d03ae8",
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import os
2+
from airflow import DAG
3+
from airflow.operators.python_operator import PythonOperator
4+
from datetime import datetime, timedelta
5+
from _task_failure_callback import send_alert
6+
from helm.tools.dags._sync_azure_ad_with_db import sync_db_users_with_azure_ad
7+
8+
MONGO_CONN_ID = "pltsvc-dev"
9+
MS_GRAPH_API_TENANT_ID = os.getenv("MS_GRAPH_API_TENANT_ID")
10+
MS_GRAPH_API_CLIENT_ID = os.getenv("MS_GRAPH_API_CLIENT_ID")
11+
MS_GRAPH_API_CLIENT_SECRET = os.getenv("MS_GRAPH_API_CLIENT_SECRET")
12+
13+
with DAG(
14+
dag_id="sync_user_dbs_dev",
15+
schedule_interval="0 0 * * *",
16+
start_date=datetime.now() - timedelta(days=1),
17+
catchup=False,
18+
) as dag:
19+
t1 = PythonOperator(
20+
task_id="sync-db-users-with-azure-ad-dev",
21+
python_callable=sync_db_users_with_azure_ad,
22+
op_kwargs={
23+
"mongo_conn_id": MONGO_CONN_ID,
24+
"ms_graph_api_tenant_id": MS_GRAPH_API_TENANT_ID,
25+
"ms_graph_api_client_id": MS_GRAPH_API_CLIENT_ID,
26+
"ms_graph_api_client_secret": MS_GRAPH_API_CLIENT_SECRET,
27+
},
28+
provide_context=True,
29+
on_failure_callback=lambda context: send_alert(context, "sync_user_dbs_dev"),
30+
dag=dag,
31+
)
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import os
2+
from airflow import DAG
3+
from airflow.operators.python_operator import PythonOperator
4+
from datetime import datetime, timedelta
5+
from _task_failure_callback import send_alert
6+
from helm.tools.dags._sync_azure_ad_with_db import sync_db_users_with_azure_ad
7+
8+
MONGO_CONN_ID = "pltsvc-prod"
9+
MS_GRAPH_API_TENANT_ID = os.getenv("MS_GRAPH_API_TENANT_ID")
10+
MS_GRAPH_API_CLIENT_ID = os.getenv("MS_GRAPH_API_CLIENT_ID")
11+
MS_GRAPH_API_CLIENT_SECRET = os.getenv("MS_GRAPH_API_CLIENT_SECRET")
12+
13+
with DAG(
14+
dag_id="sync_user_dbs_prod",
15+
schedule_interval="30 0 * * *",
16+
start_date=datetime.now() - timedelta(days=1),
17+
catchup=False,
18+
is_paused_upon_creation=True,
19+
) as dag:
20+
t1 = PythonOperator(
21+
task_id="sync-db-users-with-azure-ad-prod",
22+
python_callable=sync_db_users_with_azure_ad,
23+
op_kwargs={
24+
"mongo_conn_id": MONGO_CONN_ID,
25+
"ms_graph_api_tenant_id": MS_GRAPH_API_TENANT_ID,
26+
"ms_graph_api_client_id": MS_GRAPH_API_CLIENT_ID,
27+
"ms_graph_api_client_secret": MS_GRAPH_API_CLIENT_SECRET,
28+
},
29+
provide_context=True,
30+
on_failure_callback=lambda context: send_alert(context, "sync_user_dbs_prod"),
31+
dag=dag,
32+
)
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import os
2+
from airflow import DAG
3+
from airflow.operators.python_operator import PythonOperator
4+
from datetime import datetime, timedelta
5+
from _task_failure_callback import send_alert
6+
from helm.tools.dags._sync_azure_ad_with_db import sync_db_users_with_azure_ad
7+
8+
MONGO_CONN_ID = "pltsvc-test"
9+
MS_GRAPH_API_TENANT_ID = os.getenv("MS_GRAPH_API_TENANT_ID")
10+
MS_GRAPH_API_CLIENT_ID = os.getenv("MS_GRAPH_API_CLIENT_ID")
11+
MS_GRAPH_API_CLIENT_SECRET = os.getenv("MS_GRAPH_API_CLIENT_SECRET")
12+
13+
with DAG(
14+
dag_id="sync_user_dbs_test",
15+
schedule_interval="0 1 * * *",
16+
start_date=datetime.now() - timedelta(days=1),
17+
catchup=False,
18+
) as dag:
19+
t1 = PythonOperator(
20+
task_id="sync-db-users-with-azure-ad-test",
21+
python_callable=sync_db_users_with_azure_ad,
22+
op_kwargs={
23+
"mongo_conn_id": MONGO_CONN_ID,
24+
"ms_graph_api_tenant_id": MS_GRAPH_API_TENANT_ID,
25+
"ms_graph_api_client_id": MS_GRAPH_API_CLIENT_ID,
26+
"ms_graph_api_client_secret": MS_GRAPH_API_CLIENT_SECRET,
27+
},
28+
provide_context=True,
29+
on_failure_callback=lambda context: send_alert(context, "sync_user_dbs_test"),
30+
dag=dag,
31+
)

0 commit comments

Comments
 (0)