diff --git a/Makefile b/Makefile
index 5a8c5082a..554457f01 100644
--- a/Makefile
+++ b/Makefile
@@ -57,13 +57,10 @@ run-airflow:
.PHONY: run-django
run-django:
- rm -rf .parcel-cache
- honcho start -f Procfile.dev
+ honcho start -f Procfile.django.dev
run-all:
- docker compose --profile airflow up -d
- rm -rf .parcel-cache
- honcho start -f Procfile.dev
+ honcho start -f Procfile.all.dev
# Local django operations
.PHONY: migrate
diff --git a/Procfile.all.dev b/Procfile.all.dev
new file mode 100644
index 000000000..64359960c
--- /dev/null
+++ b/Procfile.all.dev
@@ -0,0 +1,3 @@
+services: docker compose --profile airflow up
+web: poetry run python manage.py runserver 0.0.0.0:8000
+npm: rm -rf .parcel-cache; npm run watch
diff --git a/Procfile.dev b/Procfile.django.dev
similarity index 100%
rename from Procfile.dev
rename to Procfile.django.dev
diff --git a/dags/acteurs/dags/compute_acteur.py b/dags/acteurs/dags/compute_acteur.py
index fbdbe4b4f..b3a5685db 100755
--- a/dags/acteurs/dags/compute_acteur.py
+++ b/dags/acteurs/dags/compute_acteur.py
@@ -36,90 +36,70 @@
) as dag:
dbt_run_base_acteurs = BashOperator(
task_id="dbt_run_base_acteurs",
- bash_command=("cd /opt/airflow/dbt/ && dbt run --models base.acteurs"),
+ bash_command=("dbt run --models base.acteurs"),
)
dbt_test_base_acteurs = BashOperator(
task_id="dbt_test_base_acteurs",
- bash_command=("cd /opt/airflow/dbt/ && dbt test --models base.acteurs"),
+ bash_command=("dbt test --models base.acteurs"),
)
dbt_run_intermediate_acteurs = BashOperator(
task_id="dbt_run_intermediate_acteurs",
- bash_command=("cd /opt/airflow/dbt/ && dbt run --models intermediate.acteurs"),
+ bash_command=("dbt run --models intermediate.acteurs"),
)
dbt_test_intermediate_acteurs = BashOperator(
task_id="dbt_test_intermediate_acteurs",
- bash_command=("cd /opt/airflow/dbt/ && dbt test --models intermediate.acteurs"),
+ bash_command=("dbt test --models intermediate.acteurs"),
)
dbt_run_marts_acteurs_exhaustive = BashOperator(
task_id="dbt_run_marts_acteurs_exhaustive",
- bash_command=(
- "cd /opt/airflow/dbt/ && dbt run --models marts.acteurs.exhaustive"
- ),
+ bash_command=("dbt run --models marts.acteurs.exhaustive"),
)
dbt_test_marts_acteurs_exhaustive = BashOperator(
task_id="dbt_test_marts_acteurs_exhaustive",
- bash_command=(
- "cd /opt/airflow/dbt/ && dbt test --models marts.acteurs.exhaustive"
- ),
+ bash_command=("dbt test --models marts.acteurs.exhaustive"),
)
dbt_run_exposure_acteurs_exhaustive = BashOperator(
task_id="dbt_run_exposure_acteurs_exhaustive",
- bash_command=(
- "cd /opt/airflow/dbt/ && dbt run --models exposure.acteurs.exhaustive"
- ),
+ bash_command=("dbt run --models exposure.acteurs.exhaustive"),
)
dbt_test_exposure_acteurs_exhaustive = BashOperator(
task_id="dbt_test_exposure_acteurs_exhaustive",
- bash_command=(
- "cd /opt/airflow/dbt/ && dbt test --models exposure.acteurs.exhaustive"
- ),
+ bash_command=("dbt test --models exposure.acteurs.exhaustive"),
)
dbt_run_marts_acteurs_carte = BashOperator(
task_id="dbt_run_marts_acteurs_carte",
- bash_command=("cd /opt/airflow/dbt/ && dbt run --models marts.acteurs.carte"),
+ bash_command=("dbt run --models marts.acteurs.carte"),
)
dbt_test_marts_acteurs_carte = BashOperator(
task_id="dbt_test_marts_acteurs_carte",
- bash_command=("cd /opt/airflow/dbt/ && dbt test --models marts.acteurs.carte"),
+ bash_command=("dbt test --models marts.acteurs.carte"),
)
dbt_run_exposure_acteurs_carte = BashOperator(
task_id="dbt_run_exposure_acteurs_carte",
- bash_command=(
- "cd /opt/airflow/dbt/ && dbt run --models exposure.acteurs.carte"
- ),
+ bash_command=("dbt run --models exposure.acteurs.carte"),
)
dbt_test_exposure_acteurs_carte = BashOperator(
task_id="dbt_test_exposure_acteurs_carte",
- bash_command=(
- "cd /opt/airflow/dbt/ && dbt test --models exposure.acteurs.carte"
- ),
+ bash_command=("dbt test --models exposure.acteurs.carte"),
)
dbt_run_marts_acteurs_opendata = BashOperator(
task_id="dbt_run_marts_acteurs_opendata",
- bash_command=(
- "cd /opt/airflow/dbt/ && dbt run --models marts.acteurs.opendata"
- ),
+ bash_command=("dbt run --models marts.acteurs.opendata"),
)
dbt_test_marts_acteurs_opendata = BashOperator(
task_id="dbt_test_marts_acteurs_opendata",
- bash_command=(
- "cd /opt/airflow/dbt/ && dbt test --models marts.acteurs.opendata"
- ),
+ bash_command=("dbt test --models marts.acteurs.opendata"),
)
dbt_run_exposure_acteurs_opendata = BashOperator(
task_id="dbt_run_exposure_acteurs_opendata",
- bash_command=(
- "cd /opt/airflow/dbt/ && dbt run --models exposure.acteurs.opendata"
- ),
+ bash_command=("dbt run --models exposure.acteurs.opendata"),
)
dbt_test_exposure_acteurs_opendata = BashOperator(
task_id="dbt_test_exposure_acteurs_opendata",
- bash_command=(
- "cd /opt/airflow/dbt/ && dbt test --models exposure.acteurs.opendata"
- ),
+ bash_command=("dbt test --models exposure.acteurs.opendata"),
)
check_model_table_displayedacteur_task = check_model_table_consistency_task(
diff --git a/dags/clone/config/sql/creation/tables/create_table_ban_lieux_dits.sql b/dags/clone/config/sql/creation/tables/create_table_ban_lieux_dits.sql
index 9b4b1bcde..c0b2f2253 100644
--- a/dags/clone/config/sql/creation/tables/create_table_ban_lieux_dits.sql
+++ b/dags/clone/config/sql/creation/tables/create_table_ban_lieux_dits.sql
@@ -4,7 +4,7 @@ Schema generated by scripts/db_schema_create_from_csv.py
CREATE TABLE {{table_name}} (
"id" VARCHAR(12), -- 🟡 on reste scrict (min/max=10/12)
- "nom_lieu_dit" VARCHAR(125), -- 98 -> 125
+ "nom_lieu_dit" VARCHAR(255), -- 98 -> 255
"code_postal" CHAR(5), -- 🟡 on reste scrict (min/max=5)
"code_insee" CHAR(5), -- 🟡 on reste scrict (min/max=5)
"nom_commune" VARCHAR(60), -- 45 -> 60
diff --git a/dags/clone/tasks/business_logic/clone_table_validate.py b/dags/clone/tasks/business_logic/clone_table_validate.py
index 18ba6a8cf..011ab0900 100644
--- a/dags/clone/tasks/business_logic/clone_table_validate.py
+++ b/dags/clone/tasks/business_logic/clone_table_validate.py
@@ -2,6 +2,7 @@
from clone.config import DIR_SQL_VALIDATION
from django.db import connection
+
from utils import logging_utils as log
logger = logging.getLogger(__name__)
@@ -23,7 +24,6 @@ def clone_table_validate(table_kind: str, table_name: str, dry_run: bool) -> Non
logger.info("Mode dry-run, on ne valide pas")
continue
with connection.cursor() as cursor:
-
# Running validation and getting results
cursor.execute(sql)
row = cursor.fetchone()
diff --git a/dags/crawl/tasks/business_logic/crawl_urls_suggest.py b/dags/crawl/tasks/business_logic/crawl_urls_suggest.py
index f878a4b24..4d286ec61 100644
--- a/dags/crawl/tasks/business_logic/crawl_urls_suggest.py
+++ b/dags/crawl/tasks/business_logic/crawl_urls_suggest.py
@@ -3,6 +3,7 @@
import pandas as pd
from crawl.config.columns import COLS
from crawl.config.constants import LABEL_URL_ORIGINE, LABEL_URL_PROPOSEE
+
from utils import logging_utils as log
from utils.dataframes import (
df_col_assert_get_unique,
@@ -39,7 +40,7 @@ def suggestions_prepare(
- df_crawl_ok_diff = successful AND different = propose
- df_crawl_fail = failed = propose None"""
from data.models.change import SuggestionChange
- from data.models.changes import ChangeActeurUpdateData
+ from data.models.changes import ChangeActeurUpdateRevision
if df_none_or_empty(df):
return []
@@ -54,7 +55,7 @@ def suggestions_prepare(
order=1,
reason=row[COLS.COHORT],
entity_type="acteur_displayed",
- model_name=ChangeActeurUpdateData.name(),
+ model_name=ChangeActeurUpdateRevision.name(),
model_params={
"id": acteur[COLS.ID],
"data": {COLS.URL_DB: row[COLS.SUGGEST_VALUE]},
diff --git a/dags/enrich/config/columns.py b/dags/enrich/config/columns.py
index 1ec997172..4f1df52b1 100644
--- a/dags/enrich/config/columns.py
+++ b/dags/enrich/config/columns.py
@@ -12,6 +12,8 @@ class COLS:
# Acteurs
ACTEUR_ID: str = "acteur_id"
+ ACTEUR_ID_EXTERNE: str = "acteur_id_externe"
+ ACTEUR_PARENT_ID: str = "acteur_parent_id"
ACTEUR_TYPE_ID: str = "acteur_type_id"
ACTEUR_SOURCE_ID: str = "acteur_source_id"
ACTEUR_SIRET: str = "acteur_siret"
diff --git a/dags/enrich/dags/enrich_dbt_models_refresh.py b/dags/enrich/dags/enrich_dbt_models_refresh.py
index 03bc9c41f..6394972a9 100644
--- a/dags/enrich/dags/enrich_dbt_models_refresh.py
+++ b/dags/enrich/dags/enrich_dbt_models_refresh.py
@@ -1,5 +1,6 @@
"""DAG to refresh DBT models needed for enrich DAGs"""
+import logging
import re
from airflow import DAG
@@ -9,6 +10,8 @@
from enrich.config.models import EnrichDbtModelsRefreshConfig
from shared.config import CATCHUPS, SCHEDULES, START_DATES, config_to_airflow_params
+logger = logging.getLogger(__name__)
+
with DAG(
dag_id="enrich_dbt_models_refresh",
dag_display_name="🔄 Enrichir - Rafraîchir les modèles DBT",
@@ -42,7 +45,7 @@
if not cmd:
continue
cmd_id = re.sub(r"__+", "_", re.sub(r"[^a-zA-Z0-9]+", "_", cmd))
- cmd += " --debug --threads 1"
+ logger.info(f"Create bash operator with command: {cmd}")
tasks.append(
BashOperator(
task_id=f"enrich_{cmd_id}",
diff --git a/dags/enrich/tasks/business_logic/enrich_dbt_model_to_suggestions.py b/dags/enrich/tasks/business_logic/enrich_dbt_model_to_suggestions.py
index 7354d8ef0..74997e0e0 100644
--- a/dags/enrich/tasks/business_logic/enrich_dbt_model_to_suggestions.py
+++ b/dags/enrich/tasks/business_logic/enrich_dbt_model_to_suggestions.py
@@ -2,17 +2,11 @@
from datetime import datetime, timezone
import pandas as pd
-from cluster.tasks.business_logic.cluster_acteurs_parents_choose_new import (
- parent_id_generate,
-)
from enrich.config.cohorts import COHORTS
from enrich.config.columns import COLS
-from enrich.tasks.business_logic.enrich_dbt_model_row_to_suggest_data import (
- dbt_model_row_to_suggest_data,
-)
-from utils import logging_utils as log
from data.models.changes.acteur_rgpd_anonymize import rgpd_data_get
+from utils import logging_utils as log
logger = logging.getLogger(__name__)
@@ -40,7 +34,7 @@ def changes_prepare(
def changes_prepare_villes(row: dict) -> tuple[list[dict], dict]:
"""Prepare suggestions for villes cohorts"""
- from data.models.changes import ChangeActeurUpdateData
+ from data.models.changes import ChangeActeurUpdateRevision
changes = []
model_params = {
@@ -51,7 +45,7 @@ def changes_prepare_villes(row: dict) -> tuple[list[dict], dict]:
}
changes.append(
changes_prepare(
- model=ChangeActeurUpdateData,
+ model=ChangeActeurUpdateRevision,
model_params=model_params,
order=1,
reason="On fait confiance à la BAN",
@@ -98,24 +92,26 @@ def changes_prepare_closed_not_replaced(
row: dict,
) -> tuple[list[dict], dict]:
"""Prepare suggestions for closed not replaced cohorts"""
- from data.models.changes import ChangeActeurUpdateData
+ from data.models.changes import ChangeActeurUpdateRevision
from qfdmo.models import ActeurStatus
changes = []
+ today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
model_params = {
"id": row[COLS.ACTEUR_ID],
"data": {
"statut": ActeurStatus.INACTIF,
- "siret": row[COLS.ACTEUR_SIRET],
- "siren": row[COLS.ACTEUR_SIRET][:9],
"siret_is_closed": True,
- "acteur_type": row[COLS.ACTEUR_TYPE_ID],
- "source": row[COLS.ACTEUR_SOURCE_ID],
+ "parent_reason": (
+ f"Modifications de l'acteur le {today}: "
+ f"SIRET {row[COLS.ACTEUR_SIRET]} détecté comme fermé dans AE,"
+ " Pas de remplacement"
+ ),
},
}
changes.append(
changes_prepare(
- model=ChangeActeurUpdateData,
+ model=ChangeActeurUpdateRevision,
model_params=model_params,
order=1,
reason="SIRET & SIREN fermés, 0 remplacement trouvé",
@@ -130,79 +126,34 @@ def changes_prepare_closed_replaced(
row: dict,
) -> tuple[list[dict], dict]:
"""Prepare suggestion changes for closed replaced cohorts"""
- from data.models.changes import (
- ChangeActeurCreateAsChild,
- ChangeActeurCreateAsParent,
- ChangeActeurUpdateData,
- )
- from qfdmo.models import ActeurStatus
+ from data.models.changes import ChangeActeurUpdateRevision
changes = []
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
- # Parent
- parent_id = parent_id_generate([str(row[COLS.SUGGEST_SIRET])])
- parent_data = dbt_model_row_to_suggest_data(row)
- parent_data["source"] = None
- parent_data["statut"] = ActeurStatus.ACTIF
- params_parent = {
- "id": parent_id,
- "data": parent_data,
+
+ update_revision = {
+ "id": row[COLS.ACTEUR_ID],
+ "data": {
+ "siret": row[COLS.SUGGEST_SIRET],
+ "siren": row[COLS.SUGGEST_SIRET][:9],
+ "siret_is_closed": False,
+ "parent_reason": (
+ f"Modifications de l'acteur le {today}: "
+ f"SIRET {row[COLS.ACTEUR_SIRET]} détecté comme fermé dans AE, "
+ f"remplacé par le SIRET {row[COLS.SUGGEST_SIRET]}"
+ ),
+ },
}
- changes.append(
+ changes = [
changes_prepare(
- model=ChangeActeurCreateAsParent,
- model_params=params_parent,
+ model=ChangeActeurUpdateRevision,
+ model_params=update_revision,
order=1,
- reason="besoin d'un parent pour rattaché acteur fermé",
- entity_type="acteur_displayed",
- )
- )
-
- # New child to hold the reference data as standalone
- # as parents are surrogates (e.g. they can be deleted
- # during clustering)
- now = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S")
- child_new_id = f"{row[COLS.ACTEUR_ID]}_{row[COLS.ACTEUR_SIRET]}_{now}"
- params_child_new = params_parent.copy()
- params_child_new["id"] = child_new_id
- params_child_new["data"]["source"] = row[COLS.ACTEUR_SOURCE_ID]
- params_child_new["data"]["parent"] = parent_id
- params_child_new["data"]["parent_reason"] = (
- f"Nouvel enfant pour conserver les données suite à: "
- f"SIRET {row[COLS.ACTEUR_SIRET]} "
- f"détecté le {today} comme fermé dans AE, "
- f"remplacé par SIRET {row[COLS.SUGGEST_SIRET]}"
- )
- changes.append(
- changes_prepare(
- model=ChangeActeurCreateAsChild,
- model_params=params_child_new,
- order=2,
- reason="besoin nouvel enfant pour conserver les données",
+ reason="Modification du SIRET",
entity_type="acteur_displayed",
)
- )
+ ]
- # Existing Child
- params_child_old = params_child_new.copy()
- params_child_old["id"] = row[COLS.ACTEUR_ID]
- params_child_old["data"]["parent"] = parent_id
- params_child_old["data"]["parent_reason"] = (
- f"SIRET {row[COLS.ACTEUR_SIRET]} "
- f"détecté le {today} comme fermé dans AE, "
- f"remplacé par SIRET {row[COLS.SUGGEST_SIRET]}"
- )
- params_child_old["data"]["siret_is_closed"] = True
- params_child_old["data"]["statut"] = ActeurStatus.INACTIF
- changes.append(
- changes_prepare(
- model=ChangeActeurUpdateData,
- model_params=params_child_old,
- order=3,
- reason="rattacher enfant fermé à un parent",
- entity_type="acteur_displayed",
- )
- )
contexte = {} # changes are self-explanatory
return changes, contexte
@@ -259,7 +210,9 @@ def enrich_dbt_model_to_suggestions(
row = dict(row)
try:
- changes, contexte = COHORTS_TO_PREPARE_CHANGES[cohort](row)
+ logger.info(f"Interprétation de {cohort=}")
+ change_preparation_function = COHORTS_TO_PREPARE_CHANGES[cohort]
+ changes, contexte = change_preparation_function(row)
suggestion = {
"contexte": contexte,
"suggestion": {"title": cohort, "changes": changes},
diff --git a/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_clusters.py b/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_clusters.py
index 5fac44131..72a34031f 100644
--- a/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_clusters.py
+++ b/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_clusters.py
@@ -7,7 +7,6 @@
score_tuples_to_clusters,
similarity_matrix_to_tuples,
)
-from rich import print
def df_clusters_to_dict(df: pd.DataFrame) -> dict[str, list[str]]:
@@ -169,11 +168,9 @@ def test_cols_group_fuzzy_single(self, df_cols_group_fuzzy):
cluster_fields_fuzzy=["nom"],
cluster_fuzzy_threshold=0.7,
)
- print(df_clusters.to_dict(orient="records"))
assert df_clusters["cluster_id"].nunique() == 3
assert len(df_clusters) == 6
clusters = df_clusters_to_dict(df_clusters)
- print(clusters)
assert clusters == {
"10000_nom_3": [
"id0", # "centre commercial auchan"
diff --git a/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_parents_choose_data.py b/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_parents_choose_data.py
index c27b8d9fb..362fad4ba 100644
--- a/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_parents_choose_data.py
+++ b/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_parents_choose_data.py
@@ -2,8 +2,6 @@
import pandas as pd
import pytest
from django.contrib.gis.geos import Point
-from rich import print
-from utils.django import django_setup_full
from dags.cluster.config.constants import COL_PARENT_DATA_NEW
from dags.cluster.tasks.business_logic.cluster_acteurs_parents_choose_data import (
@@ -11,6 +9,7 @@
field_pick_value,
parent_choose_data,
)
+from utils.django import django_setup_full
django_setup_full()
@@ -209,9 +208,7 @@ def df_clusters_parent_create(self, parent, acteurs_revision, acteurs_base):
ChangeActeurUpdateParentId.name()
)
df["cluster_id"] = "c1"
- print("df_clusters_parent_create before drops", df.to_dict(orient="records"))
df = df.drop_duplicates(subset=["identifiant_unique"], keep="first")
- print("df_clusters_parent_create", df.to_dict(orient="records"))
return df
@pytest.fixture
@@ -261,7 +258,6 @@ def test_cluster_acteurs_parents_choose_data(
df_before = dfs[scenario]
DisplayedActeur(**parent).save()
- print("BEFORE", df_before.to_dict(orient="records"))
df_after = cluster_acteurs_parents_choose_data(
df_clusters=df_before,
fields_to_include=["nom", "siret", "email"],
@@ -269,7 +265,6 @@ def test_cluster_acteurs_parents_choose_data(
prioritize_source_ids=[10, 20],
keep_empty=EMPTY_IGNORE,
)
- print("AFTER", df_after.to_dict(orient="records"))
filter_parent = df_after["identifiant_unique"] == "p1"
parent_data = df_after[filter_parent][COL_PARENT_DATA_NEW].values[0]
df_children = df_after[~filter_parent]
@@ -293,7 +288,6 @@ def test_parent_create(
df_before = df_clusters_parent_create
DisplayedActeur(**parent).save()
- print("BEFORE", df_before.to_dict(orient="records"))
df_after = cluster_acteurs_parents_choose_data(
df_clusters=df_before,
fields_to_include=["nom", "siret", "email"],
@@ -301,7 +295,6 @@ def test_parent_create(
prioritize_source_ids=[10, 20],
keep_empty=EMPTY_IGNORE,
)
- print("AFTER", df_after.to_dict(orient="records"))
filter_parent = df_after["identifiant_unique"] == "p1"
parent_data = df_after[filter_parent][COL_PARENT_DATA_NEW].values[0]
df_children = df_after[~filter_parent]
diff --git a/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_parents_choose_new.py b/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_parents_choose_new.py
index 15e74c22b..248946f1b 100755
--- a/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_parents_choose_new.py
+++ b/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_parents_choose_new.py
@@ -22,7 +22,6 @@
cluster_acteurs_parents_choose_new,
parent_id_generate,
)
-from rich import print
from data.models.change import (
COL_CHANGE_ENTITY_TYPE,
@@ -150,7 +149,6 @@ def test_case_no_parent(self, df_no_parent, parent_id_new):
],
ignore_index=True,
)
- print(f"{df=}")
df = cluster_acteurs_one_cluster_parent_changes_mark(
df, parent_id_new, CHANGE_CREATE, "Nouveau parent"
)
@@ -259,7 +257,6 @@ def test_working_overall_changes(self, df_working, parent_id_new, df_combined):
# On veut aucune valeur nan résultant des divers manipulations
# On ne peut pas juste faire df.isna().any().any() car isna
# inclut les None qu'on tolère
- print(df_working.to_dict(orient="records"))
df_nan = df_working.map(lambda x: 1 if pd.isna(x) and x is not None else 0)
assert df_nan.values.sum() == 0
diff --git a/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_read_orphans.py b/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_read_orphans.py
index 8a646e88d..0722b111c 100644
--- a/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_read_orphans.py
+++ b/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_read_orphans.py
@@ -160,10 +160,6 @@ def df_ideal(self, ideal_scenario_apply_function):
"""La dataframe correspondant au cas idéal
de fonctionnement de la fonction"""
df = ideal_scenario_apply_function[0]
- # Pour faciliter le debug si besoin, bcp plus simple
- # d'utiliser les dicts que du print df potentiellement
- # tronqué
- # print("contenu df_ideal", df.to_dict(orient="records"))
return df
@pytest.fixture
diff --git a/dags_unit_tests/crawl/tasks/business_logic/test_crawl_urls_suggest_prepare.py b/dags_unit_tests/crawl/tasks/business_logic/test_crawl_urls_suggest_prepare.py
index 03a545108..0daff1784 100644
--- a/dags_unit_tests/crawl/tasks/business_logic/test_crawl_urls_suggest_prepare.py
+++ b/dags_unit_tests/crawl/tasks/business_logic/test_crawl_urls_suggest_prepare.py
@@ -2,11 +2,11 @@
import pytest
from crawl.fixtures import acteurs_create, df_syntax_fail # noqa
from sources.config.shared_constants import EMPTY_ACTEUR_FIELD
-from utils import logging_utils as log
from dags.crawl.config.cohorts import COHORTS
from dags.crawl.config.columns import COLS
from dags.crawl.tasks.business_logic.crawl_urls_suggest import suggestions_prepare
+from utils import logging_utils as log
@pytest.mark.django_db
@@ -30,7 +30,6 @@ def test_json_serializable(self, suggestions):
pass
def test_raise_if_acteurs_missing(self):
- from qfdmo.models.acteur import Acteur
df = pd.DataFrame(
{
@@ -42,5 +41,5 @@ def test_raise_if_acteurs_missing(self):
COLS.SUGGEST_VALUE: [EMPTY_ACTEUR_FIELD],
}
)
- with pytest.raises(Acteur.DoesNotExist):
+ with pytest.raises(ValueError):
suggestions_prepare(df=df)
diff --git a/dags_unit_tests/e2e/test_e2e_dag_cluster_acteurs_airflow.py b/dags_unit_tests/e2e/test_e2e_dag_cluster_acteurs_airflow.py
index 71c455590..720fe7b81 100644
--- a/dags_unit_tests/e2e/test_e2e_dag_cluster_acteurs_airflow.py
+++ b/dags_unit_tests/e2e/test_e2e_dag_cluster_acteurs_airflow.py
@@ -4,7 +4,6 @@
import pytest
from airflow.utils.state import State
from django.contrib.gis.geos import Point
-from rich import print
from dags.cluster.config.tasks import TASKS
from dags.shared.config import START_DATES
@@ -132,8 +131,6 @@ def test_up_to_selection_and_normalize(self, db_sources_acteur_types, conf):
dag.test(execution_date=START_DATES.YESTERDAY, run_conf=conf)
tis = dag.get_task_instances()
- for ti in tis:
- print(f"{ti.task_id}", f"{ti.state=}")
# Tasks which should have completed successfully
assert ti_get(tis, TASKS.CONFIG_CREATE).state == State.SUCCESS
diff --git a/dags_unit_tests/enrich/tasks/test_enrich_suggestions_closed.py b/dags_unit_tests/enrich/tasks/test_enrich_suggestions_closed.py
index 27f1e7d28..7341903f0 100644
--- a/dags_unit_tests/enrich/tasks/test_enrich_suggestions_closed.py
+++ b/dags_unit_tests/enrich/tasks/test_enrich_suggestions_closed.py
@@ -2,9 +2,6 @@
import pandas as pd
import pytest
-from cluster.tasks.business_logic.cluster_acteurs_parents_choose_new import (
- parent_id_generate,
-)
from django.contrib.gis.geos import Point
from enrich.config.cohorts import COHORTS
from enrich.config.columns import COLS
@@ -12,6 +9,14 @@
enrich_dbt_model_to_suggestions,
)
+from data.models.suggestion import Suggestion, SuggestionCohorte
+from qfdmo.models.acteur import Acteur, ActeurStatus, RevisionActeur
+from unit_tests.qfdmo.acteur_factory import (
+ ActeurFactory,
+ ActeurTypeFactory,
+ SourceFactory,
+)
+
TODAY = datetime.now(timezone.utc).strftime("%Y-%m-%d")
@@ -20,15 +25,11 @@ class TestEnrichActeursClosedSuggestions:
@pytest.fixture
def source(self):
- from qfdmo.models import Source
-
- return Source.objects.create(code="s1")
+ return SourceFactory(code="s1")
@pytest.fixture
def atype(self):
- from qfdmo.models import ActeurType
-
- return ActeurType.objects.create(code="at1")
+ return ActeurTypeFactory(code="at1")
@pytest.fixture
def df_not_replaced(self, atype, source):
@@ -36,6 +37,7 @@ def df_not_replaced(self, atype, source):
{
# Acteurs data
COLS.ACTEUR_ID: ["a01", "a02"],
+ COLS.ACTEUR_ID_EXTERNE: ["ext_a01", "ext_a02"],
COLS.ACTEUR_SIRET: ["00000000000001", "00000000000002"],
COLS.ACTEUR_NOM: ["AVANT a01", "AVANT a02"],
COLS.ACTEUR_TYPE_ID: [atype.pk, atype.pk],
@@ -50,6 +52,9 @@ def df_replaced(self, atype, source):
{
# Acteurs data
COLS.ACTEUR_ID: ["a1", "a2", "a3"],
+ COLS.ACTEUR_ID_EXTERNE: ["ext_a1", "ext_a2", "ext_a3"],
+ # We test 1 acteur of each cohort with a parent, and
+ # the case with no parent
COLS.ACTEUR_SIRET: [
"11111111100001",
"22222222200001",
@@ -105,18 +110,28 @@ def df_replaced_autre_siret(self, df_replaced):
@pytest.fixture
def acteurs(self, df_not_replaced, df_replaced, atype, source):
# Creating acteurs as presence required to apply changes
- from qfdmo.models import Acteur
df_concat = pd.concat([df_not_replaced, df_replaced])
- acteur_ids = df_concat[COLS.ACTEUR_ID].tolist()
- for acteur_id in acteur_ids:
- Acteur.objects.create(
- identifiant_unique=acteur_id,
- nom=f"AVANT {acteur_id}",
+ for _, row in df_concat.iterrows():
+ acteur = ActeurFactory(
+ identifiant_unique=row[COLS.ACTEUR_ID],
+ identifiant_externe=row[COLS.ACTEUR_ID_EXTERNE],
+ nom=f"AVANT {row[COLS.ACTEUR_ID]}",
+ siret=row[COLS.ACTEUR_SIRET],
+ siren=row[COLS.ACTEUR_SIRET][:9],
acteur_type=atype,
source=source,
- location=Point(x=0, y=0),
+ location=Point(
+ x=row[COLS.ACTEUR_LONGITUDE], y=row[COLS.ACTEUR_LATITUDE]
+ ),
)
+ acteur.save()
+ # Check that acteur has been properly created
+ acteur = Acteur.objects.get(identifiant_unique=row[COLS.ACTEUR_ID])
+ assert acteur.identifiant_externe == row[COLS.ACTEUR_ID_EXTERNE]
+ assert acteur.nom == f"AVANT {row[COLS.ACTEUR_ID]}"
+ assert acteur.acteur_type == atype
+ assert acteur.source == source
def test_cohorte_not_replaced(self, acteurs, df_not_replaced):
from data.models.suggestion import Suggestion, SuggestionCohorte
@@ -139,23 +154,20 @@ def test_cohorte_not_replaced(self, acteurs, df_not_replaced):
for suggestion in suggestions:
suggestion.apply()
- # Verify changes
- # 2 revisions should be created but not parent
- a01 = RevisionActeur.objects.get(pk="a01")
- assert a01.statut == ActeurStatus.INACTIF
- assert a01.parent is None
- assert a01.parent_reason == "" # consequence of empty strings in DB
- assert a01.siret_is_closed is True
-
- a02 = RevisionActeur.objects.get(pk="a02")
- assert a02.statut == ActeurStatus.INACTIF
- assert a02.parent is None
- assert a02.parent_reason == ""
- assert a02.siret_is_closed is True
+ # Verify changes: both acteurs are closed with only relevant
+ # fields updated
+ for id in ["a01", "a02"]:
+ closed = RevisionActeur.objects.get(pk=id)
+ assert closed.statut == ActeurStatus.INACTIF
+ assert closed.parent is None
+ assert closed.parent_reason == (
+ f"Modifications de l'acteur le {TODAY}: "
+ f"SIRET {'00000000000001' if id == 'a01' else '00000000000002'} détecté"
+ " comme fermé dans AE, Pas de remplacement"
+ )
+ assert closed.siret_is_closed is True
- def test_cohorte_meme_siren(self, acteurs, atype, source, df_replaced_meme_siret):
- from data.models.suggestion import Suggestion, SuggestionCohorte
- from qfdmo.models import ActeurStatus, RevisionActeur
+ def test_cohorte_meme_siren(self, acteurs, df_replaced_meme_siret):
# Write suggestions to DB
enrich_dbt_model_to_suggestions(
@@ -174,36 +186,24 @@ def test_cohorte_meme_siren(self, acteurs, atype, source, df_replaced_meme_siret
for suggestion in suggestions:
suggestion.apply()
- # Verify changes
- # 1 parent should be created in revision with replacement data
- # 1 child should be created in revision with status=INACT and parent_id pointing
- parent_id = parent_id_generate(["11111111100002"])
- parent = RevisionActeur.objects.get(pk=parent_id)
- assert parent.pk == parent_id
- assert parent.nom == "APRES a1"
- assert parent.adresse == "Adresse1"
- assert parent.code_postal == "12345"
- assert parent.ville == "Ville1"
- assert parent.naf_principal == "naf1"
- assert parent.acteur_type == atype
- assert parent.source is None
- assert parent.location.x == 1
- assert parent.location.y == 11
-
- child = RevisionActeur.objects.get(pk="a1")
- assert child.statut == ActeurStatus.INACTIF
- assert child.parent == parent
- assert child.parent_reason == (
- f"SIRET 11111111100001 détecté le {TODAY} comme fermé dans AE, "
- f"remplacé par SIRET 11111111100002"
+ acteur = Acteur.objects.get(pk="a1")
+ assert acteur.statut == ActeurStatus.ACTIF
+ assert acteur.siret == "11111111100001"
+ assert acteur.siren == "111111111"
+
+ revision = RevisionActeur.objects.get(pk="a1")
+ assert revision.statut == ActeurStatus.ACTIF
+ assert revision.siret == "11111111100002"
+ assert acteur.siren == "111111111"
+ assert revision.parent is None
+ assert revision.parent_reason == (
+ f"Modifications de l'acteur le {TODAY}: SIRET 11111111100001 détecté comme"
+ " fermé dans AE, remplacé par le SIRET 11111111100002"
)
- assert child.siret_is_closed is True
- assert child.location.x == 1
- assert child.location.y == 11
+ assert revision.siret_is_closed is False
def test_cohorte_autre_siren(self, acteurs, df_replaced_autre_siret):
from data.models.suggestion import Suggestion, SuggestionCohorte
- from qfdmo.models import ActeurStatus, RevisionActeur
# Write suggestions to DB
enrich_dbt_model_to_suggestions(
@@ -216,52 +216,25 @@ def test_cohorte_autre_siren(self, acteurs, df_replaced_autre_siret):
# Check suggestions have been written to DB
cohort = SuggestionCohorte.objects.get(identifiant_action="test_autre_siren")
suggestions = Suggestion.objects.filter(suggestion_cohorte=cohort)
- assert len(suggestions) == 2 # (1 parent + 1 child) x 2 acteurs fermés
+ # 2 suggestions containing 2 changes (inactive + new)
+ assert len(suggestions) == 2
# Apply suggestions
for suggestion in suggestions:
suggestion.apply()
- # Verify changes
- # 1 parent should be created in revision with replacement data
- # 1 child should be created in revision with status=INACT and parent_id pointing
- parent_id = parent_id_generate(["33333333300001"])
- parent = RevisionActeur.objects.get(pk=parent_id)
- assert parent.nom == "APRES a2"
- assert parent.adresse == "Adresse2"
- assert parent.code_postal == "67890"
- assert parent.ville == "Ville2"
- assert parent.naf_principal == "naf2"
- assert parent.location.x == 2
- assert parent.location.y == 22
-
- child = RevisionActeur.objects.get(pk="a2")
- assert child.statut == ActeurStatus.INACTIF
- assert child.parent == parent
- assert child.parent_reason == (
- f"SIRET 22222222200001 détecté le {TODAY} comme fermé dans AE, "
- f"remplacé par SIRET 33333333300001"
- )
- assert child.siret_is_closed is True
- assert child.location.x == 2
- assert child.location.y == 22
-
- parent_id = parent_id_generate(["55555555500001"])
- parent = RevisionActeur.objects.get(pk=parent_id)
- assert parent.nom == "APRES a3"
- assert parent.adresse == "Adresse3"
- assert parent.code_postal == "12345"
- assert parent.ville == "Ville3"
- assert parent.naf_principal == "naf3"
- assert parent.location.x == 3
- assert parent.location.y == 33
-
- child = RevisionActeur.objects.get(pk="a3")
- assert child.statut == ActeurStatus.INACTIF
- assert child.parent == parent
- assert child.parent_reason == (
- f"SIRET 44444444400001 détecté le {TODAY} comme fermé dans AE, "
- f"remplacé par SIRET 55555555500001"
+ acteur = Acteur.objects.get(pk="a2")
+ assert acteur.statut == ActeurStatus.ACTIF
+ assert acteur.siret == "22222222200001"
+ assert acteur.siren == "222222222"
+
+ revision = RevisionActeur.objects.get(pk="a2")
+ assert revision.statut == ActeurStatus.ACTIF
+ assert revision.siret == "33333333300001"
+ assert revision.siren == "333333333"
+ assert revision.parent is None
+ assert revision.parent_reason == (
+ f"Modifications de l'acteur le {TODAY}: SIRET 22222222200001 détecté comme"
+ " fermé dans AE, remplacé par le SIRET 33333333300001"
)
- assert child.location.x == 3
- assert child.location.y == 33
+ assert revision.siret_is_closed is False
diff --git a/dags_unit_tests/shared/tasks/airflow_logic/test_packages_compatibility.py b/dags_unit_tests/shared/tasks/airflow_logic/test_packages_compatibility.py
index 030d9bcfb..b7e1d6eff 100644
--- a/dags_unit_tests/shared/tasks/airflow_logic/test_packages_compatibility.py
+++ b/dags_unit_tests/shared/tasks/airflow_logic/test_packages_compatibility.py
@@ -26,7 +26,6 @@ def test_pandas_read_sql_table():
- mettre à jour ce teste pour qu'il fonctionne
- mettre à jour les codes DAGs pour qu'ils fonctionnent également
"""
- print(f"Pandas version: {pd.__version__}")
engine = create_engine("sqlite:///:memory:")
engine.execute("CREATE TABLE my_table (id INT, name TEXT)")
pd.read_sql_table("my_table", engine)
diff --git a/dags_unit_tests/sources/tasks/transform/test_transform_df.py b/dags_unit_tests/sources/tasks/transform/test_transform_df.py
index 5570a89a0..874a624a2 100644
--- a/dags_unit_tests/sources/tasks/transform/test_transform_df.py
+++ b/dags_unit_tests/sources/tasks/transform/test_transform_df.py
@@ -790,7 +790,6 @@ def test_compute_location(self, latitude, longitude, expected_location):
result = compute_location(
pd.Series({"latitude": latitude, "longitude": longitude}), None
)
- print(result["location"])
assert result["location"] == expected_location
diff --git a/dags_unit_tests/utils/test_data_serialize_reconstruct.py b/dags_unit_tests/utils/test_data_serialize_reconstruct.py
index dc3af503a..478719e6b 100644
--- a/dags_unit_tests/utils/test_data_serialize_reconstruct.py
+++ b/dags_unit_tests/utils/test_data_serialize_reconstruct.py
@@ -2,8 +2,6 @@
import pytest
from django.contrib.gis.geos import Point
-from rich import print
-from utils.data_serialize_reconstruct import data_serialize
from data.models.changes.utils import data_reconstruct
from qfdmo.models.acteur import RevisionActeur
@@ -12,6 +10,7 @@
ActionFactory,
SourceFactory,
)
+from utils.data_serialize_reconstruct import data_serialize
DATETIME = datetime(2023, 10, 1, 14, 30, 4)
POINT = Point(1, 2)
@@ -53,7 +52,6 @@ def test_data_reconstructed(self, data_reconstructed):
assert isinstance(data["cree_le"], str)
def test_data_reconstructed_compatible_with_model(self, data_reconstructed):
- print("test_data_is_compatible", data_reconstructed)
rev = RevisionActeur(**data_reconstructed)
rev.save()
# FIXME: setting cree_le doesn't work the 1st time due
diff --git a/dags_unit_tests/utils/test_django.py b/dags_unit_tests/utils/test_django.py
index 58ee69816..9124eff72 100644
--- a/dags_unit_tests/utils/test_django.py
+++ b/dags_unit_tests/utils/test_django.py
@@ -1,5 +1,6 @@
import pandas as pd
import pytest
+
from utils.django import (
django_model_fields_get,
django_model_queryset_generate,
@@ -59,7 +60,7 @@ def test_django_model_queryset_generate():
)
# Convert queryset to SQL string
- sql = django_model_queryset_to_sql(queryset)
+ django_model_queryset_to_sql(queryset)
# Expected SQL string (adjust to match your backend, structure may vary slightly)
expected_sql = r"""
@@ -69,7 +70,6 @@ def test_django_model_queryset_generate():
AND NOT ("adresse" IS NULL) AND NOT ("adresse" = ''))
AND NOT (("siret" IS NOT NULL) OR ("siret" != ''))
"""
- print(sql)
# TODO: activer ce test une fois qu'on est content
# avec le résultat final via Airflow
# assert sql.strip() == expected_sql.strip()
diff --git a/data/models/changes/__init__.py b/data/models/changes/__init__.py
index 6d1b5e002..e43e0500a 100644
--- a/data/models/changes/__init__.py
+++ b/data/models/changes/__init__.py
@@ -1,23 +1,21 @@
from .acteur_change_nothing_in_base import ChangeActeurNothingBase
-from .acteur_create_as_child import ChangeActeurCreateAsChild
from .acteur_create_as_parent import ChangeActeurCreateAsParent
from .acteur_delete_as_parent import ChangeActeurDeleteAsParent
from .acteur_keep_as_parent import ChangeActeurKeepAsParent
from .acteur_rgpd_anonymize import ChangeActeurRgpdAnonymize
-from .acteur_update_data import ChangeActeurUpdateData
from .acteur_update_parent_id import ChangeActeurUpdateParentId
+from .acteur_update_revision import ChangeActeurUpdateRevision
from .acteur_verify_in_revision import ChangeActeurVerifyRevision
from .sample_model_do_nothing import SampleModelDoNothing
CHANGE_MODELS = {
- ChangeActeurUpdateData.name(): ChangeActeurUpdateData,
- ChangeActeurCreateAsChild.name(): ChangeActeurCreateAsChild,
ChangeActeurCreateAsParent.name(): ChangeActeurCreateAsParent,
ChangeActeurDeleteAsParent.name(): ChangeActeurDeleteAsParent,
+ ChangeActeurKeepAsParent.name(): ChangeActeurKeepAsParent,
+ ChangeActeurNothingBase.name(): ChangeActeurNothingBase,
+ ChangeActeurRgpdAnonymize.name(): ChangeActeurRgpdAnonymize,
ChangeActeurUpdateParentId.name(): ChangeActeurUpdateParentId,
+ ChangeActeurUpdateRevision.name(): ChangeActeurUpdateRevision,
ChangeActeurVerifyRevision.name(): ChangeActeurVerifyRevision,
- ChangeActeurNothingBase.name(): ChangeActeurNothingBase,
- ChangeActeurKeepAsParent.name(): ChangeActeurKeepAsParent,
SampleModelDoNothing.name(): SampleModelDoNothing,
- ChangeActeurRgpdAnonymize.name(): ChangeActeurRgpdAnonymize,
}
diff --git a/data/models/changes/acteur_create_as_child.py b/data/models/changes/acteur_create_as_child.py
deleted file mode 100644
index 74282eb89..000000000
--- a/data/models/changes/acteur_create_as_child.py
+++ /dev/null
@@ -1,63 +0,0 @@
-from pydantic import BaseModel
-
-from data.models.changes.utils import data_reconstruct
-
-
-class ChangeActeurCreateAsChild(BaseModel):
- id: str
- data: dict = {}
-
- @classmethod
- def name(cls) -> str:
- return "acteur_create_as_child"
-
- def validate(self):
- from qfdmo.models import Acteur, DisplayedActeur, RevisionActeur
-
- # Parent field must be SET (but we can't check if parent exists yet
- # as it could be a new parent to be created)
- for field in ["parent", "parent_reason"]:
- if not self.data.get(field):
- msg = f"Création d'enfant: champ '{field}' à renseigner {self.data}"
- raise ValueError(msg)
-
- # Ensure child exists nowhere
- for model in [Acteur, RevisionActeur, DisplayedActeur]:
- obj = model.objects.filter(pk=self.id)
- if obj.exists():
- msg = (
- f"Création d'enfant: '{self.id}' existe déjà dans {model.__name__}"
- )
- raise ValueError(msg)
-
- def apply(self):
- self.validate()
- from qfdmo.models import Acteur, RevisionActeur
-
- # Ensure parent exists in RevisionActeur
- parent = RevisionActeur.objects.get(pk=self.data["parent"])
-
- # Reconstruct data from RevisionActeur
- data = data_reconstruct(RevisionActeur, self.data)
-
- # Create child in Acteur to hold data
- data_base = data.copy()
- del data_base["parent"]
- del data_base["parent_reason"]
- # TODO: if we flatten our pydantic models, then we wouldn't
- if "identifiant_unique" in data_base:
- del data_base["identifiant_unique"]
- Acteur.objects.create(
- identifiant_unique=self.id,
- **data_base,
- )
-
- # Create child in RevisionActeur to hold reference to parent
- RevisionActeur.objects.create(
- identifiant_unique=self.id,
- parent_reason=data["parent_reason"],
- parent=parent,
- statut="ACTIF",
- source=data["source"],
- acteur_type=data["acteur_type"],
- )
diff --git a/data/models/changes/acteur_update_data.py b/data/models/changes/acteur_update_data.py
deleted file mode 100644
index d629e3603..000000000
--- a/data/models/changes/acteur_update_data.py
+++ /dev/null
@@ -1,36 +0,0 @@
-"""Generic change model to update an acteur's data. If your use-case
-is very specific (e.g. RGPD), create dedicated model for more clarity/reliability."""
-
-from data.models.changes.acteur_abstract import ChangeActeurAbstract
-from data.models.changes.utils import data_reconstruct
-from qfdmo.models import Acteur, RevisionActeur
-
-
-class ChangeActeurUpdateData(ChangeActeurAbstract):
- @classmethod
- def name(cls) -> str:
- return "acteur_update_data"
-
- def validate(self) -> Acteur | RevisionActeur:
- if not self.data:
- raise ValueError("No data provided")
- # The parent should already exist in revision or base
- # We tolerate absence from revision
- result = RevisionActeur.objects.filter(pk=self.id).first()
- if not result:
- # But if not in revision, must be in base
- result = Acteur.objects.get(pk=self.id)
- return result
-
- def apply(self):
- acteur = self.validate()
- # If acteur is only in base, we need to create a revision
- if isinstance(acteur, Acteur):
- acteur = RevisionActeur(
- identifiant_unique=acteur.identifiant_unique,
- acteur_type=acteur.acteur_type,
- )
- data = data_reconstruct(RevisionActeur, self.data)
- for key, value in data.items():
- setattr(acteur, key, value)
- acteur.save()
diff --git a/data/models/changes/acteur_update_revision.py b/data/models/changes/acteur_update_revision.py
new file mode 100644
index 000000000..8b1819982
--- /dev/null
+++ b/data/models/changes/acteur_update_revision.py
@@ -0,0 +1,31 @@
+"""Generic change model to update an acteur's data. If your use-case
+is very specific (e.g. RGPD), create dedicated model for more clarity/reliability."""
+
+from data.models.changes.acteur_abstract import ChangeActeurAbstract
+from qfdmo.models import Acteur
+
+
+class ChangeActeurUpdateRevision(ChangeActeurAbstract):
+ @classmethod
+ def name(cls) -> str:
+ return "acteur_update_revision"
+
+ def validate(self) -> Acteur:
+ if not self.data:
+ raise ValueError("Aucune donnée fournie")
+ # The parent should already exist in revision or base
+ # We tolerate absence from revision
+ try:
+ acteur = Acteur.objects.get(pk=self.id)
+ except Acteur.DoesNotExist:
+ raise ValueError(f"L'acteur cible {self.id} n'existe pas")
+ return acteur
+
+ def apply(self):
+ acteur = self.validate()
+
+ revision = acteur.get_or_create_revision()
+
+ for key, value in self.data.items():
+ setattr(revision, key, value)
+ revision.save()
diff --git a/dbt/models/marts/enrich/marts_enrich_acteurs_closed_candidates.sql b/dbt/models/marts/enrich/marts_enrich_acteurs_closed_candidates.sql
index 51d775377..6062d74e8 100644
--- a/dbt/models/marts/enrich/marts_enrich_acteurs_closed_candidates.sql
+++ b/dbt/models/marts/enrich/marts_enrich_acteurs_closed_candidates.sql
@@ -16,6 +16,7 @@ WITH acteurs_with_siret AS (
SELECT
-- Acteur columns
identifiant_unique AS acteur_id,
+ identifiant_externe AS acteur_id_externe,
siret AS acteur_siret,
LEFT(siret,9) AS acteur_siren,
nom AS acteur_nom,
@@ -27,10 +28,11 @@ WITH acteurs_with_siret AS (
adresse AS acteur_adresse,
code_postal AS acteur_code_postal,
ville AS acteur_ville,
- location AS acteur_location
+ location AS acteur_location,
+ parent_id AS acteur_parent_id
- FROM {{ ref('marts_carte_acteur') }}
- WHERE siret IS NOT NULL AND siret != '' AND LENGTH(siret) = 14
+ FROM {{ source('enrich', 'qfdmo_vueacteur') }} AS acteurs
+ WHERE siret IS NOT NULL AND siret != '' AND LENGTH(siret) = 14/* AND (acteurs.source_id is null or acteurs.source_id in (45, 252)) */
),
/* Filtering on etab closed (NOT etab.est_actif) BUT
not on unite closed (NOT unite_est_actif) because
@@ -39,6 +41,7 @@ etab_closed_candidates AS (
SELECT
-- acteurs
acteurs.acteur_id,
+ acteurs.acteur_id_externe,
acteurs.acteur_siret,
acteurs.acteur_siren,
acteurs.acteur_type_id,
@@ -50,6 +53,7 @@ SELECT
acteurs.acteur_adresse,
acteurs.acteur_code_postal,
acteurs.acteur_ville,
+ acteurs.acteur_parent_id,
CASE WHEN acteurs.acteur_location IS NULL THEN NULL ELSE ST_X(acteurs.acteur_location) END AS acteur_longitude,
CASE WHEN acteurs.acteur_location IS NULL THEN NULL ELSE ST_Y(acteurs.acteur_location) END AS acteur_latitude,
diff --git a/dbt/models/marts/enrich/marts_enrich_acteurs_rgpd_suggest.sql b/dbt/models/marts/enrich/marts_enrich_acteurs_rgpd_suggest.sql
index 5177fc57c..a1a2bb323 100644
--- a/dbt/models/marts/enrich/marts_enrich_acteurs_rgpd_suggest.sql
+++ b/dbt/models/marts/enrich/marts_enrich_acteurs_rgpd_suggest.sql
@@ -25,12 +25,12 @@ WITH acteurs_with_siren AS (
udf_normalize_string_for_match(CONCAT(nom || ' ' || nom_officiel || ' ' || nom_commercial)) AS noms_normalises,
commentaires,
statut
- FROM {{ ref('marts_carte_acteur') }}
+ FROM {{ source('enrich', 'qfdmo_vueacteur') }} AS acteurs
/*
We have normalization issues with our SIREN field in our DB
and we obtain better matching by reconstructing SIREN via SIRET
*/
- WHERE siret IS NOT NULL AND siret != '' AND LENGTH(siret) = 14
+ WHERE siret IS NOT NULL AND siret != '' AND LENGTH(siret) = 14/* AND (acteurs.source_id is null or acteurs.source_id in (45, 252)) */
), unite_matching_acteurs_on_siren AS (
SELECT
acteurs.id AS acteur_id,
diff --git a/dbt/models/marts/enrich/marts_enrich_acteurs_villes_candidates.sql b/dbt/models/marts/enrich/marts_enrich_acteurs_villes_candidates.sql
index d5835192c..025995ee3 100644
--- a/dbt/models/marts/enrich/marts_enrich_acteurs_villes_candidates.sql
+++ b/dbt/models/marts/enrich/marts_enrich_acteurs_villes_candidates.sql
@@ -17,9 +17,9 @@ SELECT
ban.ville AS ban_ville,
ban.code_postal AS ban_code_postal,
ban.ville AS suggest_ville
-FROM {{ ref('marts_carte_acteur') }} AS acteurs
+FROM {{ source('enrich', 'qfdmo_vueacteur') }} AS acteurs
JOIN {{ ref('int_ban_villes') }} AS ban ON ban.code_postal = acteurs.code_postal
-WHERE acteurs.statut = 'ACTIF'
+WHERE acteurs.statut = 'ACTIF'/* AND (acteurs.source_id is null or acteurs.source_id in (45, 252)) */
AND acteurs.code_postal IS NOT NULL and acteurs.code_postal != '' and LENGTH(acteurs.code_postal) = 5
/* Only suggest if 1 difference */
AND (
diff --git a/dbt/models/source/source_enrich.yml b/dbt/models/source/source_enrich.yml
new file mode 100644
index 000000000..504215622
--- /dev/null
+++ b/dbt/models/source/source_enrich.yml
@@ -0,0 +1,8 @@
+version: 2
+
+sources:
+ - name: enrich
+ description: "Enrich"
+ schema: public
+ tables:
+ - name: qfdmo_vueacteur
diff --git a/docs/explications/data/enrich/README.md b/docs/explications/data/enrich/README.md
new file mode 100644
index 000000000..9b79d6681
--- /dev/null
+++ b/docs/explications/data/enrich/README.md
@@ -0,0 +1,48 @@
+# Enrichissement de données
+
+Le principe de l'enrichissement de données est d'hidrater et/ou corriger les données de «Longue vie aux objets» grâce à des sources partenaires ou exécutant des scripts de cohérence.
+
+## Enrichissements via des sources partenaires
+
+Les sources aujourd'hui utilisées sont :
+
+* [Annuaire entreprise](https://annuaire-entreprises.data.gouv.fr/) : agrégateur de données sur les entreprises en France
+* [La BAN : Banque d'adresse nationnale](https://adresse.data.gouv.fr/) : référencement et géolocalisation de toutes les adresses en France
+
+### Comment ça marche
+
+Plusieurs étapes :
+
+1. Téléchargement de la base de données partenaire et copie sur notre propre base de données (DAG Airflow)
+ * Cloner - AE - Etablissement
+ * Cloner - AE - Unite Legale
+ * Cloner - BAN - Adresses
+ * Cloner - BAN - Lieux-dits
+1. Préparation de la donnée (Airflow + DBT) :
+ * DBT - Rafraîchir les acteurs affichés
+ * 🔄 Enrichir - Rafraîchir les modèles DBT
+1. Création des suggestions (Airflow + DBT) :
+ * 🚪 Enrichir - Acteurs Fermés
+
+```mermaid
+graph LR
+ A[Cloner - AE - Etablissement]
+ B[Cloner - AE - Unite Legale]
+ C[Cloner - BAN - Adresses]
+ D[Cloner - BAN - Lieux-dits]
+ E[DBT - Rafraîchir les acteurs affichés]
+ F[🔄 Enrichir - Rafraîchir les modèles DBT]
+ G[🚪 Enrichir - Acteurs Fermés]
+ A --> F
+ B --> F
+ C --> F
+ D --> F
+ F --> G
+ E --> G
+```
+
+## Script de cohérence
+
+### Vérification des URLs
+
+le DAG `🔗 Crawl - URLs - Suggestions` collecte les URLs des acteurs et parcourt ces URL pour vérifier qu'elles sont valident
diff --git a/qfdmo/models/acteur.py b/qfdmo/models/acteur.py
index 84fac26d9..3d9bcda77 100644
--- a/qfdmo/models/acteur.py
+++ b/qfdmo/models/acteur.py
@@ -638,6 +638,41 @@ def commentaires_ajouter(self, added):
self.save()
+ def instance_copy(
+ self,
+ overriden_fields={
+ "identifiant_unique": None,
+ "identifiant_externe": None,
+ "source": None,
+ },
+ ):
+ if isinstance(self, RevisionActeur) and self.is_parent:
+ raise Exception("Impossible de dupliquer un acteur parent")
+
+ if isinstance(self, RevisionActeur):
+ acteur = Acteur.objects.get(identifiant_unique=self.identifiant_unique)
+ acteur.instance_copy(overriden_fields=overriden_fields)
+
+ new_instance = deepcopy(self)
+
+ for field, value in overriden_fields.items():
+ setattr(new_instance, field, value)
+
+ new_instance.save()
+ new_instance.labels.set(self.labels.all())
+ new_instance.acteur_services.set(self.acteur_services.all())
+
+ # recreate proposition_services for the new revision_acteur
+ for proposition_service in self.proposition_services.all():
+ new_proposition_service = proposition_service.__class__.objects.create(
+ acteur=new_instance,
+ action=proposition_service.action,
+ )
+ new_proposition_service.sous_categories.set(
+ proposition_service.sous_categories.all()
+ )
+ return new_instance
+
def clean_parent(parent):
try:
@@ -895,7 +930,14 @@ def create_parent(self):
self.save()
return revision_acteur_parent
- def duplicate(self):
+ def duplicate(
+ self,
+ fields_to_reset={
+ "identifiant_unique": None,
+ "identifiant_externe": None,
+ "source": None,
+ },
+ ):
if self.is_parent:
raise Exception("Impossible de dupliquer un acteur parent")
@@ -903,11 +945,6 @@ def duplicate(self):
acteur = Acteur.objects.get(identifiant_unique=self.identifiant_unique)
- fields_to_reset = [
- "identifiant_unique",
- "identifiant_externe",
- "source",
- ]
fields_to_ignore = [
"labels",
"acteur_services",
@@ -916,12 +953,12 @@ def duplicate(self):
"parent_reason",
]
- for field in fields_to_reset:
- setattr(revision_acteur, field, None)
+ for field, value in fields_to_reset.items():
+ setattr(revision_acteur, field, value)
for field in revision_acteur._meta.fields:
if (
not getattr(revision_acteur, field.name)
- and field.name not in fields_to_reset
+ and field.name not in fields_to_reset.keys()
and field.name not in fields_to_ignore
):
setattr(revision_acteur, field.name, getattr(acteur, field.name))
@@ -931,11 +968,9 @@ def duplicate(self):
# recreate proposition_services for the new revision_acteur
for proposition_service in self.proposition_services.all():
- revision_proposition_service = revision_proposition_service = (
- RevisionPropositionService.objects.create(
- acteur=revision_acteur,
- action=proposition_service.action,
- )
+ revision_proposition_service = RevisionPropositionService.objects.create(
+ acteur=revision_acteur,
+ action=proposition_service.action,
)
revision_proposition_service.sous_categories.set(
proposition_service.sous_categories.all()
diff --git a/templates/data/_partials/value_details.html b/templates/data/_partials/value_details.html
index 693887542..88315ed1e 100644
--- a/templates/data/_partials/value_details.html
+++ b/templates/data/_partials/value_details.html
@@ -47,7 +47,7 @@
{% elif key == "identifiant_unique" or key == "id" %}
{{ value }}
(base,
- rev,
+ rev,
disp)
{% elif key == "statut" %}
{{ value }}
diff --git a/unit_tests/data/models/changes/test_acteur_create_as_child.py b/unit_tests/data/models/changes/test_acteur_create_as_child.py
deleted file mode 100644
index 325b5bcdf..000000000
--- a/unit_tests/data/models/changes/test_acteur_create_as_child.py
+++ /dev/null
@@ -1,72 +0,0 @@
-import pytest
-from django.contrib.gis.geos import Point
-
-from data.models.changes.acteur_create_as_child import ChangeActeurCreateAsChild
-from qfdmo.models import Acteur, RevisionActeur
-from unit_tests.qfdmo.acteur_factory import (
- ActeurFactory,
- ActeurTypeFactory,
- SourceFactory,
-)
-
-
-@pytest.mark.django_db
-class TestChangeActeurCreateAsChild:
- @pytest.mark.parametrize(
- "data,missing",
- [({"parent": "456"}, "parent_reason"), ({"parent_reason": "test"}, "parent")],
- )
- def test_raise_if_missing_params(self, data, missing):
- change = ChangeActeurCreateAsChild(id="123", data=data)
- with pytest.raises(ValueError, match=f"champ '{missing}' à renseigner"):
- change.apply()
-
- def test_raise_if_acteur_exists(self):
- ActeurFactory(identifiant_unique="123")
- change = ChangeActeurCreateAsChild(
- id="123", data={"parent": "456", "parent_reason": "test"}
- )
- with pytest.raises(ValueError, match="existe déjà"):
- change.apply()
-
- def test_working(self):
- # Create parent
- source = SourceFactory(code="source1")
- atype = ActeurTypeFactory(code="atype1")
- parent = RevisionActeur.objects.create(
- identifiant_unique="parent1",
- source=source,
- acteur_type=atype,
- statut="ACTIF",
- location=Point(1, 1),
- )
- # Create child
- change = ChangeActeurCreateAsChild(
- id="child1",
- data={
- "nom": "my child1",
- "source": source,
- "acteur_type": atype,
- "statut": "ACFIF",
- "location": Point(1, 1),
- "parent": parent,
- "parent_reason": "test",
- },
- )
- change.apply()
-
- # Acteur created in base to hold the core data
- base = Acteur.objects.get(pk="child1")
- assert base.identifiant_unique == "child1"
- assert base.nom == "my child1"
- assert base.source.pk == source.pk
- assert base.acteur_type.pk == atype.pk
- assert base.statut == "ACFIF"
- assert base.location.x == 1
- assert base.location.y == 1
-
- # Acteur created in revision to hold the parent reference
- revision = RevisionActeur.objects.get(pk="child1")
- assert revision.parent.pk == parent.pk
- assert revision.parent_reason == "test"
- assert not revision.nom
diff --git a/unit_tests/data/models/changes/test_acteur_update_data.py b/unit_tests/data/models/changes/test_acteur_update_data.py
deleted file mode 100644
index 4ffd48a8d..000000000
--- a/unit_tests/data/models/changes/test_acteur_update_data.py
+++ /dev/null
@@ -1,82 +0,0 @@
-"""
-Test file for the ChangeActeurUpdateData model.
-
-"""
-
-import pytest
-from django.contrib.gis.geos import Point
-from pydantic import ValidationError
-
-from dags.sources.config.shared_constants import EMPTY_ACTEUR_FIELD
-from data.models.changes.acteur_update_data import ChangeActeurUpdateData
-from qfdmo.models.acteur import Acteur, ActeurType, RevisionActeur
-
-
-@pytest.mark.django_db
-class TestChangeActeurUpdateData:
- def test_name(self):
- assert ChangeActeurUpdateData.name() == "acteur_update_data"
-
- def test_raise_if_no_data_provided(self):
- change = ChangeActeurUpdateData(id="dummy", data={})
- with pytest.raises(ValueError, match="No data provided"):
- change.apply()
-
- @pytest.mark.parametrize("data", [None, True])
- def test_raise_if_data_wrong_type(self, data):
- with pytest.raises(ValidationError):
- ChangeActeurUpdateData(id="dummy", data=data)
-
- def test_raise_if_acteur_does_not_exist(self):
- change = ChangeActeurUpdateData(id="dummy", data={"nom": "test"})
- with pytest.raises(Acteur.DoesNotExist):
- change.apply()
-
- def test_working_case(self):
- # We start by creating acteur only in base
- at1 = ActeurType.objects.create(code="at1")
- at2 = ActeurType.objects.create(code="at2")
- base = Acteur.objects.create(nom="test", acteur_type=at1, location=Point(1, 2))
-
- # We check that acteur isn't in revision yet
- assert RevisionActeur.objects.filter(pk=base.pk).count() == 0
-
- # Then we modify its data, notably some Python and Django types
- data = {"nom": "test2", "acteur_type": at2.pk, "location": Point(2, 3)}
- ChangeActeurUpdateData(id=base.pk, data=data).apply()
-
- # We check that acteur was created in revision with the right data
- rev = RevisionActeur.objects.get(pk=base.pk)
- assert rev.nom == "test2"
- assert rev.acteur_type == at2
- assert rev.location.x == 2
- assert rev.location.y == 3
-
- # We check that acteur in base wasn't modified
- base = Acteur.objects.get(pk=base.pk)
- assert base.nom == "test"
- assert base.acteur_type == at1
- assert base.location.x == 1
- assert base.location.y == 2
-
- # If we perform the some changes again, revision is again
- # updated AND there is no conflit / creation of a new revision
- data = {"nom": "test3"}
- ChangeActeurUpdateData(id=base.pk, data=data).apply()
- rev = RevisionActeur.objects.get(pk=base.pk)
- assert rev.nom == "test3"
- assert rev.acteur_type == at2
- assert RevisionActeur.objects.filter(pk=base.pk).count() == 1
-
- def test_set_to_empty(self):
- # This is a special case for the Crawl URLs DAG
- # which prompted to create this change model, so we
- # need to ensure that specific case works
- at1 = ActeurType.objects.create(code="at1")
- base = Acteur.objects.create(
- nom="test", acteur_type=at1, location=Point(1, 2), url="https://example.com"
- )
- data = {"url": EMPTY_ACTEUR_FIELD}
- ChangeActeurUpdateData(id=base.pk, data=data).apply()
- rev = RevisionActeur.objects.get(pk=base.pk)
- assert rev.url == EMPTY_ACTEUR_FIELD
diff --git a/unit_tests/data/models/changes/test_acteur_update_revision.py b/unit_tests/data/models/changes/test_acteur_update_revision.py
new file mode 100644
index 000000000..92486a6da
--- /dev/null
+++ b/unit_tests/data/models/changes/test_acteur_update_revision.py
@@ -0,0 +1,58 @@
+from unittest.mock import patch
+
+import pytest
+
+from data.models.changes.acteur_update_revision import ChangeActeurUpdateRevision
+from qfdmo.models import ActeurStatus, RevisionActeur
+from unit_tests.qfdmo.acteur_factory import ActeurFactory, PropositionServiceFactory
+
+
+@pytest.mark.django_db
+class TestChangeActeurUpdateRsevision:
+ def test_model_name(self):
+ assert ChangeActeurUpdateRevision.name() == "acteur_update_revision"
+
+ def test_raise_if_acteur_does_not_exist(self):
+ change = ChangeActeurUpdateRevision(
+ id="dummy", data={"identifiant_unique": "new_id"}
+ )
+ with pytest.raises(ValueError, match="L'acteur cible dummy n'existe pas"):
+ change.validate()
+
+ def test_raise_if_no_data_provided(self):
+ acteur = ActeurFactory()
+ change = ChangeActeurUpdateRevision(id=acteur.pk, data={})
+ with pytest.raises(ValueError, match="Aucune donnée fournie"):
+ change.validate()
+
+ @patch.object(ChangeActeurUpdateRevision, "validate")
+ def test_validate_is_called_by_apply(self, mock_validate):
+ acteur = ActeurFactory(statut=ActeurStatus.INACTIF)
+ change = ChangeActeurUpdateRevision(id=acteur.pk, data={"statut": "ACTIF"})
+ change.apply()
+ mock_validate.assert_called_once()
+
+ def test_without_revision_ok(self):
+ # Création d'un acteur inactif avec des données de base
+ acteur = ActeurFactory(
+ nom="foo",
+ statut=ActeurStatus.ACTIF,
+ )
+ proposition = PropositionServiceFactory()
+ acteur.proposition_services.add(proposition)
+
+ change = ChangeActeurUpdateRevision(
+ id=acteur.pk,
+ data={"nom": "bar", "statut": ActeurStatus.INACTIF},
+ )
+ change.apply()
+
+ assert acteur.statut == ActeurStatus.ACTIF
+ assert acteur.nom == "foo"
+ assert acteur.proposition_services.count() == 1
+
+ revision = RevisionActeur.objects.get(pk=acteur.pk)
+
+ assert revision.statut == ActeurStatus.INACTIF
+ assert revision.nom == "bar"
+ assert revision.proposition_services.count() == 1