Skip to content

Commit 8d13331

Browse files
kolokfabienheureux
andauthored
Dédupliquer les lignes qui ont plusieurs EO et prefixé ocab (#1610)
Co-authored-by: Fabien Le Frapper <contact@fabienlefrapper.me>
1 parent 56e183b commit 8d13331

File tree

4 files changed

+177
-7
lines changed

4 files changed

+177
-7
lines changed

dags/sources/dags/source_ocab.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
"destination": "nom",
2525
},
2626
{
27-
"origin": "enseigne_commerciale",
28-
"destination": "nom_commercial",
27+
"origin": "consignes_dacces",
28+
"destination": "description",
2929
},
3030
{
3131
"origin": "longitudewgs84",
@@ -56,6 +56,11 @@
5656
"transformation": "clean_sous_categorie_codes",
5757
"destination": "sous_categorie_codes",
5858
},
59+
{
60+
"origin": "horaires_douverture",
61+
"transformation": "clean_horaires_osm",
62+
"destination": "horaires_osm",
63+
},
5964
# 3. Ajout des colonnes avec une valeur par défaut
6065
{
6166
"column": "statut",
@@ -94,10 +99,15 @@
9499
"transformation": "clean_adresse",
95100
"destination": ["adresse", "code_postal", "ville"],
96101
},
102+
{
103+
"origin": ["telephone", "code_postal"],
104+
"transformation": "clean_telephone",
105+
"destination": ["telephone"],
106+
},
97107
{
98108
"origin": [
99-
# "point_dapport_de_service_reparation",
100-
# "point_de_reparation",
109+
"point_dapport_de_service_reparation",
110+
"point_de_reparation",
101111
"point_dapport_pour_reemploi",
102112
"point_de_collecte_ou_de_reprise_des_dechets",
103113
],
@@ -106,8 +116,8 @@
106116
},
107117
{
108118
"origin": [
109-
# "point_dapport_de_service_reparation",
110-
# "point_de_reparation",
119+
"point_dapport_de_service_reparation",
120+
"point_de_reparation",
111121
"point_dapport_pour_reemploi",
112122
"point_de_collecte_ou_de_reprise_des_dechets",
113123
],
@@ -131,12 +141,18 @@
131141
{"remove": "id_point_apport_ou_reparation"},
132142
{"remove": "point_de_collecte_ou_de_reprise_des_dechets"},
133143
{"remove": "point_dapport_pour_reemploi"},
144+
{"remove": "point_de_reparation"},
145+
{"remove": "point_dapport_de_service_reparation"},
134146
# 6. Colonnes à garder (rien à faire, utilisé pour le controle)
135147
],
136148
"endpoint": (
137149
"https://data.pointsapport.ademe.fr/data-fair/api/v1/datasets/"
138150
"donnees-eo-ocab/lines?size=10000"
139151
),
152+
"oca": {
153+
"prefix": "ocab",
154+
"deduplication_source": True,
155+
},
140156
"validate_address_with_ban": False,
141157
"product_mapping": get_mapping_config(),
142158
},

dags/sources/tasks/airflow_logic/config_management.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ class NormalizationColumnKeep(BaseModel):
4343
keep: str
4444

4545

