Skip to content

Commit 6aa8320

Browse files
authored
1️⃣ Suggestions: migrer vers 1 seul template (#1594)
* Suggestions: migrer vers 1 seul template * fix clustering context & rgpd tests * supprimer template clustering
1 parent f0e2699 commit 6aa8320

File tree

19 files changed

+207
-248
lines changed

19 files changed

+207
-248
lines changed

dags/cluster/tasks/business_logic/cluster_acteurs_suggestions/context.py

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,28 @@
11
"""Generates the data stored in Suggestion.contexte field"""
22

3+
import logging
4+
35
import pandas as pd
46
from cluster.config.constants import COL_PARENT_ID_BEFORE
57
from utils import logging_utils as log
68

9+
logger = logging.getLogger(__name__)
10+
711

812
def suggestion_context_generate(
913
df_cluster: pd.DataFrame,
1014
cluster_fields_exact: list[str],
1115
cluster_fields_fuzzy: list[str],
12-
) -> dict:
16+
) -> dict | None:
1317
"""Generates a dict for use in Suggestion.contexte field"""
1418
from data.models.change import COL_CHANGE_MODEL_NAME
1519
from data.models.changes import ChangeActeurCreateAsParent
1620

17-
clusters_cnt = df_cluster["cluster_id"].nunique()
18-
if clusters_cnt != 1:
19-
msg = f"We create contexte for 1 cluster at a time, got {clusters_cnt}"
21+
cluster_ids = df_cluster["cluster_id"].unique()
22+
if len(cluster_ids) != 1:
23+
msg = f"We create contexte for 1 cluster at a time, got {cluster_ids=}"
2024
raise ValueError(msg)
25+
cluster_id = cluster_ids[0]
2126

