Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
29bdd0f
RGPD: anonymiser noms acteurs: init commit
maxcorbeau Mar 12, 2025
1bb1875
ajout template django data
maxcorbeau Mar 12, 2025
140e348
migration Django data
maxcorbeau Mar 12, 2025
fc411fe
refacto dags/rgpd -> dags/enrich, utilisation dbt
maxcorbeau Mar 19, 2025
b7df67b
ajout nouvelle config & test rgpd
maxcorbeau Mar 19, 2025
3952b35
modèle RGPD, tests & migration
maxcorbeau Mar 20, 2025
1579dc3
v1 qui fonctionne
maxcorbeau Mar 24, 2025
43ce388
début refacto pour factoriser RGPD + fermetures
maxcorbeau Mar 31, 2025
cd401c2
début refacto et progrès vers décision métier
maxcorbeau Apr 3, 2025
9f23e3e
suggestions: début de création
maxcorbeau Apr 7, 2025
2c8364f
suggestions: presque fonctionnel
maxcorbeau Apr 7, 2025
670b598
suggestions: tests qui fonctionnent
maxcorbeau Apr 7, 2025
d4cc1bb
refacto du DAG avec dbt & suggestions
maxcorbeau Apr 9, 2025
4c71873
dbt: nettoyage & sampling
maxcorbeau Apr 9, 2025
4683e55
DAG & Admin UI fonctionnels
maxcorbeau Apr 10, 2025
4cc71ff
create_as_child, numéro rue, UI
maxcorbeau Apr 10, 2025
ad80ac5
refacto modèles & tests
maxcorbeau Apr 14, 2025
f5214d0
ajout tolérance échec
maxcorbeau Apr 14, 2025
01a3223
udfs: norma exclusion petits mots, cleanup
maxcorbeau Apr 14, 2025
7d65881
suppression code RGPD
maxcorbeau Apr 16, 2025
c8f7a97
cont. suppression RGPD & move data_reconstruct
maxcorbeau Apr 16, 2025
840982d
recréer migration django + fix imports cassés via rebase
maxcorbeau Apr 16, 2025
9f2fed9
fix imports + data_serialize en doublon
maxcorbeau Apr 16, 2025
0b5abcc
cont. del rgpd, fix acteurs model & tests
maxcorbeau Apr 16, 2025
f35c793
suppression des prints
maxcorbeau Apr 16, 2025
ddf300d
drop changes in restore script
maxcorbeau Apr 16, 2025
985f7a1
cont. fix script import
maxcorbeau Apr 16, 2025
c8b60dc
renommage replace -> suggest
maxcorbeau Apr 21, 2025
dd0d6e9
cohorte: simplification, label uniquement
maxcorbeau Apr 21, 2025
9da7b0d
fix migration after rebase
maxcorbeau Apr 21, 2025
84e5698
gestion contexte + del marts villes
maxcorbeau Apr 21, 2025
441c679
gestion contexte
maxcorbeau Apr 21, 2025
dfca1df
Anonymiser acteurs RGPD
maxcorbeau Apr 16, 2025
d82a38f
intégration dbt & django admin
maxcorbeau Apr 16, 2025
d62223e
tests de suggestions
maxcorbeau Apr 21, 2025
7a390f4
supprimer vieille migration
maxcorbeau Apr 24, 2025
6a999aa
remettre dbt iso prod after rebase
maxcorbeau Apr 24, 2025
30229e5
misc fix modèles dbt
maxcorbeau Apr 24, 2025
257d3ef
migration django
maxcorbeau Apr 24, 2025
4395ea7
del fichier bidon passé inapercu
maxcorbeau Apr 24, 2025
31f2b0e
nettoyage après revue
maxcorbeau Apr 24, 2025
bbed4ed
del doublon import data_reconstruct
maxcorbeau Apr 24, 2025
1eaf544
del code mort
maxcorbeau Apr 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion dags/enrich/config/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from .cohorts import COHORTS # noqa: F401
from .columns import COLS, SUGGEST_PREFIX # noqa: F401
from .dbt import DBT # noqa: F401
from .models import DAG_ID_TO_CONFIG_MODEL, EnrichActeursClosedConfig # noqa: F401
from .models import ( # noqa: F401
DAG_ID_TO_CONFIG_MODEL,
EnrichActeursClosedConfig,
EnrichActeursRGPDConfig,
)
from .tasks import TASKS # noqa: F401
from .xcoms import XCOMS, xcom_pull # noqa: F401
1 change: 1 addition & 0 deletions dags/enrich/config/cohorts.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ class COHORTS:
"🚪 Acteurs Fermés: 🟡 remplacés par SIRET d'un autre SIREN"
)
CLOSED_REP_SAME_SIREN = "🚪 Acteurs Fermés: 🟢 remplacés par SIRET du même SIREN"
RGPD = "Anonymisation RGPD"
15 changes: 14 additions & 1 deletion dags/enrich/config/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,22 @@ class COLS:
ACTEUR_NAF: str = "acteur_naf"
ACTEUR_LONGITUDE: str = "acteur_longitude"
ACTEUR_LATITUDE: str = "acteur_latitude"

