-
Notifications
You must be signed in to change notification settings - Fork 5
🚪 [ENRICH] Cloturer acteurs cessés/fermés #1454
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
47 commits
Select commit
Hold shift + click to select a range
8e9caab
RGPD: anonymiser noms acteurs: init commit
maxcorbeau ec0bc59
ajout template django data
maxcorbeau b73955a
migration Django data
maxcorbeau d0ec335
refacto dags/rgpd -> dags/enrich, utilisation dbt
maxcorbeau 1832fdb
ajout nouvelle config & test rgpd
maxcorbeau e6479c3
modèle RGPD, tests & migration
maxcorbeau c34820e
v1 qui fonctionne
maxcorbeau c1397f4
début refacto pour factoriser RGPD + fermetures
maxcorbeau 9968ee4
début refacto et progrès vers décision métier
maxcorbeau 413519d
utilisation constantes de config dans DAG
maxcorbeau a9fb543
suggestions: début de création
maxcorbeau 2ee6a08
suggestions: presque fonctionnel
maxcorbeau 1471b22
suggestions: tests qui fonctionnent
maxcorbeau c658bf4
refacto du DAG avec dbt & suggestions
maxcorbeau 734f36d
dbt: nettoyage & sampling
maxcorbeau 6a8aaf1
DAG & Admin UI fonctionnels
maxcorbeau 15bbf59
create_as_child, numéro rue, UI
maxcorbeau 67b03c6
refacto modèles & tests
maxcorbeau 21c1041
ajout tolérance échec
maxcorbeau 7056baf
udfs: norma exclusion petits mots, cleanup
maxcorbeau ebe75e2
fin de résolution de conflits
maxcorbeau 06e82c5
suppression code RGPD
maxcorbeau 7be068d
cont. suppression RGPD & move data_reconstruct
maxcorbeau 0a2db94
recréer migration django + fix imports cassés via rebase
maxcorbeau 66557c7
fix imports + data_serialize en doublon
maxcorbeau fe05284
cont. del rgpd, fix acteurs model & tests
maxcorbeau d1e16ce
suppresion migration RGPD
maxcorbeau b055fe3
suppression des prints
maxcorbeau 8d5de0f
drop changes in restore script
maxcorbeau 420c12c
cont. fix script import
maxcorbeau 07164fa
fix typo sur dbt_models_refresh_command
maxcorbeau b29627c
renommage replace -> suggest
maxcorbeau 4672de9
cohorte: simplification, label uniquement
maxcorbeau f5d2aaa
fix migration after rebase
maxcorbeau b5157eb
gestion contexte + del marts villes
maxcorbeau 1b17955
gestion contexte
maxcorbeau 7d1f275
renommage fichier test
maxcorbeau 7b6e4bf
regroupement logique code
maxcorbeau a969210
automatiser row -> suggest data
maxcorbeau 161b721
profiter pour corriger les siren
maxcorbeau d924ef7
commande dbt au mauvais endroit
maxcorbeau 0516ebe
renommage/déplacement row to data
maxcorbeau 8b371ff
migration: suppresion RGPD à venir via autre PR
maxcorbeau b3a472b
rebase: déplace migrations de 148 à 150
maxcorbeau 26cdce1
suppression fichiers pas utilisés
maxcorbeau fff33a4
fix type sur encode_base57
maxcorbeau 947e530
del champs opt. _code (conflit compute_acteurs)
maxcorbeau File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
from .cohorts import COHORTS # noqa: F401 | ||
from .columns import COLS, SUGGEST_PREFIX # noqa: F401 | ||
from .dbt import DBT # noqa: F401 | ||
from .models import DAG_ID_TO_CONFIG_MODEL, EnrichActeursClosedConfig # noqa: F401 | ||
from .tasks import TASKS # noqa: F401 | ||
from .xcoms import XCOMS, xcom_pull # noqa: F401 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
"""Cohorts for enrich DAGs""" | ||
|
||
from dataclasses import dataclass | ||
|
||
|
||
@dataclass(frozen=True) | ||
class COHORTS: | ||
CLOSED_NOT_REPLACED = "🚪 Acteurs Fermés: 🔴 non remplacés" | ||
CLOSED_REP_OTHER_SIREN = ( | ||
"🚪 Acteurs Fermés: 🟡 remplacés par SIRET d'un autre SIREN" | ||
) | ||
CLOSED_REP_SAME_SIREN = "🚪 Acteurs Fermés: 🟢 remplacés par SIRET du même SIREN" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
"""Column names enrichment DAGs""" | ||
|
||
from dataclasses import dataclass | ||
|
||
# All values we want to suggest via our enrichment DBT models | ||
# should start with this prefix | ||
SUGGEST_PREFIX = "suggest" | ||
|
||
|
||
@dataclass(frozen=True) | ||
class COLS: | ||
|
||
# Acteurs | ||
ACTEUR_ID: str = "acteur_id" | ||
ACTEUR_TYPE_ID: str = "acteur_type_id" | ||
ACTEUR_TYPE_CODE: str = "acteur_type_code" | ||
ACTEUR_SOURCE_ID: str = "acteur_source_id" | ||
ACTEUR_SOURCE_CODE: str = "acteur_source_code" | ||
ACTEUR_SIRET: str = "acteur_siret" | ||
ACTEUR_NOM: str = "acteur_nom" | ||
ACTEUR_NOMS_ORIGINE: str = "acteur_noms_origine" | ||
ACTEUR_NOMS_NORMALISES: str = "acteur_noms_normalises" | ||
ACTEUR_COMMENTAIRES: str = "acteur_commentaires" | ||
ACTEUR_ADRESSE: str = "acteur_adresse" | ||
ACTEUR_CODE_POSTAL: str = "acteur_code_postal" | ||
ACTEUR_VILLE: str = "acteur_ville" | ||
ACTEUR_NAF: str = "acteur_naf" | ||
ACTEUR_LONGITUDE: str = "acteur_longitude" | ||
ACTEUR_LATITUDE: str = "acteur_latitude" | ||
|
||
# Annuaire Entreprise | ||
AE_DIRIGEANTS_NOMS: str = "ae_dirigeants_noms_prenoms" | ||
|
||
# Suggestions | ||
SUGGEST_COHORT: str = f"{SUGGEST_PREFIX}_cohort" | ||
SUGGEST_SIRET: str = f"{SUGGEST_PREFIX}_siret" | ||
SUGGEST_SIREN: str = f"{SUGGEST_PREFIX}_siren" | ||
SUGGEST_NOM: str = f"{SUGGEST_PREFIX}_nom" | ||
SUGGEST_ADRESSE: str = f"{SUGGEST_PREFIX}_adresse" | ||
SUGGEST_CODE_POSTAL: str = f"{SUGGEST_PREFIX}_code_postal" | ||
SUGGEST_VILLE: str = f"{SUGGEST_PREFIX}_ville" | ||
SUGGEST_NAF: str = f"{SUGGEST_PREFIX}_naf_principal" | ||
SUGGEST_LONGITUDE: str = f"{SUGGEST_PREFIX}_longitude" | ||
SUGGEST_LATITUDE: str = f"{SUGGEST_PREFIX}_latitude" | ||
SUGGEST_ACTEUR_TYPE_ID: str = f"{SUGGEST_PREFIX}_acteur_type_id" | ||
# Matching | ||
MATCH_WORDS: str = "match_words" | ||
MATCH_SCORE: str = "match_score" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
"""DBT models used in the enrich DAGs""" | ||
|
||
from dataclasses import dataclass | ||
|
||
|
||
@dataclass(frozen=True) | ||
class DBT: | ||
MARTS_ENRICH_AE_CLOSED_CANDIDATES: str = "marts_enrich_acteurs_closed_candidates" | ||
MARTS_ENRICH_AE_CLOSED_REPLACED_SAME_SIREN: str = ( | ||
"marts_enrich_acteurs_closed_suggest_replaced_same_siren" | ||
) | ||
MARTS_ENRICH_AE_CLOSED_REPLACED_OTHER_SIREN: str = ( | ||
"marts_enrich_acteurs_closed_suggest_replaced_other_siren" | ||
) | ||
MARTS_ENRICH_AE_CLOSED_NOT_REPLACED: str = ( | ||
"marts_enrich_acteurs_closed_suggest_not_replaced" | ||
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
"""Configuration models enrich DAG""" | ||
|
||
import re | ||
from typing import Optional | ||
|
||
from pydantic import BaseModel, Field, computed_field | ||
|
||
SEPARATOR_FILTER_FIELD = "__" | ||
|
||
|
||
def filters_get(model: BaseModel, prefix: str, operator: str) -> list[dict[str, str]]: | ||
"""Utility to get list of filters (field, value) to apply to the data, | ||
used 2 ways: | ||
- generate the Airflow params for the UI from field names only | ||
- read Airflow params to generate filters with values | ||
|
||
Thus we have a dynamic Airflow UI controlled by and always aligned with | ||
our config model by only maintaining the latter. | ||
""" | ||
filters = [] | ||
for field in model.model_fields: | ||
value = getattr(model, field) | ||
if re.fullmatch(f"{prefix}{SEPARATOR_FILTER_FIELD}[a-z_]+", field): | ||
|
||
# Skipping None if it's not exclitely is_null operator | ||
if value is None and operator != "is_null": | ||
continue | ||
|
||
filters.append( | ||
{ | ||
"field": field.replace(f"{prefix}{SEPARATOR_FILTER_FIELD}", ""), | ||
"operator": operator, | ||
"value": value, | ||
} | ||
) | ||
return filters | ||
|
||
|
||
class EnrichBaseConfig(BaseModel): | ||
dry_run: bool = Field( | ||
default=True, | ||
description="🚱 Si coché, aucune tâche d'écriture ne sera effectuée", | ||
) | ||
dbt_models_refresh: bool = Field( | ||
default=True, | ||
description="""🔄 Si coché, les modèles DBT seront rafraîchis. | ||
🔴 Désactiver uniquement pour des tests.""", | ||
) | ||
dbt_models_refresh_command: str = Field( | ||
default="", | ||
description="🔄 Commande DBT à exécuter pour rafraîchir les modèles", | ||
) | ||
filter_contains__acteur_commentaires: Optional[str] = Field( | ||
default=None, | ||
description="🔍 Filtre sur **acteur_commentaires**", | ||
) | ||
filter_contains__acteur_nom: Optional[str] = Field( | ||
default=None, | ||
description="🔍 Filtre sur **acteur_nom**", | ||
) | ||
filter_equals__acteur_statut: Optional[str] = Field( | ||
default=None, | ||
description="🔍 Filtre sur **acteur_statut**", | ||
) | ||
|
||
def filters_contains(self) -> list[dict[str, str]]: | ||
return filters_get(self, "filter_contains", "contains") | ||
|
||
def filters_equals(self) -> list[dict[str, str]]: | ||
return filters_get(self, "filter_equals", "equals") | ||
|
||
@computed_field | ||
@property | ||
def filters(self) -> list[dict[str, str]]: | ||
return self.filters_contains() + self.filters_equals() | ||
|
||
|
||
class EnrichActeursClosedConfig(EnrichBaseConfig): | ||
filter_contains__etab_naf: Optional[str] = Field( | ||
default=None, | ||
description="🔍 Filtre sur **NAF AE Etablissement**", | ||
) | ||
|
||
|
||
DAG_ID_TO_CONFIG_MODEL = { | ||
"enrich_acteurs_closed": EnrichActeursClosedConfig, | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
"""Task IDs for enrichment DAGs""" | ||
|
||
from dataclasses import dataclass | ||
|
||
|
||
@dataclass(frozen=True) | ||
class TASKS: | ||
# Config | ||
CONFIG_CREATE: str = "enrich_config_create" | ||
|
||
# Read tasks | ||
ENRICH_CLOSED_REPLACED_SAME_SIREN: str = "enrich_acteurs_closed_replaced_same_siren" | ||
ENRICH_CLOSED_REPLACED_OTHER_SIREN: str = ( | ||
"enrich_acteurs_closed_replaced_other_siren" | ||
) | ||
ENRICH_CLOSED_NOT_REPLACED: str = "enrich_acteurs_closed_not_replaced" | ||
ENRICH_CLOSED_SUGGESTIONS_SAME_SIREN: str = ( | ||
"enrich_acteurs_closed_suggestions_same_siren" | ||
) | ||
ENRICH_CLOSED_SUGGESTIONS_OTHER_SIREN: str = ( | ||
"enrich_acteurs_closed_suggestions_other_siren" | ||
) | ||
ENRICH_CLOSED_SUGGESTIONS_NOT_REPLACED: str = ( | ||
"enrich_acteurs_closed_suggestions_not_replaced" | ||
) | ||
ENRICH_DBT_MODELS_REFRESH: str = "enrich_dbt_models_refresh" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
"""Constants and helpers to configure XCom for Crawl DAG, | ||
so we are more reliable & concise in our XCOM usage | ||
(so easy to typo a key or pull from wrong task and Airflow | ||
happily gives None without complaining)""" | ||
|
||
from dataclasses import dataclass | ||
from typing import Any | ||
|
||
import pandas as pd | ||
from airflow.exceptions import AirflowSkipException | ||
from airflow.models.taskinstance import TaskInstance | ||
from enrich.config.tasks import TASKS | ||
from utils import logging_utils as log | ||
|
||
|
||
@dataclass(frozen=True) | ||
class XCOMS: | ||
CONFIG: str = "config" | ||
DF_READ: str = "df_read" | ||
DF_MATCH: str = "df_match" | ||
|
||
DF_CLOSED_REPLACED_SAME_SIREN: str = "df_acteurs_closed_replaced_same_siren" | ||
DF_CLOSED_REPLACED_OTHER_SIREN: str = "df_acteurs_closed_replaced_other_siren" | ||
DF_CLOSED_NOT_REPLACED: str = "df_acteurs_closed_not_replaced" | ||
|
||
|
||
def xcom_pull(ti: TaskInstance, key: str, skip_if_empty: bool = False) -> Any: | ||
"""For pulls, we create a helper to constrain keys | ||
to specific task ids to guarantee consistent pulls""" | ||
|
||
# Init | ||
msg = f"XCOM from {ti.task_id=} pulling {key=}:" # For logging | ||
|
||
# Reading values | ||
if key == XCOMS.CONFIG: | ||
value = ti.xcom_pull(key=key, task_ids=TASKS.CONFIG_CREATE) | ||
elif key == XCOMS.DF_CLOSED_REPLACED_SAME_SIREN: | ||
value = ti.xcom_pull(key=key, task_ids=TASKS.ENRICH_CLOSED_REPLACED_SAME_SIREN) | ||
elif key == XCOMS.DF_CLOSED_REPLACED_OTHER_SIREN: | ||
value = ti.xcom_pull(key=key, task_ids=TASKS.ENRICH_CLOSED_REPLACED_OTHER_SIREN) | ||
elif key == XCOMS.DF_CLOSED_NOT_REPLACED: | ||
value = ti.xcom_pull(key=key, task_ids=TASKS.ENRICH_CLOSED_NOT_REPLACED) | ||
else: | ||
raise ValueError(f"{msg} key inconnue") | ||
|
||
# Skip if empty | ||
if skip_if_empty and ( | ||
value is None or (isinstance(value, pd.DataFrame) and value.empty) | ||
): | ||
raise AirflowSkipException(f"✋ {msg} est vide, on s'arrête là") | ||
|
||
# Logging | ||
log.preview(f"{msg} value = ", value) | ||
|
||
return value | ||
|
||
|
||
# We don't have an helper for xcom_push because | ||
# it can be done via the TaskInstance easily | ||
# as ti.xcom_push(key=..., value=...) | ||
# and we don't neet to align keys with task ids | ||
# (task id is automatically that of the pushing task) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
""" | ||
DAG to anonymize QFDMO acteur which names | ||
contains people from Annuaire Entreprise (AE) | ||
""" | ||
|
||
from airflow import DAG | ||
from enrich.config import ( | ||
COHORTS, | ||
DBT, | ||
TASKS, | ||
EnrichActeursClosedConfig, | ||
) | ||
from enrich.tasks.airflow_logic.enrich_config_create_task import ( | ||
enrich_config_create_task, | ||
) | ||
from enrich.tasks.airflow_logic.enrich_dbt_model_suggest_task import ( | ||
enrich_dbt_model_suggest_task, | ||
) | ||
from enrich.tasks.airflow_logic.enrich_dbt_models_refresh_task import ( | ||
enrich_dbt_models_refresh_task, | ||
) | ||
from shared.config import CATCHUPS, SCHEDULES, START_DATES, config_to_airflow_params | ||
|
||
with DAG( | ||
dag_id="enrich_acteurs_closed", | ||
dag_display_name="🚪 Enrichir - Acteurs Fermés", | ||
default_args={ | ||
"owner": "airflow", | ||
"depends_on_past": False, | ||
"email_on_failure": False, | ||
"email_on_retry": False, | ||
"retries": 0, | ||
}, | ||
description=( | ||
"Un DAG pour détécter et remplacer les acteurs fermés" | ||
"dans l'Annuaire Entreprises (AE)" | ||
), | ||
tags=["annuaire", "entreprises", "ae", "siren", "siret", "acteurs", "fermés"], | ||
schedule=SCHEDULES.NONE, | ||
catchup=CATCHUPS.AWLAYS_FALSE, | ||
start_date=START_DATES.YESTERDAY, | ||
params=config_to_airflow_params( | ||
EnrichActeursClosedConfig( | ||
dbt_models_refresh=False, | ||
dbt_models_refresh_command=( | ||
"dbt build --select tag:marts,tag:enrich,tag:closed" | ||
), | ||
filter_equals__acteur_statut="ACTIF", | ||
) | ||
), | ||
) as dag: | ||
# Instantiation | ||
config = enrich_config_create_task(dag) | ||
dbt_refresh = enrich_dbt_models_refresh_task(dag) | ||
suggest_not_replaced = enrich_dbt_model_suggest_task( | ||
dag, | ||
task_id=TASKS.ENRICH_CLOSED_SUGGESTIONS_NOT_REPLACED, | ||
cohort=COHORTS.CLOSED_NOT_REPLACED, | ||
dbt_model_name=DBT.MARTS_ENRICH_AE_CLOSED_NOT_REPLACED, | ||
) | ||
suggest_other_siren = enrich_dbt_model_suggest_task( | ||
dag, | ||
task_id=TASKS.ENRICH_CLOSED_SUGGESTIONS_OTHER_SIREN, | ||
cohort=COHORTS.CLOSED_REP_OTHER_SIREN, | ||
dbt_model_name=DBT.MARTS_ENRICH_AE_CLOSED_REPLACED_OTHER_SIREN, | ||
) | ||
suggest_same_siren = enrich_dbt_model_suggest_task( | ||
dag, | ||
task_id=TASKS.ENRICH_CLOSED_SUGGESTIONS_SAME_SIREN, | ||
cohort=COHORTS.CLOSED_REP_SAME_SIREN, | ||
dbt_model_name=DBT.MARTS_ENRICH_AE_CLOSED_REPLACED_SAME_SIREN, | ||
) | ||
|
||
# Graph | ||
config >> dbt_refresh # type: ignore | ||
dbt_refresh >> suggest_not_replaced # type: ignore | ||
dbt_refresh >> suggest_other_siren # type: ignore | ||
dbt_refresh >> suggest_same_siren # type: ignore |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.