Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
0f405be
RGPD: anonymiser noms acteurs: init commit
maxcorbeau Mar 12, 2025
6125181
ajout template django data
maxcorbeau Mar 12, 2025
f7f3136
migration Django data
maxcorbeau Mar 12, 2025
95a6f3d
refacto dags/rgpd -> dags/enrich, utilisation dbt
maxcorbeau Mar 19, 2025
0edfa4a
ajout nouvelle config & test rgpd
maxcorbeau Mar 19, 2025
a806be6
modèle RGPD, tests & migration
maxcorbeau Mar 20, 2025
4633021
v1 qui fonctionne
maxcorbeau Mar 24, 2025
1d8a11d
début refacto pour factoriser RGPD + fermetures
maxcorbeau Mar 31, 2025
b746d10
début refacto et progrès vers décision métier
maxcorbeau Apr 3, 2025
4c7c03b
suggestions: début de création
maxcorbeau Apr 7, 2025
d48b1e3
suggestions: presque fonctionnel
maxcorbeau Apr 7, 2025
4a45d91
suggestions: tests qui fonctionnent
maxcorbeau Apr 7, 2025
b75ccaa
refacto du DAG avec dbt & suggestions
maxcorbeau Apr 9, 2025
4dcc24b
dbt: nettoyage & sampling
maxcorbeau Apr 9, 2025
2713d89
DAG & Admin UI fonctionnels
maxcorbeau Apr 10, 2025
3cb369e
create_as_child, numéro rue, UI
maxcorbeau Apr 10, 2025
7f7dde2
refacto modèles & tests
maxcorbeau Apr 14, 2025
2a84f56
ajout tolérance échec
maxcorbeau Apr 14, 2025
de99c1a
udfs: norma exclusion petits mots, cleanup
maxcorbeau Apr 14, 2025
1ddd4ee
suppression code RGPD
maxcorbeau Apr 16, 2025
9d221c8
cont. suppression RGPD & move data_reconstruct
maxcorbeau Apr 16, 2025
55a72d9
recréer migration django + fix imports cassés via rebase
maxcorbeau Apr 16, 2025
89f468d
fix imports + data_serialize en doublon
maxcorbeau Apr 16, 2025
91295e2
cont. del rgpd, fix acteurs model & tests
maxcorbeau Apr 16, 2025
354a541
suppression des prints
maxcorbeau Apr 16, 2025
16aede7
drop changes in restore script
maxcorbeau Apr 16, 2025
836d577
cont. fix script import
maxcorbeau Apr 16, 2025
6a15cc6
renommage replace -> suggest
maxcorbeau Apr 21, 2025
fc01b2b
cohorte: simplification, label uniquement
maxcorbeau Apr 21, 2025
b67bb03
fix migration after rebase
maxcorbeau Apr 21, 2025
c92cb7f
gestion contexte + del marts villes
maxcorbeau Apr 21, 2025
5d7482b
gestion contexte
maxcorbeau Apr 21, 2025
30d0d2c
intégration dbt & django admin
maxcorbeau Apr 16, 2025
0318022
BAN: modèles dbt début
maxcorbeau Apr 16, 2025
0b79a29
fix filtre sur base
maxcorbeau Apr 17, 2025
ba803a7
dbt marts & dag airflow
maxcorbeau Apr 17, 2025
ed1608d
dag ariflow
maxcorbeau Apr 17, 2025
f8736bd
rebase + ajout migration & tests
maxcorbeau Apr 21, 2025
2409b1a
fix after rebase
maxcorbeau Apr 24, 2025
2a2b129
fix after rebase 2
maxcorbeau Apr 24, 2025
c9cd15c
fix après rebase 3
maxcorbeau Apr 24, 2025
33315cb
fix après rebase 4
maxcorbeau Apr 24, 2025
8df023d
del fichier bidon passé inapercu
maxcorbeau Apr 24, 2025
f6258bc
fix after rebase
maxcorbeau Apr 28, 2025
1c10dfc
fix after rebase (rgpd)
maxcorbeau Apr 28, 2025
bf5e989
migration django
maxcorbeau Apr 28, 2025
a0c9c09
petite amélioration commande dbt
maxcorbeau Apr 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dags/enrich/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
DAG_ID_TO_CONFIG_MODEL,
EnrichActeursClosedConfig,
EnrichActeursRGPDConfig,
EnrichActeursVillesConfig,
EnrichDbtModelsRefreshConfig,
)
from .tasks import TASKS # noqa: F401
Expand Down
2 changes: 2 additions & 0 deletions dags/enrich/config/cohorts.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ class COHORTS:
)
CLOSED_REP_SAME_SIREN = "🚪 Acteurs Fermés: 🟢 remplacés par SIRET du même SIREN"
RGPD = "Anonymisation RGPD"
VILLES_TYPO = "🌆 Changement de ville: 🟢 variation d'ortographe"
VILLES_NEW = "🌆 Changement de ville: 🟡 ancienne -> nouvelle"
14 changes: 12 additions & 2 deletions dags/enrich/config/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ class COLS:
# Acteurs
ACTEUR_ID: str = "acteur_id"
ACTEUR_TYPE_ID: str = "acteur_type_id"
ACTEUR_TYPE_CODE: str = "acteur_type_code"
ACTEUR_SOURCE_ID: str = "acteur_source_id"
ACTEUR_SOURCE_CODE: str = "acteur_source_code"
ACTEUR_SIRET: str = "acteur_siret"
ACTEUR_NOM: str = "acteur_nom"
ACTEUR_NOMS_ORIGINE: str = "acteur_noms_origine"
Expand Down Expand Up @@ -43,6 +41,18 @@ class COLS:
SUGGEST_VILLE: str = "suggest_ville"
SUGGEST_NAF: str = "suggest_naf"

