Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
943ee94
RGPD: anonymiser noms acteurs: init commit
maxcorbeau Mar 12, 2025
eac97dc
ajout template django data
maxcorbeau Mar 12, 2025
3a11ae5
migration Django data
maxcorbeau Mar 12, 2025
03b13a6
refacto dags/rgpd -> dags/enrich, utilisation dbt
maxcorbeau Mar 19, 2025
e6528f8
ajout nouvelle config & test rgpd
maxcorbeau Mar 19, 2025
6b5b3df
modèle RGPD, tests & migration
maxcorbeau Mar 20, 2025
61a2d7d
v1 qui fonctionne
maxcorbeau Mar 24, 2025
94405be
début refacto pour factoriser RGPD + fermetures
maxcorbeau Mar 31, 2025
92123d0
début refacto et progrès vers décision métier
maxcorbeau Apr 3, 2025
f1d4915
utilisation constantes de config dans DAG
maxcorbeau Apr 7, 2025
a37de14
suggestions: début de création
maxcorbeau Apr 7, 2025
bc34343
suggestions: presque fonctionnel
maxcorbeau Apr 7, 2025
3622d36
suggestions: tests qui fonctionnent
maxcorbeau Apr 7, 2025
126498d
refacto du DAG avec dbt & suggestions
maxcorbeau Apr 9, 2025
abc46b0
dbt: nettoyage & sampling
maxcorbeau Apr 9, 2025
be74405
DAG & Admin UI fonctionnels
maxcorbeau Apr 10, 2025
82de0f6
create_as_child, numéro rue, UI
maxcorbeau Apr 10, 2025
3a1184f
refacto modèles & tests
maxcorbeau Apr 14, 2025
4731a3c
ajout tolérance échec
maxcorbeau Apr 14, 2025
d892112
udfs: norma exclusion petits mots, cleanup
maxcorbeau Apr 14, 2025
658f31f
fin de résolution de conflits
maxcorbeau Apr 16, 2025
df8a074
suppression code RGPD
maxcorbeau Apr 16, 2025
38036eb
cont. suppression RGPD & move data_reconstruct
maxcorbeau Apr 16, 2025
524821e
recréer migration django + fix imports cassés via rebase
maxcorbeau Apr 16, 2025
135305e
fix imports + data_serialize en doublon
maxcorbeau Apr 16, 2025
6c3f754
cont. del rgpd, fix acteurs model & tests
maxcorbeau Apr 16, 2025
ff9f58b
suppresion migration RGPD
maxcorbeau Apr 16, 2025
390b480
suppression des prints
maxcorbeau Apr 16, 2025
fef7876
drop changes in restore script
maxcorbeau Apr 16, 2025
8c0cbaf
cont. fix script import
maxcorbeau Apr 16, 2025
3139b7a
fix typo sur dbt_models_refresh_command
maxcorbeau Apr 16, 2025
a5f3627
renommage replace -> suggest
maxcorbeau Apr 21, 2025
6e8b4aa
cohorte: simplification, label uniquement
maxcorbeau Apr 21, 2025
a476233
fix migration after rebase
maxcorbeau Apr 21, 2025
6251333
gestion contexte + del marts villes
maxcorbeau Apr 21, 2025
1e82cc7
gestion contexte
maxcorbeau Apr 21, 2025
a13e3f7
renommage fichier test
maxcorbeau Apr 21, 2025
0d74a1e
regroupement logique code
maxcorbeau Apr 21, 2025
040ce57
automatiser row -> suggest data
maxcorbeau Apr 21, 2025
6c77d16
profiter pour corriger les siren
maxcorbeau Apr 21, 2025
035c6d1
commande dbt au mauvais endroit
maxcorbeau Apr 21, 2025
cd13fb4
renommage/déplacement row to data
maxcorbeau Apr 21, 2025
eeb4b50
migration: suppresion RGPD à venir via autre PR
maxcorbeau Apr 23, 2025
385ff09
DAG de rafraichissement
maxcorbeau Apr 23, 2025
0773bc6
spécifier les modèles dbt
maxcorbeau Apr 23, 2025
6a689c7
changer param dag en liste
maxcorbeau Apr 23, 2025
63225b0
éviter problème dépendances
maxcorbeau Apr 23, 2025
60b373f
aider le linter
maxcorbeau Apr 23, 2025
9346c6f
fix nouvelle cmd pas utilisée
maxcorbeau Apr 23, 2025
47c6c5c
1 cmd en échec bloque pas les autres
maxcorbeau Apr 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added 1
Empty file.
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
import logging
from typing import Any