2227
# Exclude parents-to-be-created as non-existing thus not part of clustering context
2328
df_cluster = df_cluster[
@@ -33,6 +38,14 @@ def suggestion_context_generate(
3338
# - TODO: we will need to make the below exclusion conditional on feature activation
3439
df_cluster = df_cluster[df_cluster[COL_PARENT_ID_BEFORE].isnull()]
3540

41+
# Can happen when DAG to refresh displayed acteurs wasn't ran:
42+
# - we can have data in displayed but in reality underlying acteurs
43+
# are not longer ACTIVE nor present (e.g. removed parents)
44+
if df_cluster.empty:
45+
msg = f"{cluster_id=} vide: bien rafraîchir les acteurs affichés"
46+
logger.warning(msg)
47+
return None
48+
3649
# Ensuring we have 1 exact group:
3750
exacts = df_cluster.groupby(cluster_fields_exact)
3851
groups = list(exacts.groups.keys())
@@ -42,9 +55,10 @@ def suggestion_context_generate(
4255
log.preview("cluster problématique", df_cluster)
4356
raise ValueError(msg)
4457

45-
result = {}
46-
result["exact_match"] = dict(zip(cluster_fields_exact, groups[0])) # type: ignore
4758
cols = ["identifiant_unique"] + cluster_fields_fuzzy
48-
result["fuzzy_details"] = df_cluster[cols].to_dict(orient="records")
59+
context = {}
60+
context["cluster_id"] = cluster_id
61+
context["exact_match"] = dict(zip(cluster_fields_exact, groups[0])) # type: ignore
62+
context["fuzzy_details"] = df_cluster[cols].to_dict(orient="records")
4963

50-
return result
64+
return context

dags/cluster/tasks/business_logic/cluster_acteurs_suggestions/prepare.py

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ def cluster_changes_get(cluster: pd.DataFrame) -> list[dict]:
1717
COL_CHANGE_ENTITY_TYPE,
1818
COL_CHANGE_MODEL_NAME,
1919
COL_CHANGE_MODEL_PARAMS,
20-
COL_CHANGE_NAMESPACE,
2120
COL_CHANGE_ORDER,
2221
COL_CHANGE_REASON,
2322
SuggestionChange,
@@ -31,13 +30,16 @@ def cluster_changes_get(cluster: pd.DataFrame) -> list[dict]:
3130
)
3231
from qfdmo.models.acteur import RevisionActeur
3332

33+
# Building changes for each acteur in the cluster
3434
changes = []
3535
for _, row in cluster.iterrows():
36+
37+
# Common params
3638
row = row.to_dict()
3739
model_name = row[COL_CHANGE_MODEL_NAME]
38-
# All acteur-related changes have an "identifiant_unique"
3940
model_params = {"id": row["identifiant_unique"]}
40-
# Then params data depends on the type of changes
41+
42+
# Params specific to the type of change
4143
if model_name == ChangeActeurUpdateParentId.name():
4244
model_params["data"] = {"parent_id": row["parent_id"]}
4345
elif model_name in [
@@ -46,11 +48,9 @@ def cluster_changes_get(cluster: pd.DataFrame) -> list[dict]:
4648
]:
4749
model_params["data"] = row[COL_PARENT_DATA_NEW]
4850
elif model_name == ChangeActeurVerifyRevision.name():
49-
# no extra params to pass for this one
50-
pass
51+
pass # no additional param, we just check presence
5152
elif model_name == ChangeActeurDeleteAsParent.name():
52-
# to delete parents we already have their id
53-
pass
53+
pass # no additional param, we just delete using id
5454
else:
5555
raise ValueError(f"Unexpected model_name: {model_name}")
5656

@@ -60,21 +60,20 @@ def cluster_changes_get(cluster: pd.DataFrame) -> list[dict]:
6060

6161
# Validation
6262
row[COL_CHANGE_MODEL_PARAMS] = model_params
63-
change = {
64-
x.replace(COL_CHANGE_NAMESPACE, ""): row[x]
65-
for x in [
66-
COL_CHANGE_ORDER,
67-
COL_CHANGE_REASON,
68-
COL_CHANGE_ENTITY_TYPE,
69-
COL_CHANGE_MODEL_NAME,
70-
COL_CHANGE_MODEL_PARAMS,
71-
]
72-
}
7363
try:
64+
change = {
65+
"order": row[COL_CHANGE_ORDER],
66+
"reason": row[COL_CHANGE_REASON],
67+
"entity_type": row[COL_CHANGE_ENTITY_TYPE],
68+
"model_name": row[COL_CHANGE_MODEL_NAME],
69+
"model_params": row[COL_CHANGE_MODEL_PARAMS],
70+
}
7471
SuggestionChange(**change)
7572
except Exception as e:
7673
log.preview("🔴 Broken change", change)
7774
raise e
75+
76+
# Appending
7877
changes.append(change)
7978
return changes
8079

@@ -92,12 +91,10 @@ def cluster_acteurs_suggestions_prepare(
9291
# whenever we encounter issues
9392
try:
9493
changes = cluster_changes_get(cluster)
95-
suggestion = {"cluster_id": cluster_id, "changes": changes}
96-
cluster_id = suggestion["cluster_id"]
94+
title = "📦 Cluster/dédup"
95+
suggestion = {"title": title, "cluster_id": cluster_id, "changes": changes}
9796
df_changes = pd.DataFrame(suggestion["changes"])
98-
log.preview_df_as_markdown(
99-
f"Suggestion pour cluster_id={cluster_id}", df_changes
100-
)
97+
log.preview_df_as_markdown(f"Suggestion pour {cluster_id=}", df_changes)
10198
working.append(suggestion)
10299
except Exception:
103100
import traceback

dags/cluster/tasks/business_logic/cluster_acteurs_suggestions/to_db.py

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,11 @@ def cluster_acteurs_suggestions_to_db(
1818
suggestions: list[dict],
1919
identifiant_action: str,
2020
identifiant_execution: str,
21+
# For context purposes only
2122
cluster_fields_exact: list[str],
2223
cluster_fields_fuzzy: list[str],
2324
) -> None:
24-
"""Writing suggestions to DB
25-
26-
Args:
27-
df_clusters (pd.DataFrame): clusters for metadata purposes
28-
suggestions (list[dict]): suggestions, 1 per cluster
29-
identifiant_action (str): ex: Airflow dag_id
30-
identifiant_execution (str): ex: Airflow run_id
31-
"""
25+
"""Writing suggestions to DB"""
3226

3327
logger.info(f"{identifiant_action=}")
3428
logger.info(f"{identifiant_execution=}")
@@ -50,6 +44,10 @@ def cluster_acteurs_suggestions_to_db(
5044
cohorte.save()
5145

5246
for sugg_dict in suggestions:
47+
# TODO: refactor this DAG to generate suggestions & context
48+
# at the same time like we do for latest DAGs (BAN, AE etc..)
49+
# which would allow getting all data from context and not mismatch
50+
# context & changes
5351
cluster_id = sugg_dict["cluster_id"]
5452
logging.info(f"suggestion {cluster_id=}")
5553
df_cluster = df_clusters[df_clusters["cluster_id"] == cluster_id]
@@ -59,18 +57,20 @@ def cluster_acteurs_suggestions_to_db(
5957
msg = "Cluster vide = présent en suggestion mais plus dans df_clusters!!!"
6058
raise ValueError(msg)
6159

60+
# TODO: do the context + suggestion changes generation at once
61+
# just like for latest DAGs (BAN, AE etc..) to avoid mismatch
62+
# and hacks like the one below
63+
contexte = suggestion_context_generate(
64+
df_cluster=df_cluster,
65+
cluster_fields_exact=cluster_fields_exact,
66+
cluster_fields_fuzzy=cluster_fields_fuzzy,
67+
)
68+
if contexte is None:
69+
continue
6270
sugg_obj = Suggestion(
6371
suggestion_cohorte=cohorte,
6472
statut=SuggestionStatut.AVALIDER,
65-
# FIXME:
66-
# This is causing serialization issues due to certain values
67-
# being of Django type (e.g. ActeurType)
68-
# contexte=cluster.to_dict(orient="records"),
69-
contexte=suggestion_context_generate(
70-
df_cluster=df_cluster,
71-
cluster_fields_exact=cluster_fields_exact,
72-
cluster_fields_fuzzy=cluster_fields_fuzzy,
73-
),
73+
contexte=contexte,
7474
suggestion=sugg_dict,
7575
)
7676
log.preview(f"{cluster_id=} suggestion dict", sugg_dict)

dags/crawl/config/constants.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,5 @@
66
COLS.URLS_TO_TRY,
77
]
88

9-
LABEL_SCENARIO = "Scénario"
109
LABEL_URL_ORIGINE = "URL d'origine"
1110
LABEL_URL_PROPOSEE = "URL proposée"

dags/crawl/tasks/business_logic/crawl_urls_suggest.py

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import pandas as pd
44
from crawl.config.columns import COLS
5-
from crawl.config.constants import LABEL_SCENARIO, LABEL_URL_ORIGINE, LABEL_URL_PROPOSEE
5+
from crawl.config.constants import LABEL_URL_ORIGINE, LABEL_URL_PROPOSEE
66
from utils import logging_utils as log
77
from utils.dataframes import (
88
df_col_assert_get_unique,
@@ -70,14 +70,11 @@ def suggestions_prepare(
7070
suggestions.append(
7171
{
7272
"contexte": {
73-
LABEL_SCENARIO: row[COLS.COHORT],
7473
LABEL_URL_ORIGINE: url_original,
7574
LABEL_URL_PROPOSEE: url_proposed,
7675
},
7776
"suggestion": {
78-
"scenario": row[COLS.COHORT],
79-
"url_original": url_original,
80-
"url_proposed": url_proposed,
77+
"title": row[COLS.COHORT],
8178
"changes": changes,
8279
},
8380
}
@@ -86,9 +83,9 @@ def suggestions_prepare(
8683
logging.info(log.banner_string(f"🏁 Résultat pour {cohorte=}"))
8784
logger.info(f"Suggestion générées: {len(suggestions)}")
8885
for s in suggestions:
89-
scenario = s["contexte"]["Scénario"]
86+
title = s["suggestion"]["title"]
9087
url = s["contexte"][LABEL_URL_ORIGINE]
91-
log.preview(f"{scenario}: {url=}", s)
88+
log.preview(f"{title}: {url=}", s)
9289

9390
return suggestions
9491

@@ -99,14 +96,7 @@ def crawl_urls_suggestions_to_db(
9996
identifiant_action: str,
10097
identifiant_execution: str,
10198
) -> None:
102-
"""Writing suggestions to DB
103-
104-
Args:
105-
df_clusters (pd.DataFrame): clusters for metadata purposes
106-
suggestions (list[dict]): suggestions, 1 per cluster
107-
identifiant_action (str): ex: Airflow dag_id
108-
identifiant_execution (str): ex: Airflow run_id
109-
"""
99+
"""Writing suggestions to DB"""
110100

111101
logger.info(f"{identifiant_action=}")
112102
logger.info(f"{identifiant_execution=}")

dags/enrich/tasks/business_logic/enrich_dbt_model_to_suggestions.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
)
1212
from utils import logging_utils as log
1313

14+
from data.models.changes.acteur_rgpd_anonymize import rgpd_data_get
15+
1416
logger = logging.getLogger(__name__)
1517

1618

@@ -55,7 +57,12 @@ def changes_prepare_villes(row: dict) -> tuple[list[dict], dict]:
5557
entity_type="acteur_displayed",
5658
)
5759
)
58-
contexte = {} # changes are self-explanatory
60+
contexte = {
61+
"statut": row[COLS.ACTEUR_STATUT],
62+
"adresse": row[COLS.ACTEUR_ADRESSE],
63+
"ville": row[COLS.ACTEUR_VILLE],
64+
"code_postal": row[COLS.ACTEUR_CODE_POSTAL],
65+
}
5966
return changes, contexte
6067

6168

@@ -68,6 +75,7 @@ def changes_prepare_rgpd(
6875
changes = []
6976
model_params = {
7077
"id": row[COLS.ACTEUR_ID],
78+
"data": rgpd_data_get(),
7179
}
7280
changes.append(
7381
changes_prepare(
@@ -96,7 +104,6 @@ def changes_prepare_closed_not_replaced(
96104
model_params = {
97105
"id": row[COLS.ACTEUR_ID],
98106
"data": {
99-
"identifiant_unique": row[COLS.ACTEUR_ID],
100107
"statut": ActeurStatus.INACTIF,
101108
"siret": row[COLS.ACTEUR_SIRET],
102109
"siren": row[COLS.ACTEUR_SIRET][:9],
@@ -134,7 +141,6 @@ def changes_prepare_closed_replaced(
134141
# Parent
135142
parent_id = parent_id_generate([str(row[COLS.SUGGEST_SIRET])])
136143
parent_data = dbt_model_row_to_suggest_data(row)
137-
parent_data["identifiant_unique"] = parent_id
138144
parent_data["source"] = None
139145
parent_data["statut"] = ActeurStatus.ACTIF
140146
params_parent = {
@@ -158,7 +164,6 @@ def changes_prepare_closed_replaced(
158164
child_new_id = f"{row[COLS.ACTEUR_ID]}_{row[COLS.ACTEUR_SIRET]}_{now}"
159165
params_child_new = params_parent.copy()
160166
params_child_new["id"] = child_new_id
161-
params_child_new["data"]["identifiant_unique"] = child_new_id
162167
params_child_new["data"]["source"] = row[COLS.ACTEUR_SOURCE_ID]
163168
params_child_new["data"]["parent"] = parent_id
164169
params_child_new["data"]["parent_reason"] = (
@@ -180,7 +185,6 @@ def changes_prepare_closed_replaced(
180185
# Existing Child
181186
params_child_old = params_child_new.copy()
182187
params_child_old["id"] = row[COLS.ACTEUR_ID]
183-
params_child_old["data"]["identifiant_unique"] = row[COLS.ACTEUR_ID]
184188
params_child_old["data"]["parent"] = parent_id
185189
params_child_old["data"]["parent_reason"] = (
186190
f"SIRET {row[COLS.ACTEUR_SIRET]} "

dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_suggestions_contexte_generate.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,7 @@ def working(self, df_cluster) -> dict:
4444
def test_working_structure(self, working, df_cluster):
4545
assert isinstance(working, dict)
4646
assert sorted(list(working.keys())) == sorted(
47-
[
48-
# By definition there is only 1 exact match
49-
# per cluster
50-
"exact_match",
51-
# And multiple fuzzy entries
52-
"fuzzy_details",
53-
]
47+
["cluster_id", "exact_match", "fuzzy_details"]
5448
)
5549
# Parent to be created should not be in fuzzy details
5650
assert len(working["fuzzy_details"]) == len(df_cluster) - 1

dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_suggestions_prepare.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,10 +142,7 @@ def failing(self, suggestions):
142142
def test_structure_and_type(self, working):
143143
assert isinstance(working, list)
144144
assert isinstance(working[0], dict)
145-
assert list(working[0].keys()) == ["cluster_id", "changes"]
146-
assert isinstance(working[0]["cluster_id"], str)
147-
assert isinstance(working[0]["changes"], list)
148-
assert isinstance(working[0]["changes"][0], dict)
145+
assert list(working[0].keys()) == ["title", "cluster_id", "changes"]
149146

150147
def test_one_suggestion_per_cluster(self, df_clusters, working):
151148
# 1 suggestion per cluster EXCEPT for failing c4

dags_unit_tests/cluster/tasks/business_logic/test_cluster_acteurs_suggestions_to_db_and_apply.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ def test_each_suggestion_has_one_change_per_acteur(self, df, suggestions_from_db
320320
# Going through each suggestion to test its content
321321
cluster_ids_found = set()
322322
for s in suggestions_from_db:
323-
cluster_id = s.suggestion["cluster_id"]
323+
cluster_id = s.contexte["cluster_id"]
324324
changes = s.suggestion["changes"]
325325
cluster_ids_found.add(cluster_id)
326326

dags_unit_tests/enrich/tasks/test_enrich_suggestions_cities.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ def df_new(self):
2525
COLS.SUGGEST_COHORT: [COHORTS.VILLES_NEW] * 3,
2626
COLS.SUGGEST_VILLE: ["new town 1", "new town 2", "closed"],
2727
COLS.ACTEUR_ID: ["new1", "new2", "closed 1"],
28+
COLS.ACTEUR_ADRESSE: ["1 rue", "2 rue", "3 rue"],
29+
COLS.ACTEUR_CODE_POSTAL: ["12345", "67890", "54321"],
2830
COLS.ACTEUR_VILLE: ["old town 1", "old town 2", "closed"],
2931
COLS.ACTEUR_STATUT: ["ACTIF", "ACTIF", "INACTIF"],
3032
}
@@ -38,6 +40,8 @@ def df_typo(self):
3840
COLS.SUGGEST_COHORT: [COHORTS.VILLES_TYPO] * 3,
3941
COLS.SUGGEST_VILLE: ["Paris", "Laval", "closed"],
4042
COLS.ACTEUR_ID: ["typo1", "typo2", "closed 2"],
43+
COLS.ACTEUR_ADRESSE: ["1 rue", "2 rue", "3 rue"],
44+
COLS.ACTEUR_CODE_POSTAL: ["12345", "67890", "54321"],
4145
COLS.ACTEUR_VILLE: ["Pâris", "Lâval", "closed"],
4246
COLS.ACTEUR_STATUT: ["ACTIF", "ACTIF", "INACTIF"],
4347
}

0 commit comments

Comments
 (0)