# Suggestions
SUGGEST_COHORT_CODE: str = "suggestion_cohorte_code"
SUGGEST_COHORT: str = "suggest_cohort"

# Replacements
SUGGEST_SIRET: str = "suggest_siret"
SUGGEST_NOM: str = "suggest_nom"
SUGGEST_ADRESSE: str = "suggest_adresse"
SUGGEST_CODE_POSTAL: str = "suggest_code_postal"
SUGGEST_VILLE: str = "suggest_ville"
SUGGEST_NAF: str = "suggest_naf"

# Suggestions
SUGGEST_COHORT: str = f"{SUGGEST_PREFIX}_cohort"
SUGGEST_SIRET: str = f"{SUGGEST_PREFIX}_siret"
Expand Down
3 changes: 2 additions & 1 deletion dags/enrich/config/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ class DBT:
MARTS_ENRICH_AE_CLOSED_NOT_REPLACED: str = (
"marts_enrich_acteurs_closed_suggest_not_replaced"
)
MARTS_ENRICH_AE_RGPD: str = "marts_enrich_ae_rgpd"
MARTS_ENRICH_VILLES_TYPO: str = "marts_enrich_acteurs_villes_suggest_typo"
MARTS_ENRICH_VILLES_NEW: str = "marts_enrich_acteurs_villes_suggest_new"
5 changes: 5 additions & 0 deletions dags/enrich/config/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,13 @@ class EnrichDbtModelsRefreshConfig(BaseModel):
)


class EnrichActeursVillesConfig(EnrichBaseConfig):
pass


DAG_ID_TO_CONFIG_MODEL = {
"enrich_acteurs_closed": EnrichActeursClosedConfig,
"enrich_acteurs_rgpd": EnrichActeursRGPDConfig,
"enrich_dbt_models_refresh": EnrichDbtModelsRefreshConfig,
"enrich_acteurs_villes": EnrichActeursVillesConfig,
}
4 changes: 4 additions & 0 deletions dags/enrich/config/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@ class TASKS:
)
ENRICH_DBT_MODELS_REFRESH: str = "enrich_dbt_models_refresh"
READ_AE_RGPD: str = "enrich_ae_rgpd_read"

