diff --git a/dags/cluster/tasks/business_logic/cluster_acteurs_parents_choose_data.py b/dags/cluster/tasks/business_logic/cluster_acteurs_parents_choose_data.py index cc75f4aa7..430b29b27 100644 --- a/dags/cluster/tasks/business_logic/cluster_acteurs_parents_choose_data.py +++ b/dags/cluster/tasks/business_logic/cluster_acteurs_parents_choose_data.py @@ -1,12 +1,13 @@ +import logging from typing import Any import pandas as pd from cluster.config.constants import COL_PARENT_DATA_NEW, FIELDS_PARENT_DATA_EXCLUDED from django.forms.models import model_to_dict -from rich import print from utils.django import django_setup_full django_setup_full() + from data.models.change import COL_CHANGE_MODEL_NAME # noqa: E402 from data.models.changes import ( # noqa: E402 ChangeActeurCreateAsParent, @@ -14,6 +15,8 @@ ) from qfdmo.models.acteur import Acteur, DisplayedActeur, RevisionActeur # noqa: E402 +logger = logging.getLogger(__name__) + def fields_to_include_clean( fields_to_include: list[str], @@ -63,7 +66,7 @@ def field_pick_value( """ return value except Exception as e: - print(f"Invalid value for field {field}: {value}: {e}") + logging.error(f"Invalid value for field {field}: {value}: {e}") pass return None diff --git a/dags/enrich/config/__init__.py b/dags/enrich/config/__init__.py new file mode 100644 index 000000000..07d90673d --- /dev/null +++ b/dags/enrich/config/__init__.py @@ -0,0 +1,6 @@ +from .cohorts import COHORTS # noqa: F401 +from .columns import COLS, SUGGEST_PREFIX # noqa: F401 +from .dbt import DBT # noqa: F401 +from .models import DAG_ID_TO_CONFIG_MODEL, EnrichActeursClosedConfig # noqa: F401 +from .tasks import TASKS # noqa: F401 +from .xcoms import XCOMS, xcom_pull # noqa: F401 diff --git a/dags/enrich/config/cohorts.py b/dags/enrich/config/cohorts.py new file mode 100644 index 000000000..0cd206ecd --- /dev/null +++ b/dags/enrich/config/cohorts.py @@ -0,0 +1,12 @@ +"""Cohorts for enrich DAGs""" + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class COHORTS: + CLOSED_NOT_REPLACED = "🚪 Acteurs Fermés: 🔴 non remplacés" + CLOSED_REP_OTHER_SIREN = ( + "🚪 Acteurs Fermés: 🟡 remplacés par SIRET d'un autre SIREN" + ) + CLOSED_REP_SAME_SIREN = "🚪 Acteurs Fermés: 🟢 remplacés par SIRET du même SIREN" diff --git a/dags/enrich/config/columns.py b/dags/enrich/config/columns.py new file mode 100644 index 000000000..e9c48b924 --- /dev/null +++ b/dags/enrich/config/columns.py @@ -0,0 +1,48 @@ +"""Column names enrichment DAGs""" + +from dataclasses import dataclass + +# All values we want to suggest via our enrichment DBT models +# should start with this prefix +SUGGEST_PREFIX = "suggest" + + +@dataclass(frozen=True) +class COLS: + + # Acteurs + ACTEUR_ID: str = "acteur_id" + ACTEUR_TYPE_ID: str = "acteur_type_id" + ACTEUR_TYPE_CODE: str = "acteur_type_code" + ACTEUR_SOURCE_ID: str = "acteur_source_id" + ACTEUR_SOURCE_CODE: str = "acteur_source_code" + ACTEUR_SIRET: str = "acteur_siret" + ACTEUR_NOM: str = "acteur_nom" + ACTEUR_NOMS_ORIGINE: str = "acteur_noms_origine" + ACTEUR_NOMS_NORMALISES: str = "acteur_noms_normalises" + ACTEUR_COMMENTAIRES: str = "acteur_commentaires" + ACTEUR_ADRESSE: str = "acteur_adresse" + ACTEUR_CODE_POSTAL: str = "acteur_code_postal" + ACTEUR_VILLE: str = "acteur_ville" + ACTEUR_NAF: str = "acteur_naf" + ACTEUR_LONGITUDE: str = "acteur_longitude" + ACTEUR_LATITUDE: str = "acteur_latitude" + + # Annuaire Entreprise + AE_DIRIGEANTS_NOMS: str = "ae_dirigeants_noms_prenoms" + + # Suggestions + SUGGEST_COHORT: str = f"{SUGGEST_PREFIX}_cohort" + SUGGEST_SIRET: str = f"{SUGGEST_PREFIX}_siret" + SUGGEST_SIREN: str = f"{SUGGEST_PREFIX}_siren" + SUGGEST_NOM: str = f"{SUGGEST_PREFIX}_nom" + SUGGEST_ADRESSE: str = f"{SUGGEST_PREFIX}_adresse" + SUGGEST_CODE_POSTAL: str = f"{SUGGEST_PREFIX}_code_postal" + SUGGEST_VILLE: str = f"{SUGGEST_PREFIX}_ville" + SUGGEST_NAF: str = f"{SUGGEST_PREFIX}_naf_principal" + SUGGEST_LONGITUDE: str = f"{SUGGEST_PREFIX}_longitude" + SUGGEST_LATITUDE: str = f"{SUGGEST_PREFIX}_latitude" + SUGGEST_ACTEUR_TYPE_ID: str = f"{SUGGEST_PREFIX}_acteur_type_id" + # Matching + MATCH_WORDS: str = "match_words" + MATCH_SCORE: str = "match_score" diff --git a/dags/enrich/config/dbt.py b/dags/enrich/config/dbt.py new file mode 100644 index 000000000..44e3af97e --- /dev/null +++ b/dags/enrich/config/dbt.py @@ -0,0 +1,17 @@ +"""DBT models used in the enrich DAGs""" + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class DBT: + MARTS_ENRICH_AE_CLOSED_CANDIDATES: str = "marts_enrich_acteurs_closed_candidates" + MARTS_ENRICH_AE_CLOSED_REPLACED_SAME_SIREN: str = ( + "marts_enrich_acteurs_closed_suggest_replaced_same_siren" + ) + MARTS_ENRICH_AE_CLOSED_REPLACED_OTHER_SIREN: str = ( + "marts_enrich_acteurs_closed_suggest_replaced_other_siren" + ) + MARTS_ENRICH_AE_CLOSED_NOT_REPLACED: str = ( + "marts_enrich_acteurs_closed_suggest_not_replaced" + ) diff --git a/dags/enrich/config/models.py b/dags/enrich/config/models.py new file mode 100644 index 000000000..3ec5d7eab --- /dev/null +++ b/dags/enrich/config/models.py @@ -0,0 +1,87 @@ +"""Configuration models enrich DAG""" + +import re +from typing import Optional + +from pydantic import BaseModel, Field, computed_field + +SEPARATOR_FILTER_FIELD = "__" + + +def filters_get(model: BaseModel, prefix: str, operator: str) -> list[dict[str, str]]: + """Utility to get list of filters (field, value) to apply to the data, + used 2 ways: + - generate the Airflow params for the UI from field names only + - read Airflow params to generate filters with values + + Thus we have a dynamic Airflow UI controlled by and always aligned with + our config model by only maintaining the latter. + """ + filters = [] + for field in model.model_fields: + value = getattr(model, field) + if re.fullmatch(f"{prefix}{SEPARATOR_FILTER_FIELD}[a-z_]+", field): + + # Skipping None if it's not exclitely is_null operator + if value is None and operator != "is_null": + continue + + filters.append( + { + "field": field.replace(f"{prefix}{SEPARATOR_FILTER_FIELD}", ""), + "operator": operator, + "value": value, + } + ) + return filters + + +class EnrichBaseConfig(BaseModel): + dry_run: bool = Field( + default=True, + description="🚱 Si coché, aucune tâche d'écriture ne sera effectuée", + ) + dbt_models_refresh: bool = Field( + default=True, + description="""🔄 Si coché, les modèles DBT seront rafraîchis. + 🔴 Désactiver uniquement pour des tests.""", + ) + dbt_models_refresh_command: str = Field( + default="", + description="🔄 Commande DBT à exécuter pour rafraîchir les modèles", + ) + filter_contains__acteur_commentaires: Optional[str] = Field( + default=None, + description="🔍 Filtre sur **acteur_commentaires**", + ) + filter_contains__acteur_nom: Optional[str] = Field( + default=None, + description="🔍 Filtre sur **acteur_nom**", + ) + filter_equals__acteur_statut: Optional[str] = Field( + default=None, + description="🔍 Filtre sur **acteur_statut**", + ) + + def filters_contains(self) -> list[dict[str, str]]: + return filters_get(self, "filter_contains", "contains") + + def filters_equals(self) -> list[dict[str, str]]: + return filters_get(self, "filter_equals", "equals") + + @computed_field + @property + def filters(self) -> list[dict[str, str]]: + return self.filters_contains() + self.filters_equals() + + +class EnrichActeursClosedConfig(EnrichBaseConfig): + filter_contains__etab_naf: Optional[str] = Field( + default=None, + description="🔍 Filtre sur **NAF AE Etablissement**", + ) + + +DAG_ID_TO_CONFIG_MODEL = { + "enrich_acteurs_closed": EnrichActeursClosedConfig, +} diff --git a/dags/enrich/config/tasks.py b/dags/enrich/config/tasks.py new file mode 100644 index 000000000..a56767fa6 --- /dev/null +++ b/dags/enrich/config/tasks.py @@ -0,0 +1,26 @@ +"""Task IDs for enrichment DAGs""" + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class TASKS: + # Config + CONFIG_CREATE: str = "enrich_config_create" + + # Read tasks + ENRICH_CLOSED_REPLACED_SAME_SIREN: str = "enrich_acteurs_closed_replaced_same_siren" + ENRICH_CLOSED_REPLACED_OTHER_SIREN: str = ( + "enrich_acteurs_closed_replaced_other_siren" + ) + ENRICH_CLOSED_NOT_REPLACED: str = "enrich_acteurs_closed_not_replaced" + ENRICH_CLOSED_SUGGESTIONS_SAME_SIREN: str = ( + "enrich_acteurs_closed_suggestions_same_siren" + ) + ENRICH_CLOSED_SUGGESTIONS_OTHER_SIREN: str = ( + "enrich_acteurs_closed_suggestions_other_siren" + ) + ENRICH_CLOSED_SUGGESTIONS_NOT_REPLACED: str = ( + "enrich_acteurs_closed_suggestions_not_replaced" + ) + ENRICH_DBT_MODELS_REFRESH: str = "enrich_dbt_models_refresh" diff --git a/dags/enrich/config/xcoms.py b/dags/enrich/config/xcoms.py new file mode 100644 index 000000000..eb07c88c2 --- /dev/null +++ b/dags/enrich/config/xcoms.py @@ -0,0 +1,62 @@ +"""Constants and helpers to configure XCom for Crawl DAG, +so we are more reliable & concise in our XCOM usage +(so easy to typo a key or pull from wrong task and Airflow +happily gives None without complaining)""" + +from dataclasses import dataclass +from typing import Any + +import pandas as pd +from airflow.exceptions import AirflowSkipException +from airflow.models.taskinstance import TaskInstance +from enrich.config.tasks import TASKS +from utils import logging_utils as log + + +@dataclass(frozen=True) +class XCOMS: + CONFIG: str = "config" + DF_READ: str = "df_read" + DF_MATCH: str = "df_match" + + DF_CLOSED_REPLACED_SAME_SIREN: str = "df_acteurs_closed_replaced_same_siren" + DF_CLOSED_REPLACED_OTHER_SIREN: str = "df_acteurs_closed_replaced_other_siren" + DF_CLOSED_NOT_REPLACED: str = "df_acteurs_closed_not_replaced" + + +def xcom_pull(ti: TaskInstance, key: str, skip_if_empty: bool = False) -> Any: + """For pulls, we create a helper to constrain keys + to specific task ids to guarantee consistent pulls""" + + # Init + msg = f"XCOM from {ti.task_id=} pulling {key=}:" # For logging + + # Reading values + if key == XCOMS.CONFIG: + value = ti.xcom_pull(key=key, task_ids=TASKS.CONFIG_CREATE) + elif key == XCOMS.DF_CLOSED_REPLACED_SAME_SIREN: + value = ti.xcom_pull(key=key, task_ids=TASKS.ENRICH_CLOSED_REPLACED_SAME_SIREN) + elif key == XCOMS.DF_CLOSED_REPLACED_OTHER_SIREN: + value = ti.xcom_pull(key=key, task_ids=TASKS.ENRICH_CLOSED_REPLACED_OTHER_SIREN) + elif key == XCOMS.DF_CLOSED_NOT_REPLACED: + value = ti.xcom_pull(key=key, task_ids=TASKS.ENRICH_CLOSED_NOT_REPLACED) + else: + raise ValueError(f"{msg} key inconnue") + + # Skip if empty + if skip_if_empty and ( + value is None or (isinstance(value, pd.DataFrame) and value.empty) + ): + raise AirflowSkipException(f"✋ {msg} est vide, on s'arrête là") + + # Logging + log.preview(f"{msg} value = ", value) + + return value + + +# We don't have an helper for xcom_push because +# it can be done via the TaskInstance easily +# as ti.xcom_push(key=..., value=...) +# and we don't neet to align keys with task ids +# (task id is automatically that of the pushing task) diff --git a/dags/enrich/dags/enrich_acteurs_closed.py b/dags/enrich/dags/enrich_acteurs_closed.py new file mode 100644 index 000000000..bb806ae64 --- /dev/null +++ b/dags/enrich/dags/enrich_acteurs_closed.py @@ -0,0 +1,78 @@ +""" +DAG to anonymize QFDMO acteur which names +contains people from Annuaire Entreprise (AE) +""" + +from airflow import DAG +from enrich.config import ( + COHORTS, + DBT, + TASKS, + EnrichActeursClosedConfig, +) +from enrich.tasks.airflow_logic.enrich_config_create_task import ( + enrich_config_create_task, +) +from enrich.tasks.airflow_logic.enrich_dbt_model_suggest_task import ( + enrich_dbt_model_suggest_task, +) +from enrich.tasks.airflow_logic.enrich_dbt_models_refresh_task import ( + enrich_dbt_models_refresh_task, +) +from shared.config import CATCHUPS, SCHEDULES, START_DATES, config_to_airflow_params + +with DAG( + dag_id="enrich_acteurs_closed", + dag_display_name="🚪 Enrichir - Acteurs Fermés", + default_args={ + "owner": "airflow", + "depends_on_past": False, + "email_on_failure": False, + "email_on_retry": False, + "retries": 0, + }, + description=( + "Un DAG pour détécter et remplacer les acteurs fermés" + "dans l'Annuaire Entreprises (AE)" + ), + tags=["annuaire", "entreprises", "ae", "siren", "siret", "acteurs", "fermés"], + schedule=SCHEDULES.NONE, + catchup=CATCHUPS.AWLAYS_FALSE, + start_date=START_DATES.YESTERDAY, + params=config_to_airflow_params( + EnrichActeursClosedConfig( + dbt_models_refresh=False, + dbt_models_refresh_command=( + "dbt build --select tag:marts,tag:enrich,tag:closed" + ), + filter_equals__acteur_statut="ACTIF", + ) + ), +) as dag: + # Instantiation + config = enrich_config_create_task(dag) + dbt_refresh = enrich_dbt_models_refresh_task(dag) + suggest_not_replaced = enrich_dbt_model_suggest_task( + dag, + task_id=TASKS.ENRICH_CLOSED_SUGGESTIONS_NOT_REPLACED, + cohort=COHORTS.CLOSED_NOT_REPLACED, + dbt_model_name=DBT.MARTS_ENRICH_AE_CLOSED_NOT_REPLACED, + ) + suggest_other_siren = enrich_dbt_model_suggest_task( + dag, + task_id=TASKS.ENRICH_CLOSED_SUGGESTIONS_OTHER_SIREN, + cohort=COHORTS.CLOSED_REP_OTHER_SIREN, + dbt_model_name=DBT.MARTS_ENRICH_AE_CLOSED_REPLACED_OTHER_SIREN, + ) + suggest_same_siren = enrich_dbt_model_suggest_task( + dag, + task_id=TASKS.ENRICH_CLOSED_SUGGESTIONS_SAME_SIREN, + cohort=COHORTS.CLOSED_REP_SAME_SIREN, + dbt_model_name=DBT.MARTS_ENRICH_AE_CLOSED_REPLACED_SAME_SIREN, + ) + + # Graph + config >> dbt_refresh # type: ignore + dbt_refresh >> suggest_not_replaced # type: ignore + dbt_refresh >> suggest_other_siren # type: ignore + dbt_refresh >> suggest_same_siren # type: ignore diff --git a/dags/enrich/tasks/airflow_logic/enrich_config_create_task.py b/dags/enrich/tasks/airflow_logic/enrich_config_create_task.py new file mode 100644 index 000000000..6e3d6452e --- /dev/null +++ b/dags/enrich/tasks/airflow_logic/enrich_config_create_task.py @@ -0,0 +1,41 @@ +"""Generic task to create configuration""" + +import logging + +from airflow import DAG +from airflow.operators.python import PythonOperator +from enrich.config import DAG_ID_TO_CONFIG_MODEL, TASKS, XCOMS + +logger = logging.getLogger(__name__) + + +def task_info_get(): + return f""" + ============================================================ + Description de la tâche "{TASKS.CONFIG_CREATE}" + ============================================================ + 💡 quoi: création de la config + + 🎯 pourquoi: s'assurer qu'elle est OK avant de faire du travail, + réutiliser la config pour les autres tâches + + 🏗️ comment: on ingère les paramètres Airflow dans un modèle pydantic + """ + + +def enrich_config_create_wrapper(ti, dag, params) -> None: + logger.info(task_info_get()) + + config = DAG_ID_TO_CONFIG_MODEL[dag.dag_id](**params) + logger.info(f"📖 Configuration:\n{config.model_dump_json(indent=2)}") + + ti.xcom_push(key=XCOMS.CONFIG, value=config) + + +def enrich_config_create_task(dag: DAG) -> PythonOperator: + return PythonOperator( + task_id=TASKS.CONFIG_CREATE, + python_callable=enrich_config_create_wrapper, + dag=dag, + doc_md="📖 **Création de la config**", + ) diff --git a/dags/enrich/tasks/airflow_logic/enrich_dbt_model_suggest_task.py b/dags/enrich/tasks/airflow_logic/enrich_dbt_model_suggest_task.py new file mode 100644 index 000000000..dfd03366a --- /dev/null +++ b/dags/enrich/tasks/airflow_logic/enrich_dbt_model_suggest_task.py @@ -0,0 +1,67 @@ +"""Generate suggestions for enrichment DAGs""" + +import logging + +from airflow import DAG +from airflow.exceptions import AirflowSkipException +from airflow.models.taskinstance import TaskInstance +from airflow.operators.python import PythonOperator +from airflow.utils.trigger_rule import TriggerRule +from enrich.config import XCOMS, xcom_pull +from enrich.tasks.business_logic.enrich_dbt_model_suggest import ( + enrich_dbt_model_suggest, +) + +logger = logging.getLogger(__name__) + + +def task_info_get(task_id, df_xcom_key): + return f""" + ============================================================ + Description de la tâche "{task_id}" + ============================================================ + 💡 quoi: on génère les suggestions à partir de la df + {df_xcom_key} + + 🎯 pourquoi: le but de ce DAG + + 🏗️ comment: pour chaque acteur fermé, on génère 1 suggestion + """ + + +def enrich_dbt_model_suggest_wrapper( + task_id: str, + cohort: str, + dbt_model_name: str, + ti: TaskInstance, + dag: DAG, +) -> None: + logger.info(task_info_get(task_id, dbt_model_name)) + + # Config + config = xcom_pull(ti, XCOMS.CONFIG) + logger.info(f"📖 Configuration:\n{config.model_dump_json(indent=2)}") + + # Processing + suggestions_written = enrich_dbt_model_suggest( + dbt_model_name=dbt_model_name, + filters=config.filters, + cohort=cohort, + identifiant_action=dag.dag_id, + dry_run=config.dry_run, + ) + if not suggestions_written: + raise AirflowSkipException("Pas de suggestions écrites") + + +def enrich_dbt_model_suggest_task( + dag: DAG, task_id: str, cohort: str, dbt_model_name: str +) -> PythonOperator: + return PythonOperator( + task_id=task_id, + python_callable=enrich_dbt_model_suggest_wrapper, + op_args=[task_id, cohort, dbt_model_name], + dag=dag, + doc_md=f"**Suggestions** pour la cohorte: **{cohort}**", + trigger_rule=TriggerRule.ALL_DONE, + ) diff --git a/dags/enrich/tasks/airflow_logic/enrich_dbt_models_refresh_task.py b/dags/enrich/tasks/airflow_logic/enrich_dbt_models_refresh_task.py new file mode 100644 index 000000000..baccb7d2e --- /dev/null +++ b/dags/enrich/tasks/airflow_logic/enrich_dbt_models_refresh_task.py @@ -0,0 +1,56 @@ +"""Refresh DBT models for enrichment DAGs""" + +import logging + +from airflow import DAG +from airflow.exceptions import AirflowSkipException +from airflow.operators.bash import BashOperator +from airflow.operators.python import PythonOperator +from enrich.config import TASKS, XCOMS, xcom_pull +from enrich.config.models import EnrichBaseConfig + +logger = logging.getLogger(__name__) + + +def task_info_get(dbt_model_refresh_command: str): + return f""" + ============================================================ + Description de la tâche "{TASKS.ENRICH_DBT_MODELS_REFRESH}" + ============================================================ + 💡 quoi: rafraichissement des modèles DBT + + 🎯 pourquoi: avoir des suggestions fraiches + + 🏗️ comment: via commande: {dbt_model_refresh_command} + """ + + +def enrich_dbt_models_refresh_wrapper(ti) -> None: + + # Config + config: EnrichBaseConfig = xcom_pull(ti, XCOMS.CONFIG) + + logger.info(task_info_get(config.dbt_models_refresh_command)) + logger.info(f"📖 Configuration:\n{config.model_dump_json(indent=2)}") + + if not config.dbt_models_refresh: + raise AirflowSkipException("🚫 Rafraîchissement des modèles DBT désactivé") + + logger.info( + f"🔄 Rafraîchissement des modèles DBT: {config.dbt_models_refresh_command}" + ) + bash = BashOperator( + task_id=TASKS.ENRICH_DBT_MODELS_REFRESH + "_bash", + bash_command=config.dbt_models_refresh_command, + ) + bash.execute(context=ti.get_template_context()) + + +def enrich_dbt_models_refresh_task( + dag: DAG, +) -> PythonOperator: + return PythonOperator( + task_id=TASKS.ENRICH_DBT_MODELS_REFRESH, + python_callable=enrich_dbt_models_refresh_wrapper, + dag=dag, + ) diff --git a/dags/enrich/tasks/business_logic/enrich_dbt_model_read.py b/dags/enrich/tasks/business_logic/enrich_dbt_model_read.py new file mode 100644 index 000000000..090e45625 --- /dev/null +++ b/dags/enrich/tasks/business_logic/enrich_dbt_model_read.py @@ -0,0 +1,58 @@ +"""Read data from DBT models""" + +import logging + +import numpy as np +import pandas as pd +from utils import logging_utils as log +from utils.django import django_setup_full + +django_setup_full() + +logger = logging.getLogger(__name__) + + +def enrich_dbt_model_read( + dbt_model_name: str, filters: list[dict] = [] +) -> pd.DataFrame: + """Reads necessary QFDMO acteurs and AE entries from DB""" + from django.db import connection + + # Execute SQL query and get data + with connection.cursor() as cursor: + cursor.execute(f"SELECT * FROM {dbt_model_name}") + columns = [col[0] for col in cursor.description] + data = cursor.fetchall() + + # Create DataFrame and preview + df = pd.DataFrame(data, columns=columns, dtype="object").replace({np.nan: None}) + log.preview_df_as_markdown(f"Données de {dbt_model_name} SANS filtre", df) + + # Filtering if needed + filter_applied = False + if not df.empty: + for filter in filters: + + # Assignment & info + filter_applied = True + field = filter["field"] + operator = filter["operator"] + value = filter["value"] + logger.info(f"\n🔽 Filtre sur {field=} {operator=} {value=}") + logger.info(f"Avant filtre : {df.shape[0]} lignes") + + # Filtering + if filter["operator"] == "equals": + logger.info(f"Filtre sur {field} EQUALS {value}") + df = df[df[field] == value].copy() + elif filter["operator"] == "contains": + df = df[df[field].str.contains(value, regex=True, case=False)].copy() + else: + raise NotImplementedError(f"{filter['operator']=} non implémenté") + + logger.info(f"Après filtre : {df.shape[0]} lignes") + + if filter_applied: + log.preview_df_as_markdown(f"Données de {dbt_model_name} APRES filtre(s)", df) + + return df diff --git a/dags/enrich/tasks/business_logic/enrich_dbt_model_row_to_suggest_data.py b/dags/enrich/tasks/business_logic/enrich_dbt_model_row_to_suggest_data.py new file mode 100644 index 000000000..e22f6bd41 --- /dev/null +++ b/dags/enrich/tasks/business_logic/enrich_dbt_model_row_to_suggest_data.py @@ -0,0 +1,22 @@ +from enrich.config import SUGGEST_PREFIX + + +def dbt_model_row_to_suggest_data(row: dict) -> dict: + """Construct the pydantic model_params.data dict from a + dbt model's row based on fields prefixed with SUGGEST_PREFIX""" + pre = SUGGEST_PREFIX + keys_ok = [k for k in row.keys() if k.startswith(f"{pre}_")] + keys_ok.remove(f"{pre}_cohort") + + # Validation + keys_fail = [ + k + for k in row.keys() + if pre in k and k not in keys_ok and not k.startswith(f"{pre}_") + ] + if keys_fail: + msg = f"Colonnes invalides avec {pre} mais sans {pre}_: {keys_fail}" + raise KeyError(msg) + + # Construct the data dict + return {k.replace(pre + "_", ""): row[k] for k in keys_ok} diff --git a/dags/enrich/tasks/business_logic/enrich_dbt_model_suggest.py b/dags/enrich/tasks/business_logic/enrich_dbt_model_suggest.py new file mode 100644 index 000000000..d17ece76d --- /dev/null +++ b/dags/enrich/tasks/business_logic/enrich_dbt_model_suggest.py @@ -0,0 +1,28 @@ +import logging + +from enrich.tasks.business_logic.enrich_dbt_model_read import enrich_dbt_model_read +from enrich.tasks.business_logic.enrich_dbt_model_to_suggestions import ( + enrich_dbt_model_to_suggestions, +) + +logger = logging.getLogger(__name__) + + +def enrich_dbt_model_suggest( + dbt_model_name: str, + filters: list[dict], + cohort: str, + identifiant_action: str, + dry_run: bool = True, +) -> bool: + """Reads a DBT model and generates suggestions for it""" + df = enrich_dbt_model_read(dbt_model_name, filters) + + if df.empty: + logger.info(f"0 donnée pour {dbt_model_name=} avec filtres {filters}") + return False + + suggestions_written = enrich_dbt_model_to_suggestions( + df, cohort, identifiant_action, dry_run + ) + return suggestions_written diff --git a/dags/enrich/tasks/business_logic/enrich_dbt_model_to_suggestions.py b/dags/enrich/tasks/business_logic/enrich_dbt_model_to_suggestions.py new file mode 100644 index 000000000..3a93a1bc9 --- /dev/null +++ b/dags/enrich/tasks/business_logic/enrich_dbt_model_to_suggestions.py @@ -0,0 +1,242 @@ +import logging +from datetime import datetime, timezone + +import pandas as pd +from cluster.tasks.business_logic.cluster_acteurs_parents_choose_new import ( + parent_id_generate, +) +from enrich.config import COHORTS, COLS +from enrich.tasks.business_logic.enrich_dbt_model_row_to_suggest_data import ( + dbt_model_row_to_suggest_data, +) +from utils import logging_utils as log + +logger = logging.getLogger(__name__) + + +def changes_prepare( + model, + model_params: dict, + order: int, + reason: str, + entity_type: str, +) -> dict: + """Generic utility to prepare, validate and + serialize 1 suggestion change for ANY suggestion types""" + from data.models.change import SuggestionChange + + model(**model_params).validate() + return SuggestionChange( + order=order, + reason=reason, + entity_type=entity_type, + model_name=model.name(), + model_params=model_params, + ).model_dump() + + +def changes_prepare_closed_not_replaced( + row: dict, +) -> tuple[list[dict], dict]: + """Prepare suggestion changes for closed not replaced cohorts""" + from data.models.changes import ChangeActeurUpdateData + from qfdmo.models import ActeurStatus + + changes = [] + model_params = { + "id": row[COLS.ACTEUR_ID], + "data": { + "identifiant_unique": row[COLS.ACTEUR_ID], + "statut": ActeurStatus.INACTIF, + "siret": row[COLS.ACTEUR_SIRET], + "siren": row[COLS.ACTEUR_SIRET][:9], + "siret_is_closed": True, + "acteur_type": row[COLS.ACTEUR_TYPE_ID], + "source": row[COLS.ACTEUR_SOURCE_ID], + }, + } + changes.append( + changes_prepare( + model=ChangeActeurUpdateData, + model_params=model_params, + order=1, + reason="SIRET & SIREN fermés, 0 remplacement trouvé", + entity_type="acteur_displayed", + ) + ) + contexte = {} # changes are self-explanatory + return changes, contexte + + +def changes_prepare_closed_replaced( + row: dict, +) -> tuple[list[dict], dict]: + """Prepare suggestion changes for closed replaced cohorts""" + from data.models.changes import ( + ChangeActeurCreateAsChild, + ChangeActeurCreateAsParent, + ChangeActeurUpdateData, + ) + from qfdmo.models import ActeurStatus + + changes = [] + today = datetime.now(timezone.utc).strftime("%Y-%m-%d") + # Parent + parent_id = parent_id_generate([str(row[COLS.SUGGEST_SIRET])]) + parent_data = dbt_model_row_to_suggest_data(row) + parent_data["identifiant_unique"] = parent_id + parent_data["source"] = None + parent_data["statut"] = ActeurStatus.ACTIF + params_parent = { + "id": parent_id, + "data": parent_data, + } + changes.append( + changes_prepare( + model=ChangeActeurCreateAsParent, + model_params=params_parent, + order=1, + reason="besoin d'un parent pour rattaché acteur fermé", + entity_type="acteur_displayed", + ) + ) + + # New child to hold the reference data as standalone + # as parents are surrogates (e.g. they can be deleted + # during clustering) + now = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S") + child_new_id = f"{row[COLS.ACTEUR_ID]}_{row[COLS.ACTEUR_SIRET]}_{now}" + params_child_new = params_parent.copy() + params_child_new["id"] = child_new_id + params_child_new["data"]["identifiant_unique"] = child_new_id + params_child_new["data"]["source"] = row[COLS.ACTEUR_SOURCE_ID] + params_child_new["data"]["parent"] = parent_id + params_child_new["data"]["parent_reason"] = ( + f"Nouvel enfant pour conserver les données suite à: " + f"SIRET {row[COLS.ACTEUR_SIRET]} " + f"détecté le {today} comme fermé dans AE, " + f"remplacé par SIRET {row[COLS.SUGGEST_SIRET]}" + ) + changes.append( + changes_prepare( + model=ChangeActeurCreateAsChild, + model_params=params_child_new, + order=2, + reason="besoin nouvel enfant pour conserver les données", + entity_type="acteur_displayed", + ) + ) + + # Existing Child + params_child_old = params_child_new.copy() + params_child_old["id"] = row[COLS.ACTEUR_ID] + params_child_old["data"]["identifiant_unique"] = row[COLS.ACTEUR_ID] + params_child_old["data"]["parent"] = parent_id + params_child_old["data"]["parent_reason"] = ( + f"SIRET {row[COLS.ACTEUR_SIRET]} " + f"détecté le {today} comme fermé dans AE, " + f"remplacé par SIRET {row[COLS.SUGGEST_SIRET]}" + ) + params_child_old["data"]["siret_is_closed"] = True + params_child_old["data"]["statut"] = ActeurStatus.INACTIF + changes.append( + changes_prepare( + model=ChangeActeurUpdateData, + model_params=params_child_old, + order=3, + reason="rattacher enfant fermé à un parent", + entity_type="acteur_displayed", + ) + ) + contexte = {} # changes are self-explanatory + return changes, contexte + + +# Mapping cohorts with their respective changes preparation function +COHORTS_TO_PREPARE_CHANGES = { + COHORTS.CLOSED_NOT_REPLACED: changes_prepare_closed_not_replaced, + COHORTS.CLOSED_REP_OTHER_SIREN: changes_prepare_closed_replaced, + COHORTS.CLOSED_REP_SAME_SIREN: changes_prepare_closed_replaced, +} + + +def enrich_dbt_model_to_suggestions( + df: pd.DataFrame, + cohort: str, + identifiant_action: str, + dry_run: bool = True, +) -> bool: + from data.models.suggestion import ( + Suggestion, + SuggestionAction, + SuggestionCohorte, + SuggestionStatut, + ) + + COHORTS_TO_SUGGESTION_ACTION = { + COHORTS.CLOSED_NOT_REPLACED: SuggestionAction.ENRICH_ACTEURS_CLOSED, + COHORTS.CLOSED_REP_OTHER_SIREN: SuggestionAction.ENRICH_ACTEURS_CLOSED, + COHORTS.CLOSED_REP_SAME_SIREN: SuggestionAction.ENRICH_ACTEURS_CLOSED, + } + + # Validation + if df is None or df.empty: + raise ValueError("df vide: on devrait pas être ici") + + cohorts = list(df[COLS.SUGGEST_COHORT].unique()) + if len(cohorts) != 1 or cohorts[0] != cohort: + msg = f"Problème cohorte: obtenu {cohorts=} vs. attendu {cohort=}" + raise ValueError(msg) + + # Creating suggestion + suggestions = [] + for _, row in df.iterrows(): + row = dict(row) + + try: + changes, contexte = COHORTS_TO_PREPARE_CHANGES[cohort](row) + suggestions.append( + { + "contexte": contexte, + "suggestion": {"title": cohort, "changes": changes}, + } + ) + + # We tolerate some errors + except Exception as e: + log.preview("🔴 Suggestion problématique", row) + logger.error(f"Erreur de préparation des changements: {e}") + continue + + # we need some working suggestions, can't have it all fail + if not suggestions: + raise ValueError("Aucune suggestion à écrire, pas normal") + + # ----------------------------------------- + # DRY RUN: STOP HERE + # ----------------------------------------- + if dry_run: + logger.info("✋ Dry run: suggestions pas écrites en base") + suggestions_written = False + return suggestions_written + + # ----------------------------------------- + # SUGGESTION: WRITE TO DB + # ----------------------------------------- + db_cohort = SuggestionCohorte( + identifiant_action=identifiant_action, + identifiant_execution=f"{cohort}", + statut=SuggestionStatut.AVALIDER, + type_action=COHORTS_TO_SUGGESTION_ACTION[cohort], + metadata={"🔢 Nombre de suggestions": len(suggestions)}, + ) + db_cohort.save() + for suggestion in suggestions: + Suggestion( + suggestion_cohorte=db_cohort, + statut=SuggestionStatut.AVALIDER, + contexte=suggestion["contexte"], + suggestion=suggestion["suggestion"], + ).save() + suggestions_written = True + return suggestions_written diff --git a/dags/shared/config/__init__.py b/dags/shared/config/__init__.py index f8f8adb54..712d8484d 100644 --- a/dags/shared/config/__init__.py +++ b/dags/shared/config/__init__.py @@ -1,3 +1,4 @@ from .catchups import CATCHUPS # noqa: F401 +from .models import config_to_airflow_params # noqa: F401 from .schedules import SCHEDULES # noqa: F401 from .start_dates import START_DATES # noqa: F401 diff --git a/dags/shared/config/models.py b/dags/shared/config/models.py new file mode 100644 index 000000000..89740aa14 --- /dev/null +++ b/dags/shared/config/models.py @@ -0,0 +1,36 @@ +import typing + +from airflow.models.param import Param +from pydantic import BaseModel + +PYDANTIC_TYPE_TO_AIRFLOW_TYPE = { + bool: "boolean", + str: "string", + typing.Optional[str]: ["null", "string"], +} + + +def config_to_airflow_params(model_instance: BaseModel) -> dict[str, Param]: + """Generate Airflow params from a pydantic config model instance: + + TODO: to implement recurring/complex types, we can use a mapping + with field_name as entry, and keep the generic fallback below + + if field_name == "complex_field": + params = PARAMS[field_name] + elif: + ... + else: + fallback to current logic + """ + params = {} + model_cls = model_instance.__class__ + for field_name, field_info in model_cls.model_fields.items(): + field_value = getattr(model_instance, field_name) # Get value from instance + + params[field_name] = Param( + field_value, + type=PYDANTIC_TYPE_TO_AIRFLOW_TYPE[field_info.annotation], + description_md=field_info.description, + ) + return params diff --git a/dags_unit_tests/enrich/config/test_enrich_acteurs_closed_config.py b/dags_unit_tests/enrich/config/test_enrich_acteurs_closed_config.py new file mode 100644 index 000000000..cb51d00b2 --- /dev/null +++ b/dags_unit_tests/enrich/config/test_enrich_acteurs_closed_config.py @@ -0,0 +1,26 @@ +import pytest +from enrich.config.models import EnrichActeursClosedConfig + + +class TestEnrichClosedConfig: + + @pytest.fixture + def config(self): + return EnrichActeursClosedConfig( + dry_run=True, + filter_contains__acteur_commentaires="my comment", + filter_contains__acteur_nom=None, + filter_contains__etab_naf="test NAF", + filter_equals__acteur_statut="ACTIF", + ) + + def test_filters_get(self, config): + assert config.filters == [ + { + "field": "acteur_commentaires", + "operator": "contains", + "value": "my comment", + }, + {"field": "etab_naf", "operator": "contains", "value": "test NAF"}, + {"field": "acteur_statut", "operator": "equals", "value": "ACTIF"}, + ] diff --git a/dags_unit_tests/enrich/tasks/test_enrich_dbt_model_row_to_data.py b/dags_unit_tests/enrich/tasks/test_enrich_dbt_model_row_to_data.py new file mode 100644 index 000000000..4c96476e2 --- /dev/null +++ b/dags_unit_tests/enrich/tasks/test_enrich_dbt_model_row_to_data.py @@ -0,0 +1,31 @@ +import pytest +from enrich.tasks.business_logic.enrich_dbt_model_row_to_suggest_data import ( + dbt_model_row_to_suggest_data, +) + + +class TestEnrichDbtModelRowToData: + + def test_row_to_suggest_data(self): + row = { + "suggest_cohort": "cohort", + "suggest_siret": "12345678901234", + "foo": "bar", + } + data = dbt_model_row_to_suggest_data(row) + assert data == {"siret": "12345678901234"} + + @pytest.mark.parametrize( + "key", + ["suggest", "suggestion_siret", "siret_suggest"], + ) + def test_raise_if_inconsistent_suggest_keys(self, key): + row = {"suggest_cohort": "cohort"} # must always be present + row[key] = "12345678901234" + with pytest.raises(KeyError, match="Colonnes invalides"): + dbt_model_row_to_suggest_data(row) + + def test_raise_if_missing_cohort(self): + row = {"suggest_siret": "12345678901234"} + with pytest.raises(ValueError, match="not in list"): + dbt_model_row_to_suggest_data(row) diff --git a/dags_unit_tests/enrich/tasks/test_enrich_suggestions_closed.py b/dags_unit_tests/enrich/tasks/test_enrich_suggestions_closed.py new file mode 100644 index 000000000..9fdb456c3 --- /dev/null +++ b/dags_unit_tests/enrich/tasks/test_enrich_suggestions_closed.py @@ -0,0 +1,266 @@ +from datetime import datetime, timezone + +import pandas as pd +import pytest +from cluster.tasks.business_logic.cluster_acteurs_parents_choose_new import ( + parent_id_generate, +) +from django.contrib.gis.geos import Point +from enrich.config import COHORTS, COLS +from enrich.tasks.business_logic.enrich_dbt_model_to_suggestions import ( + enrich_dbt_model_to_suggestions, +) + +TODAY = datetime.now(timezone.utc).strftime("%Y-%m-%d") + + +@pytest.mark.django_db +class TestEnrichActeursClosedSuggestions: + + @pytest.fixture + def source(self): + from qfdmo.models import Source + + return Source.objects.create(code="s1") + + @pytest.fixture + def atype(self): + from qfdmo.models import ActeurType + + return ActeurType.objects.create(code="at1") + + @pytest.fixture + def df_not_replaced(self, atype, source): + return pd.DataFrame( + { + # Acteurs data + COLS.ACTEUR_ID: ["a01", "a02"], + COLS.ACTEUR_SIRET: ["00000000000001", "00000000000002"], + COLS.ACTEUR_NOM: ["AVANT a01", "AVANT a02"], + COLS.ACTEUR_TYPE_ID: [atype.pk, atype.pk], + COLS.ACTEUR_SOURCE_ID: [source.pk, source.pk], + COLS.SUGGEST_COHORT: [COHORTS.CLOSED_NOT_REPLACED] * 2, + } + ) + + @pytest.fixture + def df_replaced(self, atype, source): + return pd.DataFrame( + { + # Acteurs data + COLS.ACTEUR_ID: ["a1", "a2", "a3"], + COLS.ACTEUR_SIRET: [ + "11111111100001", + "22222222200001", + "44444444400001", + ], + COLS.ACTEUR_TYPE_ID: [atype.pk, atype.pk, atype.pk], + COLS.ACTEUR_SOURCE_ID: [source.pk, source.pk, source.pk], + COLS.ACTEUR_LONGITUDE: [1, 1, 1], + COLS.ACTEUR_LATITUDE: [2, 2, 2], + # Replacement data + COLS.SUGGEST_SIRET: [ + "11111111100002", + "33333333300001", + "55555555500001", + ], + COLS.SUGGEST_SIREN: ["111111111", "333333333", "555555555"], + COLS.SUGGEST_NOM: ["APRES a1", "APRES a2", "APRES a3"], + COLS.SUGGEST_COHORT: [ + COHORTS.CLOSED_REP_SAME_SIREN, + COHORTS.CLOSED_REP_OTHER_SIREN, + COHORTS.CLOSED_REP_OTHER_SIREN, + ], + COLS.SUGGEST_ADRESSE: ["Adresse1", "Adresse2", "Adresse3"], + COLS.SUGGEST_CODE_POSTAL: ["12345", "67890", "12345"], + COLS.SUGGEST_VILLE: ["Ville1", "Ville2", "Ville3"], + COLS.SUGGEST_NAF: ["naf1", "naf2", "naf3"], + COLS.SUGGEST_LONGITUDE: [1, 2, 3], + COLS.SUGGEST_LATITUDE: [11, 22, 33], + COLS.SUGGEST_ACTEUR_TYPE_ID: [atype.pk, atype.pk, atype.pk], + } + ) + + def test_df_replaced(self, df_replaced): + assert sorted(df_replaced[COLS.SUGGEST_COHORT].unique()) == sorted( + [ + COHORTS.CLOSED_REP_SAME_SIREN, + COHORTS.CLOSED_REP_OTHER_SIREN, + ] + ) + + @pytest.fixture + def df_replaced_meme_siret(self, df_replaced): + return df_replaced[ + df_replaced[COLS.SUGGEST_COHORT] == COHORTS.CLOSED_REP_SAME_SIREN + ] + + @pytest.fixture + def df_replaced_autre_siret(self, df_replaced): + return df_replaced[ + df_replaced[COLS.SUGGEST_COHORT] == COHORTS.CLOSED_REP_OTHER_SIREN + ] + + @pytest.fixture + def acteurs(self, df_not_replaced, df_replaced, atype, source): + # Creating acteurs as presence required to apply changes + from qfdmo.models import Acteur + + df_concat = pd.concat([df_not_replaced, df_replaced]) + acteur_ids = df_concat[COLS.ACTEUR_ID].tolist() + for acteur_id in acteur_ids: + Acteur.objects.create( + identifiant_unique=acteur_id, + nom=f"AVANT {acteur_id}", + acteur_type=atype, + source=source, + location=Point(x=0, y=0), + ) + + def test_cohorte_not_replaced(self, acteurs, df_not_replaced): + from data.models.suggestion import Suggestion, SuggestionCohorte + from qfdmo.models import ActeurStatus, RevisionActeur + + # Write suggestions to DB + enrich_dbt_model_to_suggestions( + df=df_not_replaced, + cohort=COHORTS.CLOSED_NOT_REPLACED, + identifiant_action="test_not_replaced", + dry_run=False, + ) + + # Check suggestions have been written to DB + cohort = SuggestionCohorte.objects.get(identifiant_action="test_not_replaced") + suggestions = Suggestion.objects.filter(suggestion_cohorte=cohort) + assert len(suggestions) == 2 + + # Apply suggestions + for suggestion in suggestions: + suggestion.apply() + + # Verify changes + # 2 revisions should be created but not parent + a01 = RevisionActeur.objects.get(pk="a01") + assert a01.statut == ActeurStatus.INACTIF + assert a01.parent is None + assert a01.parent_reason == "" # consequence of empty strings in DB + assert a01.siret_is_closed is True + + a02 = RevisionActeur.objects.get(pk="a02") + assert a02.statut == ActeurStatus.INACTIF + assert a02.parent is None + assert a02.parent_reason == "" + assert a02.siret_is_closed is True + + def test_cohorte_meme_siren(self, acteurs, atype, source, df_replaced_meme_siret): + from data.models.suggestion import Suggestion, SuggestionCohorte + from qfdmo.models import ActeurStatus, RevisionActeur + + # Write suggestions to DB + enrich_dbt_model_to_suggestions( + df=df_replaced_meme_siret, + cohort=COHORTS.CLOSED_REP_SAME_SIREN, + identifiant_action="test_meme_siren", + dry_run=False, + ) + + # Check suggestions have been written to DB + cohort = SuggestionCohorte.objects.get(identifiant_action="test_meme_siren") + suggestions = Suggestion.objects.filter(suggestion_cohorte=cohort) + assert len(suggestions) == 1 + + # Apply suggestions + for suggestion in suggestions: + suggestion.apply() + + # Verify changes + # 1 parent should be created in revision with replacement data + # 1 child should be created in revision with status=INACT and parent_id pointing + parent_id = parent_id_generate(["11111111100002"]) + parent = RevisionActeur.objects.get(pk=parent_id) + assert parent.pk == parent_id + assert parent.nom == "APRES a1" + assert parent.adresse == "Adresse1" + assert parent.code_postal == "12345" + assert parent.ville == "Ville1" + assert parent.naf_principal == "naf1" + assert parent.acteur_type == atype + assert parent.source is None + assert parent.location.x == 1 + assert parent.location.y == 11 + + child = RevisionActeur.objects.get(pk="a1") + assert child.statut == ActeurStatus.INACTIF + assert child.parent == parent + assert child.parent_reason == ( + f"SIRET 11111111100001 détecté le {TODAY} comme fermé dans AE, " + f"remplacé par SIRET 11111111100002" + ) + assert child.siret_is_closed is True + assert child.location.x == 1 + assert child.location.y == 11 + + def test_cohorte_autre_siren(self, acteurs, df_replaced_autre_siret): + from data.models.suggestion import Suggestion, SuggestionCohorte + from qfdmo.models import ActeurStatus, RevisionActeur + + # Write suggestions to DB + enrich_dbt_model_to_suggestions( + df=df_replaced_autre_siret, + cohort=COHORTS.CLOSED_REP_OTHER_SIREN, + identifiant_action="test_autre_siren", + dry_run=False, + ) + + # Check suggestions have been written to DB + cohort = SuggestionCohorte.objects.get(identifiant_action="test_autre_siren") + suggestions = Suggestion.objects.filter(suggestion_cohorte=cohort) + assert len(suggestions) == 2 # (1 parent + 1 child) x 2 acteurs fermés + + # Apply suggestions + for suggestion in suggestions: + suggestion.apply() + + # Verify changes + # 1 parent should be created in revision with replacement data + # 1 child should be created in revision with status=INACT and parent_id pointing + parent_id = parent_id_generate(["33333333300001"]) + parent = RevisionActeur.objects.get(pk=parent_id) + assert parent.nom == "APRES a2" + assert parent.adresse == "Adresse2" + assert parent.code_postal == "67890" + assert parent.ville == "Ville2" + assert parent.naf_principal == "naf2" + assert parent.location.x == 2 + assert parent.location.y == 22 + + child = RevisionActeur.objects.get(pk="a2") + assert child.statut == ActeurStatus.INACTIF + assert child.parent == parent + assert child.parent_reason == ( + f"SIRET 22222222200001 détecté le {TODAY} comme fermé dans AE, " + f"remplacé par SIRET 33333333300001" + ) + assert child.siret_is_closed is True + assert child.location.x == 2 + assert child.location.y == 22 + + parent_id = parent_id_generate(["55555555500001"]) + parent = RevisionActeur.objects.get(pk=parent_id) + assert parent.nom == "APRES a3" + assert parent.adresse == "Adresse3" + assert parent.code_postal == "12345" + assert parent.ville == "Ville3" + assert parent.naf_principal == "naf3" + assert parent.location.x == 3 + assert parent.location.y == 33 + + child = RevisionActeur.objects.get(pk="a3") + assert child.statut == ActeurStatus.INACTIF + assert child.parent == parent + assert child.parent_reason == ( + f"SIRET 44444444400001 détecté le {TODAY} comme fermé dans AE, " + f"remplacé par SIRET 55555555500001" + ) + assert child.location.x == 3 + assert child.location.y == 33 diff --git a/dags_unit_tests/shared/config/test_shared_config_models.py b/dags_unit_tests/shared/config/test_shared_config_models.py new file mode 100644 index 000000000..194b6517f --- /dev/null +++ b/dags_unit_tests/shared/config/test_shared_config_models.py @@ -0,0 +1,56 @@ +from typing import Optional + +import pytest +from pydantic import BaseModel, Field + +from dags.shared.config.models import config_to_airflow_params + + +class MyModel(BaseModel): + dry_run: bool = Field( + default=True, + description="🚱 Si coché...", + ) + some_string: str = Field( + default="foo", + description="SOME STRING", + ) + opt_string_untouched: Optional[str] = Field( + default=None, + description="OPT STRING UNTOUCHED", + ) + opt_string_changed: Optional[str] = Field( + default=None, + description="OPT STRING CHANGED", + ) + + +class TestConfigModelToAirflowParams: + @pytest.fixture + def model_instance(self): + return MyModel(opt_string_changed="bar") + + @pytest.fixture + def params(self, model_instance): + return config_to_airflow_params(model_instance) + + def test_boolean(self, params): + param = params["dry_run"] + assert param.value is True + assert param.schema["type"] == "boolean" + assert param.schema["description_md"] == "🚱 Si coché..." + + def test_string(self, params): + param = params["some_string"] + assert param.value == "foo" + assert param.schema["type"] == "string" + + def test_opt_string_untouched(self, params): + param = params["opt_string_untouched"] + assert param.value is None + assert param.schema["type"] == ["null", "string"] + + def test_opt_string_changed(self, params): + param = params["opt_string_changed"] + assert param.value == "bar" + assert param.schema["type"] == ["null", "string"] diff --git a/dags_unit_tests/utils/test_data_serialize_reconstruct.py b/dags_unit_tests/utils/test_data_serialize_reconstruct.py index df2f49eb9..dc3af503a 100644 --- a/dags_unit_tests/utils/test_data_serialize_reconstruct.py +++ b/dags_unit_tests/utils/test_data_serialize_reconstruct.py @@ -34,19 +34,16 @@ def data_init(self) -> dict: "location": POINT, "cree_le": DATETIME, } - print("data_init", f"{data=}") return data @pytest.fixture def data_serialized(self, data_init) -> dict: data = data_serialize(RevisionActeur, data_init) - print("data_serialized", f"{data=}") return data @pytest.fixture def data_reconstructed(self, data_serialized) -> dict: data = data_reconstruct(RevisionActeur, data_serialized) - print("data_reconstructed", f"{data=}") return data def test_data_reconstructed(self, data_reconstructed): @@ -94,3 +91,18 @@ def test_none_cases(self, data_init): data = {"location": None} data = data_reconstruct(RevisionActeur, data) assert data == {} + + def test_working_with_id_fields(self, data_init): + # When working with DBT, we have foreign keys being + # expressed as {field}_id fields (and not {field} like + # in Django models), and we test that data_reconstruct + # handles this transparently and forces {field} representation + data = data_init.copy() + # We switch from the Django reprentation (source) to the + # DBT representation (source_id) + data["source_id"] = data["source"].id + del data["source"] + ser = data_serialize(RevisionActeur, data) + rec = data_reconstruct(RevisionActeur, ser) + # The reconstruction should be in {field} format + assert rec["source"].id == data_init["source"].id diff --git a/data/migrations/0009_alter_suggestioncohorte_type_action.py b/data/migrations/0009_alter_suggestioncohorte_type_action.py new file mode 100644 index 000000000..17d58a020 --- /dev/null +++ b/data/migrations/0009_alter_suggestioncohorte_type_action.py @@ -0,0 +1,32 @@ +# Generated by Django 5.1.7 on 2025-03-20 09:08 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("data", "0008_alter_suggestioncohorte_type_action"), + ] + + operations = [ + migrations.AlterField( + model_name="suggestioncohorte", + name="type_action", + field=models.CharField( + blank=True, + choices=[ + ("CRAWL_URLS", "🔗 URLs scannées"), + ("RGPD_ANONYMISATION", "🕵️ Anonymisation RGPD"), + ("CLUSTERING", "regroupement/déduplication des acteurs"), + ("SOURCE_AJOUT", "ingestion de source de données - nouveau acteur"), + ( + "SOURCE_MODIFICATION", + "ingestion de source de données - modification d'acteur existant", + ), + ("SOURCE_SUPRESSION", "ingestion de source de données"), + ], + max_length=50, + ), + ), + ] diff --git a/data/migrations/0010_alter_suggestioncohorte_type_action.py b/data/migrations/0010_alter_suggestioncohorte_type_action.py new file mode 100644 index 000000000..ac430a116 --- /dev/null +++ b/data/migrations/0010_alter_suggestioncohorte_type_action.py @@ -0,0 +1,33 @@ +# Generated by Django 5.1.6 on 2025-04-07 10:01 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("data", "0009_alter_suggestioncohorte_type_action"), + ] + + operations = [ + migrations.AlterField( + model_name="suggestioncohorte", + name="type_action", + field=models.CharField( + blank=True, + choices=[ + ("CRAWL_URLS", "🔗 URLs scannées"), + ("RGPD_ANONYMISATION", "🕵️ Anonymisation RGPD"), + ("ENRICH_ACTEURS_CLOSED", "🚪 Acteurs fermés"), + ("CLUSTERING", "regroupement/déduplication des acteurs"), + ("SOURCE_AJOUT", "ingestion de source de données - nouveau acteur"), + ( + "SOURCE_MODIFICATION", + "ingestion de source de données - modification d'acteur existant", + ), + ("SOURCE_SUPRESSION", "ingestion de source de données"), + ], + max_length=50, + ), + ), + ] diff --git a/data/migrations/0011_alter_suggestioncohorte_type_action.py b/data/migrations/0011_alter_suggestioncohorte_type_action.py new file mode 100644 index 000000000..dbbdecf36 --- /dev/null +++ b/data/migrations/0011_alter_suggestioncohorte_type_action.py @@ -0,0 +1,32 @@ +# Generated by Django 5.1.6 on 2025-04-23 13:35 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("data", "0010_alter_suggestioncohorte_type_action"), + ] + + operations = [ + migrations.AlterField( + model_name="suggestioncohorte", + name="type_action", + field=models.CharField( + blank=True, + choices=[ + ("CRAWL_URLS", "🔗 URLs scannées"), + ("ENRICH_ACTEURS_CLOSED", "🚪 Acteurs fermés"), + ("CLUSTERING", "regroupement/déduplication des acteurs"), + ("SOURCE_AJOUT", "ingestion de source de données - nouveau acteur"), + ( + "SOURCE_MODIFICATION", + "ingestion de source de données - modification d'acteur existant", + ), + ("SOURCE_SUPRESSION", "ingestion de source de données"), + ], + max_length=50, + ), + ), + ] diff --git a/data/models/changes/__init__.py b/data/models/changes/__init__.py index f3503fdb0..e4fdc890c 100644 --- a/data/models/changes/__init__.py +++ b/data/models/changes/__init__.py @@ -1,4 +1,5 @@ from .acteur_change_nothing_in_base import ChangeActeurNothingBase +from .acteur_create_as_child import ChangeActeurCreateAsChild from .acteur_create_as_parent import ChangeActeurCreateAsParent from .acteur_delete_as_parent import ChangeActeurDeleteAsParent from .acteur_keep_as_parent import ChangeActeurKeepAsParent @@ -9,6 +10,7 @@ CHANGE_MODELS = { ChangeActeurUpdateData.name(): ChangeActeurUpdateData, + ChangeActeurCreateAsChild.name(): ChangeActeurCreateAsChild, ChangeActeurCreateAsParent.name(): ChangeActeurCreateAsParent, ChangeActeurDeleteAsParent.name(): ChangeActeurDeleteAsParent, ChangeActeurUpdateParentId.name(): ChangeActeurUpdateParentId, diff --git a/data/models/changes/acteur_abstract.py b/data/models/changes/acteur_abstract.py index a72e66284..83a6fb71b 100644 --- a/data/models/changes/acteur_abstract.py +++ b/data/models/changes/acteur_abstract.py @@ -1,8 +1,4 @@ -""" -change_model to use as template for acteur changes - - -""" +"""change model to use as template for acteur changes""" from pydantic import BaseModel diff --git a/data/models/changes/acteur_change_nothing_in_base.py b/data/models/changes/acteur_change_nothing_in_base.py index f6c01fe36..816e77597 100644 --- a/data/models/changes/acteur_change_nothing_in_base.py +++ b/data/models/changes/acteur_change_nothing_in_base.py @@ -1,5 +1,4 @@ -""" -change_model to make no change to an acteur +"""change model to make no change to an acteur Reason for having such a model is that we can follow the same pattern to be consistent across the board. diff --git a/data/models/changes/acteur_create_as_child.py b/data/models/changes/acteur_create_as_child.py new file mode 100644 index 000000000..74282eb89 --- /dev/null +++ b/data/models/changes/acteur_create_as_child.py @@ -0,0 +1,63 @@ +from pydantic import BaseModel + +from data.models.changes.utils import data_reconstruct + + +class ChangeActeurCreateAsChild(BaseModel): + id: str + data: dict = {} + + @classmethod + def name(cls) -> str: + return "acteur_create_as_child" + + def validate(self): + from qfdmo.models import Acteur, DisplayedActeur, RevisionActeur + + # Parent field must be SET (but we can't check if parent exists yet + # as it could be a new parent to be created) + for field in ["parent", "parent_reason"]: + if not self.data.get(field): + msg = f"Création d'enfant: champ '{field}' à renseigner {self.data}" + raise ValueError(msg) + + # Ensure child exists nowhere + for model in [Acteur, RevisionActeur, DisplayedActeur]: + obj = model.objects.filter(pk=self.id) + if obj.exists(): + msg = ( + f"Création d'enfant: '{self.id}' existe déjà dans {model.__name__}" + ) + raise ValueError(msg) + + def apply(self): + self.validate() + from qfdmo.models import Acteur, RevisionActeur + + # Ensure parent exists in RevisionActeur + parent = RevisionActeur.objects.get(pk=self.data["parent"]) + + # Reconstruct data from RevisionActeur + data = data_reconstruct(RevisionActeur, self.data) + + # Create child in Acteur to hold data + data_base = data.copy() + del data_base["parent"] + del data_base["parent_reason"] + # TODO: if we flatten our pydantic models, then we wouldn't + if "identifiant_unique" in data_base: + del data_base["identifiant_unique"] + Acteur.objects.create( + identifiant_unique=self.id, + **data_base, + ) + + # Create child in RevisionActeur to hold reference to parent + RevisionActeur.objects.create( + identifiant_unique=self.id, + parent_reason=data["parent_reason"], + parent=parent, + statut="ACTIF", + source=data["source"], + acteur_type=data["acteur_type"], + ) diff --git a/data/models/changes/acteur_create_as_parent.py b/data/models/changes/acteur_create_as_parent.py index f30c00daf..6ba6bff61 100644 --- a/data/models/changes/acteur_create_as_parent.py +++ b/data/models/changes/acteur_create_as_parent.py @@ -1,8 +1,4 @@ -""" -change_model to create a parent acteur - - -""" +"""change model to create a parent acteur""" from data.models.changes.acteur_abstract import ChangeActeurAbstract from data.models.changes.utils import data_reconstruct diff --git a/data/models/changes/acteur_delete_as_parent.py b/data/models/changes/acteur_delete_as_parent.py index 5f51eaf26..886919054 100644 --- a/data/models/changes/acteur_delete_as_parent.py +++ b/data/models/changes/acteur_delete_as_parent.py @@ -1,8 +1,4 @@ -""" -change_model to delete a parent acteur - - -""" +"""change model to delete a parent""" from data.models.changes.acteur_abstract import ChangeActeurAbstract from qfdmo.models import RevisionActeur diff --git a/data/models/changes/acteur_update_data.py b/data/models/changes/acteur_update_data.py index f740ebc3a..d629e3603 100644 --- a/data/models/changes/acteur_update_data.py +++ b/data/models/changes/acteur_update_data.py @@ -1,6 +1,5 @@ -"""Generic change model which should allow updating anything -about an acteur, taking care of handling Acteur vs. RevisionActeur -and data reconstruction.""" +"""Generic change model to update an acteur's data. If your use-case +is very specific (e.g. RGPD), create dedicated model for more clarity/reliability.""" from data.models.changes.acteur_abstract import ChangeActeurAbstract from data.models.changes.utils import data_reconstruct @@ -12,19 +11,25 @@ class ChangeActeurUpdateData(ChangeActeurAbstract): def name(cls) -> str: return "acteur_update_data" - def validate(self): - # The parent should already exist + def validate(self) -> Acteur | RevisionActeur: if not self.data: raise ValueError("No data provided") + # The parent should already exist in revision or base + # We tolerate absence from revision result = RevisionActeur.objects.filter(pk=self.id).first() if not result: + # But if not in revision, must be in base result = Acteur.objects.get(pk=self.id) return result def apply(self): acteur = self.validate() + # If acteur is only in base, we need to create a revision if isinstance(acteur, Acteur): - acteur = RevisionActeur(identifiant_unique=acteur.identifiant_unique) + acteur = RevisionActeur( + identifiant_unique=acteur.identifiant_unique, + acteur_type=acteur.acteur_type, + ) data = data_reconstruct(RevisionActeur, self.data) for key, value in data.items(): setattr(acteur, key, value) diff --git a/data/models/changes/acteur_update_parent_id.py b/data/models/changes/acteur_update_parent_id.py index c0426a8d1..cfdfd4dcc 100644 --- a/data/models/changes/acteur_update_parent_id.py +++ b/data/models/changes/acteur_update_parent_id.py @@ -1,8 +1,4 @@ -""" -change_model to update an acteur's parent - - -""" +"""change model to update an acteur's parent""" from data.models.changes.acteur_abstract import ChangeActeurAbstract from qfdmo.models import Acteur, RevisionActeur @@ -15,17 +11,20 @@ def name(cls) -> str: def validate(self): # - The acteur MUST exist in base - Acteur.objects.get(pk=self.id) + return Acteur.objects.get(pk=self.id) # - It's OK for acteur to not be in revision # - Can't test if parent exists as maybe it's to be created def apply(self): + base = self.validate() # By the time we apply changes to update parent_ids, the # corresponding parents must exist parent = RevisionActeur.objects.get(pk=self.data["parent_id"]) rev = RevisionActeur.objects.filter(pk=self.id) if not rev.exists(): - rev = RevisionActeur(identifiant_unique=self.id) + rev = RevisionActeur( + identifiant_unique=self.id, acteur_type=base.acteur_type + ) else: rev = rev.first() rev.parent = parent # type: ignore diff --git a/data/models/changes/acteur_verify_in_revision.py b/data/models/changes/acteur_verify_in_revision.py index e5a6f3753..3d6e50cb5 100644 --- a/data/models/changes/acteur_verify_in_revision.py +++ b/data/models/changes/acteur_verify_in_revision.py @@ -1,16 +1,4 @@ -""" -change_model to make no change to an acteur - -Reason for having such a model is that we can -follow the same pattern to be consistent across the board. - -For instance in the clustering pipeline, we might decide -that some acteurs do not need to be changed as they already point -to the chosen parent, yes we want to reflect all decisions made -in the cluster summary, this model allows us to do just that -without havint to create messy conditional code in pipelines - -""" +"""change model to verify an acteur's presence in revision""" from data.models.changes.acteur_abstract import ChangeActeurAbstract from qfdmo.models import RevisionActeur diff --git a/data/models/changes/sample_model_do_nothing.py b/data/models/changes/sample_model_do_nothing.py index a81a692da..c7d64f094 100644 --- a/data/models/changes/sample_model_do_nothing.py +++ b/data/models/changes/sample_model_do_nothing.py @@ -1,5 +1,4 @@ -"""A sample model which does nothing BUT -helps us create test cases for the +"""A sample model which does nothing BUT helps us create test cases for the overall SuggestionChange model""" from pydantic import BaseModel diff --git a/data/models/changes/utils.py b/data/models/changes/utils.py index a593407f1..44c329fb3 100644 --- a/data/models/changes/utils.py +++ b/data/models/changes/utils.py @@ -36,30 +36,37 @@ def data_reconstruct(model: type[models.Model], data_src: dict) -> dict: result = {} data = data_src.copy() - try: - if "longitude" in data and "latitude" in data: - result["location"] = Point(data["longitude"], data["latitude"]) - # so we don't evaluate in below loop - del data["longitude"] - del data["latitude"] - - for key, value in data.items(): - field = model._meta.get_field(key) - - # We don't try to be fancy with None, it's None - if value is None: - # Same explanation as in data_serialize - if key == "location": - continue - else: - result[key] = value - elif isinstance(field, models.ForeignKey): - # If it's a foreign key, fetch the related entity - related_instance = field.related_model.objects.get(pk=value) # type: ignore - result[key] = related_instance + if "longitude" in data and "latitude" in data: + result["location"] = Point(data.pop("longitude"), data.pop("latitude")) + + for key, value in data.items(): + field = model._meta.get_field(key) + + # We don't try to be fancy with None, it's None + if value is None: + # Same explanation as in data_serialize + if key == "location": + continue else: result[key] = value - except Exception as e: - logger.error(f"Error reconstructing for {model.__name__}, {data=}, {e=}") - raise e + elif isinstance(field, models.ForeignKey): + # Normalizing to {field} from {field}_id so all fields are + # represented in their Django flavour + if key.endswith("_id"): + try: + key_no_id = key.rstrip("_id") + field = model._meta.get_field(key_no_id) + key = key_no_id + except Exception: + pass + + # Retrieving the related instance if it's not already an instance + if not isinstance(value, field.related_model): # type: ignore + value = field.related_model.objects.get(pk=value) # type: ignore + + result[key] = value + + else: + result[key] = value + return result diff --git a/data/models/suggestion.py b/data/models/suggestion.py index 5ee94a6b2..383a5eee0 100644 --- a/data/models/suggestion.py +++ b/data/models/suggestion.py @@ -54,6 +54,7 @@ class SuggestionCohorteStatut(models.TextChoices): class SuggestionAction(models.TextChoices): CRAWL_URLS = SUGGESTION_CRAWL_URLS, "🔗 URLs scannées" + ENRICH_ACTEURS_CLOSED = "ENRICH_ACTEURS_CLOSED", "🚪 Acteurs fermés" CLUSTERING = SUGGESTION_CLUSTERING, "regroupement/déduplication des acteurs" SOURCE_AJOUT = ( SUGGESTION_SOURCE_AJOUT, @@ -187,9 +188,20 @@ def display_contexte_details(self): def display_suggestion_details(self): template_name = "data/_partials/suggestion_details.html" template_context = {"suggestion": self.suggestion} + + # Suggestions leveraging the PYDANTIC SuggestionChange model if self.suggestion_cohorte.type_action == SuggestionAction.CLUSTERING: template_context = self.suggestion template_name = "data/_partials/clustering_suggestion_details.html" + elif self.suggestion_cohorte.type_action == SuggestionAction.CRAWL_URLS: + template_name = "data/_partials/crawl_urls_suggestion_details.html" + elif self.suggestion_cohorte.type_action in [ + SuggestionAction.ENRICH_ACTEURS_CLOSED, + ]: + template_name = "data/_partials/suggestion_details_changes.html" + template_context = self.suggestion + + # TODO: suggestions to migrate to PYDANTIC classes elif ( self.suggestion_cohorte.type_action == SuggestionAction.SOURCE_SUPPRESSION and isinstance(self.suggestion, dict) @@ -222,9 +234,6 @@ def display_suggestion_details(self): and isinstance(self.suggestion, dict) ): template_name = "data/_partials/ajout_suggestion_details.html" - elif self.suggestion_cohorte.type_action == SuggestionAction.CRAWL_URLS: - template_name = "data/_partials/crawl_urls_suggestion_details.html" - template_context = self.suggestion.copy() return render_to_string(template_name, template_context) @@ -304,17 +313,20 @@ def _update_acteur(self): self._remove_acteur_linked_objects(acteur) self._create_acteur_linked_objects(acteur) - # FIXME: this acteur management will be reviewed with PYDANTIC classes which will - # be used to handle all specificities of suggestions def apply(self): + + # Suggestions leveraging the PYDANTIC SuggestionChange model if self.suggestion_cohorte.type_action in [ SuggestionAction.CLUSTERING, SuggestionAction.CRAWL_URLS, + SuggestionAction.ENRICH_ACTEURS_CLOSED, ]: changes = self.suggestion["changes"] changes.sort(key=lambda x: x["order"]) for change in changes: SuggestionChange(**change).apply() + + # FIXME: this acteur management will be reviewed with PYDANTIC classes elif self.suggestion_cohorte.type_action == SuggestionAction.SOURCE_AJOUT: self._create_acteur() elif ( diff --git a/dbt/README.md b/dbt/README.md index 13faf5657..c56288ee9 100644 --- a/dbt/README.md +++ b/dbt/README.md @@ -30,6 +30,24 @@ Lancer les tests dbt run --select qfdmo.exhaustive_acteurs ``` +## Sampling + + - 💡 **quoi**: utiliser une sous-partie de la donnée + - 🎯 **pourquoi**: itérer plus rapidement + - 🤔 **comment**: + - **Variable d'environement** `DBT_SAMPLING` à mettre à `true` + - **Liberté par modèle**: d'implémenter du sampling ou pas, ex: `base_ae_etablissement.sql` + ```sql + {% if env_var('DBT_SAMPLING', 'false') == 'true' %} + ORDER BY siret DESC + LIMIT 1000000 + {% endif %} + ``` + - **Appliquer le sampling**: en préfixant la commande dbt + ```bash + export DBT_SAMPLING='true' && dbt ... + ``` + ### Resources: - Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction) - Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers diff --git a/dbt/dbt_project.yml b/dbt/dbt_project.yml index 0aeabebd8..021af4962 100644 --- a/dbt/dbt_project.yml +++ b/dbt/dbt_project.yml @@ -25,7 +25,9 @@ on-run-start: - "{{ create_udf_uuid_to_int() }}" - "{{ create_udf_safe_divmod() }}" - "{{ create_udf_columns_concat_unique_non_empty() }}" + - "{{ create_udf_columns_words_in_common_count() }}" - "{{ create_udf_normalize_string_alpha_for_match() }}" + - "{{ create_udf_ae_string_cleanup() }}" clean-targets: - "target" diff --git a/dbt/macros/constants/value_unavailable.sql b/dbt/macros/constants/value_unavailable.sql new file mode 100644 index 000000000..5aaba79a0 --- /dev/null +++ b/dbt/macros/constants/value_unavailable.sql @@ -0,0 +1,5 @@ +/* Constant used to flag a value as unavailable +and detect, test, exclude it more easily +vs. values such as "" or NULL which might raise +doubts about functionality of model */ +{% macro value_unavailable() %}'🔴 VALEUR INDISPONIBLE'{% endmacro %} \ No newline at end of file diff --git a/dbt/macros/udf/udf_ae_string_cleanup.sql b/dbt/macros/udf/udf_ae_string_cleanup.sql new file mode 100644 index 000000000..ddd65e003 --- /dev/null +++ b/dbt/macros/udf/udf_ae_string_cleanup.sql @@ -0,0 +1,20 @@ +{% macro create_udf_ae_string_cleanup() %} +/* + Converts string values from Annuaire Entreprises + to 1 consistent format, taking into account cases + such as '[ND]' = Non disponible, with conversion + to NULL for easier processing whenever we consider + it to be empty. +*/ +DROP FUNCTION IF EXISTS {{ target.schema }}.udf_ae_string_cleanup CASCADE; +CREATE FUNCTION {{ target.schema }}.udf_ae_string_cleanup(val TEXT) +RETURNS TEXT AS $$ +BEGIN + IF TRIM(val) = '' OR TRIM(val) = '[ND]' THEN + RETURN NULL; + ELSE + RETURN TRIM(val); + END IF; +END; +$$ LANGUAGE plpgsql STRICT; +{% endmacro %} \ No newline at end of file diff --git a/dbt/macros/udf/udf_columns_concat_unique_non_empty.sql b/dbt/macros/udf/udf_columns_concat_unique_non_empty.sql index 76b32544b..a13f7acee 100644 --- a/dbt/macros/udf/udf_columns_concat_unique_non_empty.sql +++ b/dbt/macros/udf/udf_columns_concat_unique_non_empty.sql @@ -1,10 +1,8 @@ {% macro create_udf_columns_concat_unique_non_empty() %} /* - Function to concatenate strings from various - columns while only retaining non-empty values + Concatenate strings from various columns while only retaining non-empty values */ - -DROP FUNCTION IF EXISTS {{ target.schema }}.udf_columns_concat_unique_non_empty(VARIADIC input_columns TEXT[]); +DROP FUNCTION IF EXISTS {{ target.schema }}.udf_columns_concat_unique_non_empty CASCADE; CREATE FUNCTION {{ target.schema }}.udf_columns_concat_unique_non_empty(VARIADIC input_columns TEXT[]) RETURNS TEXT AS $$ DECLARE diff --git a/dbt/macros/udf/udf_columns_words_in_common_count.sql b/dbt/macros/udf/udf_columns_words_in_common_count.sql new file mode 100644 index 000000000..07534a8ef --- /dev/null +++ b/dbt/macros/udf/udf_columns_words_in_common_count.sql @@ -0,0 +1,24 @@ +{% macro create_udf_columns_words_in_common_count() %} +/* + Count number of words in common between 2 columns +*/ +DROP FUNCTION IF EXISTS {{ target.schema }}.udf_columns_words_in_common_count CASCADE; +CREATE FUNCTION {{ target.schema }}.udf_columns_words_in_common_count(col1 text, col2 text) +RETURNS integer AS $$ +DECLARE + word text; + count integer := 0; +BEGIN + FOR word IN + SELECT unnest(string_to_array(col1, ' ')) + LOOP + -- TODO: accuracy could be improved with REGEXP boundaries to count whole words + IF position(word IN col2) > 0 THEN + count := count + 1; + END IF; + END LOOP; + + RETURN count; +END; +$$ LANGUAGE plpgsql; +{% endmacro %} \ No newline at end of file diff --git a/dbt/macros/udf/udf_encode_base57.sql b/dbt/macros/udf/udf_encode_base57.sql index b251e509d..e739d455b 100644 --- a/dbt/macros/udf/udf_encode_base57.sql +++ b/dbt/macros/udf/udf_encode_base57.sql @@ -1,6 +1,6 @@ {% macro create_udf_encode_base57() %} -DROP FUNCTION IF EXISTS {{ target.schema }}.encode_base57(uuid); +DROP FUNCTION IF EXISTS {{ target.schema }}.encode_base57 CASCADE; CREATE FUNCTION {{ target.schema }}.encode_base57(uuid UUID) RETURNS varchar(22) AS $$ DECLARE diff --git a/dbt/macros/udf/udf_normalize_string_alpha_for_match.sql b/dbt/macros/udf/udf_normalize_string_alpha_for_match.sql index 32d144ba8..44a6d10ed 100644 --- a/dbt/macros/udf/udf_normalize_string_alpha_for_match.sql +++ b/dbt/macros/udf/udf_normalize_string_alpha_for_match.sql @@ -1,35 +1,36 @@ {% macro create_udf_normalize_string_alpha_for_match() %} /* - Function to normalize strings for the purpose of matching. + Normalize strings for the purpose of matching. For instance for RGPD we want to identify acteurs which names are directors' names, but we cant just pull everything for processing in Python because as of 2025-03-17 there are 13M unite_legale rows with directors' names, hence normalization for a pre-filtering in SQL E.g. to test this function: - SELECT udf_normalize_string_alpha_for_match(' Héllo-Wørld! Ça va? 123 '); + SELECT udf_normalize_string_for_match(' Héllo-Wørld! Ça va? 123 '); */ - -DROP FUNCTION IF EXISTS {{ target.schema }}.udf_normalize_string_alpha_for_match(input_text TEXT); -CREATE FUNCTION {{ target.schema }}.udf_normalize_string_alpha_for_match(input_text TEXT) RETURNS TEXT AS $$ +DROP FUNCTION IF EXISTS {{ target.schema }}.udf_normalize_string_for_match CASCADE; +CREATE FUNCTION {{ target.schema }}.udf_normalize_string_for_match(input_text TEXT, remove_words_smaller_size INTEGER DEFAULT 2) RETURNS TEXT AS $$ DECLARE normalized TEXT; + words TEXT[]; BEGIN - -- Step 1: Transliterate using unaccent + -- Step 1: Normalize the string normalized := unaccent(input_text); - - -- Step 2: Convert to lowercase normalized := lower(normalized); - - -- Step 3: Replace non-alpha characters with space normalized := regexp_replace(normalized, '[^a-z]', ' ', 'g'); - - -- Step 4: Replace multiple spaces with a single space normalized := regexp_replace(normalized, '\s+', ' ', 'g'); - - -- Step 5: Trim leading and trailing spaces normalized := trim(normalized); + -- Step 2: Split into words, sort alphabetically, and rejoin + words := string_to_array(normalized, ' '); + SELECT string_agg(word, ' ') INTO normalized + FROM ( + SELECT unnest(words) AS word + ORDER BY word + ) AS words_sorted + WHERE length(word) >= remove_words_smaller_size; + RETURN normalized; END; $$ LANGUAGE plpgsql; diff --git a/dbt/macros/udf/udf_safe_divmod.sql b/dbt/macros/udf/udf_safe_divmod.sql index 14dd02195..a782562fd 100644 --- a/dbt/macros/udf/udf_safe_divmod.sql +++ b/dbt/macros/udf/udf_safe_divmod.sql @@ -1,6 +1,5 @@ {% macro create_udf_safe_divmod() %} - -DROP FUNCTION IF EXISTS {{ target.schema }}.safe_divmod(n numeric, d numeric); +DROP FUNCTION IF EXISTS {{ target.schema }}.safe_divmod CASCADE; CREATE FUNCTION {{ target.schema }}.safe_divmod(n numeric, d numeric) RETURNS TABLE(quotient numeric, remainder numeric) AS $$ DECLARE diff --git a/dbt/macros/udf/udf_uuid_to_int.sql b/dbt/macros/udf/udf_uuid_to_int.sql index 87a5ec7cb..6bff1d6ba 100644 --- a/dbt/macros/udf/udf_uuid_to_int.sql +++ b/dbt/macros/udf/udf_uuid_to_int.sql @@ -1,6 +1,5 @@ {% macro create_udf_uuid_to_int() %} - -DROP FUNCTION IF EXISTS {{ target.schema }}.uuid_to_int(uuid UUID); +DROP FUNCTION IF EXISTS {{ target.schema }}.uuid_to_int CASCADE; CREATE FUNCTION {{ target.schema }}.uuid_to_int(uuid UUID) RETURNS numeric AS $$ DECLARE diff --git a/dbt/models/base/acteurs/base_acteur.sql b/dbt/models/base/acteurs/base_acteur.sql index 209b43f2f..321da9544 100644 --- a/dbt/models/base/acteurs/base_acteur.sql +++ b/dbt/models/base/acteurs/base_acteur.sql @@ -1 +1,4 @@ -select * from {{ source('qfdmo', 'qfdmo_acteur') }} \ No newline at end of file +select * from {{ source('qfdmo', 'qfdmo_acteur') }} +{% if env_var('DBT_SAMPLING', 'false') == 'true' %} +TABLESAMPLE SYSTEM (10) +{% endif %} diff --git a/dbt/models/base/acteurs/base_acteur_type.sql b/dbt/models/base/acteurs/base_acteur_type.sql new file mode 100644 index 000000000..cf1d4a247 --- /dev/null +++ b/dbt/models/base/acteurs/base_acteur_type.sql @@ -0,0 +1 @@ +select * from {{ source('qfdmo', 'qfdmo_acteurtype') }} \ No newline at end of file diff --git a/dbt/models/base/acteurs/schema.yml b/dbt/models/base/acteurs/schema.yml index 157b6e544..1e3c5b3b9 100644 --- a/dbt/models/base/acteurs/schema.yml +++ b/dbt/models/base/acteurs/schema.yml @@ -338,3 +338,17 @@ models: description: "The logo_file for this table" - name: licence description: "The licence for this table" + - name: base_acteur_type + description: "Types d'acteurs" + columns: + - name: id + description: "clef primaire" + data_tests: + - not_null + - name: libelle + description: "Nom/description du type d'acteur (ex: Association, entreprise de l'ESS)" + - name: code + description: "Code du type d'acteur (ex: ess)" + data_tests: + - not_null + - unique diff --git a/dbt/models/base/ae_annuaire_entreprises/base_ae_etablissement.sql b/dbt/models/base/ae_annuaire_entreprises/base_ae_etablissement.sql index 3e31ec0bb..2cb1da23f 100644 --- a/dbt/models/base/ae_annuaire_entreprises/base_ae_etablissement.sql +++ b/dbt/models/base/ae_annuaire_entreprises/base_ae_etablissement.sql @@ -14,21 +14,37 @@ Notes: SELECT -- Codes -siret, -activite_principale, +udf_ae_string_cleanup(siret) AS siret, +udf_ae_string_cleanup(activite_principale) AS activite_principale, + +-- Names +udf_ae_string_cleanup(denomination_usuelle) AS denomination_usuelle, -- Status -etat_administratif, +udf_ae_string_cleanup(etat_administratif) AS etat_administratif, -- Address -numero_voie, -complement_adresse, -type_voie, -libelle_voie, -code_postal, -libelle_commune +udf_ae_string_cleanup(numero_voie) AS numero_voie, +udf_ae_string_cleanup(complement_adresse) AS complement_adresse, +udf_ae_string_cleanup(type_voie) AS type_voie, +udf_ae_string_cleanup(libelle_voie) AS libelle_voie, +udf_ae_string_cleanup(code_postal) AS code_postal, +udf_ae_string_cleanup(libelle_commune) AS libelle_commune FROM {{ source('ae', 'clone_ae_etablissement_in_use') }} -- Filtering out foreign establishments as our focus is France -- On 2025-03-17 this allows excluding ~316K rows -WHERE code_pays_etranger IS NULL \ No newline at end of file +WHERE code_pays_etranger IS NULL +{% if env_var('DBT_SAMPLING', 'false') == 'true' %} +/* We can't do random sampling else we risk having +no matching etablissement vs. unite legale. Can't +sample on location as not available in unite to match, +falling back to latest SIRET/SIREN as they will give +matches while representing recent data. +*/ +/* TODO: improve sampling by grabbing what we have +in acteurs + a little more if we can suggestion models +to have more data */ +ORDER BY siret DESC +LIMIT 1000000 +{% endif %} diff --git a/dbt/models/base/ae_annuaire_entreprises/base_ae_unite_legale.sql b/dbt/models/base/ae_annuaire_entreprises/base_ae_unite_legale.sql index 6df83ed8c..49c353ae3 100644 --- a/dbt/models/base/ae_annuaire_entreprises/base_ae_unite_legale.sql +++ b/dbt/models/base/ae_annuaire_entreprises/base_ae_unite_legale.sql @@ -15,23 +15,33 @@ Notes: SELECT -- Codes -siren, -activite_principale, +udf_ae_string_cleanup(siren) AS siren, +udf_ae_string_cleanup(activite_principale) AS activite_principale, -- Status -etat_administratif, +udf_ae_string_cleanup(etat_administratif) AS etat_administratif, -- Business names -denomination, +udf_ae_string_cleanup(denomination) AS denomination, -- Director's names -CASE WHEN prenom1 = '[ND]' THEN NULL ELSE prenom1 END AS prenom1, -CASE WHEN prenom2 = '[ND]' THEN NULL ELSE prenom2 END AS prenom2, -CASE WHEN prenom3 = '[ND]' THEN NULL ELSE prenom3 END AS prenom3, -CASE WHEN prenom4 = '[ND]' THEN NULL ELSE prenom4 END AS prenom4, -CASE WHEN prenom_usuel = '[ND]' THEN NULL ELSE prenom_usuel END AS prenom_usuel, -CASE WHEN pseudonyme = '[ND]' THEN NULL ELSE pseudonyme END AS pseudonyme, -CASE WHEN nom = '[ND]' THEN NULL ELSE nom END AS nom, -CASE WHEN nom_usage = '[ND]' THEN NULL ELSE nom_usage END AS nom_usage +udf_ae_string_cleanup(prenom1) AS prenom1, +udf_ae_string_cleanup(prenom2) AS prenom2, +udf_ae_string_cleanup(prenom3) AS prenom3, +udf_ae_string_cleanup(prenom4) AS prenom4, +udf_ae_string_cleanup(prenom_usuel) AS prenom_usuel, +udf_ae_string_cleanup(pseudonyme) AS pseudonyme, +udf_ae_string_cleanup(nom) AS nom, +udf_ae_string_cleanup(nom_usage) AS nom_usage -FROM {{ source('ae', 'clone_ae_unite_legale_in_use') }} \ No newline at end of file +FROM {{ source('ae', 'clone_ae_unite_legale_in_use') }} +/* We can't do random sampling else we risk having +no matching etablissement vs. unite legale. Can't +sample on location as not available in unite to match, +falling back to latest SIRET/SIREN as they will give +matches while representing recent data. +*/ +{% if env_var('DBT_SAMPLING', 'false') == 'true' %} +ORDER BY siren DESC +LIMIT 500000 +{% endif %} \ No newline at end of file diff --git a/dbt/models/base/ae_annuaire_entreprises/schema.yml b/dbt/models/base/ae_annuaire_entreprises/schema.yml index 0ea0e6f85..15b9a0cd3 100644 --- a/dbt/models/base/ae_annuaire_entreprises/schema.yml +++ b/dbt/models/base/ae_annuaire_entreprises/schema.yml @@ -52,6 +52,10 @@ models: data_tests: - not_null - unique + - name: activite_principale + description: "Code NAF Rev2" + - name: denomination_usuelle + description: "Nom de l'établissement" - name: etat_administratif description: "A = Actif, F = Fermé" data_type: varchar(1) @@ -59,8 +63,6 @@ models: - not_null - accepted_values: values: ["A", "F"] - - name: activite_principale - description: "Code NAF Rev2" - name: numero_voie description: "Numéro de voie" - name: complement_adresse diff --git a/dbt/models/intermediate/ae_annuaire_entreprises/int_ae_etablissement.sql b/dbt/models/intermediate/ae_annuaire_entreprises/int_ae_etablissement.sql index 7b333a52c..b313e9eac 100644 --- a/dbt/models/intermediate/ae_annuaire_entreprises/int_ae_etablissement.sql +++ b/dbt/models/intermediate/ae_annuaire_entreprises/int_ae_etablissement.sql @@ -11,14 +11,25 @@ Notes: indexes=[ {'columns': ['siret'], 'unique': True}, {'columns': ['est_actif']}, + {'columns': ['code_postal']}, + ], + post_hook=[ + "CREATE INDEX ON {{ this }}(adresse_numero) WHERE adresse_numero IS NOT NULL" ] ) }} SELECT -- Codes - siret, - activite_principale AS naf, -- Making NAF explicit since it's a code + etab.siret, + etab.activite_principale AS naf, -- Making NAF explicit being a well-known code + + -- Names + CASE + WHEN etab.denomination_usuelle IS NOT NULL THEN etab.denomination_usuelle + WHEN etab.denomination_usuelle IS NULL AND unite.denomination IS NOT NULL THEN unite.denomination + ELSE {{ value_unavailable() }} + END AS nom, /* Is active or not: converting this field to BOOLEAN to: @@ -27,19 +38,33 @@ SELECT using different flags - create more efficient data type and index */ - CASE etat_administratif + CASE etab.etat_administratif WHEN 'A' THEN TRUE ELSE FALSE END AS est_actif, + CASE unite.etat_administratif + WHEN 'A' THEN TRUE + ELSE FALSE + END AS unite_est_actif, -- Addresse udf_columns_concat_unique_non_empty( - numero_voie, - type_voie, - libelle_voie + etab.numero_voie, + etab.type_voie, + etab.libelle_voie ) AS adresse, - complement_adresse AS adresse_complement, - code_postal, - libelle_commune AS ville + etab.numero_voie AS adresse_numero, + etab.complement_adresse AS adresse_complement, + etab.code_postal, + etab.libelle_commune AS ville -FROM {{ ref('base_ae_etablissement') }} \ No newline at end of file +FROM {{ ref('base_ae_etablissement') }} AS etab +/* Joining with unite_legale to bring some essential +data from parent unite into each etablissement (saves +us from making expensive JOINS in downstream models) */ +JOIN {{ ref('base_ae_unite_legale') }} AS unite +ON unite.siren = LEFT(etab.siret,9) +/* Here we keep unavailable names as int_ models aren't +responsible for business logic. Keeping allows investigating +AND nom != {{ value_unavailable() }} +*/ \ No newline at end of file diff --git a/dbt/models/intermediate/ae_annuaire_entreprises/int_ae_unite_legale.sql b/dbt/models/intermediate/ae_annuaire_entreprises/int_ae_unite_legale.sql index 6a7c8ef18..162f3b84e 100644 --- a/dbt/models/intermediate/ae_annuaire_entreprises/int_ae_unite_legale.sql +++ b/dbt/models/intermediate/ae_annuaire_entreprises/int_ae_unite_legale.sql @@ -45,14 +45,14 @@ SELECT - normalize to increase chances of matching - keep each column separate for a potential substring match */ - udf_normalize_string_alpha_for_match(nom) AS dirigeant_nom, - udf_normalize_string_alpha_for_match(nom_usage) AS dirigeant_nom_usage, - udf_normalize_string_alpha_for_match(pseudonyme) AS dirigeant_pseudonyme, - udf_normalize_string_alpha_for_match(prenom1) AS dirigeant_prenom1, - udf_normalize_string_alpha_for_match(prenom2) AS dirigeant_prenom2, - udf_normalize_string_alpha_for_match(prenom3) AS dirigeant_prenom3, - udf_normalize_string_alpha_for_match(prenom4) AS dirigeant_prenom4, - udf_normalize_string_alpha_for_match(prenom_usuel) AS dirigeant_prenom_usuel, + nom AS dirigeant_nom, + nom_usage AS dirigeant_nom_usage, + pseudonyme AS dirigeant_pseudonyme, + prenom1 AS dirigeant_prenom1, + prenom2 AS dirigeant_prenom2, + prenom3 AS dirigeant_prenom3, + prenom4 AS dirigeant_prenom4, + prenom_usuel AS dirigeant_prenom_usuel, -- TRUE if ANY names NOT NULL for more efficient pre-filtering COALESCE( nom, diff --git a/dbt/models/intermediate/ae_annuaire_entreprises/schema.yml b/dbt/models/intermediate/ae_annuaire_entreprises/schema.yml index 5b9e15b70..5e6567c05 100644 --- a/dbt/models/intermediate/ae_annuaire_entreprises/schema.yml +++ b/dbt/models/intermediate/ae_annuaire_entreprises/schema.yml @@ -50,13 +50,17 @@ models: data_tests: - not_null - unique + - name: activite_principale + description: "Code NAF Rev2" + - name: nom + description: "Nom de l'établissement" + data_tests: + - not_null - name: est_actif description: "OUI si A = Actif" data_type: boolean data_tests: - not_null - - name: activite_principale - description: "Code NAF Rev2" - name: numero_voie description: "Numéro de voie" - name: complement_adresse diff --git a/dbt/models/marts/acteurs/carte/marts_carte_acteur.sql b/dbt/models/marts/acteurs/carte/marts_carte_acteur.sql index be48b7bd2..fa0dafeab 100644 --- a/dbt/models/marts/acteurs/carte/marts_carte_acteur.sql +++ b/dbt/models/marts/acteurs/carte/marts_carte_acteur.sql @@ -1,4 +1,4 @@ -- depends_on: {{ ref('marts_carte_filtered_acteur') }} -- depends_on: {{ ref('marts_carte_propositionservice')}} -{{ acteur('marts_carte_filtered_acteur', 'marts_carte_propositionservice')}} +{{ acteur('marts_carte_filtered_acteur', 'marts_carte_propositionservice')}} \ No newline at end of file diff --git a/dbt/models/marts/enrich/marts_enrich_acteurs_closed_candidates.sql b/dbt/models/marts/enrich/marts_enrich_acteurs_closed_candidates.sql new file mode 100644 index 000000000..51d775377 --- /dev/null +++ b/dbt/models/marts/enrich/marts_enrich_acteurs_closed_candidates.sql @@ -0,0 +1,70 @@ +/* +Acteurs which SIRET is closed in AE's etablissement + +Notes: + - 📦 Materialized as table but refreshed by DAG enrich_acteurs_closed + as many models/tests depending on each other = would take too long +*/ +{{ + config( + materialized = 'table', + tags=['marts', 'enrich', 'closed', 'ae', 'annuaire_entreprises', 'etablissement'], + ) +}} +-- Starting from our acteurs we can match via SIRET +WITH acteurs_with_siret AS ( + SELECT + -- Acteur columns + identifiant_unique AS acteur_id, + siret AS acteur_siret, + LEFT(siret,9) AS acteur_siren, + nom AS acteur_nom, + udf_normalize_string_for_match(nom) AS acteur_nom_normalise, + commentaires AS acteur_commentaires, + statut AS acteur_statut, + acteur_type_id, + source_id AS acteur_source_id, + adresse AS acteur_adresse, + code_postal AS acteur_code_postal, + ville AS acteur_ville, + location AS acteur_location + + FROM {{ ref('marts_carte_acteur') }} + WHERE siret IS NOT NULL AND siret != '' AND LENGTH(siret) = 14 +), +/* Filtering on etab closed (NOT etab.est_actif) BUT +not on unite closed (NOT unite_est_actif) because +open unite might bring potential replacements */ +etab_closed_candidates AS ( +SELECT + -- acteurs + acteurs.acteur_id, + acteurs.acteur_siret, + acteurs.acteur_siren, + acteurs.acteur_type_id, + acteurs.acteur_source_id, + acteurs.acteur_statut, + acteurs.acteur_nom, + acteurs.acteur_nom_normalise, + acteurs.acteur_commentaires, + acteurs.acteur_adresse, + acteurs.acteur_code_postal, + acteurs.acteur_ville, + CASE WHEN acteurs.acteur_location IS NULL THEN NULL ELSE ST_X(acteurs.acteur_location) END AS acteur_longitude, + CASE WHEN acteurs.acteur_location IS NULL THEN NULL ELSE ST_Y(acteurs.acteur_location) END AS acteur_latitude, + + -- etablissement + etab.unite_est_actif AS unite_est_actif, + etab.est_actif AS etab_est_actif, + etab.code_postal AS etab_code_postal, + etab.adresse_numero AS etab_adresse_numero, + etab.adresse AS etab_adresse, + etab.adresse_complement AS etab_adresse_complement, + etab.naf AS etab_naf + +FROM acteurs_with_siret AS acteurs +JOIN {{ ref('int_ae_etablissement') }} AS etab ON acteurs.acteur_siret = etab.siret +WHERE etab.est_actif IS FALSE +) + +SELECT * FROM etab_closed_candidates \ No newline at end of file diff --git a/dbt/models/marts/enrich/marts_enrich_acteurs_closed_replaced.sql b/dbt/models/marts/enrich/marts_enrich_acteurs_closed_replaced.sql new file mode 100644 index 000000000..94c964759 --- /dev/null +++ b/dbt/models/marts/enrich/marts_enrich_acteurs_closed_replaced.sql @@ -0,0 +1,70 @@ +{{ + config( + materialized = 'table', + tags=['marts', 'enrich', 'closed', 'ae', 'annuaire_entreprises', 'etablissement'], + ) +}} + +WITH potential_replacements AS ( + SELECT + + -- Candidates acteur data + candidates.*, + + -- Suggestions + acteur_type_id AS suggest_acteur_type_id, + acteur_longitude AS suggest_longitude, + acteur_latitude AS suggest_latitude, + suggests.siret AS suggest_siret, + LEFT(suggests.siret,9) AS suggest_siren, + LEFT(candidates.acteur_siret,9) = LEFT(suggests.siret,9) AS suggest_siret_is_from_same_siren, + suggests.nom AS suggest_nom, + suggests.naf AS suggest_naf, + suggests.ville AS suggest_ville, + suggests.code_postal AS suggest_code_postal, + suggests.adresse AS suggest_adresse, + + -- Matching + udf_columns_words_in_common_count( + candidates.acteur_nom_normalise, + udf_normalize_string_for_match(suggests.nom) + ) AS noms_nombre_mots_commun, + ROW_NUMBER() OVER ( + PARTITION BY candidates.acteur_siret + ORDER BY + -- Prioritize suggests from same company + CASE + WHEN LEFT(candidates.acteur_siret,9) = LEFT(suggests.siret,9) THEN 1 + ELSE 0 + END DESC, + -- Then etablissements with more words in common + udf_columns_words_in_common_count( + candidates.acteur_nom_normalise, + udf_normalize_string_for_match(suggests.nom) + ) DESC + ) AS replacement_priority + /* + JOINS: candidates are our acteurs, suggests are etablissements + with a matching naf, code_postal, adresse and adresse_numero + */ + FROM {{ ref('marts_enrich_acteurs_closed_candidates') }} AS candidates + INNER JOIN {{ ref('int_ae_etablissement') }} AS suggests + ON suggests.naf = candidates.etab_naf + AND suggests.code_postal = candidates.etab_code_postal + AND suggests.adresse_numero = candidates.etab_adresse_numero + AND udf_normalize_string_for_match(suggests.adresse) = udf_normalize_string_for_match(candidates.etab_adresse) + WHERE suggests.est_actif + -- Fields which must be non-NULL for a replacement to be considered + AND suggests.code_postal IS NOT NULL + AND suggests.adresse IS NOT NULL + /* To reduce false positives with generic addresses + such as ZA, ZI containing multiple instances of similar + stores (e.g. supermarkets), we force presence + of street number, which later will be used + as condition for matching */ + AND suggests.adresse_numero IS NOT NULL +) +SELECT * FROM potential_replacements +WHERE replacement_priority=1 +/* We don't want to propose suggests with unavailable names */ +AND suggest_nom != {{ value_unavailable() }} \ No newline at end of file diff --git a/dbt/models/marts/enrich/marts_enrich_acteurs_closed_suggest_not_replaced.sql b/dbt/models/marts/enrich/marts_enrich_acteurs_closed_suggest_not_replaced.sql new file mode 100644 index 000000000..5b49c3c15 --- /dev/null +++ b/dbt/models/marts/enrich/marts_enrich_acteurs_closed_suggest_not_replaced.sql @@ -0,0 +1,25 @@ +/* +Acteurs which SIREN & SIRET are closed in AE's etablissement +AND for which we couldn't find replacements +*/ +{{ + config( + materialized = 'table', + tags=['marts', 'enrich', 'closed', 'ae', 'annuaire_entreprises', 'etablissement'], + ) +}} + +SELECT + '🚪 Acteurs Fermés: 🔴 non remplacés' AS suggest_cohort, + * +FROM {{ ref('marts_enrich_acteurs_closed_candidates') }} +WHERE + /* In candidates we already filter on etab_est_actif IS FALSE + but we don't filter on unite_est_actif IS FALSE + because it would prevent us from finding replacements for same unite, + however for acteurs we consider fully closed we do apply that filter */ + unite_est_actif is FALSE + AND acteur_id NOT IN ( + SELECT acteur_id FROM {{ ref('marts_enrich_acteurs_closed_replaced') }} + ) + diff --git a/dbt/models/marts/enrich/marts_enrich_acteurs_closed_suggest_replaced_other_siren.sql b/dbt/models/marts/enrich/marts_enrich_acteurs_closed_suggest_replaced_other_siren.sql new file mode 100644 index 000000000..1cb1d63cb --- /dev/null +++ b/dbt/models/marts/enrich/marts_enrich_acteurs_closed_suggest_replaced_other_siren.sql @@ -0,0 +1,12 @@ +{{ + config( + materialized = 'table', + tags=['marts', 'enrich', 'closed', 'ae', 'annuaire_entreprises', 'etablissement'], + ) +}} + +SELECT + '🚪 Acteurs Fermés: 🟡 remplacés par SIRET d''un autre SIREN' AS suggest_cohort, + * +FROM {{ ref('marts_enrich_acteurs_closed_replaced') }} +WHERE suggest_siret_is_from_same_siren IS FALSE diff --git a/dbt/models/marts/enrich/marts_enrich_acteurs_closed_suggest_replaced_same_siren.sql b/dbt/models/marts/enrich/marts_enrich_acteurs_closed_suggest_replaced_same_siren.sql new file mode 100644 index 000000000..30f6e24cf --- /dev/null +++ b/dbt/models/marts/enrich/marts_enrich_acteurs_closed_suggest_replaced_same_siren.sql @@ -0,0 +1,12 @@ +{{ + config( + materialized = 'table', + tags=['marts', 'enrich', 'closed', 'ae', 'annuaire_entreprises', 'etablissement'], + ) +}} + +SELECT + '🚪 Acteurs Fermés: 🟢 remplacés par SIRET du même SIREN' AS suggest_cohort, + * +FROM {{ ref('marts_enrich_acteurs_closed_replaced') }} +WHERE suggest_siret_is_from_same_siren IS TRUE diff --git a/dbt/models/marts/enrich/marts_enrich_ea_rgpd.sql b/dbt/models/marts/enrich/marts_enrich_ea_rgpd.sql deleted file mode 100644 index 6e36bf5f7..000000000 --- a/dbt/models/marts/enrich/marts_enrich_ea_rgpd.sql +++ /dev/null @@ -1,68 +0,0 @@ -/* -Model to find entries from AE's unite legal which directors names -around found inside our acteurs names. - -Notes: - - 🧹 Pre-matching/filtering at SQL level to reduce data size (13M rows) - - 👁️‍🗨️ Keeping as view to always re-evaluate vs. ever changing QFDMO data -*/ -{{ - config( - materialized = 'view', - tags=['marts', 'ae', 'annuaire_entreprises', 'unite_legale', 'rgpd'], - ) -}} - -WITH acteurs_with_siren AS ( - SELECT - LEFT(siret,9) AS siren, - identifiant_unique AS acteur_id, - udf_normalize_string_alpha_for_match(CONCAT(nom || ' ' || nom_officiel || ' ' || nom_commercial)) AS acteur_noms, - commentaires AS acteur_commentaires - FROM {{ ref('marts_carte_acteur') }} - /* - We have normalization issues with our SIREN field in our DB - and we obtain better matching by reconstructing SIREN via SIRET - */ - WHERE siret IS NOT NULL AND siret != '' AND LENGTH(siret) = 14 - AND {{ acteur_status_is_active() }} -) -SELECT - -- Common fields - acteurs.siren, - - -- Acteur fields - acteur_id, - acteur_noms, - acteur_commentaires, - - -- Unite legale fields - /* - We don't care which one is which, we aggregate to - reduce data size and we will perform a more precise - post-match in Python - */ - udf_columns_concat_unique_non_empty( - dirigeant_nom, - dirigeant_nom_usage, - dirigeant_pseudonyme, - dirigeant_prenom1, - dirigeant_prenom2, - dirigeant_prenom3, - dirigeant_prenom4 - ) AS dirigeants_noms_prenoms - -FROM {{ ref('int_ae_unite_legale') }} AS unite -LEFT JOIN acteurs_with_siren AS acteurs ON acteurs.siren = unite.siren -WHERE - acteurs.siren IS NOT NULL -- i.e. we have a match - AND a_dirigeant_noms_ou_prenoms_non_null -- we have some directors names - AND ( -- Any of the directors names appear in the acteur names - position(dirigeant_nom IN acteur_noms) > 0 - OR position(dirigeant_nom_usage IN acteur_noms) > 0 - OR position(dirigeant_pseudonyme IN acteur_noms) > 0 - OR position(dirigeant_prenom1 IN acteur_noms) > 0 - OR position(dirigeant_prenom2 IN acteur_noms) > 0 - OR position(dirigeant_prenom3 IN acteur_noms) > 0 - OR position(dirigeant_prenom4 IN acteur_noms) > 0 - ) \ No newline at end of file diff --git a/dbt/models/marts/enrich/schema.yml b/dbt/models/marts/enrich/schema.yml index d26f7a6a3..2b25e895f 100644 --- a/dbt/models/marts/enrich/schema.yml +++ b/dbt/models/marts/enrich/schema.yml @@ -1,41 +1,83 @@ version: 2 models: - - name: marts_enrich_ea_rgpd - description: Unités légales de l'Annuaire Entreprises (AE) préfiltrés - | et prématchés sur la base des noms/prénoms de dirigeants dont au - | moins 1 apparait dans le nom de nos acteurs (le modèle sera ensuite - | utilisé par un DAG Airflow pour faire du matching plus poussé via - | python et soumettre des suggestions) + - name: marts_enrich_acteurs_closed_replaced + description: Etablissements de l'Annuaire Entreprises (AE) qui ont été + | fermés et remplacés par un autre établissement columns: - - name: siren - description: "Numéro SIREN" - data_type: varchar(9) - data_tests: - - not_null - # Our model is at unite_legale level (no repetition per establishment) - # hence SIREN should be unique. However test failing as of 2025-03-19 - # due to duplicate SIREN in our DB which is potentially OK (i.e. multiple - # acteur locations belonging to the same parent Acteur SIREN) - # - unique - name: acteur_id description: Identifiant unique de l'acteur data_tests: - not_null - - name: acteur_noms - description: Nom, nom officiel et nom commercial de l'acteur - | regroupés & normalisés pour réduire la taille de la table, sachant - | qu'on fait un matching plus poussés avec python par la suite - # Ensuring we are not matching empty strings + - name: acteur_statut + description: Statut de l'acteur dans QFDMO + - name: acteur_siret + description: SIRET de l'acteur fermé + data_tests: + - not_null + - name: suggest_siret + description: SIRET de l'établissement qui remplace l'acteur fermé data_tests: - not_null + - name: suggest_cohorte + description: "Si le SIRET de remplacement appartient à la même entreprise (meme_siret) ou non (autre_siret)" + data_tests: + - not_null + # - accepted_values: + # values: ['siret_du_meme_siren', 'siret_dun_autre_siren'] + - name: acteur_nom + description: Nom de l'acteur fermé + - name: suggest_nom + description: Nom de l'établissement qui remplace l'acteur fermé + - name: noms_nombre_mots_commun + description: Nombre de mots en commun entre le nom de l'acteur et celui du remplaçant + data_type: integer - name: acteur_commentaires - description: Commentaires de l'acteur pour debug ET si on veut faire - | filtering avec des paramètres de DAG - - name: dirigeants_noms_prenoms - description: Noms & prénoms de tous les dirigeants - | regroupés & normalisés pour réduire la taille de la table, sachant - | qu'on fait un matching plus poussés avec python par la suite - # If we had a match then we must have at least one director's name + description: Commentaires de l'acteur pour debug + - name: naf + description: Code NAF de l'établissement remplaçant + - name: ville + description: Ville de l'établissement remplaçant + - name: code_postal + description: Code postal de l'établissement remplaçant + - name: adresse + description: Adresse de l'établissement remplaçant + - name: replacement_priority + description: "Priorité du remplacement (1 = meilleur match)" + data_type: integer + + - name: marts_enrich_acteurs_closed_candidates + description: Etablissements fermés de l'Annuaire Entreprises (AE) qui pourraient être remplacés + columns: + - name: siret + description: SIRET de l'établissement fermé data_tests: - - not_null \ No newline at end of file + - not_null + - name: unite_est_actif + description: Si l'unité légale est toujours active + data_type: boolean + - name: etab_est_actif + description: Si l'établissement est toujours actif (toujours FALSE ici) + data_type: boolean + data_tests: + - accepted_values: + values: [false] + - name: etab_code_postal + description: Code postal de l'établissement fermé + - name: etab_adresse + description: Adresse de l'établissement fermé + - name: etab_naf + description: Code NAF de l'établissement fermé + - name: acteur_id + description: Identifiant unique de l'acteur dans QFDMO + data_tests: + - not_null + - name: acteur_statut + description: Statut de l'acteur dans QFDMO + - name: acteur_nom + description: Nom de l'acteur dans QFDMO + - name: acteur_nom_normalise + description: Nom de l'acteur normalisé pour faciliter les comparaisons + - name: acteur_commentaires + description: Commentaires de l'acteur pour debug + diff --git a/dbt/models/source/source_acteur.yml b/dbt/models/source/source_acteur.yml index b5d6f20b6..3c8d7fdec 100644 --- a/dbt/models/source/source_acteur.yml +++ b/dbt/models/source/source_acteur.yml @@ -7,6 +7,7 @@ sources: - name : qfdmo_acteur - name : qfdmo_acteur_acteur_services - name : qfdmo_acteur_labels + - name : qfdmo_acteurtype - name : qfdmo_propositionservice - name : qfdmo_propositionservice_sous_categories - name : qfdmo_revisionacteur diff --git a/dbt/profiles.yml b/dbt/profiles.yml index edebeb981..ca26c20c1 100644 --- a/dbt/profiles.yml +++ b/dbt/profiles.yml @@ -8,4 +8,4 @@ default: user: "{{ env_var('POSTGRES_USER', 'qfdmo') }}" password: "{{ env_var('POSTGRES_PASSWORD', 'qfdmo') }}" dbname: "{{ env_var('POSTGRES_DB', 'qfdmo') }}" - schema: "{{ env_var('POSTGRES_SCHEMA', 'public') }}" + schema: "{{ env_var('POSTGRES_SCHEMA', 'public') }}" \ No newline at end of file diff --git a/qfdmo/migrations/0150_acteur_siret_is_closed_and_more.py b/qfdmo/migrations/0150_acteur_siret_is_closed_and_more.py new file mode 100644 index 000000000..36cf2ebe3 --- /dev/null +++ b/qfdmo/migrations/0150_acteur_siret_is_closed_and_more.py @@ -0,0 +1,68 @@ +# Generated by Django 5.1.6 on 2025-04-16 08:39 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("qfdmo", "0149_alter_carteconfig_hide_legend"), + ] + + operations = [ + migrations.AddField( + model_name="acteur", + name="siret_is_closed", + field=models.BooleanField( + blank=True, + default=None, + help_text="Indique si le SIRET est fermé ou non dans l'Annuaire Entreprises", + null=True, + verbose_name="SIRET fermé", + ), + ), + migrations.AddField( + model_name="displayedacteur", + name="siret_is_closed", + field=models.BooleanField( + blank=True, + default=None, + help_text="Indique si le SIRET est fermé ou non dans l'Annuaire Entreprises", + null=True, + verbose_name="SIRET fermé", + ), + ), + migrations.AddField( + model_name="displayedacteurtemp", + name="siret_is_closed", + field=models.BooleanField( + blank=True, + default=None, + help_text="Indique si le SIRET est fermé ou non dans l'Annuaire Entreprises", + null=True, + verbose_name="SIRET fermé", + ), + ), + migrations.AddField( + model_name="revisionacteur", + name="parent_reason", + field=models.CharField( + blank=True, + db_default="", + default="", + help_text="Raison du rattachement au parent", + max_length=255, + ), + ), + migrations.AddField( + model_name="revisionacteur", + name="siret_is_closed", + field=models.BooleanField( + blank=True, + default=None, + help_text="Indique si le SIRET est fermé ou non dans l'Annuaire Entreprises", + null=True, + verbose_name="SIRET fermé", + ), + ), + ] diff --git a/qfdmo/models/acteur.py b/qfdmo/models/acteur.py index fde50f37b..ded587031 100644 --- a/qfdmo/models/acteur.py +++ b/qfdmo/models/acteur.py @@ -1,3 +1,4 @@ +import json import logging import random import re @@ -354,6 +355,15 @@ class Meta: siret = models.CharField( max_length=14, blank=True, default="", db_default="", db_index=True ) + # To backfill SIRET status into our DB from AE and avoid having to evaluate + # AE's DB at runtime (which has 40M rows), also helping with Django admin info + siret_is_closed = models.BooleanField( + default=None, # by default we can't assume a SIRET is opened + null=True, + blank=True, + verbose_name="SIRET fermé", + help_text="Indique si le SIRET est fermé ou non dans l'Annuaire Entreprises", + ) source = models.ForeignKey(Source, on_delete=models.CASCADE, blank=True, null=True) identifiant_externe = models.CharField( max_length=255, blank=True, default="", db_default="" @@ -601,6 +611,33 @@ def get_fields_for_clone(cls): "labels", } + def commentaires_ajouter(self, added): + """Historically this field has been defined as TextField + but has contained a mix of free text and JSON data, hence + method to help append data in a JSON format""" + existing = self.commentaires + + # If empty we overwrite + if existing is None or existing.strip() == "": + self.commentaires = json.dumps([{"message": added}]) + else: + try: + # If not empty, trying to parse as JSON + existing_data = json.loads(existing) + if not isinstance(existing_data, list): + raise NotImplementedError( + "Cas de commentaires JSON non-liste pas prévu" + ) + except (json.JSONDecodeError, ValueError): + # If existing not JSON we turn it into a list + existing_data = [{"message": existing}] + + # Appending new data + existing_data.append({"message": added}) + self.commentaires = json.dumps(existing_data) + + self.save() + def clean_parent(parent): try: @@ -702,6 +739,13 @@ class Meta: related_name="duplicats", validators=[clean_parent], ) + parent_reason = models.CharField( + max_length=255, + blank=True, + default="", + db_default="", + help_text="Raison du rattachement au parent", + ) @property def is_parent(self): @@ -794,6 +838,7 @@ def set_default_fields_and_objects_before_save(self) -> Acteur | None: "acteur_services", "labels", "parent", + "parent_reason", "is_parent", ], ) @@ -860,6 +905,7 @@ def duplicate(self): "acteur_services", "proposition_services", "parent", + "parent_reason", ] for field in fields_to_reset: diff --git a/templates/data/_partials/dicts_to_table.html b/templates/data/_partials/dicts_to_table.html new file mode 100644 index 000000000..83405d35d --- /dev/null +++ b/templates/data/_partials/dicts_to_table.html @@ -0,0 +1,33 @@ +{# Turns data:list[dict] into a table #} + + + + {% for key in data.0.keys %} + + {% endfor %} + + + + {% for row in data %} + + {% for value in row.values %} + + {% endfor %} + + {% endfor %} + +
{{ key|capfirst }}
+ {% if value|valuetype == "dict" %} + {% include "data/_partials/value_details.html" with value=value %}