ACTEUR_STATUT: str = "acteur_statut"
# Annuaire Entreprise
AE_DIRIGEANTS_NOMS: str = "ae_dirigeants_noms_prenoms"

# Suggestions
SUGGEST_COHORT_CODE: str = "suggestion_cohorte_code"
SUGGEST_COHORT: str = "suggest_cohort"

# Replacements
SUGGEST_SIRET: str = "suggest_siret"
SUGGEST_NOM: str = "suggest_nom"
SUGGEST_ADRESSE: str = "suggest_adresse"
SUGGEST_CODE_POSTAL: str = "suggest_code_postal"
SUGGEST_VILLE: str = "suggest_ville"
SUGGEST_NAF: str = "suggest_naf"

# Suggestions
SUGGEST_COHORT: str = f"{SUGGEST_PREFIX}_cohort"
SUGGEST_SIRET: str = f"{SUGGEST_PREFIX}_siret"
Expand All @@ -46,3 +58,4 @@ class COLS:
# Matching
MATCH_WORDS: str = "match_words"
MATCH_SCORE: str = "match_score"
MATCH_THRESHOLD: str = "match_threshold"
5 changes: 5 additions & 0 deletions dags/enrich/config/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

@dataclass(frozen=True)
class DBT:
# RGPD
MARTS_ENRICH_RGPD_SUGGESTIONS: str = "marts_enrich_acteurs_rgpd_suggest"

# Closed
MARTS_ENRICH_AE_CLOSED_CANDIDATES: str = "marts_enrich_acteurs_closed_candidates"
MARTS_ENRICH_AE_CLOSED_REPLACED_SAME_SIREN: str = (
"marts_enrich_acteurs_closed_suggest_replaced_same_siren"
Expand All @@ -15,3 +19,4 @@ class DBT:
MARTS_ENRICH_AE_CLOSED_NOT_REPLACED: str = (
"marts_enrich_acteurs_closed_suggest_not_replaced"
)
MARTS_ENRICH_AE_RGPD: str = "marts_enrich_ae_rgpd"
12 changes: 12 additions & 0 deletions dags/enrich/config/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,24 @@ def filters(self) -> list[dict[str, str]]:


class EnrichActeursClosedConfig(EnrichBaseConfig):
dbt_models_refresh_command: str = Field(
default="dbt build --select tag:marts,tag:enrich,tag:closed",
description="🔄 Commande DBT à exécuter pour rafraîchir les modèles",
)
filter_contains__etab_naf: Optional[str] = Field(
default=None,
description="🔍 Filtre sur **NAF AE Etablissement**",
)


class EnrichActeursRGPDConfig(EnrichBaseConfig):
dbt_models_refresh_command: str = Field(
default="dbt build --select tag:marts,tag:enrich,tag:rgpd",
description="🔄 Commande DBT à exécuter pour rafraîchir les modèles",
)


DAG_ID_TO_CONFIG_MODEL = {
"enrich_acteurs_closed": EnrichActeursClosedConfig,
"enrich_acteurs_rgpd": EnrichActeursRGPDConfig,
}
4 changes: 4 additions & 0 deletions dags/enrich/config/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ class TASKS:
# Config
CONFIG_CREATE: str = "enrich_config_create"

# RGPD
ENRICH_RGPD_SUGGESTIONS: str = "enrich_acteurs_rgpd_suggestions"

# Read tasks
ENRICH_CLOSED_REPLACED_SAME_SIREN: str = "enrich_acteurs_closed_replaced_same_siren"
ENRICH_CLOSED_REPLACED_OTHER_SIREN: str = (
Expand All @@ -24,3 +27,4 @@ class TASKS:
"enrich_acteurs_closed_suggestions_not_replaced"
)
ENRICH_DBT_MODELS_REFRESH: str = "enrich_dbt_models_refresh"
READ_AE_RGPD: str = "enrich_ae_rgpd_read"
5 changes: 3 additions & 2 deletions dags/enrich/config/xcoms.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
@dataclass(frozen=True)
class XCOMS:
CONFIG: str = "config"
DF_READ: str = "df_read"
DF_MATCH: str = "df_match"

DF_CLOSED_REPLACED_SAME_SIREN: str = "df_acteurs_closed_replaced_same_siren"
DF_CLOSED_REPLACED_OTHER_SIREN: str = "df_acteurs_closed_replaced_other_siren"
DF_CLOSED_NOT_REPLACED: str = "df_acteurs_closed_not_replaced"

DF_READ: str = "df_read"
DF_MATCH: str = "df_match"


def xcom_pull(ti: TaskInstance, key: str, skip_if_empty: bool = False) -> Any:
"""For pulls, we create a helper to constrain keys
Expand Down
52 changes: 52 additions & 0 deletions dags/enrich/dags/enrich_acteurs_rgpd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""DAG to anonymize QFDMO acteurs for RGPD"""

from airflow import DAG
from enrich.config import (
COHORTS,
DBT,
TASKS,
EnrichActeursRGPDConfig,
)
from enrich.tasks.airflow_logic.enrich_config_create_task import (
enrich_config_create_task,
)
from enrich.tasks.airflow_logic.enrich_dbt_model_suggest_task import (
enrich_dbt_model_suggest_task,
)
from enrich.tasks.airflow_logic.enrich_dbt_models_refresh_task import (
enrich_dbt_models_refresh_task,
)
from shared.config import CATCHUPS, SCHEDULES, START_DATES, config_to_airflow_params

with DAG(
dag_id="enrich_acteurs_rgpd",
dag_display_name="🕵️ Enrichir - Acteurs RGPD",
default_args={
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
},
description=("Un DAG pour anonymiser les acteurs vs. RGPD"),
tags=["annuaire", "entreprises", "ae", "rgpd", "acteurs", "juridique"],
schedule=SCHEDULES.NONE,
catchup=CATCHUPS.AWLAYS_FALSE,
start_date=START_DATES.YESTERDAY,
params=config_to_airflow_params(
EnrichActeursRGPDConfig(
dbt_models_refresh=True,
filter_equals__acteur_statut="ACTIF",
)
),
) as dag:
# Instantiation
config = enrich_config_create_task(dag)
dbt_refresh = enrich_dbt_models_refresh_task(dag)
suggest_rgpd = enrich_dbt_model_suggest_task(
dag,
task_id=TASKS.ENRICH_RGPD_SUGGESTIONS,
cohort=COHORTS.RGPD,
dbt_model_name=DBT.MARTS_ENRICH_RGPD_SUGGESTIONS,
)
config >> dbt_refresh >> suggest_rgpd # type: ignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,36 @@ def changes_prepare(
).model_dump()


