diff --git a/Makefile b/Makefile index 5a8c5082a..554457f01 100644 --- a/Makefile +++ b/Makefile @@ -57,13 +57,10 @@ run-airflow: .PHONY: run-django run-django: - rm -rf .parcel-cache - honcho start -f Procfile.dev + honcho start -f Procfile.django.dev run-all: - docker compose --profile airflow up -d - rm -rf .parcel-cache - honcho start -f Procfile.dev + honcho start -f Procfile.all.dev # Local django operations .PHONY: migrate diff --git a/Procfile.all.dev b/Procfile.all.dev new file mode 100644 index 000000000..64359960c --- /dev/null +++ b/Procfile.all.dev @@ -0,0 +1,3 @@ +services: docker compose --profile airflow up +web: poetry run python manage.py runserver 0.0.0.0:8000 +npm: rm -rf .parcel-cache; npm run watch diff --git a/Procfile.dev b/Procfile.django.dev similarity index 100% rename from Procfile.dev rename to Procfile.django.dev diff --git a/dags/acteurs/dags/compute_acteur.py b/dags/acteurs/dags/compute_acteur.py index fbdbe4b4f..b3a5685db 100755 --- a/dags/acteurs/dags/compute_acteur.py +++ b/dags/acteurs/dags/compute_acteur.py @@ -36,90 +36,70 @@ ) as dag: dbt_run_base_acteurs = BashOperator( task_id="dbt_run_base_acteurs", - bash_command=("cd /opt/airflow/dbt/ && dbt run --models base.acteurs"), + bash_command=("dbt run --models base.acteurs"), ) dbt_test_base_acteurs = BashOperator( task_id="dbt_test_base_acteurs", - bash_command=("cd /opt/airflow/dbt/ && dbt test --models base.acteurs"), + bash_command=("dbt test --models base.acteurs"), ) dbt_run_intermediate_acteurs = BashOperator( task_id="dbt_run_intermediate_acteurs", - bash_command=("cd /opt/airflow/dbt/ && dbt run --models intermediate.acteurs"), + bash_command=("dbt run --models intermediate.acteurs"), ) dbt_test_intermediate_acteurs = BashOperator( task_id="dbt_test_intermediate_acteurs", - bash_command=("cd /opt/airflow/dbt/ && dbt test --models intermediate.acteurs"), + bash_command=("dbt test --models intermediate.acteurs"), ) dbt_run_marts_acteurs_exhaustive = BashOperator( task_id="dbt_run_marts_acteurs_exhaustive", - bash_command=( - "cd /opt/airflow/dbt/ && dbt run --models marts.acteurs.exhaustive" - ), + bash_command=("dbt run --models marts.acteurs.exhaustive"), ) dbt_test_marts_acteurs_exhaustive = BashOperator( task_id="dbt_test_marts_acteurs_exhaustive", - bash_command=( - "cd /opt/airflow/dbt/ && dbt test --models marts.acteurs.exhaustive" - ), + bash_command=("dbt test --models marts.acteurs.exhaustive"), ) dbt_run_exposure_acteurs_exhaustive = BashOperator( task_id="dbt_run_exposure_acteurs_exhaustive", - bash_command=( - "cd /opt/airflow/dbt/ && dbt run --models exposure.acteurs.exhaustive" - ), + bash_command=("dbt run --models exposure.acteurs.exhaustive"), ) dbt_test_exposure_acteurs_exhaustive = BashOperator( task_id="dbt_test_exposure_acteurs_exhaustive", - bash_command=( - "cd /opt/airflow/dbt/ && dbt test --models exposure.acteurs.exhaustive" - ), + bash_command=("dbt test --models exposure.acteurs.exhaustive"), ) dbt_run_marts_acteurs_carte = BashOperator( task_id="dbt_run_marts_acteurs_carte", - bash_command=("cd /opt/airflow/dbt/ && dbt run --models marts.acteurs.carte"), + bash_command=("dbt run --models marts.acteurs.carte"), ) dbt_test_marts_acteurs_carte = BashOperator( task_id="dbt_test_marts_acteurs_carte", - bash_command=("cd /opt/airflow/dbt/ && dbt test --models marts.acteurs.carte"), + bash_command=("dbt test --models marts.acteurs.carte"), ) dbt_run_exposure_acteurs_carte = BashOperator( task_id="dbt_run_exposure_acteurs_carte", - bash_command=( - "cd /opt/airflow/dbt/ && dbt run --models exposure.acteurs.carte" - ), + bash_command=("dbt run --models exposure.acteurs.carte"), ) dbt_test_exposure_acteurs_carte = BashOperator( task_id="dbt_test_exposure_acteurs_carte", - bash_command=( - "cd /opt/airflow/dbt/ && dbt test --models exposure.acteurs.carte" - ), + bash_command=("dbt test --models exposure.acteurs.carte"), ) dbt_run_marts_acteurs_opendata = BashOperator( task_id="dbt_run_marts_acteurs_opendata", - bash_command=( - "cd /opt/airflow/dbt/ && dbt run --models marts.acteurs.opendata" - ), + bash_command=("dbt run --models marts.acteurs.opendata"), ) dbt_test_marts_acteurs_opendata = BashOperator( task_id="dbt_test_marts_acteurs_opendata", - bash_command=( - "cd /opt/airflow/dbt/ && dbt test --models marts.acteurs.opendata" - ), + bash_command=("dbt test --models marts.acteurs.opendata"), ) dbt_run_exposure_acteurs_opendata = BashOperator( task_id="dbt_run_exposure_acteurs_opendata", - bash_command=( - "cd /opt/airflow/dbt/ && dbt run --models exposure.acteurs.opendata" - ), + bash_command=("dbt run --models exposure.acteurs.opendata"), ) dbt_test_exposure_acteurs_opendata = BashOperator( task_id="dbt_test_exposure_acteurs_opendata", - bash_command=( - "cd /opt/airflow/dbt/ && dbt test --models exposure.acteurs.opendata" - ), + bash_command=("dbt test --models exposure.acteurs.opendata"), ) check_model_table_displayedacteur_task = check_model_table_consistency_task( diff --git a/dags/clone/config/sql/creation/tables/create_table_ban_lieux_dits.sql b/dags/clone/config/sql/creation/tables/create_table_ban_lieux_dits.sql index 9b4b1bcde..c0b2f2253 100644 --- a/dags/clone/config/sql/creation/tables/create_table_ban_lieux_dits.sql +++ b/dags/clone/config/sql/creation/tables/create_table_ban_lieux_dits.sql @@ -4,7 +4,7 @@ Schema generated by scripts/db_schema_create_from_csv.py CREATE TABLE {{table_name}} ( "id" VARCHAR(12), -- 🟡 on reste scrict (min/max=10/12) - "nom_lieu_dit" VARCHAR(125), -- 98 -> 125 + "nom_lieu_dit" VARCHAR(255), -- 98 -> 255 "code_postal" CHAR(5), -- 🟡 on reste scrict (min/max=5) "code_insee" CHAR(5), -- 🟡 on reste scrict (min/max=5) "nom_commune" VARCHAR(60), -- 45 -> 60 diff --git a/dags/clone/tasks/business_logic/clone_table_validate.py b/dags/clone/tasks/business_logic/clone_table_validate.py index 18ba6a8cf..011ab0900 100644 --- a/dags/clone/tasks/business_logic/clone_table_validate.py +++ b/dags/clone/tasks/business_logic/clone_table_validate.py @@ -2,6 +2,7 @@ from clone.config import DIR_SQL_VALIDATION from django.db import connection + from utils import logging_utils as log logger = logging.getLogger(__name__) @@ -23,7 +24,6 @@ def clone_table_validate(table_kind: str, table_name: str, dry_run: bool) -> Non logger.info("Mode dry-run, on ne valide pas") continue with connection.cursor() as cursor: - # Running validation and getting results cursor.execute(sql) row = cursor.fetchone() diff --git a/dags/crawl/tasks/business_logic/crawl_urls_suggest.py b/dags/crawl/tasks/business_logic/crawl_urls_suggest.py index f878a4b24..4d286ec61 100644 --- a/dags/crawl/tasks/business_logic/crawl_urls_suggest.py +++ b/dags/crawl/tasks/business_logic/crawl_urls_suggest.py @@ -3,6 +3,7 @@ import pandas as pd from crawl.config.columns import COLS from crawl.config.constants import LABEL_URL_ORIGINE, LABEL_URL_PROPOSEE + from utils import logging_utils as log from utils.dataframes import ( df_col_assert_get_unique, @@ -39,7 +40,7 @@ def suggestions_prepare( - df_crawl_ok_diff = successful AND different = propose - df_crawl_fail = failed = propose None""" from data.models.change import SuggestionChange - from data.models.changes import ChangeActeurUpdateData + from data.models.changes import ChangeActeurUpdateRevision if df_none_or_empty(df): return [] @@ -54,7 +55,7 @@ def suggestions_prepare( order=1, reason=row[COLS.COHORT], entity_type="acteur_displayed", - model_name=ChangeActeurUpdateData.name(), + model_name=ChangeActeurUpdateRevision.name(), model_params={ "id": acteur[COLS.ID], "data": {COLS.URL_DB: row[COLS.SUGGEST_VALUE]}, diff --git a/dags/enrich/config/columns.py b/dags/enrich/config/columns.py index 1ec997172..4f1df52b1 100644 --- a/dags/enrich/config/columns.py +++ b/dags/enrich/config/columns.py @@ -12,6 +12,8 @@ class COLS: # Acteurs ACTEUR_ID: str = "acteur_id" + ACTEUR_ID_EXTERNE: str = "acteur_id_externe" + ACTEUR_PARENT_ID: str = "acteur_parent_id" ACTEUR_TYPE_ID: str = "acteur_type_id" ACTEUR_SOURCE_ID: str = "acteur_source_id" ACTEUR_SIRET: str = "acteur_siret" diff --git a/dags/enrich/dags/enrich_dbt_models_refresh.py b/dags/enrich/dags/enrich_dbt_models_refresh.py index 03bc9c41f..6394972a9 100644 --- a/dags/enrich/dags/enrich_dbt_models_refresh.py +++ b/dags/enrich/dags/enrich_dbt_models_refresh.py @@ -1,5 +1,6 @@ """DAG to refresh DBT models needed for enrich DAGs""" +import logging import re from airflow import DAG @@ -9,6 +10,8 @@ from enrich.config.models import EnrichDbtModelsRefreshConfig from shared.config import CATCHUPS, SCHEDULES, START_DATES, config_to_airflow_params +logger = logging.getLogger(__name__) + with DAG( dag_id="enrich_dbt_models_refresh", dag_display_name="🔄 Enrichir - Rafraîchir les modèles DBT", @@ -42,7 +45,7 @@ if not cmd: continue cmd_id = re.sub(r"__+", "_", re.sub(r"[^a-zA-Z0-9]+", "_", cmd)) - cmd += " --debug --threads 1" + logger.info(f"Create bash operator with command: {cmd}") tasks.append( BashOperator( task_id=f"enrich_{cmd_id}", diff --git a/dags/enrich/tasks/business_logic/enrich_dbt_model_to_suggestions.py b/dags/enrich/tasks/business_logic/enrich_dbt_model_to_suggestions.py index 7354d8ef0..74997e0e0 100644 --- a/dags/enrich/tasks/business_logic/enrich_dbt_model_to_suggestions.py +++ b/dags/enrich/tasks/business_logic/enrich_dbt_model_to_suggestions.py @@ -2,17 +2,11 @@ from datetime import datetime, timezone import pandas as pd -from cluster.tasks.business_logic.cluster_acteurs_parents_choose_new import ( - parent_id_generate, -) from enrich.config.cohorts import COHORTS from enrich.config.columns import COLS -from enrich.tasks.business_logic.enrich_dbt_model_row_to_suggest_data import ( - dbt_model_row_to_suggest_data, -) -from utils import logging_utils as log from data.models.changes.acteur_rgpd_anonymize import rgpd_data_get +from utils import logging_utils as log logger = logging.getLogger(__name__) @@ -40,7 +34,7 @@ def changes_prepare( def changes_prepare_villes(row: dict) -> tuple[list[dict], dict]: """Prepare suggestions for villes cohorts""" - from data.models.changes import ChangeActeurUpdateData + from data.models.changes import ChangeActeurUpdateRevision changes = [] model_params = { @@ -51,7 +45,7 @@ def changes_prepare_villes(row: dict) -> tuple[list[dict], dict]: } changes.append( changes_prepare( - model=ChangeActeurUpdateData, + model=ChangeActeurUpdateRevision, model_params=model_params, order=1, reason="On fait confiance à la BAN", @@ -98,24 +92,26 @@ def changes_prepare_closed_not_replaced( row: dict, ) -> tuple[list[dict], dict]: """Prepare suggestions for closed not replaced cohorts""" - from data.models.changes import ChangeActeurUpdateData + from data.models.changes import ChangeActeurUpdateRevision from qfdmo.models import ActeurStatus changes = [] + today = datetime.now(timezone.utc).strftime("%Y-%m-%d") model_params = { "id": row[COLS.ACTEUR_ID], "data": { "statut": ActeurStatus.INACTIF, - "siret": row[COLS.ACTEUR_SIRET], - "siren": row[COLS.ACTEUR_SIRET][:9], "siret_is_closed": True, - "acteur_type": row[COLS.ACTEUR_TYPE_ID], - "source": row[COLS.ACTEUR_SOURCE_ID], + "parent_reason": ( + f"Modifications de l'acteur le {today}: " + f"SIRET {row[COLS.ACTEUR_SIRET]} détecté comme fermé dans AE," + " Pas de remplacement" + ), }, } changes.append( changes_prepare( - model=ChangeActeurUpdateData, + model=ChangeActeurUpdateRevision, model_params=model_params, order=1, reason="SIRET & SIREN fermés, 0 remplacement trouvé", @@ -130,79 +126,34 @@ def changes_prepare_closed_replaced( row: dict, ) -> tuple[list[dict], dict]: """Prepare suggestion changes for closed replaced cohorts""" - from data.models.changes import ( - ChangeActeurCreateAsChild, - ChangeActeurCreateAsParent, - ChangeActeurUpdateData, - ) - from qfdmo.models import ActeurStatus + from data.models.changes import ChangeActeurUpdateRevision changes = [] today = datetime.now(timezone.utc).strftime("%Y-%m-%d") - # Parent - parent_id = parent_id_generate([str(row[COLS.SUGGEST_SIRET])]) - parent_data = dbt_model_row_to_suggest_data(row) - parent_data["source"] = None - parent_data["statut"] = ActeurStatus.ACTIF - params_parent = { - "id": parent_id, - "data": parent_data, + + update_revision = { + "id": row[COLS.ACTEUR_ID], + "data": { + "siret": row[COLS.SUGGEST_SIRET], + "siren": row[COLS.SUGGEST_SIRET][:9], + "siret_is_closed": False, + "parent_reason": ( + f"Modifications de l'acteur le {today}: " + f"SIRET {row[COLS.ACTEUR_SIRET]} détecté comme fermé dans AE, " + f"remplacé par le SIRET {row[COLS.SUGGEST_SIRET]}" + ), + }, } - changes.append( + changes = [ changes_prepare( - model=ChangeActeurCreateAsParent, - model_params=params_parent, + model=ChangeActeurUpdateRevision, + model_params=update_revision, order=1, - reason="besoin d'un parent pour rattaché acteur fermé", - entity_type="acteur_displayed", - ) - ) - - # New child to hold the reference data as standalone - # as parents are surrogates (e.g. they can be deleted - # during clustering) - now = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S") - child_new_id = f"{row[COLS.ACTEUR_ID]}_{row[COLS.ACTEUR_SIRET]}_{now}" - params_child_new = params_parent.copy() - params_child_new["id"] = child_new_id - params_child_new["data"]["source"] = row[COLS.ACTEUR_SOURCE_ID] - params_child_new["data"]["parent"] = parent_id - params_child_new["data"]["parent_reason"] = ( - f"Nouvel enfant pour conserver les données suite à: " - f"SIRET {row[COLS.ACTEUR_SIRET]} " - f"détecté le {today} comme fermé dans AE, " - f"remplacé par SIRET {row[COLS.SUGGEST_SIRET]}" - ) - changes.append( - changes_prepare( - model=ChangeActeurCreateAsChild, - model_params=params_child_new, - order=2, - reason="besoin nouvel enfant pour conserver les données", + reason="Modification du SIRET", entity_type="acteur_displayed", ) - ) + ] - # Existing Child - params_child_old = params_child_new.copy() - params_child_old["id"] = row[COLS.ACTEUR_ID] - params_child_old["data"]["parent"] = parent_id - params_child_old["data"]["parent_reason"] = ( - f"SIRET {row[COLS.ACTEUR_SIRET]} " - f"détecté le {today} comme fermé dans AE, " - f"remplacé par SIRET {row[COLS.SUGGEST_SIRET]}" - ) - params_child_old["data"]["siret_is_closed"] = True - params_child_old["data"]["statut"] = ActeurStatus.INACTIF - changes.append( - changes_prepare( - model=ChangeActeurUpdateData, - model_params=params_child_old, - order=3, - reason="rattacher enfant fermé à un parent", - entity_type="acteur_displayed", - ) - ) contexte = {} # changes are self-explanatory return changes, contexte @@ -259,7 +210,9 @@ def enrich_dbt_model_to_suggestions( row = dict(row) try: - changes, contexte = COHORTS_TO_PREPARE_CHANGES[cohort](row) + logger.info(f"Interprétation de {cohort=}") + change_preparation_function = COHORTS_TO_PREPARE_CHANGES[cohort] + changes, contexte = change_preparation_function(row) suggestion = { "contexte": contexte, "suggestion": {"title": cohort, "changes": changes}, diff --git a/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_clusters.py b/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_clusters.py index 5fac44131..72a34031f 100644 --- a/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_clusters.py +++ b/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_clusters.py @@ -7,7 +7,6 @@ score_tuples_to_clusters, similarity_matrix_to_tuples, ) -from rich import print def df_clusters_to_dict(df: pd.DataFrame) -> dict[str, list[str]]: @@ -169,11 +168,9 @@ def test_cols_group_fuzzy_single(self, df_cols_group_fuzzy): cluster_fields_fuzzy=["nom"], cluster_fuzzy_threshold=0.7, ) - print(df_clusters.to_dict(orient="records")) assert df_clusters["cluster_id"].nunique() == 3 assert len(df_clusters) == 6 clusters = df_clusters_to_dict(df_clusters) - print(clusters) assert clusters == { "10000_nom_3": [ "id0", # "centre commercial auchan" diff --git a/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_parents_choose_data.py b/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_parents_choose_data.py index c27b8d9fb..362fad4ba 100644 --- a/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_parents_choose_data.py +++ b/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_parents_choose_data.py @@ -2,8 +2,6 @@ import pandas as pd import pytest from django.contrib.gis.geos import Point -from rich import print -from utils.django import django_setup_full from dags.cluster.config.constants import COL_PARENT_DATA_NEW from dags.cluster.tasks.business_logic.cluster_acteurs_parents_choose_data import ( @@ -11,6 +9,7 @@ field_pick_value, parent_choose_data, ) +from utils.django import django_setup_full django_setup_full() @@ -209,9 +208,7 @@ def df_clusters_parent_create(self, parent, acteurs_revision, acteurs_base): ChangeActeurUpdateParentId.name() ) df["cluster_id"] = "c1" - print("df_clusters_parent_create before drops", df.to_dict(orient="records")) df = df.drop_duplicates(subset=["identifiant_unique"], keep="first") - print("df_clusters_parent_create", df.to_dict(orient="records")) return df @pytest.fixture @@ -261,7 +258,6 @@ def test_cluster_acteurs_parents_choose_data( df_before = dfs[scenario] DisplayedActeur(**parent).save() - print("BEFORE", df_before.to_dict(orient="records")) df_after = cluster_acteurs_parents_choose_data( df_clusters=df_before, fields_to_include=["nom", "siret", "email"], @@ -269,7 +265,6 @@ def test_cluster_acteurs_parents_choose_data( prioritize_source_ids=[10, 20], keep_empty=EMPTY_IGNORE, ) - print("AFTER", df_after.to_dict(orient="records")) filter_parent = df_after["identifiant_unique"] == "p1" parent_data = df_after[filter_parent][COL_PARENT_DATA_NEW].values[0] df_children = df_after[~filter_parent] @@ -293,7 +288,6 @@ def test_parent_create( df_before = df_clusters_parent_create DisplayedActeur(**parent).save() - print("BEFORE", df_before.to_dict(orient="records")) df_after = cluster_acteurs_parents_choose_data( df_clusters=df_before, fields_to_include=["nom", "siret", "email"], @@ -301,7 +295,6 @@ def test_parent_create( prioritize_source_ids=[10, 20], keep_empty=EMPTY_IGNORE, ) - print("AFTER", df_after.to_dict(orient="records")) filter_parent = df_after["identifiant_unique"] == "p1" parent_data = df_after[filter_parent][COL_PARENT_DATA_NEW].values[0] df_children = df_after[~filter_parent] diff --git a/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_parents_choose_new.py b/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_parents_choose_new.py index 15e74c22b..248946f1b 100755 --- a/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_parents_choose_new.py +++ b/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_parents_choose_new.py @@ -22,7 +22,6 @@ cluster_acteurs_parents_choose_new, parent_id_generate, ) -from rich import print from data.models.change import ( COL_CHANGE_ENTITY_TYPE, @@ -150,7 +149,6 @@ def test_case_no_parent(self, df_no_parent, parent_id_new): ], ignore_index=True, ) - print(f"{df=}") df = cluster_acteurs_one_cluster_parent_changes_mark( df, parent_id_new, CHANGE_CREATE, "Nouveau parent" ) @@ -259,7 +257,6 @@ def test_working_overall_changes(self, df_working, parent_id_new, df_combined): # On veut aucune valeur nan résultant des divers manipulations # On ne peut pas juste faire df.isna().any().any() car isna # inclut les None qu'on tolère - print(df_working.to_dict(orient="records")) df_nan = df_working.map(lambda x: 1 if pd.isna(x) and x is not None else 0) assert df_nan.values.sum() == 0 diff --git a/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_read_orphans.py b/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_read_orphans.py index 8a646e88d..0722b111c 100644 --- a/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_read_orphans.py +++ b/dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_read_orphans.py @@ -160,10 +160,6 @@ def df_ideal(self, ideal_scenario_apply_function): """La dataframe correspondant au cas idéal de fonctionnement de la fonction""" df = ideal_scenario_apply_function[0] - # Pour faciliter le debug si besoin, bcp plus simple - # d'utiliser les dicts que du print df potentiellement - # tronqué - # print("contenu df_ideal", df.to_dict(orient="records")) return df @pytest.fixture diff --git a/dags_unit_tests/crawl/tasks/business_logic/test_crawl_urls_suggest_prepare.py b/dags_unit_tests/crawl/tasks/business_logic/test_crawl_urls_suggest_prepare.py index 03a545108..0daff1784 100644 --- a/dags_unit_tests/crawl/tasks/business_logic/test_crawl_urls_suggest_prepare.py +++ b/dags_unit_tests/crawl/tasks/business_logic/test_crawl_urls_suggest_prepare.py @@ -2,11 +2,11 @@ import pytest from crawl.fixtures import acteurs_create, df_syntax_fail # noqa from sources.config.shared_constants import EMPTY_ACTEUR_FIELD -from utils import logging_utils as log from dags.crawl.config.cohorts import COHORTS from dags.crawl.config.columns import COLS from dags.crawl.tasks.business_logic.crawl_urls_suggest import suggestions_prepare +from utils import logging_utils as log @pytest.mark.django_db @@ -30,7 +30,6 @@ def test_json_serializable(self, suggestions): pass def test_raise_if_acteurs_missing(self): - from qfdmo.models.acteur import Acteur df = pd.DataFrame( { @@ -42,5 +41,5 @@ def test_raise_if_acteurs_missing(self): COLS.SUGGEST_VALUE: [EMPTY_ACTEUR_FIELD], } ) - with pytest.raises(Acteur.DoesNotExist): + with pytest.raises(ValueError): suggestions_prepare(df=df) diff --git a/dags_unit_tests/e2e/test_e2e_dag_cluster_acteurs_airflow.py b/dags_unit_tests/e2e/test_e2e_dag_cluster_acteurs_airflow.py index 71c455590..720fe7b81 100644 --- a/dags_unit_tests/e2e/test_e2e_dag_cluster_acteurs_airflow.py +++ b/dags_unit_tests/e2e/test_e2e_dag_cluster_acteurs_airflow.py @@ -4,7 +4,6 @@ import pytest from airflow.utils.state import State from django.contrib.gis.geos import Point -from rich import print from dags.cluster.config.tasks import TASKS from dags.shared.config import START_DATES @@ -132,8 +131,6 @@ def test_up_to_selection_and_normalize(self, db_sources_acteur_types, conf): dag.test(execution_date=START_DATES.YESTERDAY, run_conf=conf) tis = dag.get_task_instances() - for ti in tis: - print(f"{ti.task_id}", f"{ti.state=}") # Tasks which should have completed successfully assert ti_get(tis, TASKS.CONFIG_CREATE).state == State.SUCCESS diff --git a/dags_unit_tests/enrich/tasks/test_enrich_suggestions_closed.py b/dags_unit_tests/enrich/tasks/test_enrich_suggestions_closed.py index 27f1e7d28..7341903f0 100644 --- a/dags_unit_tests/enrich/tasks/test_enrich_suggestions_closed.py +++ b/dags_unit_tests/enrich/tasks/test_enrich_suggestions_closed.py @@ -2,9 +2,6 @@ import pandas as pd import pytest -from cluster.tasks.business_logic.cluster_acteurs_parents_choose_new import ( - parent_id_generate, -) from django.contrib.gis.geos import Point from enrich.config.cohorts import COHORTS from enrich.config.columns import COLS @@ -12,6 +9,14 @@ enrich_dbt_model_to_suggestions, ) +from data.models.suggestion import Suggestion, SuggestionCohorte +from qfdmo.models.acteur import Acteur, ActeurStatus, RevisionActeur +from unit_tests.qfdmo.acteur_factory import ( + ActeurFactory, + ActeurTypeFactory, + SourceFactory, +) + TODAY = datetime.now(timezone.utc).strftime("%Y-%m-%d") @@ -20,15 +25,11 @@ class TestEnrichActeursClosedSuggestions: @pytest.fixture def source(self): - from qfdmo.models import Source - - return Source.objects.create(code="s1") + return SourceFactory(code="s1") @pytest.fixture def atype(self): - from qfdmo.models import ActeurType - - return ActeurType.objects.create(code="at1") + return ActeurTypeFactory(code="at1") @pytest.fixture def df_not_replaced(self, atype, source): @@ -36,6 +37,7 @@ def df_not_replaced(self, atype, source): { # Acteurs data COLS.ACTEUR_ID: ["a01", "a02"], + COLS.ACTEUR_ID_EXTERNE: ["ext_a01", "ext_a02"], COLS.ACTEUR_SIRET: ["00000000000001", "00000000000002"], COLS.ACTEUR_NOM: ["AVANT a01", "AVANT a02"], COLS.ACTEUR_TYPE_ID: [atype.pk, atype.pk], @@ -50,6 +52,9 @@ def df_replaced(self, atype, source): { # Acteurs data COLS.ACTEUR_ID: ["a1", "a2", "a3"], + COLS.ACTEUR_ID_EXTERNE: ["ext_a1", "ext_a2", "ext_a3"], + # We test 1 acteur of each cohort with a parent, and + # the case with no parent COLS.ACTEUR_SIRET: [ "11111111100001", "22222222200001", @@ -105,18 +110,28 @@ def df_replaced_autre_siret(self, df_replaced): @pytest.fixture def acteurs(self, df_not_replaced, df_replaced, atype, source): # Creating acteurs as presence required to apply changes - from qfdmo.models import Acteur df_concat = pd.concat([df_not_replaced, df_replaced]) - acteur_ids = df_concat[COLS.ACTEUR_ID].tolist() - for acteur_id in acteur_ids: - Acteur.objects.create( - identifiant_unique=acteur_id, - nom=f"AVANT {acteur_id}", + for _, row in df_concat.iterrows(): + acteur = ActeurFactory( + identifiant_unique=row[COLS.ACTEUR_ID], + identifiant_externe=row[COLS.ACTEUR_ID_EXTERNE], + nom=f"AVANT {row[COLS.ACTEUR_ID]}", + siret=row[COLS.ACTEUR_SIRET], + siren=row[COLS.ACTEUR_SIRET][:9], acteur_type=atype, source=source, - location=Point(x=0, y=0), + location=Point( + x=row[COLS.ACTEUR_LONGITUDE], y=row[COLS.ACTEUR_LATITUDE] + ), ) + acteur.save() + # Check that acteur has been properly created + acteur = Acteur.objects.get(identifiant_unique=row[COLS.ACTEUR_ID]) + assert acteur.identifiant_externe == row[COLS.ACTEUR_ID_EXTERNE] + assert acteur.nom == f"AVANT {row[COLS.ACTEUR_ID]}" + assert acteur.acteur_type == atype + assert acteur.source == source def test_cohorte_not_replaced(self, acteurs, df_not_replaced): from data.models.suggestion import Suggestion, SuggestionCohorte @@ -139,23 +154,20 @@ def test_cohorte_not_replaced(self, acteurs, df_not_replaced): for suggestion in suggestions: suggestion.apply() - # Verify changes - # 2 revisions should be created but not parent - a01 = RevisionActeur.objects.get(pk="a01") - assert a01.statut == ActeurStatus.INACTIF - assert a01.parent is None - assert a01.parent_reason == "" # consequence of empty strings in DB - assert a01.siret_is_closed is True - - a02 = RevisionActeur.objects.get(pk="a02") - assert a02.statut == ActeurStatus.INACTIF - assert a02.parent is None - assert a02.parent_reason == "" - assert a02.siret_is_closed is True + # Verify changes: both acteurs are closed with only relevant + # fields updated + for id in ["a01", "a02"]: + closed = RevisionActeur.objects.get(pk=id) + assert closed.statut == ActeurStatus.INACTIF + assert closed.parent is None + assert closed.parent_reason == ( + f"Modifications de l'acteur le {TODAY}: " + f"SIRET {'00000000000001' if id == 'a01' else '00000000000002'} détecté" + " comme fermé dans AE, Pas de remplacement" + ) + assert closed.siret_is_closed is True - def test_cohorte_meme_siren(self, acteurs, atype, source, df_replaced_meme_siret): - from data.models.suggestion import Suggestion, SuggestionCohorte - from qfdmo.models import ActeurStatus, RevisionActeur + def test_cohorte_meme_siren(self, acteurs, df_replaced_meme_siret): # Write suggestions to DB enrich_dbt_model_to_suggestions( @@ -174,36 +186,24 @@ def test_cohorte_meme_siren(self, acteurs, atype, source, df_replaced_meme_siret for suggestion in suggestions: suggestion.apply() - # Verify changes - # 1 parent should be created in revision with replacement data - # 1 child should be created in revision with status=INACT and parent_id pointing - parent_id = parent_id_generate(["11111111100002"]) - parent = RevisionActeur.objects.get(pk=parent_id) - assert parent.pk == parent_id - assert parent.nom == "APRES a1" - assert parent.adresse == "Adresse1" - assert parent.code_postal == "12345" - assert parent.ville == "Ville1" - assert parent.naf_principal == "naf1" - assert parent.acteur_type == atype - assert parent.source is None - assert parent.location.x == 1 - assert parent.location.y == 11 - - child = RevisionActeur.objects.get(pk="a1") - assert child.statut == ActeurStatus.INACTIF - assert child.parent == parent - assert child.parent_reason == ( - f"SIRET 11111111100001 détecté le {TODAY} comme fermé dans AE, " - f"remplacé par SIRET 11111111100002" + acteur = Acteur.objects.get(pk="a1") + assert acteur.statut == ActeurStatus.ACTIF + assert acteur.siret == "11111111100001" + assert acteur.siren == "111111111" + + revision = RevisionActeur.objects.get(pk="a1") + assert revision.statut == ActeurStatus.ACTIF + assert revision.siret == "11111111100002" + assert acteur.siren == "111111111" + assert revision.parent is None + assert revision.parent_reason == ( + f"Modifications de l'acteur le {TODAY}: SIRET 11111111100001 détecté comme" + " fermé dans AE, remplacé par le SIRET 11111111100002" ) - assert child.siret_is_closed is True - assert child.location.x == 1 - assert child.location.y == 11 + assert revision.siret_is_closed is False def test_cohorte_autre_siren(self, acteurs, df_replaced_autre_siret): from data.models.suggestion import Suggestion, SuggestionCohorte - from qfdmo.models import ActeurStatus, RevisionActeur # Write suggestions to DB enrich_dbt_model_to_suggestions( @@ -216,52 +216,25 @@ def test_cohorte_autre_siren(self, acteurs, df_replaced_autre_siret): # Check suggestions have been written to DB cohort = SuggestionCohorte.objects.get(identifiant_action="test_autre_siren") suggestions = Suggestion.objects.filter(suggestion_cohorte=cohort) - assert len(suggestions) == 2 # (1 parent + 1 child) x 2 acteurs fermés + # 2 suggestions containing 2 changes (inactive + new) + assert len(suggestions) == 2 # Apply suggestions for suggestion in suggestions: suggestion.apply() - # Verify changes - # 1 parent should be created in revision with replacement data - # 1 child should be created in revision with status=INACT and parent_id pointing - parent_id = parent_id_generate(["33333333300001"]) - parent = RevisionActeur.objects.get(pk=parent_id) - assert parent.nom == "APRES a2" - assert parent.adresse == "Adresse2" - assert parent.code_postal == "67890" - assert parent.ville == "Ville2" - assert parent.naf_principal == "naf2" - assert parent.location.x == 2 - assert parent.location.y == 22 - - child = RevisionActeur.objects.get(pk="a2") - assert child.statut == ActeurStatus.INACTIF - assert child.parent == parent - assert child.parent_reason == ( - f"SIRET 22222222200001 détecté le {TODAY} comme fermé dans AE, " - f"remplacé par SIRET 33333333300001" - ) - assert child.siret_is_closed is True - assert child.location.x == 2 - assert child.location.y == 22 - - parent_id = parent_id_generate(["55555555500001"]) - parent = RevisionActeur.objects.get(pk=parent_id) - assert parent.nom == "APRES a3" - assert parent.adresse == "Adresse3" - assert parent.code_postal == "12345" - assert parent.ville == "Ville3" - assert parent.naf_principal == "naf3" - assert parent.location.x == 3 - assert parent.location.y == 33 - - child = RevisionActeur.objects.get(pk="a3") - assert child.statut == ActeurStatus.INACTIF - assert child.parent == parent - assert child.parent_reason == ( - f"SIRET 44444444400001 détecté le {TODAY} comme fermé dans AE, " - f"remplacé par SIRET 55555555500001" + acteur = Acteur.objects.get(pk="a2") + assert acteur.statut == ActeurStatus.ACTIF + assert acteur.siret == "22222222200001" + assert acteur.siren == "222222222" + + revision = RevisionActeur.objects.get(pk="a2") + assert revision.statut == ActeurStatus.ACTIF + assert revision.siret == "33333333300001" + assert revision.siren == "333333333" + assert revision.parent is None + assert revision.parent_reason == ( + f"Modifications de l'acteur le {TODAY}: SIRET 22222222200001 détecté comme" + " fermé dans AE, remplacé par le SIRET 33333333300001" ) - assert child.location.x == 3 - assert child.location.y == 33 + assert revision.siret_is_closed is False diff --git a/dags_unit_tests/shared/tasks/airflow_logic/test_packages_compatibility.py b/dags_unit_tests/shared/tasks/airflow_logic/test_packages_compatibility.py index 030d9bcfb..b7e1d6eff 100644 --- a/dags_unit_tests/shared/tasks/airflow_logic/test_packages_compatibility.py +++ b/dags_unit_tests/shared/tasks/airflow_logic/test_packages_compatibility.py @@ -26,7 +26,6 @@ def test_pandas_read_sql_table(): - mettre à jour ce teste pour qu'il fonctionne - mettre à jour les codes DAGs pour qu'ils fonctionnent également """ - print(f"Pandas version: {pd.__version__}") engine = create_engine("sqlite:///:memory:") engine.execute("CREATE TABLE my_table (id INT, name TEXT)") pd.read_sql_table("my_table", engine) diff --git a/dags_unit_tests/sources/tasks/transform/test_transform_df.py b/dags_unit_tests/sources/tasks/transform/test_transform_df.py index 5570a89a0..874a624a2 100644 --- a/dags_unit_tests/sources/tasks/transform/test_transform_df.py +++ b/dags_unit_tests/sources/tasks/transform/test_transform_df.py @@ -790,7 +790,6 @@ def test_compute_location(self, latitude, longitude, expected_location): result = compute_location( pd.Series({"latitude": latitude, "longitude": longitude}), None ) - print(result["location"]) assert result["location"] == expected_location diff --git a/dags_unit_tests/utils/test_data_serialize_reconstruct.py b/dags_unit_tests/utils/test_data_serialize_reconstruct.py index dc3af503a..478719e6b 100644 --- a/dags_unit_tests/utils/test_data_serialize_reconstruct.py +++ b/dags_unit_tests/utils/test_data_serialize_reconstruct.py @@ -2,8 +2,6 @@ import pytest from django.contrib.gis.geos import Point -from rich import print -from utils.data_serialize_reconstruct import data_serialize from data.models.changes.utils import data_reconstruct from qfdmo.models.acteur import RevisionActeur @@ -12,6 +10,7 @@ ActionFactory, SourceFactory, ) +from utils.data_serialize_reconstruct import data_serialize DATETIME = datetime(2023, 10, 1, 14, 30, 4) POINT = Point(1, 2) @@ -53,7 +52,6 @@ def test_data_reconstructed(self, data_reconstructed): assert isinstance(data["cree_le"], str) def test_data_reconstructed_compatible_with_model(self, data_reconstructed): - print("test_data_is_compatible", data_reconstructed) rev = RevisionActeur(**data_reconstructed) rev.save() # FIXME: setting cree_le doesn't work the 1st time due diff --git a/dags_unit_tests/utils/test_django.py b/dags_unit_tests/utils/test_django.py index 58ee69816..9124eff72 100644 --- a/dags_unit_tests/utils/test_django.py +++ b/dags_unit_tests/utils/test_django.py @@ -1,5 +1,6 @@ import pandas as pd import pytest + from utils.django import ( django_model_fields_get, django_model_queryset_generate, @@ -59,7 +60,7 @@ def test_django_model_queryset_generate(): ) # Convert queryset to SQL string - sql = django_model_queryset_to_sql(queryset) + django_model_queryset_to_sql(queryset) # Expected SQL string (adjust to match your backend, structure may vary slightly) expected_sql = r""" @@ -69,7 +70,6 @@ def test_django_model_queryset_generate(): AND NOT ("adresse" IS NULL) AND NOT ("adresse" = '')) AND NOT (("siret" IS NOT NULL) OR ("siret" != '')) """ - print(sql) # TODO: activer ce test une fois qu'on est content # avec le résultat final via Airflow # assert sql.strip() == expected_sql.strip() diff --git a/data/models/changes/__init__.py b/data/models/changes/__init__.py index 6d1b5e002..e43e0500a 100644 --- a/data/models/changes/__init__.py +++ b/data/models/changes/__init__.py @@ -1,23 +1,21 @@ from .acteur_change_nothing_in_base import ChangeActeurNothingBase -from .acteur_create_as_child import ChangeActeurCreateAsChild from .acteur_create_as_parent import ChangeActeurCreateAsParent from .acteur_delete_as_parent import ChangeActeurDeleteAsParent from .acteur_keep_as_parent import ChangeActeurKeepAsParent from .acteur_rgpd_anonymize import ChangeActeurRgpdAnonymize -from .acteur_update_data import ChangeActeurUpdateData from .acteur_update_parent_id import ChangeActeurUpdateParentId +from .acteur_update_revision import ChangeActeurUpdateRevision from .acteur_verify_in_revision import ChangeActeurVerifyRevision from .sample_model_do_nothing import SampleModelDoNothing CHANGE_MODELS = { - ChangeActeurUpdateData.name(): ChangeActeurUpdateData, - ChangeActeurCreateAsChild.name(): ChangeActeurCreateAsChild, ChangeActeurCreateAsParent.name(): ChangeActeurCreateAsParent, ChangeActeurDeleteAsParent.name(): ChangeActeurDeleteAsParent, + ChangeActeurKeepAsParent.name(): ChangeActeurKeepAsParent, + ChangeActeurNothingBase.name(): ChangeActeurNothingBase, + ChangeActeurRgpdAnonymize.name(): ChangeActeurRgpdAnonymize, ChangeActeurUpdateParentId.name(): ChangeActeurUpdateParentId, + ChangeActeurUpdateRevision.name(): ChangeActeurUpdateRevision, ChangeActeurVerifyRevision.name(): ChangeActeurVerifyRevision, - ChangeActeurNothingBase.name(): ChangeActeurNothingBase, - ChangeActeurKeepAsParent.name(): ChangeActeurKeepAsParent, SampleModelDoNothing.name(): SampleModelDoNothing, - ChangeActeurRgpdAnonymize.name(): ChangeActeurRgpdAnonymize, } diff --git a/data/models/changes/acteur_create_as_child.py b/data/models/changes/acteur_create_as_child.py deleted file mode 100644 index 74282eb89..000000000 --- a/data/models/changes/acteur_create_as_child.py +++ /dev/null @@ -1,63 +0,0 @@ -from pydantic import BaseModel - -from data.models.changes.utils import data_reconstruct - - -class ChangeActeurCreateAsChild(BaseModel): - id: str - data: dict = {} - - @classmethod - def name(cls) -> str: - return "acteur_create_as_child" - - def validate(self): - from qfdmo.models import Acteur, DisplayedActeur, RevisionActeur - - # Parent field must be SET (but we can't check if parent exists yet - # as it could be a new parent to be created) - for field in ["parent", "parent_reason"]: - if not self.data.get(field): - msg = f"Création d'enfant: champ '{field}' à renseigner {self.data}" - raise ValueError(msg) - - # Ensure child exists nowhere - for model in [Acteur, RevisionActeur, DisplayedActeur]: - obj = model.objects.filter(pk=self.id) - if obj.exists(): - msg = ( - f"Création d'enfant: '{self.id}' existe déjà dans {model.__name__}" - ) - raise ValueError(msg) - - def apply(self): - self.validate() - from qfdmo.models import Acteur, RevisionActeur - - # Ensure parent exists in RevisionActeur - parent = RevisionActeur.objects.get(pk=self.data["parent"]) - - # Reconstruct data from RevisionActeur - data = data_reconstruct(RevisionActeur, self.data) - - # Create child in Acteur to hold data - data_base = data.copy() - del data_base["parent"] - del data_base["parent_reason"] - # TODO: if we flatten our pydantic models, then we wouldn't - if "identifiant_unique" in data_base: - del data_base["identifiant_unique"] - Acteur.objects.create( - identifiant_unique=self.id, - **data_base, - ) - - # Create child in RevisionActeur to hold reference to parent - RevisionActeur.objects.create( - identifiant_unique=self.id, - parent_reason=data["parent_reason"], - parent=parent, - statut="ACTIF", - source=data["source"], - acteur_type=data["acteur_type"], - ) diff --git a/data/models/changes/acteur_update_data.py b/data/models/changes/acteur_update_data.py deleted file mode 100644 index d629e3603..000000000 --- a/data/models/changes/acteur_update_data.py +++ /dev/null @@ -1,36 +0,0 @@ -"""Generic change model to update an acteur's data. If your use-case -is very specific (e.g. RGPD), create dedicated model for more clarity/reliability.""" - -from data.models.changes.acteur_abstract import ChangeActeurAbstract -from data.models.changes.utils import data_reconstruct -from qfdmo.models import Acteur, RevisionActeur - - -class ChangeActeurUpdateData(ChangeActeurAbstract): - @classmethod - def name(cls) -> str: - return "acteur_update_data" - - def validate(self) -> Acteur | RevisionActeur: - if not self.data: - raise ValueError("No data provided") - # The parent should already exist in revision or base - # We tolerate absence from revision - result = RevisionActeur.objects.filter(pk=self.id).first() - if not result: - # But if not in revision, must be in base - result = Acteur.objects.get(pk=self.id) - return result - - def apply(self): - acteur = self.validate() - # If acteur is only in base, we need to create a revision - if isinstance(acteur, Acteur): - acteur = RevisionActeur( - identifiant_unique=acteur.identifiant_unique, - acteur_type=acteur.acteur_type, - ) - data = data_reconstruct(RevisionActeur, self.data) - for key, value in data.items(): - setattr(acteur, key, value) - acteur.save() diff --git a/data/models/changes/acteur_update_revision.py b/data/models/changes/acteur_update_revision.py new file mode 100644 index 000000000..8b1819982 --- /dev/null +++ b/data/models/changes/acteur_update_revision.py @@ -0,0 +1,31 @@ +"""Generic change model to update an acteur's data. If your use-case +is very specific (e.g. RGPD), create dedicated model for more clarity/reliability.""" + +from data.models.changes.acteur_abstract import ChangeActeurAbstract +from qfdmo.models import Acteur + + +class ChangeActeurUpdateRevision(ChangeActeurAbstract): + @classmethod + def name(cls) -> str: + return "acteur_update_revision" + + def validate(self) -> Acteur: + if not self.data: + raise ValueError("Aucune donnée fournie") + # The parent should already exist in revision or base + # We tolerate absence from revision + try: + acteur = Acteur.objects.get(pk=self.id) + except Acteur.DoesNotExist: + raise ValueError(f"L'acteur cible {self.id} n'existe pas") + return acteur + + def apply(self): + acteur = self.validate() + + revision = acteur.get_or_create_revision() + + for key, value in self.data.items(): + setattr(revision, key, value) + revision.save() diff --git a/dbt/models/marts/enrich/marts_enrich_acteurs_closed_candidates.sql b/dbt/models/marts/enrich/marts_enrich_acteurs_closed_candidates.sql index 51d775377..6062d74e8 100644 --- a/dbt/models/marts/enrich/marts_enrich_acteurs_closed_candidates.sql +++ b/dbt/models/marts/enrich/marts_enrich_acteurs_closed_candidates.sql @@ -16,6 +16,7 @@ WITH acteurs_with_siret AS ( SELECT -- Acteur columns identifiant_unique AS acteur_id, + identifiant_externe AS acteur_id_externe, siret AS acteur_siret, LEFT(siret,9) AS acteur_siren, nom AS acteur_nom, @@ -27,10 +28,11 @@ WITH acteurs_with_siret AS ( adresse AS acteur_adresse, code_postal AS acteur_code_postal, ville AS acteur_ville, - location AS acteur_location + location AS acteur_location, + parent_id AS acteur_parent_id - FROM {{ ref('marts_carte_acteur') }} - WHERE siret IS NOT NULL AND siret != '' AND LENGTH(siret) = 14 + FROM {{ source('enrich', 'qfdmo_vueacteur') }} AS acteurs + WHERE siret IS NOT NULL AND siret != '' AND LENGTH(siret) = 14/* AND (acteurs.source_id is null or acteurs.source_id in (45, 252)) */ ), /* Filtering on etab closed (NOT etab.est_actif) BUT not on unite closed (NOT unite_est_actif) because @@ -39,6 +41,7 @@ etab_closed_candidates AS ( SELECT -- acteurs acteurs.acteur_id, + acteurs.acteur_id_externe, acteurs.acteur_siret, acteurs.acteur_siren, acteurs.acteur_type_id, @@ -50,6 +53,7 @@ SELECT acteurs.acteur_adresse, acteurs.acteur_code_postal, acteurs.acteur_ville, + acteurs.acteur_parent_id, CASE WHEN acteurs.acteur_location IS NULL THEN NULL ELSE ST_X(acteurs.acteur_location) END AS acteur_longitude, CASE WHEN acteurs.acteur_location IS NULL THEN NULL ELSE ST_Y(acteurs.acteur_location) END AS acteur_latitude, diff --git a/dbt/models/marts/enrich/marts_enrich_acteurs_rgpd_suggest.sql b/dbt/models/marts/enrich/marts_enrich_acteurs_rgpd_suggest.sql index 5177fc57c..a1a2bb323 100644 --- a/dbt/models/marts/enrich/marts_enrich_acteurs_rgpd_suggest.sql +++ b/dbt/models/marts/enrich/marts_enrich_acteurs_rgpd_suggest.sql @@ -25,12 +25,12 @@ WITH acteurs_with_siren AS ( udf_normalize_string_for_match(CONCAT(nom || ' ' || nom_officiel || ' ' || nom_commercial)) AS noms_normalises, commentaires, statut - FROM {{ ref('marts_carte_acteur') }} + FROM {{ source('enrich', 'qfdmo_vueacteur') }} AS acteurs /* We have normalization issues with our SIREN field in our DB and we obtain better matching by reconstructing SIREN via SIRET */ - WHERE siret IS NOT NULL AND siret != '' AND LENGTH(siret) = 14 + WHERE siret IS NOT NULL AND siret != '' AND LENGTH(siret) = 14/* AND (acteurs.source_id is null or acteurs.source_id in (45, 252)) */ ), unite_matching_acteurs_on_siren AS ( SELECT acteurs.id AS acteur_id, diff --git a/dbt/models/marts/enrich/marts_enrich_acteurs_villes_candidates.sql b/dbt/models/marts/enrich/marts_enrich_acteurs_villes_candidates.sql index d5835192c..025995ee3 100644 --- a/dbt/models/marts/enrich/marts_enrich_acteurs_villes_candidates.sql +++ b/dbt/models/marts/enrich/marts_enrich_acteurs_villes_candidates.sql @@ -17,9 +17,9 @@ SELECT ban.ville AS ban_ville, ban.code_postal AS ban_code_postal, ban.ville AS suggest_ville -FROM {{ ref('marts_carte_acteur') }} AS acteurs +FROM {{ source('enrich', 'qfdmo_vueacteur') }} AS acteurs JOIN {{ ref('int_ban_villes') }} AS ban ON ban.code_postal = acteurs.code_postal -WHERE acteurs.statut = 'ACTIF' +WHERE acteurs.statut = 'ACTIF'/* AND (acteurs.source_id is null or acteurs.source_id in (45, 252)) */ AND acteurs.code_postal IS NOT NULL and acteurs.code_postal != '' and LENGTH(acteurs.code_postal) = 5 /* Only suggest if 1 difference */ AND ( diff --git a/dbt/models/source/source_enrich.yml b/dbt/models/source/source_enrich.yml new file mode 100644 index 000000000..504215622 --- /dev/null +++ b/dbt/models/source/source_enrich.yml @@ -0,0 +1,8 @@ +version: 2 + +sources: + - name: enrich + description: "Enrich" + schema: public + tables: + - name: qfdmo_vueacteur diff --git a/docs/explications/data/enrich/README.md b/docs/explications/data/enrich/README.md new file mode 100644 index 000000000..9b79d6681 --- /dev/null +++ b/docs/explications/data/enrich/README.md @@ -0,0 +1,48 @@ +# Enrichissement de données + +Le principe de l'enrichissement de données est d'hidrater et/ou corriger les données de «Longue vie aux objets» grâce à des sources partenaires ou exécutant des scripts de cohérence. + +## Enrichissements via des sources partenaires + +Les sources aujourd'hui utilisées sont : + +* [Annuaire entreprise](https://annuaire-entreprises.data.gouv.fr/) : agrégateur de données sur les entreprises en France +* [La BAN : Banque d'adresse nationnale](https://adresse.data.gouv.fr/) : référencement et géolocalisation de toutes les adresses en France + +### Comment ça marche + +Plusieurs étapes : + +1. Téléchargement de la base de données partenaire et copie sur notre propre base de données (DAG Airflow) + * Cloner - AE - Etablissement + * Cloner - AE - Unite Legale + * Cloner - BAN - Adresses + * Cloner - BAN - Lieux-dits +1. Préparation de la donnée (Airflow + DBT) : + * DBT - Rafraîchir les acteurs affichés + * 🔄 Enrichir - Rafraîchir les modèles DBT +1. Création des suggestions (Airflow + DBT) : + * 🚪 Enrichir - Acteurs Fermés + +```mermaid +graph LR + A[Cloner - AE - Etablissement] + B[Cloner - AE - Unite Legale] + C[Cloner - BAN - Adresses] + D[Cloner - BAN - Lieux-dits] + E[DBT - Rafraîchir les acteurs affichés] + F[🔄 Enrichir - Rafraîchir les modèles DBT] + G[🚪 Enrichir - Acteurs Fermés] + A --> F + B --> F + C --> F + D --> F + F --> G + E --> G +``` + +## Script de cohérence + +### Vérification des URLs + +le DAG `🔗 Crawl - URLs - Suggestions` collecte les URLs des acteurs et parcourt ces URL pour vérifier qu'elles sont valident diff --git a/qfdmo/models/acteur.py b/qfdmo/models/acteur.py index 84fac26d9..3d9bcda77 100644 --- a/qfdmo/models/acteur.py +++ b/qfdmo/models/acteur.py @@ -638,6 +638,41 @@ def commentaires_ajouter(self, added): self.save() + def instance_copy( + self, + overriden_fields={ + "identifiant_unique": None, + "identifiant_externe": None, + "source": None, + }, + ): + if isinstance(self, RevisionActeur) and self.is_parent: + raise Exception("Impossible de dupliquer un acteur parent") + + if isinstance(self, RevisionActeur): + acteur = Acteur.objects.get(identifiant_unique=self.identifiant_unique) + acteur.instance_copy(overriden_fields=overriden_fields) + + new_instance = deepcopy(self) + + for field, value in overriden_fields.items(): + setattr(new_instance, field, value) + + new_instance.save() + new_instance.labels.set(self.labels.all()) + new_instance.acteur_services.set(self.acteur_services.all()) + + # recreate proposition_services for the new revision_acteur + for proposition_service in self.proposition_services.all(): + new_proposition_service = proposition_service.__class__.objects.create( + acteur=new_instance, + action=proposition_service.action, + ) + new_proposition_service.sous_categories.set( + proposition_service.sous_categories.all() + ) + return new_instance + def clean_parent(parent): try: @@ -895,7 +930,14 @@ def create_parent(self): self.save() return revision_acteur_parent - def duplicate(self): + def duplicate( + self, + fields_to_reset={ + "identifiant_unique": None, + "identifiant_externe": None, + "source": None, + }, + ): if self.is_parent: raise Exception("Impossible de dupliquer un acteur parent") @@ -903,11 +945,6 @@ def duplicate(self): acteur = Acteur.objects.get(identifiant_unique=self.identifiant_unique) - fields_to_reset = [ - "identifiant_unique", - "identifiant_externe", - "source", - ] fields_to_ignore = [ "labels", "acteur_services", @@ -916,12 +953,12 @@ def duplicate(self): "parent_reason", ] - for field in fields_to_reset: - setattr(revision_acteur, field, None) + for field, value in fields_to_reset.items(): + setattr(revision_acteur, field, value) for field in revision_acteur._meta.fields: if ( not getattr(revision_acteur, field.name) - and field.name not in fields_to_reset + and field.name not in fields_to_reset.keys() and field.name not in fields_to_ignore ): setattr(revision_acteur, field.name, getattr(acteur, field.name)) @@ -931,11 +968,9 @@ def duplicate(self): # recreate proposition_services for the new revision_acteur for proposition_service in self.proposition_services.all(): - revision_proposition_service = revision_proposition_service = ( - RevisionPropositionService.objects.create( - acteur=revision_acteur, - action=proposition_service.action, - ) + revision_proposition_service = RevisionPropositionService.objects.create( + acteur=revision_acteur, + action=proposition_service.action, ) revision_proposition_service.sous_categories.set( proposition_service.sous_categories.all() diff --git a/templates/data/_partials/value_details.html b/templates/data/_partials/value_details.html index 693887542..88315ed1e 100644 --- a/templates/data/_partials/value_details.html +++ b/templates/data/_partials/value_details.html @@ -47,7 +47,7 @@ {% elif key == "identifiant_unique" or key == "id" %} {{ value }} (base, - rev, + rev, disp) {% elif key == "statut" %} {{ value }} diff --git a/unit_tests/data/models/changes/test_acteur_create_as_child.py b/unit_tests/data/models/changes/test_acteur_create_as_child.py deleted file mode 100644 index 325b5bcdf..000000000 --- a/unit_tests/data/models/changes/test_acteur_create_as_child.py +++ /dev/null @@ -1,72 +0,0 @@ -import pytest -from django.contrib.gis.geos import Point - -from data.models.changes.acteur_create_as_child import ChangeActeurCreateAsChild -from qfdmo.models import Acteur, RevisionActeur -from unit_tests.qfdmo.acteur_factory import ( - ActeurFactory, - ActeurTypeFactory, - SourceFactory, -) - - -@pytest.mark.django_db -class TestChangeActeurCreateAsChild: - @pytest.mark.parametrize( - "data,missing", - [({"parent": "456"}, "parent_reason"), ({"parent_reason": "test"}, "parent")], - ) - def test_raise_if_missing_params(self, data, missing): - change = ChangeActeurCreateAsChild(id="123", data=data) - with pytest.raises(ValueError, match=f"champ '{missing}' à renseigner"): - change.apply() - - def test_raise_if_acteur_exists(self): - ActeurFactory(identifiant_unique="123") - change = ChangeActeurCreateAsChild( - id="123", data={"parent": "456", "parent_reason": "test"} - ) - with pytest.raises(ValueError, match="existe déjà"): - change.apply() - - def test_working(self): - # Create parent - source = SourceFactory(code="source1") - atype = ActeurTypeFactory(code="atype1") - parent = RevisionActeur.objects.create( - identifiant_unique="parent1", - source=source, - acteur_type=atype, - statut="ACTIF", - location=Point(1, 1), - ) - # Create child - change = ChangeActeurCreateAsChild( - id="child1", - data={ - "nom": "my child1", - "source": source, - "acteur_type": atype, - "statut": "ACFIF", - "location": Point(1, 1), - "parent": parent, - "parent_reason": "test", - }, - ) - change.apply() - - # Acteur created in base to hold the core data - base = Acteur.objects.get(pk="child1") - assert base.identifiant_unique == "child1" - assert base.nom == "my child1" - assert base.source.pk == source.pk - assert base.acteur_type.pk == atype.pk - assert base.statut == "ACFIF" - assert base.location.x == 1 - assert base.location.y == 1 - - # Acteur created in revision to hold the parent reference - revision = RevisionActeur.objects.get(pk="child1") - assert revision.parent.pk == parent.pk - assert revision.parent_reason == "test" - assert not revision.nom diff --git a/unit_tests/data/models/changes/test_acteur_update_data.py b/unit_tests/data/models/changes/test_acteur_update_data.py deleted file mode 100644 index 4ffd48a8d..000000000 --- a/unit_tests/data/models/changes/test_acteur_update_data.py +++ /dev/null @@ -1,82 +0,0 @@ -""" -Test file for the ChangeActeurUpdateData model. - -""" - -import pytest -from django.contrib.gis.geos import Point -from pydantic import ValidationError - -from dags.sources.config.shared_constants import EMPTY_ACTEUR_FIELD -from data.models.changes.acteur_update_data import ChangeActeurUpdateData -from qfdmo.models.acteur import Acteur, ActeurType, RevisionActeur - - -@pytest.mark.django_db -class TestChangeActeurUpdateData: - def test_name(self): - assert ChangeActeurUpdateData.name() == "acteur_update_data" - - def test_raise_if_no_data_provided(self): - change = ChangeActeurUpdateData(id="dummy", data={}) - with pytest.raises(ValueError, match="No data provided"): - change.apply() - - @pytest.mark.parametrize("data", [None, True]) - def test_raise_if_data_wrong_type(self, data): - with pytest.raises(ValidationError): - ChangeActeurUpdateData(id="dummy", data=data) - - def test_raise_if_acteur_does_not_exist(self): - change = ChangeActeurUpdateData(id="dummy", data={"nom": "test"}) - with pytest.raises(Acteur.DoesNotExist): - change.apply() - - def test_working_case(self): - # We start by creating acteur only in base - at1 = ActeurType.objects.create(code="at1") - at2 = ActeurType.objects.create(code="at2") - base = Acteur.objects.create(nom="test", acteur_type=at1, location=Point(1, 2)) - - # We check that acteur isn't in revision yet - assert RevisionActeur.objects.filter(pk=base.pk).count() == 0 - - # Then we modify its data, notably some Python and Django types - data = {"nom": "test2", "acteur_type": at2.pk, "location": Point(2, 3)} - ChangeActeurUpdateData(id=base.pk, data=data).apply() - - # We check that acteur was created in revision with the right data - rev = RevisionActeur.objects.get(pk=base.pk) - assert rev.nom == "test2" - assert rev.acteur_type == at2 - assert rev.location.x == 2 - assert rev.location.y == 3 - - # We check that acteur in base wasn't modified - base = Acteur.objects.get(pk=base.pk) - assert base.nom == "test" - assert base.acteur_type == at1 - assert base.location.x == 1 - assert base.location.y == 2 - - # If we perform the some changes again, revision is again - # updated AND there is no conflit / creation of a new revision - data = {"nom": "test3"} - ChangeActeurUpdateData(id=base.pk, data=data).apply() - rev = RevisionActeur.objects.get(pk=base.pk) - assert rev.nom == "test3" - assert rev.acteur_type == at2 - assert RevisionActeur.objects.filter(pk=base.pk).count() == 1 - - def test_set_to_empty(self): - # This is a special case for the Crawl URLs DAG - # which prompted to create this change model, so we - # need to ensure that specific case works - at1 = ActeurType.objects.create(code="at1") - base = Acteur.objects.create( - nom="test", acteur_type=at1, location=Point(1, 2), url="https://example.com" - ) - data = {"url": EMPTY_ACTEUR_FIELD} - ChangeActeurUpdateData(id=base.pk, data=data).apply() - rev = RevisionActeur.objects.get(pk=base.pk) - assert rev.url == EMPTY_ACTEUR_FIELD diff --git a/unit_tests/data/models/changes/test_acteur_update_revision.py b/unit_tests/data/models/changes/test_acteur_update_revision.py new file mode 100644 index 000000000..92486a6da --- /dev/null +++ b/unit_tests/data/models/changes/test_acteur_update_revision.py @@ -0,0 +1,58 @@ +from unittest.mock import patch + +import pytest + +from data.models.changes.acteur_update_revision import ChangeActeurUpdateRevision +from qfdmo.models import ActeurStatus, RevisionActeur +from unit_tests.qfdmo.acteur_factory import ActeurFactory, PropositionServiceFactory + + +@pytest.mark.django_db +class TestChangeActeurUpdateRsevision: + def test_model_name(self): + assert ChangeActeurUpdateRevision.name() == "acteur_update_revision" + + def test_raise_if_acteur_does_not_exist(self): + change = ChangeActeurUpdateRevision( + id="dummy", data={"identifiant_unique": "new_id"} + ) + with pytest.raises(ValueError, match="L'acteur cible dummy n'existe pas"): + change.validate() + + def test_raise_if_no_data_provided(self): + acteur = ActeurFactory() + change = ChangeActeurUpdateRevision(id=acteur.pk, data={}) + with pytest.raises(ValueError, match="Aucune donnée fournie"): + change.validate() + + @patch.object(ChangeActeurUpdateRevision, "validate") + def test_validate_is_called_by_apply(self, mock_validate): + acteur = ActeurFactory(statut=ActeurStatus.INACTIF) + change = ChangeActeurUpdateRevision(id=acteur.pk, data={"statut": "ACTIF"}) + change.apply() + mock_validate.assert_called_once() + + def test_without_revision_ok(self): + # Création d'un acteur inactif avec des données de base + acteur = ActeurFactory( + nom="foo", + statut=ActeurStatus.ACTIF, + ) + proposition = PropositionServiceFactory() + acteur.proposition_services.add(proposition) + + change = ChangeActeurUpdateRevision( + id=acteur.pk, + data={"nom": "bar", "statut": ActeurStatus.INACTIF}, + ) + change.apply() + + assert acteur.statut == ActeurStatus.ACTIF + assert acteur.nom == "foo" + assert acteur.proposition_services.count() == 1 + + revision = RevisionActeur.objects.get(pk=acteur.pk) + + assert revision.statut == ActeurStatus.INACTIF + assert revision.nom == "bar" + assert revision.proposition_services.count() == 1