+ {% if 'data' in value %} + {% if "nom" in value.data and "adresse" in value.data and "ville" in value.data and "code_postal" in value.data %} +
+ + 🗺️ Voir sur Google Maps + + {% endif %} + {% endif %} + {% else %} + {{ value }} + {% endif %} +
diff --git a/templates/data/_partials/suggestion_details_changes.html b/templates/data/_partials/suggestion_details_changes.html new file mode 100644 index 000000000..e91094586 --- /dev/null +++ b/templates/data/_partials/suggestion_details_changes.html @@ -0,0 +1,13 @@ +{# Generic template for suggestions following suggestion.changes:list[SuggestionChange] #} +{% extends "data/_partials/suggestion_details.html" %} +{% load custom_filters %} + +{% block suggestion_title %} +{{ title }} +{% endblock suggestion_title %} + +{% block suggestion_details %} + +

🔢 {{ changes|length }} acteur(s) impacté(s):

+{% include "data/_partials/dicts_to_table.html" with data=changes %} +{% endblock suggestion_details %} diff --git a/templates/data/_partials/value_details.html b/templates/data/_partials/value_details.html index db29adb23..3f30804c4 100644 --- a/templates/data/_partials/value_details.html +++ b/templates/data/_partials/value_details.html @@ -1,15 +1,53 @@ {% load custom_filters %} {% if value|valuetype != "list" and value|valuetype != "dict" %} - {{ value }} + + {# Generic case to highlight specific values #} + {% if value is None %} + NONE + {% elif value == "" %} + EMPTY STRING + + {# Source #} + {% elif key == "source" %} + {{ value }} + + {# Acteur Type #} + {% elif key == "acteur_type" %} + {{ value }} + + {# Acteur #} + {% elif key == "identifiant_unique" %} + {{ value }} + (base, + rev, + disp) + {% elif key == "statut" %} + {{ value }} + {% elif key == "siret_is_closed" %} + {{ value }} + {% elif key == "parent" %} + {{ value }} (futur parent) + + + {# Annuaire Entreprises links #} + {% elif key == "siren" %} + {{ value }} + {% elif key == "siret" %} + {{ value }} + + {# Fallback #} + {% else %} + {{ value }} + {% endif %} {% elif value|valuetype == "list" and value %} {% elif value|valuetype == "dict" %} {% for key, item in value.items %} -

{{ key }} : {% include "data/_partials/value_details.html" with value=item %}

+

{{ key }} : {% include "data/_partials/value_details.html" with key=key value=item %}

{% endfor %} {% endif %} diff --git a/unit_tests/data/models/changes/test_acteur_create_as_child.py b/unit_tests/data/models/changes/test_acteur_create_as_child.py new file mode 100644 index 000000000..325b5bcdf --- /dev/null +++ b/unit_tests/data/models/changes/test_acteur_create_as_child.py @@ -0,0 +1,72 @@ +import pytest +from django.contrib.gis.geos import Point + +from data.models.changes.acteur_create_as_child import ChangeActeurCreateAsChild +from qfdmo.models import Acteur, RevisionActeur +from unit_tests.qfdmo.acteur_factory import ( + ActeurFactory, + ActeurTypeFactory, + SourceFactory, +) + + +@pytest.mark.django_db +class TestChangeActeurCreateAsChild: + @pytest.mark.parametrize( + "data,missing", + [({"parent": "456"}, "parent_reason"), ({"parent_reason": "test"}, "parent")], + ) + def test_raise_if_missing_params(self, data, missing): + change = ChangeActeurCreateAsChild(id="123", data=data) + with pytest.raises(ValueError, match=f"champ '{missing}' à renseigner"): + change.apply() + + def test_raise_if_acteur_exists(self): + ActeurFactory(identifiant_unique="123") + change = ChangeActeurCreateAsChild( + id="123", data={"parent": "456", "parent_reason": "test"} + ) + with pytest.raises(ValueError, match="existe déjà"): + change.apply() + + def test_working(self): + # Create parent + source = SourceFactory(code="source1") + atype = ActeurTypeFactory(code="atype1") + parent = RevisionActeur.objects.create( + identifiant_unique="parent1", + source=source, + acteur_type=atype, + statut="ACTIF", + location=Point(1, 1), + ) + # Create child + change = ChangeActeurCreateAsChild( + id="child1", + data={ + "nom": "my child1", + "source": source, + "acteur_type": atype, + "statut": "ACFIF", + "location": Point(1, 1), + "parent": parent, + "parent_reason": "test", + }, + ) + change.apply() + + # Acteur created in base to hold the core data + base = Acteur.objects.get(pk="child1") + assert base.identifiant_unique == "child1" + assert base.nom == "my child1" + assert base.source.pk == source.pk + assert base.acteur_type.pk == atype.pk + assert base.statut == "ACFIF" + assert base.location.x == 1 + assert base.location.y == 1 + + # Acteur created in revision to hold the parent reference + revision = RevisionActeur.objects.get(pk="child1") + assert revision.parent.pk == parent.pk + assert revision.parent_reason == "test" + assert not revision.nom diff --git a/unit_tests/qfdmo/test_acteur_methods.py b/unit_tests/qfdmo/test_acteur_methods.py new file mode 100644 index 000000000..7e0e7e759 --- /dev/null +++ b/unit_tests/qfdmo/test_acteur_methods.py @@ -0,0 +1,28 @@ +"""Test file dedicated to acteur methods""" + +import json + +import pytest + +from unit_tests.qfdmo.acteur_factory import ( + ActeurFactory, +) + + +@pytest.mark.django_db +class TestActeurMethods: + + @pytest.mark.parametrize( + "initial,expected", + [ + ("", [{"message": "test"}]), + (" ", [{"message": "test"}]), + ("foo", [{"message": "foo"}, {"message": "test"}]), + ('[{"message": "bar"}]', [{"message": "bar"}, {"message": "test"}]), + ], + ) + def test_commentaires_ajouter(self, initial, expected): + acteur = ActeurFactory(commentaires=initial) + acteur.commentaires_ajouter("test") + actual = json.loads(acteur.commentaires) + assert actual == expected