def changes_prepare_rgpd(
row: dict,
) -> tuple[list[dict], dict]:
"""Prepare suggestions for RGPD cohorts"""
from data.models.changes import ChangeActeurRgpdAnonymize

changes = []
model_params = {
"id": row[COLS.ACTEUR_ID],
}
changes.append(
changes_prepare(
model=ChangeActeurRgpdAnonymize,
model_params=model_params,
order=1,
reason="🕵 Anonymisation RGPD",
entity_type="acteur_displayed",
)
)
contexte = {
"statut": row[COLS.ACTEUR_STATUT],
"noms d'origine": row[COLS.ACTEUR_NOMS_ORIGINE],
}
return changes, contexte


def changes_prepare_closed_not_replaced(
row: dict,
) -> tuple[list[dict], dict]:
"""Prepare suggestion changes for closed not replaced cohorts"""
"""Prepare suggestions for closed not replaced cohorts"""
from data.models.changes import ChangeActeurUpdateData
from qfdmo.models import ActeurStatus

Expand Down Expand Up @@ -157,6 +183,7 @@ def changes_prepare_closed_replaced(
COHORTS.CLOSED_NOT_REPLACED: changes_prepare_closed_not_replaced,
COHORTS.CLOSED_REP_OTHER_SIREN: changes_prepare_closed_replaced,
COHORTS.CLOSED_REP_SAME_SIREN: changes_prepare_closed_replaced,
COHORTS.RGPD: changes_prepare_rgpd,
}


Expand All @@ -177,6 +204,7 @@ def enrich_dbt_model_to_suggestions(
COHORTS.CLOSED_NOT_REPLACED: SuggestionAction.ENRICH_ACTEURS_CLOSED,
COHORTS.CLOSED_REP_OTHER_SIREN: SuggestionAction.ENRICH_ACTEURS_CLOSED,
COHORTS.CLOSED_REP_SAME_SIREN: SuggestionAction.ENRICH_ACTEURS_CLOSED,
COHORTS.RGPD: SuggestionAction.ENRICH_ACTEURS_RGPD,
}

# Validation
Expand Down
81 changes: 81 additions & 0 deletions dags_unit_tests/enrich/tasks/test_enrich_suggestions_rgpd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import pandas as pd
import pytest
from enrich.config import COHORTS, COLS
from enrich.tasks.business_logic.enrich_dbt_model_to_suggestions import (
enrich_dbt_model_to_suggestions,
)

from data.models.changes.acteur_rgpd_anonymize import ACTEUR_FIELDS_TO_ANONYMIZE


@pytest.mark.django_db
class TestEnrichSuggestionsRgpd:

