diff --git a/dags/enrich/config/__init__.py b/dags/enrich/config/__init__.py index 07d90673d..e133b6e5e 100644 --- a/dags/enrich/config/__init__.py +++ b/dags/enrich/config/__init__.py @@ -1,6 +1,10 @@ from .cohorts import COHORTS # noqa: F401 from .columns import COLS, SUGGEST_PREFIX # noqa: F401 from .dbt import DBT # noqa: F401 -from .models import DAG_ID_TO_CONFIG_MODEL, EnrichActeursClosedConfig # noqa: F401 +from .models import ( # noqa: F401 + DAG_ID_TO_CONFIG_MODEL, + EnrichActeursClosedConfig, + EnrichActeursRGPDConfig, +) from .tasks import TASKS # noqa: F401 from .xcoms import XCOMS, xcom_pull # noqa: F401 diff --git a/dags/enrich/config/cohorts.py b/dags/enrich/config/cohorts.py index 0cd206ecd..ae4f73d4d 100644 --- a/dags/enrich/config/cohorts.py +++ b/dags/enrich/config/cohorts.py @@ -10,3 +10,4 @@ class COHORTS: "🚪 Acteurs Fermés: 🟡 remplacés par SIRET d'un autre SIREN" ) CLOSED_REP_SAME_SIREN = "🚪 Acteurs Fermés: 🟢 remplacés par SIRET du même SIREN" + RGPD = "Anonymisation RGPD" diff --git a/dags/enrich/config/columns.py b/dags/enrich/config/columns.py index e9c48b924..b6cbdbc2b 100644 --- a/dags/enrich/config/columns.py +++ b/dags/enrich/config/columns.py @@ -27,10 +27,22 @@ class COLS: ACTEUR_NAF: str = "acteur_naf" ACTEUR_LONGITUDE: str = "acteur_longitude" ACTEUR_LATITUDE: str = "acteur_latitude" - + ACTEUR_STATUT: str = "acteur_statut" # Annuaire Entreprise AE_DIRIGEANTS_NOMS: str = "ae_dirigeants_noms_prenoms" + # Suggestions + SUGGEST_COHORT_CODE: str = "suggestion_cohorte_code" + SUGGEST_COHORT: str = "suggest_cohort" + + # Replacements + SUGGEST_SIRET: str = "suggest_siret" + SUGGEST_NOM: str = "suggest_nom" + SUGGEST_ADRESSE: str = "suggest_adresse" + SUGGEST_CODE_POSTAL: str = "suggest_code_postal" + SUGGEST_VILLE: str = "suggest_ville" + SUGGEST_NAF: str = "suggest_naf" + # Suggestions SUGGEST_COHORT: str = f"{SUGGEST_PREFIX}_cohort" SUGGEST_SIRET: str = f"{SUGGEST_PREFIX}_siret" @@ -46,3 +58,4 @@ class COLS: # Matching MATCH_WORDS: str = "match_words" MATCH_SCORE: str = "match_score" + MATCH_THRESHOLD: str = "match_threshold" diff --git a/dags/enrich/config/dbt.py b/dags/enrich/config/dbt.py index 44e3af97e..bf1a6fb83 100644 --- a/dags/enrich/config/dbt.py +++ b/dags/enrich/config/dbt.py @@ -5,6 +5,10 @@ @dataclass(frozen=True) class DBT: + # RGPD + MARTS_ENRICH_RGPD_SUGGESTIONS: str = "marts_enrich_acteurs_rgpd_suggest" + + # Closed MARTS_ENRICH_AE_CLOSED_CANDIDATES: str = "marts_enrich_acteurs_closed_candidates" MARTS_ENRICH_AE_CLOSED_REPLACED_SAME_SIREN: str = ( "marts_enrich_acteurs_closed_suggest_replaced_same_siren" @@ -15,3 +19,4 @@ class DBT: MARTS_ENRICH_AE_CLOSED_NOT_REPLACED: str = ( "marts_enrich_acteurs_closed_suggest_not_replaced" ) + MARTS_ENRICH_AE_RGPD: str = "marts_enrich_ae_rgpd" diff --git a/dags/enrich/config/models.py b/dags/enrich/config/models.py index 3ec5d7eab..23b75907b 100644 --- a/dags/enrich/config/models.py +++ b/dags/enrich/config/models.py @@ -76,12 +76,24 @@ def filters(self) -> list[dict[str, str]]: class EnrichActeursClosedConfig(EnrichBaseConfig): + dbt_models_refresh_command: str = Field( + default="dbt build --select tag:marts,tag:enrich,tag:closed", + description="🔄 Commande DBT à exécuter pour rafraîchir les modèles", + ) filter_contains__etab_naf: Optional[str] = Field( default=None, description="🔍 Filtre sur **NAF AE Etablissement**", ) +class EnrichActeursRGPDConfig(EnrichBaseConfig): + dbt_models_refresh_command: str = Field( + default="dbt build --select tag:marts,tag:enrich,tag:rgpd", + description="🔄 Commande DBT à exécuter pour rafraîchir les modèles", + ) + + DAG_ID_TO_CONFIG_MODEL = { "enrich_acteurs_closed": EnrichActeursClosedConfig, + "enrich_acteurs_rgpd": EnrichActeursRGPDConfig, } diff --git a/dags/enrich/config/tasks.py b/dags/enrich/config/tasks.py index a56767fa6..a3dc8cc45 100644 --- a/dags/enrich/config/tasks.py +++ b/dags/enrich/config/tasks.py @@ -8,6 +8,9 @@ class TASKS: # Config CONFIG_CREATE: str = "enrich_config_create" + # RGPD + ENRICH_RGPD_SUGGESTIONS: str = "enrich_acteurs_rgpd_suggestions" + # Read tasks ENRICH_CLOSED_REPLACED_SAME_SIREN: str = "enrich_acteurs_closed_replaced_same_siren" ENRICH_CLOSED_REPLACED_OTHER_SIREN: str = ( @@ -24,3 +27,4 @@ class TASKS: "enrich_acteurs_closed_suggestions_not_replaced" ) ENRICH_DBT_MODELS_REFRESH: str = "enrich_dbt_models_refresh" + READ_AE_RGPD: str = "enrich_ae_rgpd_read" diff --git a/dags/enrich/config/xcoms.py b/dags/enrich/config/xcoms.py index eb07c88c2..c2b829629 100644 --- a/dags/enrich/config/xcoms.py +++ b/dags/enrich/config/xcoms.py @@ -16,13 +16,14 @@ @dataclass(frozen=True) class XCOMS: CONFIG: str = "config" - DF_READ: str = "df_read" - DF_MATCH: str = "df_match" DF_CLOSED_REPLACED_SAME_SIREN: str = "df_acteurs_closed_replaced_same_siren" DF_CLOSED_REPLACED_OTHER_SIREN: str = "df_acteurs_closed_replaced_other_siren" DF_CLOSED_NOT_REPLACED: str = "df_acteurs_closed_not_replaced" + DF_READ: str = "df_read" + DF_MATCH: str = "df_match" + def xcom_pull(ti: TaskInstance, key: str, skip_if_empty: bool = False) -> Any: """For pulls, we create a helper to constrain keys diff --git a/dags/enrich/dags/enrich_acteurs_rgpd.py b/dags/enrich/dags/enrich_acteurs_rgpd.py new file mode 100644 index 000000000..12d13d8fa --- /dev/null +++ b/dags/enrich/dags/enrich_acteurs_rgpd.py @@ -0,0 +1,52 @@ +"""DAG to anonymize QFDMO acteurs for RGPD""" + +from airflow import DAG +from enrich.config import ( + COHORTS, + DBT, + TASKS, + EnrichActeursRGPDConfig, +) +from enrich.tasks.airflow_logic.enrich_config_create_task import ( + enrich_config_create_task, +) +from enrich.tasks.airflow_logic.enrich_dbt_model_suggest_task import ( + enrich_dbt_model_suggest_task, +) +from enrich.tasks.airflow_logic.enrich_dbt_models_refresh_task import ( + enrich_dbt_models_refresh_task, +) +from shared.config import CATCHUPS, SCHEDULES, START_DATES, config_to_airflow_params + +with DAG( + dag_id="enrich_acteurs_rgpd", + dag_display_name="🕵️ Enrichir - Acteurs RGPD", + default_args={ + "owner": "airflow", + "depends_on_past": False, + "email_on_failure": False, + "email_on_retry": False, + "retries": 0, + }, + description=("Un DAG pour anonymiser les acteurs vs. RGPD"), + tags=["annuaire", "entreprises", "ae", "rgpd", "acteurs", "juridique"], + schedule=SCHEDULES.NONE, + catchup=CATCHUPS.AWLAYS_FALSE, + start_date=START_DATES.YESTERDAY, + params=config_to_airflow_params( + EnrichActeursRGPDConfig( + dbt_models_refresh=True, + filter_equals__acteur_statut="ACTIF", + ) + ), +) as dag: + # Instantiation + config = enrich_config_create_task(dag) + dbt_refresh = enrich_dbt_models_refresh_task(dag) + suggest_rgpd = enrich_dbt_model_suggest_task( + dag, + task_id=TASKS.ENRICH_RGPD_SUGGESTIONS, + cohort=COHORTS.RGPD, + dbt_model_name=DBT.MARTS_ENRICH_RGPD_SUGGESTIONS, + ) + config >> dbt_refresh >> suggest_rgpd # type: ignore diff --git a/dags/enrich/tasks/business_logic/enrich_dbt_model_to_suggestions.py b/dags/enrich/tasks/business_logic/enrich_dbt_model_to_suggestions.py index 3a93a1bc9..533ebc1d0 100644 --- a/dags/enrich/tasks/business_logic/enrich_dbt_model_to_suggestions.py +++ b/dags/enrich/tasks/business_logic/enrich_dbt_model_to_suggestions.py @@ -35,10 +35,36 @@ def changes_prepare( ).model_dump() +def changes_prepare_rgpd( + row: dict, +) -> tuple[list[dict], dict]: + """Prepare suggestions for RGPD cohorts""" + from data.models.changes import ChangeActeurRgpdAnonymize + + changes = [] + model_params = { + "id": row[COLS.ACTEUR_ID], + } + changes.append( + changes_prepare( + model=ChangeActeurRgpdAnonymize, + model_params=model_params, + order=1, + reason="🕵 Anonymisation RGPD", + entity_type="acteur_displayed", + ) + ) + contexte = { + "statut": row[COLS.ACTEUR_STATUT], + "noms d'origine": row[COLS.ACTEUR_NOMS_ORIGINE], + } + return changes, contexte + + def changes_prepare_closed_not_replaced( row: dict, ) -> tuple[list[dict], dict]: - """Prepare suggestion changes for closed not replaced cohorts""" + """Prepare suggestions for closed not replaced cohorts""" from data.models.changes import ChangeActeurUpdateData from qfdmo.models import ActeurStatus @@ -157,6 +183,7 @@ def changes_prepare_closed_replaced( COHORTS.CLOSED_NOT_REPLACED: changes_prepare_closed_not_replaced, COHORTS.CLOSED_REP_OTHER_SIREN: changes_prepare_closed_replaced, COHORTS.CLOSED_REP_SAME_SIREN: changes_prepare_closed_replaced, + COHORTS.RGPD: changes_prepare_rgpd, } @@ -177,6 +204,7 @@ def enrich_dbt_model_to_suggestions( COHORTS.CLOSED_NOT_REPLACED: SuggestionAction.ENRICH_ACTEURS_CLOSED, COHORTS.CLOSED_REP_OTHER_SIREN: SuggestionAction.ENRICH_ACTEURS_CLOSED, COHORTS.CLOSED_REP_SAME_SIREN: SuggestionAction.ENRICH_ACTEURS_CLOSED, + COHORTS.RGPD: SuggestionAction.ENRICH_ACTEURS_RGPD, } # Validation diff --git a/dags_unit_tests/enrich/tasks/test_enrich_suggestions_rgpd.py b/dags_unit_tests/enrich/tasks/test_enrich_suggestions_rgpd.py new file mode 100644 index 000000000..69e9c74d2 --- /dev/null +++ b/dags_unit_tests/enrich/tasks/test_enrich_suggestions_rgpd.py @@ -0,0 +1,81 @@ +import pandas as pd +import pytest +from enrich.config import COHORTS, COLS +from enrich.tasks.business_logic.enrich_dbt_model_to_suggestions import ( + enrich_dbt_model_to_suggestions, +) + +from data.models.changes.acteur_rgpd_anonymize import ACTEUR_FIELDS_TO_ANONYMIZE + + +@pytest.mark.django_db +class TestEnrichSuggestionsRgpd: + + @pytest.fixture + def df_rgpd(self): + return pd.DataFrame( + { + COLS.SUGGEST_COHORT: [COHORTS.RGPD] * 2, + COLS.ACTEUR_ID: ["rgpd1", "rgpd2"], + COLS.ACTEUR_STATUT: ["ACTIF"] * 2, + COLS.ACTEUR_NOMS_ORIGINE: ["nom rgpd1", "nom rgpd2"], + COLS.ACTEUR_NOM: ["nom rgpd1", "nom rgpd2"], + } + ) + + @pytest.fixture + def acteurs(self, df_rgpd): + # The point of the RGPD pipeline is to anonymize the data + # everywhere in our DB, thus it won't follow the usual pattern + # of creating/updating a revision only, it overwrites acteurs + # wherever they are found WITHOUT creating revivisions + from unit_tests.qfdmo.acteur_factory import ActeurFactory, RevisionActeurFactory + + # rgpd1 only in base + ActeurFactory(identifiant_unique="rgpd1", nom="nom rgpd1") + # rgpd2 in both base and revision + ActeurFactory(identifiant_unique="rgpd2", nom="nom rgpd2") + RevisionActeurFactory(pk="rgpd2", nom="nom rgpd2") + + @pytest.fixture + def suggestions_applied(self, acteurs, df_rgpd): + """Generating and applying suggestions""" + from data.models.suggestion import Suggestion, SuggestionCohorte + + enrich_dbt_model_to_suggestions( + df=df_rgpd, + cohort=COHORTS.RGPD, + identifiant_action="test_rgpd", + dry_run=False, + ) + cohort = SuggestionCohorte.objects.get(identifiant_action="test_rgpd") + suggestions = Suggestion.objects.filter(suggestion_cohorte=cohort) + assert len(suggestions) == 2 + # Apply suggestions + for suggestion in suggestions: + suggestion.apply() + + def test_changed_only_in_base(self, suggestions_applied): + # Test that the acteur in base is changed WITHOUT creating a revision + from qfdmo.models import Acteur + + rgpd1 = Acteur.objects.get(pk="rgpd1") + assert all( + getattr(rgpd1, field) == value + for field, value in ACTEUR_FIELDS_TO_ANONYMIZE.items() + ) + + def test_changed_in_both(self, suggestions_applied): + # Test that the acteur in both base and revision is changed + from qfdmo.models import Acteur, RevisionActeur + + rgpd2 = Acteur.objects.get(pk="rgpd2") + assert all( + getattr(rgpd2, field) == value + for field, value in ACTEUR_FIELDS_TO_ANONYMIZE.items() + ) + rgpd2_rev = RevisionActeur.objects.get(pk="rgpd2") + assert all( + getattr(rgpd2_rev, field) == value + for field, value in ACTEUR_FIELDS_TO_ANONYMIZE.items() + ) diff --git a/data/migrations/0012_alter_suggestioncohorte_type_action.py b/data/migrations/0012_alter_suggestioncohorte_type_action.py new file mode 100644 index 000000000..01048ae19 --- /dev/null +++ b/data/migrations/0012_alter_suggestioncohorte_type_action.py @@ -0,0 +1,33 @@ +# Generated by Django 5.1.6 on 2025-04-24 12:12 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("data", "0011_alter_suggestioncohorte_type_action"), + ] + + operations = [ + migrations.AlterField( + model_name="suggestioncohorte", + name="type_action", + field=models.CharField( + blank=True, + choices=[ + ("CRAWL_URLS", "🔗 URLs scannées"), + ("ENRICH_ACTEURS_CLOSED", "🚪 Acteurs fermés"), + ("ENRICH_ACTEURS_RGPD", "🕵 Anonymisation RGPD"), + ("CLUSTERING", "regroupement/déduplication des acteurs"), + ("SOURCE_AJOUT", "ingestion de source de données - nouveau acteur"), + ( + "SOURCE_MODIFICATION", + "ingestion de source de données - modification d'acteur existant", + ), + ("SOURCE_SUPRESSION", "ingestion de source de données"), + ], + max_length=50, + ), + ), + ] diff --git a/data/models/changes/__init__.py b/data/models/changes/__init__.py index e4fdc890c..ab172c0c0 100644 --- a/data/models/changes/__init__.py +++ b/data/models/changes/__init__.py @@ -3,12 +3,14 @@ from .acteur_create_as_parent import ChangeActeurCreateAsParent from .acteur_delete_as_parent import ChangeActeurDeleteAsParent from .acteur_keep_as_parent import ChangeActeurKeepAsParent +from .acteur_rgpd_anonymize import ChangeActeurRgpdAnonymize from .acteur_update_data import ChangeActeurUpdateData from .acteur_update_parent_id import ChangeActeurUpdateParentId from .acteur_verify_in_revision import ChangeActeurVerifyRevision from .sample_model_do_nothing import SampleModelDoNothing CHANGE_MODELS = { + ChangeActeurRgpdAnonymize.name(): ChangeActeurRgpdAnonymize, ChangeActeurUpdateData.name(): ChangeActeurUpdateData, ChangeActeurCreateAsChild.name(): ChangeActeurCreateAsChild, ChangeActeurCreateAsParent.name(): ChangeActeurCreateAsParent, diff --git a/data/models/changes/acteur_rgpd_anonymize.py b/data/models/changes/acteur_rgpd_anonymize.py new file mode 100644 index 000000000..00d29f163 --- /dev/null +++ b/data/models/changes/acteur_rgpd_anonymize.py @@ -0,0 +1,62 @@ +"""Special change model dedicated to RGPD because: + +- NORMALLY we version data through RevisionActeur + + consequence: we create a Revision if it doesn't exist + +- HOWEVER WITH RGPD we don't do data versioning, we overwrite + the data so it disappears from our DB + = consequence: we don't create a Revision if it doesn't exist + (again we are not versioning, just overwriting) + +Since the approach to RGPD should be consistent, we don't +expect the model to take any other input than the ID of the acteur +we are changing, and the model takes care of the rest +""" + +from datetime import datetime, timezone + +from data.models.changes.acteur_abstract import ChangeActeurAbstract +from qfdmo.models import Acteur, ActeurStatus, RevisionActeur + +VALUE_ANONYMIZED = "ANONYMISE POUR RAISON RGPD" +ACTEUR_FIELDS_TO_ANONYMIZE = { + "nom": VALUE_ANONYMIZED, + "nom_officiel": VALUE_ANONYMIZED, + "nom_commercial": VALUE_ANONYMIZED, + "email": "", # Consequence of forcing empty strings in DB + "telephone": VALUE_ANONYMIZED, + "adresse": VALUE_ANONYMIZED, + "adresse_complement": VALUE_ANONYMIZED, + "statut": ActeurStatus.INACTIF, +} + + +class ChangeActeurRgpdAnonymize(ChangeActeurAbstract): + @classmethod + def name(cls) -> str: + return "acteur_rgpd_anonymize" + + def validate(self) -> list[Acteur | RevisionActeur]: + if self.data: + raise ValueError("Pour RGPD ne pas fournir de data, le modèle efface") + # The parent should already exist in revision or base + # and we return all its instances to overwrite them all + instances = [] + rev = RevisionActeur.objects.filter(pk=self.id).first() + if rev: + instances.append(rev) + instances.append(Acteur.objects.get(pk=self.id)) + return instances + + def apply(self): + # For each instance found + instances = self.validate() + for instance in instances: + # We anonymize the fields + for key, value in ACTEUR_FIELDS_TO_ANONYMIZE.items(): + setattr(instance, key, value) + + # Special case for comments + now = datetime.now(timezone.utc).strftime("le %Y-%m-%d à %H:%M:%S UTC") + instance.commentaires_ajouter(f"{VALUE_ANONYMIZED} {now}") + instance.save() diff --git a/data/models/suggestion.py b/data/models/suggestion.py index 383a5eee0..b71955c73 100644 --- a/data/models/suggestion.py +++ b/data/models/suggestion.py @@ -55,6 +55,7 @@ class SuggestionCohorteStatut(models.TextChoices): class SuggestionAction(models.TextChoices): CRAWL_URLS = SUGGESTION_CRAWL_URLS, "🔗 URLs scannées" ENRICH_ACTEURS_CLOSED = "ENRICH_ACTEURS_CLOSED", "🚪 Acteurs fermés" + ENRICH_ACTEURS_RGPD = "ENRICH_ACTEURS_RGPD", "🕵 Anonymisation RGPD" CLUSTERING = SUGGESTION_CLUSTERING, "regroupement/déduplication des acteurs" SOURCE_AJOUT = ( SUGGESTION_SOURCE_AJOUT, @@ -81,6 +82,8 @@ class Meta: verbose_name="Identifiant de l'execution", help_text="(ex : run_id pour Airflow)", ) + # TODO: once all suggestions migrated to pydantic, we should be able to remove this + # field as all changes will be done generically through changes. apply() method type_action = models.CharField( choices=SuggestionAction.choices, max_length=50, @@ -177,6 +180,7 @@ def display_contexte_details(self): if self.suggestion_cohorte.type_action in [ SuggestionAction.CLUSTERING, SuggestionAction.CRAWL_URLS, + SuggestionAction.ENRICH_ACTEURS_RGPD, ]: context["details_open"] = True @@ -197,6 +201,7 @@ def display_suggestion_details(self): template_name = "data/_partials/crawl_urls_suggestion_details.html" elif self.suggestion_cohorte.type_action in [ SuggestionAction.ENRICH_ACTEURS_CLOSED, + SuggestionAction.ENRICH_ACTEURS_RGPD, ]: template_name = "data/_partials/suggestion_details_changes.html" template_context = self.suggestion @@ -234,6 +239,8 @@ def display_suggestion_details(self): and isinstance(self.suggestion, dict) ): template_name = "data/_partials/ajout_suggestion_details.html" + elif self.suggestion_cohorte.type_action == SuggestionAction.CRAWL_URLS: + template_name = "data/_partials/crawl_urls_suggestion_details.html" return render_to_string(template_name, template_context) @@ -320,6 +327,7 @@ def apply(self): SuggestionAction.CLUSTERING, SuggestionAction.CRAWL_URLS, SuggestionAction.ENRICH_ACTEURS_CLOSED, + SuggestionAction.ENRICH_ACTEURS_RGPD, ]: changes = self.suggestion["changes"] changes.sort(key=lambda x: x["order"]) diff --git a/dbt/models/intermediate/acteurs/int_acteur.sql b/dbt/models/intermediate/acteurs/int_acteur.sql index 45dac5061..4fc0c8508 100644 --- a/dbt/models/intermediate/acteurs/int_acteur.sql +++ b/dbt/models/intermediate/acteurs/int_acteur.sql @@ -35,4 +35,4 @@ SELECT ra.identifiant_unique IS NOT NULL AS revision_existe FROM {{ ref('base_acteur') }} AS a FULL JOIN {{ ref('base_revisionacteur') }} AS ra - ON a.identifiant_unique = ra.identifiant_unique + ON a.identifiant_unique = ra.identifiant_unique \ No newline at end of file diff --git a/dbt/models/intermediate/ae_annuaire_entreprises/int_ae_etablissement.sql b/dbt/models/intermediate/ae_annuaire_entreprises/int_ae_etablissement.sql index b313e9eac..df63e0467 100644 --- a/dbt/models/intermediate/ae_annuaire_entreprises/int_ae_etablissement.sql +++ b/dbt/models/intermediate/ae_annuaire_entreprises/int_ae_etablissement.sql @@ -67,4 +67,4 @@ ON unite.siren = LEFT(etab.siret,9) /* Here we keep unavailable names as int_ models aren't responsible for business logic. Keeping allows investigating AND nom != {{ value_unavailable() }} -*/ \ No newline at end of file +*/ diff --git a/dbt/models/marts/enrich/marts_enrich_acteurs_closed_replaced.sql b/dbt/models/marts/enrich/marts_enrich_acteurs_closed_replaced.sql index 94c964759..4ce20c003 100644 --- a/dbt/models/marts/enrich/marts_enrich_acteurs_closed_replaced.sql +++ b/dbt/models/marts/enrich/marts_enrich_acteurs_closed_replaced.sql @@ -67,4 +67,4 @@ WITH potential_replacements AS ( SELECT * FROM potential_replacements WHERE replacement_priority=1 /* We don't want to propose suggests with unavailable names */ -AND suggest_nom != {{ value_unavailable() }} \ No newline at end of file +AND suggest_nom != {{ value_unavailable() }} diff --git a/dbt/models/marts/enrich/marts_enrich_acteurs_rgpd_suggest.sql b/dbt/models/marts/enrich/marts_enrich_acteurs_rgpd_suggest.sql new file mode 100644 index 000000000..b2ed9819b --- /dev/null +++ b/dbt/models/marts/enrich/marts_enrich_acteurs_rgpd_suggest.sql @@ -0,0 +1,87 @@ +/* +Model to find entries from AE's unite legal which directors names +around found inside our acteurs names. + +Notes: + - 🧹 Pre-matching/filtering at SQL level to reduce data size (13M rows) +*/ +{{ + config( + materialized = 'view', + tags=['marts', 'enrich', 'ae', 'annuaire_entreprises', 'unite_legale', 'rgpd'], + ) +}} + +WITH acteurs_with_siren AS ( + SELECT + -- Extract SIREN from SIRET as we have SIREN issues in our DB + LEFT(siret,9) AS siren, + identifiant_unique AS id, + TRIM(REGEXP_REPLACE( + CONCAT(nom || ', ' || nom_officiel || ', ' || nom_commercial), + ', , ', + '') + ) AS noms_origine, + udf_normalize_string_for_match(CONCAT(nom || ' ' || nom_officiel || ' ' || nom_commercial)) AS noms_normalises, + commentaires, + statut + FROM {{ ref('marts_carte_acteur') }} + /* + We have normalization issues with our SIREN field in our DB + and we obtain better matching by reconstructing SIREN via SIRET + */ + WHERE siret IS NOT NULL AND siret != '' AND LENGTH(siret) = 14 +), unite_matching_acteurs_on_siren AS ( + SELECT + acteurs.id AS acteur_id, + acteurs.siren AS acteur_siren, + acteurs.noms_origine AS acteur_noms_origine, + acteurs.noms_normalises AS acteur_noms_normalises, + acteurs.commentaires AS acteur_commentaires, + acteurs.statut AS acteur_statut, + -- Unite legale fields + /* + We don't care which one is which, we aggregate to + reduce data size and we will perform a more precise + post-match in Python + */ + udf_normalize_string_for_match(dirigeant_nom) AS unite_dirigeant_nom_normalise, + udf_normalize_string_for_match(dirigeant_nom_usage) AS unite_dirigeant_nom_usage_normalise, + udf_normalize_string_for_match(dirigeant_pseudonyme) AS unite_dirigeant_pseudonyme_normalise, + udf_normalize_string_for_match(dirigeant_prenom1) AS unite_dirigeant_prenom1_normalise, + udf_normalize_string_for_match(dirigeant_prenom2) AS unite_dirigeant_prenom2_normalise, + udf_normalize_string_for_match(dirigeant_prenom3) AS unite_dirigeant_prenom3_normalise, + udf_normalize_string_for_match(dirigeant_prenom4) AS unite_dirigeant_prenom4_normalise, + udf_columns_concat_unique_non_empty( + dirigeant_nom, + dirigeant_nom_usage, + dirigeant_pseudonyme, + dirigeant_prenom1, + dirigeant_prenom2, + dirigeant_prenom3, + dirigeant_prenom4 + ) AS unite_dirigeants_noms_prenoms + FROM {{ ref('int_ae_unite_legale') }} AS unite + LEFT JOIN acteurs_with_siren AS acteurs ON acteurs.siren = unite.siren + WHERE + unite.est_actif IS FALSE -- we only anonymize inactive acteurs + AND a_dirigeant_noms_ou_prenoms_non_null -- unite with any directors names available + AND acteurs.siren IS NOT NULL +), suggestions_with_minimum_matching_words AS ( + SELECT + * + FROM unite_matching_acteurs_on_siren + WHERE ( -- Any of the directors names appear in the acteur names + position(unite_dirigeant_nom_normalise IN acteur_noms_normalises) > 0 + OR position(unite_dirigeant_nom_usage_normalise IN acteur_noms_normalises) > 0 + OR position(unite_dirigeant_pseudonyme_normalise IN acteur_noms_normalises) > 0 + OR position(unite_dirigeant_prenom1_normalise IN acteur_noms_normalises) > 0 + OR position(unite_dirigeant_prenom2_normalise IN acteur_noms_normalises) > 0 + OR position(unite_dirigeant_prenom3_normalise IN acteur_noms_normalises) > 0 + OR position(unite_dirigeant_prenom4_normalise IN acteur_noms_normalises) > 0 + ) +) +SELECT + 'Anonymisation RGPD' AS suggest_cohort, + * +FROM suggestions_with_minimum_matching_words \ No newline at end of file diff --git a/unit_tests/data/models/changes/test_acteur_rgpd_anonymize.py b/unit_tests/data/models/changes/test_acteur_rgpd_anonymize.py new file mode 100644 index 000000000..a1e8a2b94 --- /dev/null +++ b/unit_tests/data/models/changes/test_acteur_rgpd_anonymize.py @@ -0,0 +1,116 @@ +import json +import re + +import pytest +from django.contrib.gis.geos import Point + +from data.models.changes.acteur_rgpd_anonymize import ( + ChangeActeurRgpdAnonymize, +) +from qfdmo.models.acteur import Acteur, ActeurStatus, ActeurType, RevisionActeur + +TEST_DATA = { + "location": Point(1, 2), + "nom": "🟠 not anonymized", + "nom_officiel": "🟠 not anonymized", + "nom_commercial": "🟠 not anonymized", + "description": "🟠 not anonymized", + "email": "me@myself.com", + "telephone": "🟠 not anonymized", + "adresse": "🟠 not anonymized", + "adresse_complement": "🟠 not anonymized", + "statut": ActeurStatus.ACTIF, + "commentaires": " ", +} + +# Intentionally replicating & hardcoding the expected +# changes to prevent accidental modification to model +# without updating the tests +CHANGE_ANON = "ANONYMISE POUR RAISON RGPD" +CHANGES_EXPECTED = { + "nom": CHANGE_ANON, + "nom_officiel": CHANGE_ANON, + "nom_commercial": CHANGE_ANON, + "email": "", # Consequence of allowing empty strings in DB + "telephone": CHANGE_ANON, + "adresse": CHANGE_ANON, + "adresse_complement": CHANGE_ANON, + "statut": ActeurStatus.INACTIF, +} +COMMENT_PATTERN = CHANGE_ANON + r" le \d{4}-\d{2}-\d{2} à \d{2}:\d{2}:\d{2} UTC" + + +@pytest.mark.django_db +class TestChangeActeurRgpdAnonymize: + def test_name(self): + assert ChangeActeurRgpdAnonymize.name() == "acteur_rgpd_anonymize" + + def test_raise_if_data_provided(self): + change = ChangeActeurRgpdAnonymize(id="dummy", data={"nom": "dummy"}) + with pytest.raises(ValueError, match="Pour RGPD ne pas fournir de data"): + change.apply() + + def test_raise_if_acteur_does_not_exist(self): + change = ChangeActeurRgpdAnonymize(id="dummy") + with pytest.raises(Acteur.DoesNotExist): + change.apply() + + def test_working_only_in_base(self): + # We start by creating acteur only in base + at1 = ActeurType.objects.create(code="at1") + id1 = "id1" + data = TEST_DATA.copy() + data["acteur_type"] = at1 + data["identifiant_unique"] = id1 + Acteur.objects.create(**data) + + # We check that acteur isn't in revision yet + assert RevisionActeur.objects.filter(pk=id1).count() == 0 + + # Since RGPD changes are to owerwrite consistently, we don't + # pass any data to the model, only the ID of the acteur + # and the model takes care of the rest + ChangeActeurRgpdAnonymize(id=id1).apply() + + # We check that no revision was created because we overwrite + # hence don't want Revisions meants for versioning + assert not RevisionActeur.objects.filter(pk=id1).exists() + + # We check that acteur in base was anonymized + base = Acteur.objects.get(pk=id1) + for key, value in CHANGES_EXPECTED.items(): + assert getattr(base, key) == value + + # Comments + comments = json.loads(base.commentaires) + assert re.match(COMMENT_PATTERN, comments[0]["message"]) + + # We check that other fields were not modified + assert base.description == "🟠 not anonymized" + + def test_working_both_base_and_revision(self): + # We start by creating acteur BOTH in base and revision + at1 = ActeurType.objects.create(code="at1") + id2 = "id2" + data = TEST_DATA.copy() + data["acteur_type"] = at1 + data["identifiant_unique"] = id2 + Acteur.objects.create(**data) + RevisionActeur.objects.create(**data) + + # Same remark as previous test on not having to pass data + ChangeActeurRgpdAnonymize(id=id2).apply() + + # In this case we check that all instances were anonymized + instances = [ + Acteur.objects.get(pk=id2), + RevisionActeur.objects.get(pk=id2), + ] + for instance in instances: + for key, value in CHANGES_EXPECTED.items(): + assert getattr(instance, key) == value + assert instance.description == "🟠 not anonymized" + + # Comments + comments = json.loads(instance.commentaires) + assert re.match(COMMENT_PATTERN, comments[0]["message"])