Skip to content

Commit a9c9ee7

Browse files
committed
dag ariflow
1 parent 7bf65ab commit a9c9ee7

File tree

6 files changed

+108
-1
lines changed

6 files changed

+108
-1
lines changed

dags/enrich/config/dbt.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,5 @@ class DBT:
1919
MARTS_ENRICH_AE_CLOSED_NOT_REPLACED: str = (
2020
"marts_enrich_acteurs_closed_suggest_not_replaced"
2121
)
22+
MARTS_ENRICH_ACTEURS_VILLES_TYPO: str = "marts_enrich_acteurs_villes_suggest_typo"
23+
MARTS_ENRICH_ACTEURS_VILLES_NEW: str = "marts_enrich_acteurs_villes_suggest_new"

dags/enrich/config/models.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,12 @@ class EnrichActeursRGPDConfig(EnrichBaseConfig):
9393
)
9494

9595

96+
class EnrichActeursVillesConfig(EnrichBaseConfig):
97+
pass
98+
99+
96100
DAG_ID_TO_CONFIG_MODEL = {
97101
"enrich_acteurs_closed": EnrichActeursClosedConfig,
98102
"enrich_acteurs_rgpd": EnrichActeursRGPDConfig,
103+
"enrich_acteurs_villes": EnrichActeursVillesConfig,
99104
}

dags/enrich/config/tasks.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,7 @@ class TASKS:
2727
"enrich_acteurs_closed_suggestions_not_replaced"
2828
)
2929
ENRICH_DBT_MODELS_REFRESH: str = "enrich_dbt_models_refresh"
30+
31+
# Villes
32+
ENRICH_ACTEURS_VILLES_TYPO: str = "enrich_acteurs_villes_typo"
33+
ENRICH_ACTEURS_VILLES_NEW: str = "enrich_acteurs_villes_new"
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
"""DAG to anonymize QFDMO acteurs for RGPD"""
2+
3+
from airflow import DAG
4+
from enrich.config import (
5+
COHORTS,
6+
DBT,
7+
TASKS,
8+
EnrichActeursVillesConfig,
9+
)
10+
from enrich.tasks.airflow_logic.enrich_config_create_task import (
11+
enrich_config_create_task,
12+
)
13+
from enrich.tasks.airflow_logic.enrich_dbt_model_suggest_task import (
14+
enrich_dbt_model_suggest_task,
15+
)
16+
from enrich.tasks.airflow_logic.enrich_dbt_models_refresh_task import (
17+
enrich_dbt_models_refresh_task,
18+
)
19+
from shared.config import CATCHUPS, SCHEDULES, START_DATES, config_to_airflow_params
20+
21+
with DAG(
22+
dag_id="enrich_acteurs_villes",
23+
dag_display_name="🌆 Enrichir - Acteurs Villes",
24+
default_args={
25+
"owner": "airflow",
26+
"depends_on_past": False,
27+
"email_on_failure": False,
28+
"email_on_retry": False,
29+
"retries": 0,
30+
},
31+
description=("Un DAG pour anonymiser les acteurs vs. RGPD"),
32+
tags=["annuaire", "entreprises", "ae", "rgpd", "acteurs", "juridique"],
33+
schedule=SCHEDULES.NONE,
34+
catchup=CATCHUPS.AWLAYS_FALSE,
35+
start_date=START_DATES.YESTERDAY,
36+
params=config_to_airflow_params(
37+
EnrichActeursVillesConfig(
38+
dbt_models_refresh=True,
39+
dbt_models_refresh_command=(
40+
"dbt build --select tag:marts,tag:enrich,tag:villes"
41+
),
42+
)
43+
),
44+
) as dag:
45+
# Instantiation
46+
config = enrich_config_create_task(dag)
47+
dbt_refresh = enrich_dbt_models_refresh_task(dag)
48+
suggest_typo = enrich_dbt_model_suggest_task(
49+
dag,
50+
task_id=TASKS.ENRICH_ACTEURS_VILLES_TYPO,
51+
cohort=COHORTS.ACTEURS_VILLES_TYPO,
52+
dbt_model_name=DBT.MARTS_ENRICH_ACTEURS_VILLES_TYPO,
53+
)
54+
suggest_new = enrich_dbt_model_suggest_task(
55+
dag,
56+
task_id=TASKS.ENRICH_ACTEURS_VILLES_NEW,
57+
cohort=COHORTS.ACTEURS_VILLES_NEW,
58+
dbt_model_name=DBT.MARTS_ENRICH_ACTEURS_VILLES_NEW,
59+
)
60+
config >> dbt_refresh # type: ignore
61+
dbt_refresh >> suggest_typo # type: ignore
62+
dbt_refresh >> suggest_new # type: ignore

