Skip to content

Commit ae21632

Browse files
authored
🔎 DAGs d'enrichissement: plus de filtres (#1595)
1 parent 0ded8cf commit ae21632

21 files changed

+162
-83
lines changed

dags/enrich/config/__init__.py

Lines changed: 0 additions & 12 deletions
This file was deleted.

dags/enrich/config/models.py

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
import re
44
from typing import Optional
55

6-
from pydantic import BaseModel, Field, computed_field
6+
from pydantic import BaseModel, Field, computed_field, field_validator
7+
from utils.airflow_params import airflow_params_dropdown_codes_to_ids
78

89
SEPARATOR_FILTER_FIELD = "__"
10+
MAPPING_ACTEUR_TYPE = airflow_params_dropdown_codes_to_ids("ActeurType")
11+
MAPPING_SOURCE = airflow_params_dropdown_codes_to_ids("Source")
912

1013

1114
def filters_get(model: BaseModel, prefix: str, operator: str) -> list[dict[str, str]]:
@@ -25,6 +28,8 @@ def filters_get(model: BaseModel, prefix: str, operator: str) -> list[dict[str,
2528
# Skipping None if it's not exclitely is_null operator
2629
if value is None and operator != "is_null":
2730
continue
31+
if operator == "in" and value is None or len(value) == 0:
32+
continue
2833

2934
filters.append(
3035
{
@@ -62,17 +67,43 @@ class EnrichBaseConfig(BaseModel):
6267
default=None,
6368
description="🔍 Filtre sur **acteur_statut**",
6469
)
70+
filter_in__acteur_type_id: Optional[list[int]] = Field(
71+
default=[],
72+
description="🔍 Filtre sur **acteur_type**",
73+
examples=list(MAPPING_ACTEUR_TYPE.keys()), # type: ignore
74+
json_schema_extra={
75+
"values_display": MAPPING_ACTEUR_TYPE, # type: ignore
76+
},
77+
)
6578

66-
def filters_contains(self) -> list[dict[str, str]]:
67-
return filters_get(self, "filter_contains", "contains")
68-
69-
def filters_equals(self) -> list[dict[str, str]]:
70-
return filters_get(self, "filter_equals", "equals")
79+
filter_in__acteur_source_id: Optional[list[int]] = Field(
80+
default=[],
81+
description="🔍 Filtre sur **source**",
82+
examples=list(MAPPING_SOURCE.keys()), # type: ignore
83+
json_schema_extra={
84+
"values_display": MAPPING_SOURCE, # type: ignore
85+
},
86+
)
7187

7288
@computed_field
7389
@property
7490
def filters(self) -> list[dict[str, str]]:
75-
return self.filters_contains() + self.filters_equals()
91+
return (
92+
filters_get(self, "filter_contains", "contains")
93+
+ filters_get(self, "filter_equals", "equals")
94+
+ filters_get(self, "filter_in", "in")
95+
)
96+
97+
@field_validator("filter_in__acteur_type_id", mode="before")
98+
def validate_filter_in_acteur_type_id(cls, v) -> list[int]:
99+
# Due to Airflow Params dropdowns via dict values_display,
100+
# values coming back from Airflow will be strings to convert back to int
101+
return [int(x) for x in v] if v else []
102+
103+
@field_validator("filter_in__acteur_source_id", mode="before")
104+
def validate_filter_in_acteur_source_id(cls, v) -> list[int]:
105+
# Same as above
106+
return [int(x) for x in v] if v else []
76107

77108

78109
class EnrichActeursClosedConfig(EnrichBaseConfig):

dags/enrich/dags/enrich_acteurs_closed.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,10 @@
44
"""
55

66
from airflow import DAG
7-
from enrich.config import (
8-
COHORTS,
9-
DBT,
10-
TASKS,
11-
EnrichActeursClosedConfig,
12-
)
7+
from enrich.config.cohorts import COHORTS
8+
from enrich.config.dbt import DBT
9+
from enrich.config.models import EnrichActeursClosedConfig
10+
from enrich.config.tasks import TASKS
1311
from enrich.tasks.airflow_logic.enrich_config_create_task import (
1412
enrich_config_create_task,
1513
)

dags/enrich/dags/enrich_acteurs_rgpd.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
"""DAG to anonymize QFDMO acteurs for RGPD"""
22

33
from airflow import DAG
4-
from enrich.config import (
5-
COHORTS,
6-
DBT,
7-
TASKS,
8-
EnrichActeursRGPDConfig,
9-
)
4+
from enrich.config.cohorts import COHORTS
5+
from enrich.config.dbt import DBT
6+
from enrich.config.models import EnrichActeursRGPDConfig
7+
from enrich.config.tasks import TASKS
108
from enrich.tasks.airflow_logic.enrich_config_create_task import (
119
enrich_config_create_task,
1210
)

dags/enrich/dags/enrich_acteurs_villes.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
"""DAG to suggestion city corrections based on BAN data"""
22

33
from airflow import DAG
4-
from enrich.config import (
5-
COHORTS,
6-
DBT,
7-
TASKS,
8-
EnrichActeursVillesConfig,
9-
)
4+
from enrich.config.cohorts import COHORTS
5+
from enrich.config.dbt import DBT
6+
from enrich.config.models import EnrichActeursVillesConfig
7+
from enrich.config.tasks import TASKS
108
from enrich.tasks.airflow_logic.enrich_config_create_task import (
119
enrich_config_create_task,
1210
)

dags/enrich/dags/enrich_dbt_models_refresh.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from airflow.models.baseoperator import chain
77
from airflow.operators.bash import BashOperator
88
from airflow.utils.trigger_rule import TriggerRule
9-
from enrich.config import EnrichDbtModelsRefreshConfig
9+
from enrich.config.models import EnrichDbtModelsRefreshConfig
1010
from shared.config import CATCHUPS, SCHEDULES, START_DATES, config_to_airflow_params
1111

1212
with DAG(

dags/enrich/tasks/airflow_logic/enrich_config_create_task.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
from airflow import DAG
66
from airflow.operators.python import PythonOperator
7-
from enrich.config import DAG_ID_TO_CONFIG_MODEL, TASKS, XCOMS
7+
from enrich.config.models import DAG_ID_TO_CONFIG_MODEL
8+
from enrich.config.tasks import TASKS
9+
from enrich.config.xcoms import XCOMS
810

911
logger = logging.getLogger(__name__)
1012

dags/enrich/tasks/airflow_logic/enrich_dbt_model_suggest_task.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from airflow.models.taskinstance import TaskInstance
88
from airflow.operators.python import PythonOperator
99
from airflow.utils.trigger_rule import TriggerRule
10-
from enrich.config import XCOMS, xcom_pull
10+
from enrich.config.xcoms import XCOMS, xcom_pull
1111
from enrich.tasks.business_logic.enrich_dbt_model_suggest import (
1212
enrich_dbt_model_suggest,
1313
)

dags/enrich/tasks/airflow_logic/enrich_dbt_models_refresh_task.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@
66
from airflow.exceptions import AirflowSkipException
77
from airflow.operators.bash import BashOperator
88
from airflow.operators.python import PythonOperator
9-
from enrich.config import TASKS, XCOMS, xcom_pull
109
from enrich.config.models import EnrichBaseConfig
10+
from enrich.config.tasks import TASKS
11+
from enrich.config.xcoms import XCOMS, xcom_pull
1112

1213
logger = logging.getLogger(__name__)
1314

dags/enrich/tasks/business_logic/enrich_dbt_model_row_to_suggest_data.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from enrich.config import SUGGEST_PREFIX
1+
from enrich.config.columns import SUGGEST_PREFIX
22

33

44
def dbt_model_row_to_suggest_data(row: dict) -> dict:

0 commit comments

Comments
 (0)