import pandas as pd
from cluster.config.constants import COL_PARENT_DATA_NEW, FIELDS_PARENT_DATA_EXCLUDED
from django.forms.models import model_to_dict
from rich import print
from utils.django import django_setup_full

django_setup_full()

from data.models.change import COL_CHANGE_MODEL_NAME # noqa: E402
from data.models.changes import ( # noqa: E402
ChangeActeurCreateAsParent,
ChangeActeurKeepAsParent,
)
from qfdmo.models.acteur import Acteur, DisplayedActeur, RevisionActeur # noqa: E402

logger = logging.getLogger(__name__)


def fields_to_include_clean(
fields_to_include: list[str],
Expand Down Expand Up @@ -63,7 +66,7 @@ def field_pick_value(
"""
return value
except Exception as e:
print(f"Invalid value for field {field}: {value}: {e}")
logging.error(f"Invalid value for field {field}: {value}: {e}")
pass
return None

Expand Down
11 changes: 11 additions & 0 deletions dags/enrich/config/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from .cohorts import COHORTS # noqa: F401
from .columns import COLS, SUGGEST_PREFIX # noqa: F401
from .dbt import DBT # noqa: F401
from .models import ( # noqa: F401
DAG_ID_TO_CONFIG_MODEL,
EnrichActeursClosedConfig,
EnrichDbtModelsRefreshConfig,
)
from .paths import DIR_SQL_READ # noqa: F401
from .tasks import TASKS # noqa: F401
from .xcoms import XCOMS, xcom_pull # noqa: F401
12 changes: 12 additions & 0 deletions dags/enrich/config/cohorts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""Cohorts for enrich DAGs"""

from dataclasses import dataclass

CLOSED = "🚪 Acteurs Fermés:"


@dataclass(frozen=True)
class COHORTS:
CLOSED_NOT_REPLACED = f"{CLOSED} 🔴 non remplacés"
CLOSED_REP_OTHER_SIREN = f"{CLOSED} 🟡 remplacés par SIRET d'un autre SIREN"
CLOSED_REP_SAME_SIREN = f"{CLOSED} 🟢 remplacés par SIRET du même SIREN"
48 changes: 48 additions & 0 deletions dags/enrich/config/columns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""Column names enrichment DAGs"""

from dataclasses import dataclass

# All values we want to suggest via our enrichment DBT models
# should start with this prefix
SUGGEST_PREFIX = "suggest"


@dataclass(frozen=True)
class COLS:

# Acteurs
ACTEUR_ID: str = "acteur_id"
ACTEUR_TYPE_ID: str = "acteur_type_id"
ACTEUR_TYPE_CODE: str = "acteur_type_code"
ACTEUR_SOURCE_ID: str = "acteur_source_id"
ACTEUR_SOURCE_CODE: str = "acteur_source_code"
ACTEUR_SIRET: str = "acteur_siret"
ACTEUR_NOM: str = "acteur_nom"
ACTEUR_NOMS_ORIGINE: str = "acteur_noms_origine"
ACTEUR_NOMS_NORMALISES: str = "acteur_noms_normalises"
ACTEUR_COMMENTAIRES: str = "acteur_commentaires"
ACTEUR_ADRESSE: str = "acteur_adresse"
ACTEUR_CODE_POSTAL: str = "acteur_code_postal"
ACTEUR_VILLE: str = "acteur_ville"
ACTEUR_NAF: str = "acteur_naf"
ACTEUR_LONGITUDE: str = "acteur_longitude"
ACTEUR_LATITUDE: str = "acteur_latitude"

# Annuaire Entreprise
AE_DIRIGEANTS_NOMS: str = "ae_dirigeants_noms_prenoms"

# Suggestions
SUGGEST_COHORT: str = f"{SUGGEST_PREFIX}_cohort"
SUGGEST_SIRET: str = f"{SUGGEST_PREFIX}_siret"
SUGGEST_SIREN: str = f"{SUGGEST_PREFIX}_siren"
SUGGEST_NOM: str = f"{SUGGEST_PREFIX}_nom"
SUGGEST_ADRESSE: str = f"{SUGGEST_PREFIX}_adresse"
SUGGEST_CODE_POSTAL: str = f"{SUGGEST_PREFIX}_code_postal"
SUGGEST_VILLE: str = f"{SUGGEST_PREFIX}_ville"
SUGGEST_NAF: str = f"{SUGGEST_PREFIX}_naf_principal"
SUGGEST_LONGITUDE: str = f"{SUGGEST_PREFIX}_longitude"
SUGGEST_LATITUDE: str = f"{SUGGEST_PREFIX}_latitude"
SUGGEST_ACTEUR_TYPE_ID: str = f"{SUGGEST_PREFIX}_acteur_type_id"
# Matching
MATCH_WORDS: str = "match_words"
MATCH_SCORE: str = "match_score"
17 changes: 17 additions & 0 deletions dags/enrich/config/dbt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""DBT models used in the enrich DAGs"""

from dataclasses import dataclass


@dataclass(frozen=True)
class DBT:
MARTS_ENRICH_AE_CLOSED_CANDIDATES: str = "marts_enrich_acteurs_closed_candidates"
MARTS_ENRICH_AE_CLOSED_REPLACED_SAME_SIREN: str = (
"marts_enrich_acteurs_closed_suggest_replaced_same_siren"
)
MARTS_ENRICH_AE_CLOSED_REPLACED_OTHER_SIREN: str = (
"marts_enrich_acteurs_closed_suggest_replaced_other_siren"
)
MARTS_ENRICH_AE_CLOSED_NOT_REPLACED: str = (
"marts_enrich_acteurs_closed_suggest_not_replaced"
)
94 changes: 94 additions & 0 deletions dags/enrich/config/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""Configuration models enrich DAG"""

import re
from typing import Optional

from pydantic import BaseModel, Field, computed_field

SEPARATOR_FILTER_FIELD = "__"


def filters_get(model: BaseModel, prefix: str, operator: str) -> list[dict[str, str]]:
"""Utility to get list of filters (field, value) to apply to the data,
used 2 ways:
- generate the Airflow params for the UI from field names only
- read Airflow params to generate filters with values

Thus we have a dynamic Airflow UI controlled by and always aligned with
our config model by only maintaining the latter.
"""
filters = []
for field in model.model_fields:
value = getattr(model, field)
if re.fullmatch(f"{prefix}{SEPARATOR_FILTER_FIELD}[a-z_]+", field):

# Skipping None if it's not exclitely is_null operator
if value is None and operator != "is_null":
continue

filters.append(
{
"field": field.replace(f"{prefix}{SEPARATOR_FILTER_FIELD}", ""),
"operator": operator,
"value": value,
}
)
return filters


class EnrichBaseConfig(BaseModel):
dry_run: bool = Field(
default=True,
description="🚱 Si coché, aucune tâche d'écriture ne sera effectuée",
)
dbt_models_refresh: bool = Field(
default=True,
description="""🔄 Si coché, les modèles DBT seront rafraîchis.
🔴 Désactiver uniquement pour des tests.""",
)
dbt_models_refresh_command: str = Field(
default="",
description="🔄 Commande DBT à exécuter pour rafraîchir les modèles",
)
filter_contains__acteur_commentaires: Optional[str] = Field(
default=None,
description="🔍 Filtre sur **acteur_commentaires**",
)
filter_contains__acteur_nom: Optional[str] = Field(
default=None,
description="🔍 Filtre sur **acteur_nom**",
)
filter_equals__acteur_statut: Optional[str] = Field(
default=None,
description="🔍 Filtre sur **acteur_statut**",
)

def filters_contains(self) -> list[dict[str, str]]:
return filters_get(self, "filter_contains", "contains")

def filters_equals(self) -> list[dict[str, str]]:
return filters_get(self, "filter_equals", "equals")

@computed_field
@property
def filters(self) -> list[dict[str, str]]:
return self.filters_contains() + self.filters_equals()


class EnrichActeursClosedConfig(EnrichBaseConfig):
filter_contains__etab_naf: Optional[str] = Field(
default=None,
description="🔍 Filtre sur **NAF AE Etablissement**",
)


class EnrichDbtModelsRefreshConfig(BaseModel):
dbt_models_refresh_commands: list[str] = Field(
default=[],
description="🔄 Liste de commandes DBT à exécuter pour rafraîchir les modèles",
)


DAG_ID_TO_CONFIG_MODEL = {
"enrich_acteurs_closed": EnrichActeursClosedConfig,
}
6 changes: 6 additions & 0 deletions dags/enrich/config/paths.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""Constants for file paths"""

from pathlib import Path

DIR_CURRENT = Path(__file__).resolve()
DIR_SQL_READ = DIR_CURRENT.parent / "sql" / "read"
26 changes: 26 additions & 0 deletions dags/enrich/config/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Task IDs for enrichment DAGs"""

from dataclasses import dataclass


@dataclass(frozen=True)
class TASKS:
# Config
CONFIG_CREATE: str = "enrich_config_create"

# Read tasks
ENRICH_CLOSED_REPLACED_SAME_SIREN: str = "enrich_acteurs_closed_replaced_same_siren"
ENRICH_CLOSED_REPLACED_OTHER_SIREN: str = (
"enrich_acteurs_closed_replaced_other_siren"
)
ENRICH_CLOSED_NOT_REPLACED: str = "enrich_acteurs_closed_not_replaced"
ENRICH_CLOSED_SUGGESTIONS_SAME_SIREN: str = (
"enrich_acteurs_closed_suggestions_same_siren"
)
ENRICH_CLOSED_SUGGESTIONS_OTHER_SIREN: str = (
"enrich_acteurs_closed_suggestions_other_siren"
)
ENRICH_CLOSED_SUGGESTIONS_NOT_REPLACED: str = (
"enrich_acteurs_closed_suggestions_not_replaced"
)
ENRICH_DBT_MODELS_REFRESH: str = "enrich_dbt_models_refresh"
62 changes: 62 additions & 0 deletions dags/enrich/config/xcoms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""Constants and helpers to configure XCom for Crawl DAG,
so we are more reliable & concise in our XCOM usage
(so easy to typo a key or pull from wrong task and Airflow
happily gives None without complaining)"""

from dataclasses import dataclass
from typing import Any

import pandas as pd
from airflow.exceptions import AirflowSkipException
from airflow.models.taskinstance import TaskInstance
from enrich.config.tasks import TASKS
from utils import logging_utils as log


@dataclass(frozen=True)
class XCOMS:
CONFIG: str = "config"
DF_READ: str = "df_read"
DF_MATCH: str = "df_match"

DF_CLOSED_REPLACED_SAME_SIREN: str = "df_acteurs_closed_replaced_same_siren"
DF_CLOSED_REPLACED_OTHER_SIREN: str = "df_acteurs_closed_replaced_other_siren"
DF_CLOSED_NOT_REPLACED: str = "df_acteurs_closed_not_replaced"


def xcom_pull(ti: TaskInstance, key: str, skip_if_empty: bool = False) -> Any:
"""For pulls, we create a helper to constrain keys
to specific task ids to guarantee consistent pulls"""

# Init
msg = f"XCOM from {ti.task_id=} pulling {key=}:" # For logging

# Reading values
if key == XCOMS.CONFIG:
value = ti.xcom_pull(key=key, task_ids=TASKS.CONFIG_CREATE)
elif key == XCOMS.DF_CLOSED_REPLACED_SAME_SIREN:
value = ti.xcom_pull(key=key, task_ids=TASKS.ENRICH_CLOSED_REPLACED_SAME_SIREN)
elif key == XCOMS.DF_CLOSED_REPLACED_OTHER_SIREN:
value = ti.xcom_pull(key=key, task_ids=TASKS.ENRICH_CLOSED_REPLACED_OTHER_SIREN)
elif key == XCOMS.DF_CLOSED_NOT_REPLACED:
value = ti.xcom_pull(key=key, task_ids=TASKS.ENRICH_CLOSED_NOT_REPLACED)
else:
raise ValueError(f"{msg} key inconnue")

# Skip if empty
if skip_if_empty and (
value is None or (isinstance(value, pd.DataFrame) and value.empty)
):
raise AirflowSkipException(f"✋ {msg} est vide, on s'arrête là")

# Logging
log.preview(f"{msg} value = ", value)

return value


# We don't have an helper for xcom_push because
# it can be done via the TaskInstance easily
# as ti.xcom_push(key=..., value=...)
# and we don't neet to align keys with task ids
# (task id is automatically that of the pushing task)
78 changes: 78 additions & 0 deletions dags/enrich/dags/enrich_acteurs_closed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""
DAG to anonymize QFDMO acteur which names
contains people from Annuaire Entreprise (AE)
"""

from airflow import DAG
from enrich.config import (
COHORTS,
DBT,
TASKS,
EnrichActeursClosedConfig,
)
from enrich.tasks.airflow_logic.enrich_config_create_task import (
enrich_config_create_task,
)
from enrich.tasks.airflow_logic.enrich_dbt_model_suggest_task import (
enrich_dbt_model_suggest_task,
)
from enrich.tasks.airflow_logic.enrich_dbt_models_refresh_task import (
enrich_dbt_models_refresh_task,
)
from shared.config import CATCHUPS, SCHEDULES, START_DATES, config_to_airflow_params

with DAG(
dag_id="enrich_acteurs_closed",
dag_display_name="🚪 Enrichir - Acteurs Fermés",
default_args={
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
},
description=(
"Un DAG pour détécter et remplacer les acteurs fermés"
"dans l'Annuaire Entreprises (AE)"
),
tags=["annuaire", "entreprises", "ae", "siren", "siret", "acteurs", "fermés"],
schedule=SCHEDULES.NONE,
catchup=CATCHUPS.AWLAYS_FALSE,
start_date=START_DATES.YESTERDAY,
params=config_to_airflow_params(
EnrichActeursClosedConfig(
dbt_models_refresh=False,
dbt_models_refresh_command=(
"dbt build --select tag:marts,tag:enrich,tag:closed"
),
filter_equals__acteur_statut="ACTIF",
)
),
) as dag:
# Instantiation
config = enrich_config_create_task(dag)
dbt_refresh = enrich_dbt_models_refresh_task(dag)
suggest_not_replaced = enrich_dbt_model_suggest_task(
dag,
task_id=TASKS.ENRICH_CLOSED_SUGGESTIONS_NOT_REPLACED,
cohort=COHORTS.CLOSED_NOT_REPLACED,
dbt_model_name=DBT.MARTS_ENRICH_AE_CLOSED_NOT_REPLACED,
)
suggest_other_siren = enrich_dbt_model_suggest_task(
dag,
task_id=TASKS.ENRICH_CLOSED_SUGGESTIONS_OTHER_SIREN,
cohort=COHORTS.CLOSED_REP_OTHER_SIREN,
dbt_model_name=DBT.MARTS_ENRICH_AE_CLOSED_REPLACED_OTHER_SIREN,
)
suggest_same_siren = enrich_dbt_model_suggest_task(
dag,
task_id=TASKS.ENRICH_CLOSED_SUGGESTIONS_SAME_SIREN,
cohort=COHORTS.CLOSED_REP_SAME_SIREN,
dbt_model_name=DBT.MARTS_ENRICH_AE_CLOSED_REPLACED_SAME_SIREN,
)

# Graph
config >> dbt_refresh # type: ignore
dbt_refresh >> suggest_not_replaced # type: ignore
dbt_refresh >> suggest_other_siren # type: ignore
dbt_refresh >> suggest_same_siren # type: ignore
Loading
Loading