Skip to content

Commit 40f02e4

Browse files
committed
dag ariflow
1 parent 7469d2b commit 40f02e4

File tree

8 files changed

+113
-1
lines changed

8 files changed

+113
-1
lines changed

dags/enrich/config/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
from .cohorts import COHORTS # noqa: F401
22
from .columns import COLS # noqa: F401
33
from .dbt import DBT # noqa: F401
4-
from .models import DAG_ID_TO_CONFIG_MODEL, EnrichActeursClosedConfig # noqa: F401
4+
from .models import ( # noqa: F401
5+
DAG_ID_TO_CONFIG_MODEL,
6+
EnrichActeursClosedConfig,
7+
EnrichActeursVillesConfig,
8+
)
59
from .paths import DIR_SQL_READ # noqa: F401
610
from .tasks import TASKS # noqa: F401
711
from .xcoms import XCOMS, xcom_pull # noqa: F401

dags/enrich/config/cohorts.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,5 @@ class COHORTS:
1010
CLOSED_NOT_REPLACED = f"{CLOSED} 🔴 non remplacés"
1111
CLOSED_REP_OTHER_SIREN = f"{CLOSED} 🟡 remplacés par SIRET d'un autre SIREN"
1212
CLOSED_REP_SAME_SIREN = f"{CLOSED} 🟢 remplacés par SIRET du même SIREN"
13+
ACTEURS_VILLES_TYPO = "🌆 Changement de ville: 🟢 variation d'ortographe"
14+
ACTEURS_VILLES_NEW = "🌆 Changement de ville: 🟡 ancienne -> nouvelle"

dags/enrich/config/dbt.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,5 @@ class DBT:
1515
MARTS_ENRICH_AE_CLOSED_NOT_REPLACED: str = (
1616
"marts_enrich_acteurs_closed_suggest_not_replaced"
1717
)
18+
MARTS_ENRICH_ACTEURS_VILLES_TYPO: str = "marts_enrich_acteurs_villes_suggest_typo"
19+
MARTS_ENRICH_ACTEURS_VILLES_NEW: str = "marts_enrich_acteurs_villes_suggest_new"

dags/enrich/config/models.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,11 @@ class EnrichActeursClosedConfig(EnrichBaseConfig):
8282
)
8383

8484

85+
class EnrichActeursVillesConfig(EnrichBaseConfig):
86+
pass
87+
88+
8589
DAG_ID_TO_CONFIG_MODEL = {
8690
"enrich_acteurs_closed": EnrichActeursClosedConfig,
91+
"enrich_acteurs_villes": EnrichActeursVillesConfig,
8792
}

dags/enrich/config/tasks.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,7 @@ class TASKS:
2424
"enrich_acteurs_closed_suggestions_not_replaced"
2525
)
2626
ENRICH_DBT_MODELS_REFRESH: str = "enrich_dbt_models_refresh"
27+
28+
# Villes
29+
ENRICH_ACTEURS_VILLES_TYPO: str = "enrich_acteurs_villes_typo"
30+
ENRICH_ACTEURS_VILLES_NEW: str = "enrich_acteurs_villes_new"
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
"""DAG to anonymize QFDMO acteurs for RGPD"""
2+
3+
from airflow import DAG
4+
from enrich.config import (
5+
COHORTS,
6+
DBT,
7+
TASKS,
8+
EnrichActeursVillesConfig,
9+
)
10+
from enrich.tasks.airflow_logic.enrich_config_create_task import (
11+
enrich_config_create_task,
12+
)
13+
from enrich.tasks.airflow_logic.enrich_dbt_model_suggest_task import (
14+
enrich_dbt_model_suggest_task,
15+
)
16+
from enrich.tasks.airflow_logic.enrich_dbt_models_refresh_task import (
17+
enrich_dbt_models_refresh_task,
18+
)
19+
from shared.config import CATCHUPS, SCHEDULES, START_DATES, config_to_airflow_params
20+
21+
with DAG(
22+
dag_id="enrich_acteurs_villes",
23+
dag_display_name="🌆 Enrichir - Acteurs Villes",
24+
default_args={
25+
"owner": "airflow",
26+
"depends_on_past": False,
27+
"email_on_failure": False,
28+
"email_on_retry": False,
29+
"retries": 0,
30+
},
31+
description=("Un DAG pour anonymiser les acteurs vs. RGPD"),
32+
tags=["annuaire", "entreprises", "ae", "rgpd", "acteurs", "juridique"],
33+
schedule=SCHEDULES.NONE,
34+
catchup=CATCHUPS.AWLAYS_FALSE,
35+
start_date=START_DATES.YESTERDAY,
36+
params=config_to_airflow_params(
37+
EnrichActeursVillesConfig(
38+
dbt_models_refresh=True,
39+
dbt_models_refresh_command=(
40+
"dbt build --select tag:marts,tag:enrich,tag:villes"
41+
),
42+
)
43+
),
44+
) as dag:
45+
# Instantiation
46+
config = enrich_config_create_task(dag)
47+
dbt_refresh = enrich_dbt_models_refresh_task(dag)
48+
suggest_typo = enrich_dbt_model_suggest_task(
49+
dag,
50+
task_id=TASKS.ENRICH_ACTEURS_VILLES_TYPO,
51+
cohort=COHORTS.ACTEURS_VILLES_TYPO,
52+
dbt_model_name=DBT.MARTS_ENRICH_ACTEURS_VILLES_TYPO,
53+
)
54+
suggest_new = enrich_dbt_model_suggest_task(
55+
dag,
56+
task_id=TASKS.ENRICH_ACTEURS_VILLES_NEW,
57+
cohort=COHORTS.ACTEURS_VILLES_NEW,
58+
dbt_model_name=DBT.MARTS_ENRICH_ACTEURS_VILLES_NEW,
59+
)
60+
config >> dbt_refresh # type: ignore
61+
dbt_refresh >> suggest_typo # type: ignore
62+
dbt_refresh >> suggest_new # type: ignore

