From 9933bed214cb110d20d62e52273cf49e4df596bc Mon Sep 17 00:00:00 2001 From: Nicolas Oudard Date: Tue, 15 Apr 2025 10:15:03 +0200 Subject: [PATCH 1/3] =?UTF-8?q?ajouter=20un=20index=20lors=20de=20la=20r?= =?UTF-8?q?=C3=A9solution=20de=20mapping?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../business_logic/keep_acteur_changed.py | 12 ++++ .../test_keep_acteur_changed.py | 69 +++++++++++++++++++ qfdmo/admin/acteur.py | 2 + .../commands/update_external_ids.py | 20 +++++- 4 files changed, 102 insertions(+), 1 deletion(-) diff --git a/dags/sources/tasks/business_logic/keep_acteur_changed.py b/dags/sources/tasks/business_logic/keep_acteur_changed.py index 45e42a3ba..55bfa73b6 100644 --- a/dags/sources/tasks/business_logic/keep_acteur_changed.py +++ b/dags/sources/tasks/business_logic/keep_acteur_changed.py @@ -91,6 +91,18 @@ def retrieve_identifiant_unique_from_existing_acteur( axis=1, ) + # find the duplicated identifiant in df_acteur_from_db and raise if any because + # we can't resolve simply the mapping between source and db + duplicates = df_acteur_from_db[ + df_acteur_from_db.duplicated("identifiant", keep=False) + ] + if not duplicates.empty: + logger.warning( + "Duplicated identifiant in df_acteur_from_db" + f" {duplicates["identifiant"].tolist()}" + ) + raise ValueError("Duplicated identifiant in df_acteur_from_db") + # Replace identifiant_unique (from source) by identifiant (from db) for acteur # which doesn't have corelation between source, external_id and identifiant_unique df_normalized.set_index("identifiant", inplace=True) diff --git a/dags_unit_tests/sources/tasks/business_logic/test_keep_acteur_changed.py b/dags_unit_tests/sources/tasks/business_logic/test_keep_acteur_changed.py index 76857f8cd..3fc087065 100644 --- a/dags_unit_tests/sources/tasks/business_logic/test_keep_acteur_changed.py +++ b/dags_unit_tests/sources/tasks/business_logic/test_keep_acteur_changed.py @@ -991,3 +991,72 @@ def test_keep_acteur_changed_same_acteur_but_different_identifiant_unique(dag_co pd.testing.assert_frame_equal(df_acteur, df_expected, check_dtype=False) pd.testing.assert_frame_equal(df_acteur_from_db, df_expected, check_dtype=False) assert metadata == {} + + +def test_keep_acteur_changed_same_acteur_but_different_identifiant_unique2(dag_config): + df_normalized = pd.DataFrame( + { + "nom": ["nom 1", "nom 2"], + "identifiant_unique": ["source1_id1", "source1_id2"], + "source_code": ["source1", "source1"], + "identifiant_externe": ["id1", "id2"], + "label_codes": [[], []], + "acteur_type_code": [[], []], + "acteur_service_codes": [[], []], + "proposition_service_codes": [[], []], + } + ) + df_acteur_from_db = pd.DataFrame( + { + "nom": ["nom 1", "nom 3"], + "identifiant_unique": ["source1_id_old", "source1_id3"], + "source_code": ["source1", "source1"], + "identifiant_externe": ["id1", "id3"], + "label_codes": [[], []], + "acteur_type_code": [[], []], + "acteur_service_codes": [[], []], + "proposition_service_codes": [[], []], + } + ) + df_expected = pd.DataFrame( + { + "nom": ["nom 2"], + "identifiant_unique": ["source1_id2"], + "source_code": ["source1"], + "identifiant_externe": ["id2"], + "label_codes": [[]], + "acteur_type_code": [[]], + "acteur_service_codes": [[]], + "proposition_service_codes": [[]], + } + ) + df_expected_from_db = pd.DataFrame( + { + "nom": ["nom 3"], + "identifiant_unique": ["source1_id3"], + "source_code": ["source1"], + "identifiant_externe": ["id3"], + "label_codes": [[]], + "acteur_type_code": [[]], + "acteur_service_codes": [[]], + "proposition_service_codes": [[]], + } + ) + + df_acteur, df_acteur_from_db, metadata = keep_acteur_changed( + df_normalized=df_normalized, + df_acteur_from_db=df_acteur_from_db, + dag_config=dag_config, + ) + + pd.testing.assert_frame_equal( + df_acteur.reset_index(drop=True), + df_expected.reset_index(drop=True), + check_dtype=False, + ) + pd.testing.assert_frame_equal( + df_acteur_from_db.reset_index(drop=True), + df_expected_from_db.reset_index(drop=True), + check_dtype=False, + ) + assert metadata == {} diff --git a/qfdmo/admin/acteur.py b/qfdmo/admin/acteur.py index 822f4adc4..864e513e5 100644 --- a/qfdmo/admin/acteur.py +++ b/qfdmo/admin/acteur.py @@ -155,6 +155,7 @@ class BaseActeurAdmin(admin.GISModelAdmin): "siret", "siren", "identifiant_unique", + "identifiant_externe", "code_postal", "ville", "adresse", @@ -163,6 +164,7 @@ class BaseActeurAdmin(admin.GISModelAdmin): search_fields = [ "code_postal", "identifiant_unique", + "identifiant_externe", "nom__unaccent", "siret", "siren", diff --git a/qfdmo/management/commands/update_external_ids.py b/qfdmo/management/commands/update_external_ids.py index 55fc7a28f..643935573 100644 --- a/qfdmo/management/commands/update_external_ids.py +++ b/qfdmo/management/commands/update_external_ids.py @@ -3,7 +3,7 @@ from django.core.management.base import BaseCommand -from qfdmo.models.acteur import Acteur, RevisionActeur +from qfdmo.models.acteur import Acteur, ActeurStatus, RevisionActeur CHUNK = 1000 @@ -53,6 +53,22 @@ def handle(self, *args, **options): ) continue + # test acteur with this new external id already exists + acteur_from_db = Acteur.objects.filter( + identifiant_externe=new_id, source__code=source_code + ).first() + id_index = 0 + while acteur_from_db: + id_index += 1 + new_id_indexed = f"{new_id}_{id_index}" + acteur_from_db = Acteur.objects.filter( + identifiant_externe=new_id_indexed, source__code=source_code + ).first() + statut = ActeurStatus.ACTIF + if id_index: + new_id = f"{new_id}_{id_index}" + statut = ActeurStatus.INACTIF + # Update acteur if exists acteur = Acteur.objects.filter( identifiant_externe=old_id, source__code=source_code @@ -69,6 +85,7 @@ def handle(self, *args, **options): ) ) acteur.identifiant_externe = new_id + acteur.statut = statut acteur.save() else: self.stdout.write( @@ -90,6 +107,7 @@ def handle(self, *args, **options): ) ) revision_acteur.identifiant_externe = new_id + revision_acteur.statut = statut revision_acteur.save() if revision_acteur and dry_run: self.stdout.write( From b07ccd09fe71a4ee3a158260b41cb4942106ee56 Mon Sep 17 00:00:00 2001 From: Nicolas Oudard Date: Fri, 25 Apr 2025 08:35:38 +0200 Subject: [PATCH 2/3] plus pythonesque :) --- qfdmo/management/commands/update_external_ids.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/qfdmo/management/commands/update_external_ids.py b/qfdmo/management/commands/update_external_ids.py index 643935573..0a4c1df0b 100644 --- a/qfdmo/management/commands/update_external_ids.py +++ b/qfdmo/management/commands/update_external_ids.py @@ -64,10 +64,12 @@ def handle(self, *args, **options): acteur_from_db = Acteur.objects.filter( identifiant_externe=new_id_indexed, source__code=source_code ).first() - statut = ActeurStatus.ACTIF - if id_index: - new_id = f"{new_id}_{id_index}" + + try: + new_id = new_id_indexed statut = ActeurStatus.INACTIF + except NameError: + statut = ActeurStatus.ACTIF # Update acteur if exists acteur = Acteur.objects.filter( From e8540d98e7c97b1ce556afe9a92a49ff391db67a Mon Sep 17 00:00:00 2001 From: Nicolas Oudard Date: Fri, 25 Apr 2025 10:57:23 +0200 Subject: [PATCH 3/3] =?UTF-8?q?utilisation=20de=20transaction=20pour=20g?= =?UTF-8?q?=C3=A9rer=20l'option=20dry-run?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../commands/update_external_ids.py | 123 +++++++++--------- 1 file changed, 60 insertions(+), 63 deletions(-) diff --git a/qfdmo/management/commands/update_external_ids.py b/qfdmo/management/commands/update_external_ids.py index 0a4c1df0b..259b99541 100644 --- a/qfdmo/management/commands/update_external_ids.py +++ b/qfdmo/management/commands/update_external_ids.py @@ -2,11 +2,10 @@ import json from django.core.management.base import BaseCommand +from django.db import transaction from qfdmo.models.acteur import Acteur, ActeurStatus, RevisionActeur -CHUNK = 1000 - class Command(BaseCommand): help = "Export Ressources using CSV format" @@ -39,48 +38,54 @@ def handle(self, *args, **options): with open(mapping_file, "r") as f: mapping = json.load(f) - for old_id, new_id in mapping.items(): - # Check if the ids are not empty - if old_id and new_id: - self.stdout.write( - self.style.SUCCESS(f"Updating `{old_id}` to `{new_id}`") - ) - else: - self.stdout.write( - self.style.WARNING( - f"Skipping `{old_id}` to `{new_id}` : one of the ids is empty" + with transaction.atomic(): + for old_id, new_id in mapping.items(): + # Check if the ids are not empty + if old_id and new_id: + self.stdout.write( + self.style.SUCCESS(f"Mapping `{old_id}` to `{new_id}`") ) - ) - continue - - # test acteur with this new external id already exists - acteur_from_db = Acteur.objects.filter( - identifiant_externe=new_id, source__code=source_code - ).first() - id_index = 0 - while acteur_from_db: - id_index += 1 - new_id_indexed = f"{new_id}_{id_index}" - acteur_from_db = Acteur.objects.filter( - identifiant_externe=new_id_indexed, source__code=source_code + else: + self.stdout.write( + self.style.WARNING( + f"Skipping `{old_id}` to `{new_id}` : one of the ids is" + " empty" + ) + ) + continue + + # Update acteur if exists + acteur = Acteur.objects.filter( + identifiant_externe=old_id, source__code=source_code ).first() - try: - new_id = new_id_indexed - statut = ActeurStatus.INACTIF - except NameError: - statut = ActeurStatus.ACTIF + if not acteur: + self.stdout.write(self.style.WARNING(f"Acteur {old_id} not found")) + continue - # Update acteur if exists - acteur = Acteur.objects.filter( - identifiant_externe=old_id, source__code=source_code - ).first() + # test acteur with this new external id already exists + acteur_from_db = Acteur.objects.filter( + identifiant_externe=new_id, source__code=source_code + ).first() + id_index = 0 + while acteur_from_db: + self.stdout.write( + self.style.WARNING( + f"Acteur {acteur_from_db.identifiant_externe} already" + " exists, trying to find an unused id" + ) + ) + id_index += 1 + new_id_indexed = f"{new_id}_{id_index}" + acteur_from_db = Acteur.objects.filter( + identifiant_externe=new_id_indexed, source__code=source_code + ).first() - if not acteur: - self.stdout.write(self.style.WARNING(f"Acteur {old_id} not found")) - continue + statut = ActeurStatus.ACTIF + if id_index: + new_id = f"{new_id}_{id_index}" + statut = ActeurStatus.INACTIF - if not dry_run: self.stdout.write( self.style.SUCCESS( f"Updating acteur {acteur.identifiant_unique} to {new_id}" @@ -89,31 +94,23 @@ def handle(self, *args, **options): acteur.identifiant_externe = new_id acteur.statut = statut acteur.save() - else: - self.stdout.write( - self.style.WARNING( - f"Dry run: would update Acteur {old_id} to {new_id}" - ) - ) - # Update revision acteur if exists - revision_acteur = RevisionActeur.objects.filter( - identifiant_externe=old_id, source__code=source_code - ).first() + # Update revision acteur if exists + revision_acteur = RevisionActeur.objects.filter( + identifiant_externe=old_id, source__code=source_code + ).first() - if revision_acteur and not dry_run: - self.stdout.write( - self.style.SUCCESS( - f"Updating revision acteur {revision_acteur.identifiant_unique}" - f" to {new_id}" - ) - ) - revision_acteur.identifiant_externe = new_id - revision_acteur.statut = statut - revision_acteur.save() - if revision_acteur and dry_run: - self.stdout.write( - self.style.WARNING( - f"Dry run: would update RevisionActeur {old_id} to {new_id}" + if revision_acteur: + self.stdout.write( + self.style.SUCCESS( + "Updating revision acteur " + f"{revision_acteur.identifiant_unique} to {new_id}" + ) ) - ) + revision_acteur.identifiant_externe = new_id + revision_acteur.statut = statut + revision_acteur.save() + + if dry_run: + # Rollback the transaction + raise Exception("Rolling back transaction because of dry run")