# Villes
ENRICH_VILLES_TYPO: str = "enrich_acteurs_villes_typo"
ENRICH_VILLES_NEW: str = "enrich_acteurs_villes_new"
63 changes: 63 additions & 0 deletions dags/enrich/dags/enrich_acteurs_villes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""DAG to suggestion city corrections based on BAN data"""

from airflow import DAG
from enrich.config import (
COHORTS,
DBT,
TASKS,
EnrichActeursVillesConfig,
)
from enrich.tasks.airflow_logic.enrich_config_create_task import (
enrich_config_create_task,
)
from enrich.tasks.airflow_logic.enrich_dbt_model_suggest_task import (
enrich_dbt_model_suggest_task,
)
from enrich.tasks.airflow_logic.enrich_dbt_models_refresh_task import (
enrich_dbt_models_refresh_task,
)
from shared.config import CATCHUPS, SCHEDULES, START_DATES, config_to_airflow_params

with DAG(
dag_id="enrich_acteurs_villes",
dag_display_name="🌆 Enrichir - Acteurs Villes",
default_args={
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
},
description=("Un DAG pour suggérer des corrections de villes"),
tags=["annuaire", "entreprises", "ae", "acteurs", "juridique"],
schedule=SCHEDULES.NONE,
catchup=CATCHUPS.AWLAYS_FALSE,
start_date=START_DATES.YESTERDAY,
params=config_to_airflow_params(
EnrichActeursVillesConfig(
dbt_models_refresh=True,
dbt_models_refresh_command=(
"dbt build --select tag:marts,tag:enrich,tag:villes"
),
filter_equals__acteur_statut="ACTIF",
)
),
) as dag:
# Instantiation
config = enrich_config_create_task(dag)
dbt_refresh = enrich_dbt_models_refresh_task(dag)
suggest_typo = enrich_dbt_model_suggest_task(
dag,
task_id=TASKS.ENRICH_VILLES_TYPO,
cohort=COHORTS.VILLES_TYPO,
dbt_model_name=DBT.MARTS_ENRICH_VILLES_TYPO,
)
suggest_new = enrich_dbt_model_suggest_task(
dag,
task_id=TASKS.ENRICH_VILLES_NEW,
cohort=COHORTS.VILLES_NEW,
dbt_model_name=DBT.MARTS_ENRICH_VILLES_NEW,
)
config >> dbt_refresh # type: ignore
dbt_refresh >> suggest_typo # type: ignore
dbt_refresh >> suggest_new # type: ignore
6 changes: 2 additions & 4 deletions dags/enrich/dags/enrich_dbt_models_refresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,8 @@
params=config_to_airflow_params(
EnrichDbtModelsRefreshConfig(
dbt_models_refresh_commands=[
"dbt build --select +int_ae_unite_legale",
"dbt build --select +int_ae_etablissement",
"dbt build --select +int_ban_adresses",
"dbt build --select int_ban_villes",
"dbt build --select +tag:intermediate,tag:ae",
"dbt build --select +tag:intermediate,tag:ban",
],
)
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,30 @@ def changes_prepare(
).model_dump()


def changes_prepare_villes(row: dict) -> tuple[list[dict], dict]:
"""Prepare suggestions for villes cohorts"""
from data.models.changes import ChangeActeurUpdateData

changes = []
model_params = {
"id": row[COLS.ACTEUR_ID],
"data": {
"ville": row[COLS.SUGGEST_VILLE],
},
}
changes.append(
changes_prepare(
model=ChangeActeurUpdateData,
model_params=model_params,
order=1,
reason="On fait confiance à la BAN",
entity_type="acteur_displayed",
)
)
contexte = {} # changes are self-explanatory
return changes, contexte


def changes_prepare_rgpd(
row: dict,
) -> tuple[list[dict], dict]:
Expand Down Expand Up @@ -184,6 +208,8 @@ def changes_prepare_closed_replaced(
COHORTS.CLOSED_REP_OTHER_SIREN: changes_prepare_closed_replaced,
COHORTS.CLOSED_REP_SAME_SIREN: changes_prepare_closed_replaced,
COHORTS.RGPD: changes_prepare_rgpd,
COHORTS.VILLES_TYPO: changes_prepare_villes,
COHORTS.VILLES_NEW: changes_prepare_villes,
}


Expand All @@ -200,11 +226,17 @@ def enrich_dbt_model_to_suggestions(
SuggestionStatut,
)

# TODO: once all suggestions have been migrated to pydantic, we no
# longer need SuggestionCohorte.type_action and any of the following
# identifiant_execution = cohort AND pydantic models take care of
# handling the specifics
COHORTS_TO_SUGGESTION_ACTION = {
COHORTS.CLOSED_NOT_REPLACED: SuggestionAction.ENRICH_ACTEURS_CLOSED,
COHORTS.CLOSED_REP_OTHER_SIREN: SuggestionAction.ENRICH_ACTEURS_CLOSED,
COHORTS.CLOSED_REP_SAME_SIREN: SuggestionAction.ENRICH_ACTEURS_CLOSED,
COHORTS.RGPD: SuggestionAction.ENRICH_ACTEURS_RGPD,
COHORTS.VILLES_TYPO: SuggestionAction.ENRICH_ACTEURS_VILLES_TYPO,
COHORTS.VILLES_NEW: SuggestionAction.ENRICH_ACTEURS_VILLES_NEW,
}

# Validation
Expand All @@ -223,12 +255,12 @@ def enrich_dbt_model_to_suggestions(

try:
changes, contexte = COHORTS_TO_PREPARE_CHANGES[cohort](row)
suggestions.append(
{
"contexte": contexte,
"suggestion": {"title": cohort, "changes": changes},
}
)
suggestion = {
"contexte": contexte,
"suggestion": {"title": cohort, "changes": changes},
}
log.preview("🔢 Suggestion", suggestion)
suggestions.append(suggestion)

# We tolerate some errors
except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import pytest
from enrich.config.models import EnrichActeursClosedConfig
from dags.enrich.config.models import EnrichActeursClosedConfig


class TestEnrichClosedConfig:
Expand Down
100 changes: 100 additions & 0 deletions dags_unit_tests/enrich/tasks/test_enrich_suggestions_cities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import pandas as pd
import pytest
from enrich.config import COHORTS, COLS
from enrich.tasks.business_logic.enrich_dbt_model_to_suggestions import (
enrich_dbt_model_to_suggestions,
)


@pytest.mark.django_db
class TestEnrichSuggestionsCities:

@pytest.fixture
def df_new(self):
return pd.DataFrame(
{
COLS.SUGGEST_COHORT: [COHORTS.VILLES_NEW] * 2,
COLS.SUGGEST_VILLE: ["new town 1", "new town 2"],
COLS.ACTEUR_ID: ["new1", "new2"],
COLS.ACTEUR_VILLE: ["old town 1", "old town 2"],
}
)

@pytest.fixture
def df_typo(self):
return pd.DataFrame(
{
COLS.SUGGEST_COHORT: [COHORTS.VILLES_TYPO] * 2,
COLS.SUGGEST_VILLE: ["Paris", "Laval"],
COLS.ACTEUR_ID: ["typo1", "typo2"],
COLS.ACTEUR_VILLE: ["Pâris", "Lâval"],
}
)

@pytest.fixture
def acteurs(self, df_new, df_typo):
# Creating acteurs as presence required to apply changes
from unit_tests.qfdmo.acteur_factory import ActeurFactory

for _, row in pd.concat([df_new, df_typo]).iterrows():
ActeurFactory(
identifiant_unique=row[COLS.ACTEUR_ID],
ville=row[COLS.ACTEUR_VILLE],
)

def test_cohort_new(self, acteurs, df_new):
from data.models.suggestion import Suggestion, SuggestionCohorte
from qfdmo.models import RevisionActeur

# Write suggestions to DB
enrich_dbt_model_to_suggestions(
df=df_new,
cohort=COHORTS.VILLES_NEW,
identifiant_action="test_new",
dry_run=False,
)

# Check suggestions have been written to DB
cohort = SuggestionCohorte.objects.get(identifiant_action="test_new")
suggestions = Suggestion.objects.filter(suggestion_cohorte=cohort)
assert len(suggestions) == 2

# Apply suggestions
for suggestion in suggestions:
suggestion.apply()

# Verify changes
# 2 revisions should be created but not parent
new1 = RevisionActeur.objects.get(pk="new1")
assert new1.ville == "new town 1"

new2 = RevisionActeur.objects.get(pk="new2")
assert new2.ville == "new town 2"

def test_cohort_typo(self, acteurs, df_typo):
from data.models.suggestion import Suggestion, SuggestionCohorte
from qfdmo.models import RevisionActeur

# Write suggestions to DB
enrich_dbt_model_to_suggestions(
df=df_typo,
cohort=COHORTS.VILLES_TYPO,
identifiant_action="test_typo",
dry_run=False,
)

# Check suggestions have been written to DB
cohort = SuggestionCohorte.objects.get(identifiant_action="test_typo")
suggestions = Suggestion.objects.filter(suggestion_cohorte=cohort)
assert len(suggestions) == 2

# Apply suggestions
for suggestion in suggestions:
suggestion.apply()

# Verify changes
typo1 = RevisionActeur.objects.get(pk="typo1")
assert typo1.ville == "Paris"

typo2 = RevisionActeur.objects.get(pk="typo2")
assert typo2.ville == "Laval"
35 changes: 35 additions & 0 deletions data/migrations/0013_alter_suggestioncohorte_type_action.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Generated by Django 5.1.6 on 2025-04-28 05:22

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("data", "0012_alter_suggestioncohorte_type_action"),
]

operations = [
migrations.AlterField(
model_name="suggestioncohorte",
name="type_action",
field=models.CharField(
blank=True,
choices=[
("CRAWL_URLS", "🔗 URLs scannées"),
("ENRICH_ACTEURS_CLOSED", "🚪 Acteurs fermés"),
("ENRICH_ACTEURS_RGPD", "🕵 Anonymisation RGPD"),
("ENRICH_ACTEURS_VILLES_TYPO", "🏙️ Acteurs villes typographiques"),
("ENRICH_ACTEURS_VILLES_NEW", "🏙️ Acteurs villes nouvelles"),
("CLUSTERING", "regroupement/déduplication des acteurs"),
("SOURCE_AJOUT", "ingestion de source de données - nouveau acteur"),
(
"SOURCE_MODIFICATION",
"ingestion de source de données - modification d'acteur existant",
),
("SOURCE_SUPRESSION", "ingestion de source de données"),
],
max_length=50,
),
),
]
2 changes: 1 addition & 1 deletion data/models/changes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from .sample_model_do_nothing import SampleModelDoNothing

CHANGE_MODELS = {
ChangeActeurRgpdAnonymize.name(): ChangeActeurRgpdAnonymize,
ChangeActeurUpdateData.name(): ChangeActeurUpdateData,
ChangeActeurCreateAsChild.name(): ChangeActeurCreateAsChild,
ChangeActeurCreateAsParent.name(): ChangeActeurCreateAsParent,
Expand All @@ -20,4 +19,5 @@
ChangeActeurNothingBase.name(): ChangeActeurNothingBase,
ChangeActeurKeepAsParent.name(): ChangeActeurKeepAsParent,
SampleModelDoNothing.name(): SampleModelDoNothing,
ChangeActeurRgpdAnonymize.name(): ChangeActeurRgpdAnonymize,
}
Loading