dags/enrich/tasks/business_logic/enrich_dbt_model_to_suggestions.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,29 @@ def changes_prepare_rgpd(
6161
return changes, contexte
6262

6363

64+
def changes_prepare_villes(row: dict) -> list[dict]:
65+
"""Prepare suggestions for villes cohorts"""
66+
from data.models.changes import ChangeActeurUpdateData
67+
68+
changes = []
69+
model_params = {
70+
"id": row[COLS.ACTEUR_ID],
71+
"data": {
72+
"ville": row[COLS.SUGGEST_VILLE],
73+
},
74+
}
75+
changes.append(
76+
changes_prepare(
77+
model=ChangeActeurUpdateData,
78+
model_params=model_params,
79+
order=1,
80+
reason="On fait confiance à la BAN",
81+
entity_type="acteur_displayed",
82+
)
83+
)
84+
return changes
85+
86+
6487
def changes_prepare_closed_not_replaced(
6588
row: dict,
6689
) -> tuple[list[dict], dict]:
@@ -184,6 +207,8 @@ def changes_prepare_closed_replaced(
184207
COHORTS.CLOSED_REP_OTHER_SIREN: changes_prepare_closed_replaced,
185208
COHORTS.CLOSED_REP_SAME_SIREN: changes_prepare_closed_replaced,
186209
COHORTS.RGPD: changes_prepare_rgpd,
210+
COHORTS.ACTEURS_VILLES_TYPO: changes_prepare_villes,
211+
COHORTS.ACTEURS_VILLES_NEW: changes_prepare_villes,
187212
}
188213

189214

@@ -204,7 +229,8 @@ def enrich_dbt_model_to_suggestions(
204229
COHORTS.CLOSED_NOT_REPLACED: SuggestionAction.ENRICH_ACTEURS_CLOSED,
205230
COHORTS.CLOSED_REP_OTHER_SIREN: SuggestionAction.ENRICH_ACTEURS_CLOSED,
206231
COHORTS.CLOSED_REP_SAME_SIREN: SuggestionAction.ENRICH_ACTEURS_CLOSED,
207-
COHORTS.RGPD: SuggestionAction.ENRICH_ACTEURS_RGPD,
232+
COHORTS.ACTEURS_VILLES_TYPO: SuggestionAction.ENRICH_ACTEURS_VILLES_TYPO,
233+
COHORTS.ACTEURS_VILLES_NEW: SuggestionAction.ENRICH_ACTEURS_VILLES_NEW,
208234
}
209235

210236
# Validation

data/models/suggestion.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,14 @@ class SuggestionAction(models.TextChoices):
5656
CRAWL_URLS = SUGGESTION_CRAWL_URLS, "🔗 URLs scannées"
5757
ENRICH_ACTEURS_CLOSED = "ENRICH_ACTEURS_CLOSED", "🚪 Acteurs fermés"
5858
ENRICH_ACTEURS_RGPD = "ENRICH_ACTEURS_RGPD", "🕵 Anonymisation RGPD"
59+
ENRICH_ACTEURS_VILLES_TYPO = (
60+
"ENRICH_ACTEURS_VILLES_TYPO",
61+
"🏙️ Acteurs villes typographiques",
62+
)
63+
ENRICH_ACTEURS_VILLES_NEW = (
64+
"ENRICH_ACTEURS_VILLES_NEW",
65+
"🏙️ Acteurs villes nouvelles",
66+
)
5967
CLUSTERING = SUGGESTION_CLUSTERING, "regroupement/déduplication des acteurs"
6068
SOURCE_AJOUT = (
6169
SUGGESTION_SOURCE_AJOUT,

0 commit comments

Comments
 (0)