Skip to content

Commit 04f1a0a

Browse files
authored
🚪 [ENRICH] Cloturer acteurs cessés/fermés (#1454)
* RGPD: anonymiser noms acteurs: init commit * ajout template django data * migration Django data * refacto dags/rgpd -> dags/enrich, utilisation dbt * ajout nouvelle config & test rgpd * modèle RGPD, tests & migration * v1 qui fonctionne * début refacto pour factoriser RGPD + fermetures * début refacto et progrès vers décision métier * utilisation constantes de config dans DAG * suggestions: début de création * suggestions: presque fonctionnel * suggestions: tests qui fonctionnent * refacto du DAG avec dbt & suggestions * dbt: nettoyage & sampling * DAG & Admin UI fonctionnels * create_as_child, numéro rue, UI * refacto modèles & tests * ajout tolérance échec * udfs: norma exclusion petits mots, cleanup * fin de résolution de conflits * suppression code RGPD * cont. suppression RGPD & move data_reconstruct * recréer migration django + fix imports cassés via rebase * fix imports + data_serialize en doublon * cont. del rgpd, fix acteurs model & tests * suppresion migration RGPD * suppression des prints * drop changes in restore script * cont. fix script import * fix typo sur dbt_models_refresh_command * renommage replace -> suggest * cohorte: simplification, label uniquement * fix migration after rebase * gestion contexte + del marts villes * gestion contexte * renommage fichier test * regroupement logique code * automatiser row -> suggest data * profiter pour corriger les siren * commande dbt au mauvais endroit * renommage/déplacement row to data * migration: suppresion RGPD à venir via autre PR * rebase: déplace migrations de 148 à 150 * suppression fichiers pas utilisés * fix type sur encode_base57 * del champs opt. _code (conflit compute_acteurs)
1 parent 1f17c1d commit 04f1a0a

File tree

74 files changed

+2293
-250
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+2293
-250
lines changed

dags/cluster/tasks/business_logic/cluster_acteurs_parents_choose_data.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,22 @@
1+
import logging
12
from typing import Any
23

34
import pandas as pd
45
from cluster.config.constants import COL_PARENT_DATA_NEW, FIELDS_PARENT_DATA_EXCLUDED
56
from django.forms.models import model_to_dict
6-
from rich import print
77
from utils.django import django_setup_full
88

99
django_setup_full()
10+
1011
from data.models.change import COL_CHANGE_MODEL_NAME # noqa: E402
1112
from data.models.changes import ( # noqa: E402
1213
ChangeActeurCreateAsParent,
1314
ChangeActeurKeepAsParent,
1415
)
1516
from qfdmo.models.acteur import Acteur, DisplayedActeur, RevisionActeur # noqa: E402
1617

18+
logger = logging.getLogger(__name__)
19+
1720

1821
def fields_to_include_clean(
1922
fields_to_include: list[str],
@@ -63,7 +66,7 @@ def field_pick_value(
6366
"""
6467
return value
6568
except Exception as e:
66-
print(f"Invalid value for field {field}: {value}: {e}")
69+
logging.error(f"Invalid value for field {field}: {value}: {e}")
6770
pass
6871
return None
6972

dags/enrich/config/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from .cohorts import COHORTS # noqa: F401
2+
from .columns import COLS, SUGGEST_PREFIX # noqa: F401
3+
from .dbt import DBT # noqa: F401
4+
from .models import DAG_ID_TO_CONFIG_MODEL, EnrichActeursClosedConfig # noqa: F401
5+
from .tasks import TASKS # noqa: F401
6+
from .xcoms import XCOMS, xcom_pull # noqa: F401

dags/enrich/config/cohorts.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
"""Cohorts for enrich DAGs"""
2+
3+
from dataclasses import dataclass
4+
5+
6+
@dataclass(frozen=True)
7+
class COHORTS:
8+
CLOSED_NOT_REPLACED = "🚪 Acteurs Fermés: 🔴 non remplacés"
9+
CLOSED_REP_OTHER_SIREN = (
10+
"🚪 Acteurs Fermés: 🟡 remplacés par SIRET d'un autre SIREN"
11+
)
12+
CLOSED_REP_SAME_SIREN = "🚪 Acteurs Fermés: 🟢 remplacés par SIRET du même SIREN"

dags/enrich/config/columns.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
"""Column names enrichment DAGs"""
2+
3+
from dataclasses import dataclass
4+
5+
# All values we want to suggest via our enrichment DBT models
6+
# should start with this prefix
7+
SUGGEST_PREFIX = "suggest"
8+
9+
10+
@dataclass(frozen=True)
11+
class COLS:
12+
13+
# Acteurs
14+
ACTEUR_ID: str = "acteur_id"
15+
ACTEUR_TYPE_ID: str = "acteur_type_id"
16+
ACTEUR_TYPE_CODE: str = "acteur_type_code"
17+
ACTEUR_SOURCE_ID: str = "acteur_source_id"
18+
ACTEUR_SOURCE_CODE: str = "acteur_source_code"
19+
ACTEUR_SIRET: str = "acteur_siret"
20+
ACTEUR_NOM: str = "acteur_nom"
21+
ACTEUR_NOMS_ORIGINE: str = "acteur_noms_origine"
22+
ACTEUR_NOMS_NORMALISES: str = "acteur_noms_normalises"
23+
ACTEUR_COMMENTAIRES: str = "acteur_commentaires"
24+
ACTEUR_ADRESSE: str = "acteur_adresse"
25+
ACTEUR_CODE_POSTAL: str = "acteur_code_postal"
26+
ACTEUR_VILLE: str = "acteur_ville"
27+
ACTEUR_NAF: str = "acteur_naf"
28+
ACTEUR_LONGITUDE: str = "acteur_longitude"
29+
ACTEUR_LATITUDE: str = "acteur_latitude"
30+
31+
# Annuaire Entreprise
32+
AE_DIRIGEANTS_NOMS: str = "ae_dirigeants_noms_prenoms"
33+
34+
# Suggestions
35+
SUGGEST_COHORT: str = f"{SUGGEST_PREFIX}_cohort"
36+
SUGGEST_SIRET: str = f"{SUGGEST_PREFIX}_siret"
37+
SUGGEST_SIREN: str = f"{SUGGEST_PREFIX}_siren"
38+
SUGGEST_NOM: str = f"{SUGGEST_PREFIX}_nom"
39+
SUGGEST_ADRESSE: str = f"{SUGGEST_PREFIX}_adresse"
40+
SUGGEST_CODE_POSTAL: str = f"{SUGGEST_PREFIX}_code_postal"
41+
SUGGEST_VILLE: str = f"{SUGGEST_PREFIX}_ville"
42+
SUGGEST_NAF: str = f"{SUGGEST_PREFIX}_naf_principal"
43+
SUGGEST_LONGITUDE: str = f"{SUGGEST_PREFIX}_longitude"
44+
SUGGEST_LATITUDE: str = f"{SUGGEST_PREFIX}_latitude"
45+
SUGGEST_ACTEUR_TYPE_ID: str = f"{SUGGEST_PREFIX}_acteur_type_id"
46+
# Matching
47+
MATCH_WORDS: str = "match_words"
48+
MATCH_SCORE: str = "match_score"

dags/enrich/config/dbt.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
"""DBT models used in the enrich DAGs"""
2+
3+
from dataclasses import dataclass
4+
5+
6+
@dataclass(frozen=True)
7+
class DBT:
8+
MARTS_ENRICH_AE_CLOSED_CANDIDATES: str = "marts_enrich_acteurs_closed_candidates"
9+
MARTS_ENRICH_AE_CLOSED_REPLACED_SAME_SIREN: str = (
10+
"marts_enrich_acteurs_closed_suggest_replaced_same_siren"
11+
)
12+
MARTS_ENRICH_AE_CLOSED_REPLACED_OTHER_SIREN: str = (
13+
"marts_enrich_acteurs_closed_suggest_replaced_other_siren"
14+
)
15+
MARTS_ENRICH_AE_CLOSED_NOT_REPLACED: str = (
16+
"marts_enrich_acteurs_closed_suggest_not_replaced"
17+
)

dags/enrich/config/models.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
"""Configuration models enrich DAG"""
2+
3+
import re
4+
from typing import Optional
5+
6+
from pydantic import BaseModel, Field, computed_field
7+
8+
SEPARATOR_FILTER_FIELD = "__"
9+
10+
11+
def filters_get(model: BaseModel, prefix: str, operator: str) -> list[dict[str, str]]:
12+
"""Utility to get list of filters (field, value) to apply to the data,
13+
used 2 ways:
14+
- generate the Airflow params for the UI from field names only
15+
- read Airflow params to generate filters with values
16+
17+
Thus we have a dynamic Airflow UI controlled by and always aligned with
18+
our config model by only maintaining the latter.
19+
"""
20+
filters = []
21+
for field in model.model_fields:
22+
value = getattr(model, field)
23+
if re.fullmatch(f"{prefix}{SEPARATOR_FILTER_FIELD}[a-z_]+", field):
24+
25+
# Skipping None if it's not exclitely is_null operator
26+
if value is None and operator != "is_null":
27+
continue
28+
29+
filters.append(
30+
{
31+
"field": field.replace(f"{prefix}{SEPARATOR_FILTER_FIELD}", ""),
32+
"operator": operator,
33+
"value": value,
34+
}
35+
)
36+
return filters
37+
38+
39+
class EnrichBaseConfig(BaseModel):
40+
dry_run: bool = Field(
41+
default=True,
42+
description="🚱 Si coché, aucune tâche d'écriture ne sera effectuée",
43+
)
44+
dbt_models_refresh: bool = Field(
45+
default=True,
46+
description="""🔄 Si coché, les modèles DBT seront rafraîchis.
47+
🔴 Désactiver uniquement pour des tests.""",
48+
)
49+
dbt_models_refresh_command: str = Field(
50+
default="",
51+
description="🔄 Commande DBT à exécuter pour rafraîchir les modèles",
52+
)
53+
filter_contains__acteur_commentaires: Optional[str] = Field(
54+
default=None,
55+
description="🔍 Filtre sur **acteur_commentaires**",
56+
)
57+
filter_contains__acteur_nom: Optional[str] = Field(
58+
default=None,
59+
description="🔍 Filtre sur **acteur_nom**",
60+
)
61+
filter_equals__acteur_statut: Optional[str] = Field(
62+
default=None,
63+
description="🔍 Filtre sur **acteur_statut**",
64+
)
65+
66+
def filters_contains(self) -> list[dict[str, str]]:
67+
return filters_get(self, "filter_contains", "contains")
68+
69+
def filters_equals(self) -> list[dict[str, str]]:
70+
return filters_get(self, "filter_equals", "equals")
71+
72+
@computed_field
73+
@property
74+
def filters(self) -> list[dict[str, str]]:
75+
return self.filters_contains() + self.filters_equals()
76+
77+
78+
class EnrichActeursClosedConfig(EnrichBaseConfig):
79+
filter_contains__etab_naf: Optional[str] = Field(
80+
default=None,
81+
description="🔍 Filtre sur **NAF AE Etablissement**",
82+
)
83+
84+
85+
DAG_ID_TO_CONFIG_MODEL = {
86+
"enrich_acteurs_closed": EnrichActeursClosedConfig,
87+
}

dags/enrich/config/tasks.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
"""Task IDs for enrichment DAGs"""
2+
3+
from dataclasses import dataclass
4+
5+
6+
@dataclass(frozen=True)
7+
class TASKS:
8+
# Config
9+
CONFIG_CREATE: str = "enrich_config_create"
10+
11+
# Read tasks
12+
ENRICH_CLOSED_REPLACED_SAME_SIREN: str = "enrich_acteurs_closed_replaced_same_siren"
13+
ENRICH_CLOSED_REPLACED_OTHER_SIREN: str = (
14+
"enrich_acteurs_closed_replaced_other_siren"
15+
)
16+
ENRICH_CLOSED_NOT_REPLACED: str = "enrich_acteurs_closed_not_replaced"
17+
ENRICH_CLOSED_SUGGESTIONS_SAME_SIREN: str = (
18+
"enrich_acteurs_closed_suggestions_same_siren"
19+
)
20+
ENRICH_CLOSED_SUGGESTIONS_OTHER_SIREN: str = (
21+
"enrich_acteurs_closed_suggestions_other_siren"
22+
)
23+
ENRICH_CLOSED_SUGGESTIONS_NOT_REPLACED: str = (
24+
"enrich_acteurs_closed_suggestions_not_replaced"
25+
)
26+
ENRICH_DBT_MODELS_REFRESH: str = "enrich_dbt_models_refresh"

dags/enrich/config/xcoms.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
"""Constants and helpers to configure XCom for Crawl DAG,
2+
so we are more reliable & concise in our XCOM usage
3+
(so easy to typo a key or pull from wrong task and Airflow
4+
happily gives None without complaining)"""
5+
6+
from dataclasses import dataclass
7+
from typing import Any
8+
9+
import pandas as pd
10+
from airflow.exceptions import AirflowSkipException
11+
from airflow.models.taskinstance import TaskInstance
12+
from enrich.config.tasks import TASKS
13+
from utils import logging_utils as log
14+
15+
16+
@dataclass(frozen=True)
17+
class XCOMS:
18+
CONFIG: str = "config"
19+
DF_READ: str = "df_read"
20+
DF_MATCH: str = "df_match"
21+
22+
DF_CLOSED_REPLACED_SAME_SIREN: str = "df_acteurs_closed_replaced_same_siren"
23+
DF_CLOSED_REPLACED_OTHER_SIREN: str = "df_acteurs_closed_replaced_other_siren"
24+
DF_CLOSED_NOT_REPLACED: str = "df_acteurs_closed_not_replaced"
25+
26+
27+
def xcom_pull(ti: TaskInstance, key: str, skip_if_empty: bool = False) -> Any:
28+
"""For pulls, we create a helper to constrain keys
29+
to specific task ids to guarantee consistent pulls"""
30+
31+
# Init
32+
msg = f"XCOM from {ti.task_id=} pulling {key=}:" # For logging
33+
34+
# Reading values
35+
if key == XCOMS.CONFIG:
36+
value = ti.xcom_pull(key=key, task_ids=TASKS.CONFIG_CREATE)
37+
elif key == XCOMS.DF_CLOSED_REPLACED_SAME_SIREN:
38+
value = ti.xcom_pull(key=key, task_ids=TASKS.ENRICH_CLOSED_REPLACED_SAME_SIREN)
39+
elif key == XCOMS.DF_CLOSED_REPLACED_OTHER_SIREN:
40+
value = ti.xcom_pull(key=key, task_ids=TASKS.ENRICH_CLOSED_REPLACED_OTHER_SIREN)
41+
elif key == XCOMS.DF_CLOSED_NOT_REPLACED:
42+
value = ti.xcom_pull(key=key, task_ids=TASKS.ENRICH_CLOSED_NOT_REPLACED)
43+
else:
44+
raise ValueError(f"{msg} key inconnue")
45+
46+
# Skip if empty
47+
if skip_if_empty and (
48+
value is None or (isinstance(value, pd.DataFrame) and value.empty)
49+
):
50+
raise AirflowSkipException(f"✋ {msg} est vide, on s'arrête là")
51+
52+
# Logging
53+
log.preview(f"{msg} value = ", value)
54+
55+
return value
56+
57+
58+
# We don't have an helper for xcom_push because
59+
# it can be done via the TaskInstance easily
60+
# as ti.xcom_push(key=..., value=...)
61+
# and we don't neet to align keys with task ids
62+
# (task id is automatically that of the pushing task)
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
"""
2+
DAG to anonymize QFDMO acteur which names
3+
contains people from Annuaire Entreprise (AE)
4+
"""
5+
6+
from airflow import DAG
7+
from enrich.config import (
8+
COHORTS,
9+
DBT,
10+
TASKS,
11+
EnrichActeursClosedConfig,
12+
)
13+
from enrich.tasks.airflow_logic.enrich_config_create_task import (
14+
enrich_config_create_task,
15+
)
16+
from enrich.tasks.airflow_logic.enrich_dbt_model_suggest_task import (
17+
enrich_dbt_model_suggest_task,
18+
)
19+
from enrich.tasks.airflow_logic.enrich_dbt_models_refresh_task import (
20+
enrich_dbt_models_refresh_task,
21+
)
22+
from shared.config import CATCHUPS, SCHEDULES, START_DATES, config_to_airflow_params
23+
24+
with DAG(
25+
dag_id="enrich_acteurs_closed",
26+
dag_display_name="🚪 Enrichir - Acteurs Fermés",
27+
default_args={
28+
"owner": "airflow",
29+
"depends_on_past": False,
30+
"email_on_failure": False,
31+
"email_on_retry": False,
32+
"retries": 0,
33+
},
34+
description=(
35+
"Un DAG pour détécter et remplacer les acteurs fermés"
36+
"dans l'Annuaire Entreprises (AE)"
37+
),
38+
tags=["annuaire", "entreprises", "ae", "siren", "siret", "acteurs", "fermés"],
39+
schedule=SCHEDULES.NONE,
40+
catchup=CATCHUPS.AWLAYS_FALSE,
41+
start_date=START_DATES.YESTERDAY,
42+
params=config_to_airflow_params(
43+
EnrichActeursClosedConfig(
44+
dbt_models_refresh=False,
45+
dbt_models_refresh_command=(
46+
"dbt build --select tag:marts,tag:enrich,tag:closed"
47+
),
48+
filter_equals__acteur_statut="ACTIF",
49+
)
50+
),
51+
) as dag:
52+
# Instantiation
53+
config = enrich_config_create_task(dag)
54+
dbt_refresh = enrich_dbt_models_refresh_task(dag)
55+
suggest_not_replaced = enrich_dbt_model_suggest_task(
56+
dag,
57+
task_id=TASKS.ENRICH_CLOSED_SUGGESTIONS_NOT_REPLACED,
58+
cohort=COHORTS.CLOSED_NOT_REPLACED,
59+
dbt_model_name=DBT.MARTS_ENRICH_AE_CLOSED_NOT_REPLACED,
60+
)
61+
suggest_other_siren = enrich_dbt_model_suggest_task(
62+
dag,
63+
task_id=TASKS.ENRICH_CLOSED_SUGGESTIONS_OTHER_SIREN,
64+
cohort=COHORTS.CLOSED_REP_OTHER_SIREN,
65+
dbt_model_name=DBT.MARTS_ENRICH_AE_CLOSED_REPLACED_OTHER_SIREN,
66+
)
67+
suggest_same_siren = enrich_dbt_model_suggest_task(
68+
dag,
69+
task_id=TASKS.ENRICH_CLOSED_SUGGESTIONS_SAME_SIREN,
70+
cohort=COHORTS.CLOSED_REP_SAME_SIREN,
71+
dbt_model_name=DBT.MARTS_ENRICH_AE_CLOSED_REPLACED_SAME_SIREN,
72+
)
73+
74+
# Graph
75+
config >> dbt_refresh # type: ignore
76+
dbt_refresh >> suggest_not_replaced # type: ignore
77+
dbt_refresh >> suggest_other_siren # type: ignore
78+
dbt_refresh >> suggest_same_siren # type: ignore

0 commit comments

Comments
 (0)