dags/enrich/tasks/business_logic/enrich_dbt_model_to_suggestions.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,29 @@ def changes_prepare(
3232
).model_dump()
3333

3434

35+
def changes_prepare_villes(row: dict) -> list[dict]:
36+
"""Prepare suggestions for villes cohorts"""
37+
from data.models.changes import ChangeActeurUpdateData
38+
39+
changes = []
40+
model_params = {
41+
"id": row[COLS.ACTEUR_ID],
42+
"data": {
43+
"ville": row[COLS.SUGGEST_VILLE],
44+
},
45+
}
46+
changes.append(
47+
changes_prepare(
48+
model=ChangeActeurUpdateData,
49+
model_params=model_params,
50+
order=1,
51+
reason="On fait confiance à la BAN",
52+
entity_type="acteur_displayed",
53+
)
54+
)
55+
return changes
56+
57+
3558
def changes_prepare_closed_not_replaced(
3659
row: dict,
3760
) -> tuple[list[dict], dict]:
@@ -186,6 +209,8 @@ def enrich_dbt_model_to_suggestions(
186209
COHORTS.CLOSED_NOT_REPLACED: SuggestionAction.ENRICH_ACTEURS_CLOSED,
187210
COHORTS.CLOSED_REP_OTHER_SIREN: SuggestionAction.ENRICH_ACTEURS_CLOSED,
188211
COHORTS.CLOSED_REP_SAME_SIREN: SuggestionAction.ENRICH_ACTEURS_CLOSED,
212+
COHORTS.ACTEURS_VILLES_TYPO: SuggestionAction.ENRICH_ACTEURS_VILLES_TYPO,
213+
COHORTS.ACTEURS_VILLES_NEW: SuggestionAction.ENRICH_ACTEURS_VILLES_NEW,
189214
}
190215

191216
# Validation

data/models/suggestion.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,14 @@ class SuggestionCohorteStatut(models.TextChoices):
5555
class SuggestionAction(models.TextChoices):
5656
CRAWL_URLS = SUGGESTION_CRAWL_URLS, "🔗 URLs scannées"
5757
ENRICH_ACTEURS_CLOSED = "ENRICH_ACTEURS_CLOSED", "🚪 Acteurs fermés"
58+
ENRICH_ACTEURS_VILLES_TYPO = (
59+
"ENRICH_ACTEURS_VILLES_TYPO",
60+
"🏙️ Acteurs villes typographiques",
61+
)
62+
ENRICH_ACTEURS_VILLES_NEW = (
63+
"ENRICH_ACTEURS_VILLES_NEW",
64+
"🏙️ Acteurs villes nouvelles",
65+
)
5866
CLUSTERING = SUGGESTION_CLUSTERING, "regroupement/déduplication des acteurs"
5967
SOURCE_AJOUT = (
6068
SUGGESTION_SOURCE_AJOUT,

0 commit comments

Comments
 (0)