@pytest.fixture
def df_rgpd(self):
return pd.DataFrame(
{
COLS.SUGGEST_COHORT: [COHORTS.RGPD] * 2,
COLS.ACTEUR_ID: ["rgpd1", "rgpd2"],
COLS.ACTEUR_STATUT: ["ACTIF"] * 2,
COLS.ACTEUR_NOMS_ORIGINE: ["nom rgpd1", "nom rgpd2"],
COLS.ACTEUR_NOM: ["nom rgpd1", "nom rgpd2"],
}
)

@pytest.fixture
def acteurs(self, df_rgpd):
# The point of the RGPD pipeline is to anonymize the data
# everywhere in our DB, thus it won't follow the usual pattern
# of creating/updating a revision only, it overwrites acteurs
# wherever they are found WITHOUT creating revivisions
from unit_tests.qfdmo.acteur_factory import ActeurFactory, RevisionActeurFactory

# rgpd1 only in base
ActeurFactory(identifiant_unique="rgpd1", nom="nom rgpd1")
# rgpd2 in both base and revision
ActeurFactory(identifiant_unique="rgpd2", nom="nom rgpd2")
RevisionActeurFactory(pk="rgpd2", nom="nom rgpd2")

@pytest.fixture
def suggestions_applied(self, acteurs, df_rgpd):
"""Generating and applying suggestions"""
from data.models.suggestion import Suggestion, SuggestionCohorte

enrich_dbt_model_to_suggestions(
df=df_rgpd,
cohort=COHORTS.RGPD,
identifiant_action="test_rgpd",
dry_run=False,
)
cohort = SuggestionCohorte.objects.get(identifiant_action="test_rgpd")
suggestions = Suggestion.objects.filter(suggestion_cohorte=cohort)
assert len(suggestions) == 2
# Apply suggestions
for suggestion in suggestions:
suggestion.apply()

def test_changed_only_in_base(self, suggestions_applied):
# Test that the acteur in base is changed WITHOUT creating a revision
from qfdmo.models import Acteur

rgpd1 = Acteur.objects.get(pk="rgpd1")
assert all(
getattr(rgpd1, field) == value
for field, value in ACTEUR_FIELDS_TO_ANONYMIZE.items()
)

def test_changed_in_both(self, suggestions_applied):
# Test that the acteur in both base and revision is changed
from qfdmo.models import Acteur, RevisionActeur

rgpd2 = Acteur.objects.get(pk="rgpd2")
assert all(
getattr(rgpd2, field) == value
for field, value in ACTEUR_FIELDS_TO_ANONYMIZE.items()
)
rgpd2_rev = RevisionActeur.objects.get(pk="rgpd2")
assert all(
getattr(rgpd2_rev, field) == value
for field, value in ACTEUR_FIELDS_TO_ANONYMIZE.items()
)
33 changes: 33 additions & 0 deletions data/migrations/0012_alter_suggestioncohorte_type_action.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Generated by Django 5.1.6 on 2025-04-24 12:12

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("data", "0011_alter_suggestioncohorte_type_action"),
]

operations = [
migrations.AlterField(
model_name="suggestioncohorte",
name="type_action",
field=models.CharField(
blank=True,
choices=[
("CRAWL_URLS", "🔗 URLs scannées"),
("ENRICH_ACTEURS_CLOSED", "🚪 Acteurs fermés"),
("ENRICH_ACTEURS_RGPD", "🕵 Anonymisation RGPD"),
("CLUSTERING", "regroupement/déduplication des acteurs"),
("SOURCE_AJOUT", "ingestion de source de données - nouveau acteur"),
(
"SOURCE_MODIFICATION",
"ingestion de source de données - modification d'acteur existant",
),
("SOURCE_SUPRESSION", "ingestion de source de données"),
],
max_length=50,
),
),
]
2 changes: 2 additions & 0 deletions data/models/changes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
from .acteur_create_as_parent import ChangeActeurCreateAsParent
from .acteur_delete_as_parent import ChangeActeurDeleteAsParent
from .acteur_keep_as_parent import ChangeActeurKeepAsParent
from .acteur_rgpd_anonymize import ChangeActeurRgpdAnonymize
from .acteur_update_data import ChangeActeurUpdateData
from .acteur_update_parent_id import ChangeActeurUpdateParentId
from .acteur_verify_in_revision import ChangeActeurVerifyRevision
from .sample_model_do_nothing import SampleModelDoNothing

CHANGE_MODELS = {
ChangeActeurRgpdAnonymize.name(): ChangeActeurRgpdAnonymize,
ChangeActeurUpdateData.name(): ChangeActeurUpdateData,
ChangeActeurCreateAsChild.name(): ChangeActeurCreateAsChild,
ChangeActeurCreateAsParent.name(): ChangeActeurCreateAsParent,
Expand Down
Loading