46+
class OCAConfig(BaseModel):
47+
prefix: str | None = None
48+
deduplication_source: bool = False
49+
50+
4651
class DAGConfig(BaseModel):
4752
normalization_rules: list[
4853
Union[
@@ -62,6 +67,7 @@ class DAGConfig(BaseModel):
6267
product_mapping: dict
6368
source_code: Optional[str] = None
6469
validate_address_with_ban: bool = False
70+
oca: OCAConfig | None = None
6571

6672
@field_validator("endpoint")
6773
def validate_endpoint(cls, endpoint):
@@ -112,6 +118,22 @@ def get_expected_columns(self) -> set[str]:
112118
columns -= set(removed_columns)
113119
return columns
114120

121+
@property
122+
def oca_deduplication_source(self) -> bool:
123+
if self.oca is None:
124+
return False
125+
return bool(self.oca.deduplication_source)
126+
127+
@property
128+
def is_oca(self) -> bool:
129+
return bool(self.oca)
130+
131+
@property
132+
def oca_prefix(self) -> str | None:
133+
if self.oca is None:
134+
return None
135+
return self.oca.prefix
136+
115137

116138
# DEPRECATED
117139
def get_nested_config_parameter(

dags/sources/tasks/business_logic/source_data_normalize.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,25 @@ def _display_warning_about_missing_location(df: pd.DataFrame) -> None:
205205
log.preview("Acteurs sans localisation", df_acteur_sans_loc)
206206

207207

208+
def _manage_oca_config(df: pd.DataFrame, dag_config: DAGConfig) -> pd.DataFrame:
209+
if dag_config.oca_deduplication_source:
210+
df = df.assign(source_code=df["source_code"].str.split("|")).explode(
211+
"source_code"
212+
)
213+
if oca_prefix := dag_config.oca_prefix:
214+
df["source_code"] = df["source_code"].apply(
215+
lambda x: oca_prefix + "_" + x.strip().lower()
216+
)
217+
# Recalcul de l'identifiant unique
218+
normalisation_function = get_transformation_function(
219+
"clean_identifiant_unique", dag_config
220+
)
221+
df[["identifiant_unique"]] = df[["identifiant_externe", "source_code"]].apply(
222+
normalisation_function, axis=1
223+
)
224+
return df
225+
226+
208227
def source_data_normalize(
209228
df_acteur_from_source: pd.DataFrame,
210229
dag_config: DAGConfig,
@@ -239,6 +258,10 @@ def source_data_normalize(
239258
# Merge and delete undesired lines
240259
df, metadata = _remove_undesired_lines(df, dag_config)
241260

261+
# deduplication_on_source_code
262+
if dag_config.is_oca:
263+
df = _manage_oca_config(df, dag_config)
264+
242265
# Check that the dataframe has the expected columns
243266
expected_columns = dag_config.get_expected_columns()
244267

dags_unit_tests/sources/tasks/business_logic/test_source_data_normalize.py

Lines changed: 110 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ def test_remove_explicit_null(self, null_value):
215215
"product_mapping": {"product1": "code1"},
216216
}
217217
)
218-
df, metadata = source_data_normalize(
218+
df, _ = source_data_normalize(
219219
dag_config=dag_config,
220220
df_acteur_from_source=pd.DataFrame(
221221
{
@@ -238,6 +238,115 @@ def test_remove_explicit_null(self, null_value):
238238
)
239239

240240

241+
class TestDfApplyOCA:
242+
"""
243+
Test de la fonction df_normalize_oca
244+
"""
245+
246+
@pytest.fixture
247+
def dag_config_kwargs(self):
248+
return {
249+
"normalization_rules": [
250+
{"keep": "identifiant_unique"},
251+
{"keep": "nom"},
252+
{"keep": "source_code"},
253+
{"keep": "identifiant_externe"},
254+
],
255+
"product_mapping": {},
256+
"endpoint": "https://example.com/api",
257+
}
258+
259+
@pytest.fixture
260+
def df_acteur(self):
261+
return pd.DataFrame(
262+
{
263+
"identifiant_unique": ["id1", "id2"],
264+
"source_code": ["oca_1|oca_2", "oca_2"],
265+
"identifiant_externe": ["ext1", "ext2"],
266+
"nom": ["nom1", "nom2"],
267+
}
268+
)
269+
270+
def test_apply_oca_config(self, dag_config_kwargs, df_acteur):
271+
dag_config_kwargs["oca"] = {"prefix": "ocatest", "deduplication_source": True}
272+
273+
df, _ = source_data_normalize(
274+
df_acteur_from_source=df_acteur,
275+
dag_config=DAGConfig.model_validate(dag_config_kwargs),
276+
dag_id="dag_id",
277+
)
278+
279+
expected_df = pd.DataFrame(
280+
{
281+
"identifiant_unique": [
282+
"ocatest_oca_1_ext1",
283+
"ocatest_oca_2_ext1",
284+
"ocatest_oca_2_ext2",
285+
],
286+
"source_code": ["ocatest_oca_1", "ocatest_oca_2", "ocatest_oca_2"],
287+
"identifiant_externe": ["ext1", "ext1", "ext2"],
288+
"nom": ["nom1", "nom1", "nom2"],
289+
}
290+
)
291+
292+
pd.testing.assert_frame_equal(
293+
df.reset_index(drop=True), expected_df.reset_index(drop=True)
294+
)
295+
296+
def test_apply_oca_config_no_deduplication_source(
297+
self, dag_config_kwargs, df_acteur
298+
):
299+
dag_config_kwargs["oca"] = {"prefix": "ocatest"}
300+
301+
df, _ = source_data_normalize(
302+
df_acteur_from_source=df_acteur,
303+
dag_config=DAGConfig.model_validate(dag_config_kwargs),
304+
dag_id="dag_id",
305+
)
306+
307+
expected_df = pd.DataFrame(
308+
{
309+
"identifiant_unique": [
310+
"ocatest_oca_1|oca_2_ext1",
311+
"ocatest_oca_2_ext2",
312+
],
313+
"source_code": ["ocatest_oca_1|oca_2", "ocatest_oca_2"],
314+
"identifiant_externe": ["ext1", "ext2"],
315+
"nom": ["nom1", "nom2"],
316+
}
317+
)
318+
319+
pd.testing.assert_frame_equal(
320+
df.reset_index(drop=True), expected_df.reset_index(drop=True)
321+
)
322+
323+
def test_apply_oca_config_no_prefix(self, dag_config_kwargs, df_acteur):
324+
dag_config_kwargs["oca"] = {"deduplication_source": True}
325+
326+
df, _ = source_data_normalize(
327+
df_acteur_from_source=df_acteur,
328+
dag_config=DAGConfig.model_validate(dag_config_kwargs),
329+
dag_id="dag_id",
330+
)
331+
332+
expected_df = pd.DataFrame(
333+
{
334+
"identifiant_unique": [
335+
"oca_1_ext1",
336+
"oca_2_ext1",
337+
"oca_2_ext2",
338+
],
339+
"source_code": ["oca_1", "oca_2", "oca_2"],
340+
"identifiant_externe": ["ext1", "ext1", "ext2"],
341+
"nom": ["nom1", "nom1", "nom2"],
342+
}
343+
)
344+
345+
pd.testing.assert_frame_equal(
346+
df.reset_index(drop=True), expected_df.reset_index(drop=True)
347+
)
348+
349+
241350
class TestDfNormalizePharmacie:
242351
"""
243352
Test de la fonction df_normalize_pharmacie

0 commit comments

Comments
 (0)