Skip to content

Commit cc6fb79

Browse files
authored
🔄 [ENRICH] - DAG de rafraichissment DBT (#1560)
DAG pour rafraichir dbt enrich
1 parent 6c21c54 commit cc6fb79

File tree

5 files changed

+75
-1
lines changed

5 files changed

+75
-1
lines changed

dags/enrich/config/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
DAG_ID_TO_CONFIG_MODEL,
66
EnrichActeursClosedConfig,
77
EnrichActeursRGPDConfig,
8+
EnrichDbtModelsRefreshConfig,
89
)
910
from .tasks import TASKS # noqa: F401
1011
from .xcoms import XCOMS, xcom_pull # noqa: F401

dags/enrich/config/models.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,15 @@ class EnrichActeursRGPDConfig(EnrichBaseConfig):
9393
)
9494

9595

96+
class EnrichDbtModelsRefreshConfig(BaseModel):
97+
dbt_models_refresh_commands: list[str] = Field(
98+
default=[],
99+
description="🔄 Liste de commandes DBT à exécuter pour rafraîchir les modèles",
100+
)
101+
102+
96103
DAG_ID_TO_CONFIG_MODEL = {
97104
"enrich_acteurs_closed": EnrichActeursClosedConfig,
98105
"enrich_acteurs_rgpd": EnrichActeursRGPDConfig,
106+
"enrich_dbt_models_refresh": EnrichDbtModelsRefreshConfig,
99107
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
"""DAG to refresh DBT models needed for enrich DAGs"""
2+
3+
import re
4+
5+
from airflow import DAG
6+
from airflow.models.baseoperator import chain
7+
from airflow.operators.bash import BashOperator
8+
from airflow.utils.trigger_rule import TriggerRule
9+
from enrich.config import EnrichDbtModelsRefreshConfig
10+
from shared.config import CATCHUPS, SCHEDULES, START_DATES, config_to_airflow_params
11+
12+
with DAG(
13+
dag_id="enrich_dbt_models_refresh",
14+
dag_display_name="🔄 Enrichir - Rafraîchir les modèles DBT",
15+
default_args={
16+
"owner": "airflow",
17+
"depends_on_past": False,
18+
"email_on_failure": False,
19+
"email_on_retry": False,
20+
"retries": 0,
21+
},
22+
description=(
23+
"Un DAG pour rafraîchir les modèles DBT nécessaires"
24+
"à l'enrichissement des acteurs"
25+
),
26+
tags=["dbt", "models", "refresh", "enrich"],
27+
schedule=SCHEDULES.DAILY,
28+
catchup=CATCHUPS.AWLAYS_FALSE,
29+
start_date=START_DATES.YESTERDAY,
30+
params=config_to_airflow_params(
31+
EnrichDbtModelsRefreshConfig(
32+
dbt_models_refresh_commands=[
33+
"dbt build --select +int_ae_unite_legale",
34+
"dbt build --select +int_ae_etablissement",
35+
"dbt build --select +int_ban_adresses",
36+
"dbt build --select int_ban_villes",
37+
],
38+
)
39+
),
40+
) as dag:
41+
tasks = []
42+
for cmd in dag.params.get("dbt_models_refresh_commands", []):
43+
cmd = cmd.strip()
44+
if not cmd:
45+
continue
46+
cmd_id = re.sub(r"__+", "_", re.sub(r"[^a-zA-Z0-9]+", "_", cmd))
47+
cmd += " --debug --threads 1"
48+
tasks.append(
49+
BashOperator(
50+
task_id=f"enrich_{cmd_id}",
51+
bash_command=cmd,
52+
trigger_rule=TriggerRule.ALL_DONE,
53+
)
54+
)
55+
chain(*tasks)

dags/shared/config/models.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
bool: "boolean",
88
str: "string",
99
typing.Optional[str]: ["null", "string"],
10+
list[str]: "array",
1011
}
1112

1213

@@ -27,7 +28,7 @@ def config_to_airflow_params(model_instance: BaseModel) -> dict[str, Param]:
2728
model_cls = model_instance.__class__
2829
for field_name, field_info in model_cls.model_fields.items():
2930
field_value = getattr(model_instance, field_name) # Get value from instance
30-
31+
assert field_info.annotation is not None
3132
params[field_name] = Param(
3233
field_value,
3334
type=PYDANTIC_TYPE_TO_AIRFLOW_TYPE[field_info.annotation],

dags_unit_tests/shared/config/test_shared_config_models.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ class MyModel(BaseModel):
2323
default=None,
2424
description="OPT STRING CHANGED",
2525
)
26+
some_list: list[str] = Field(
27+
default=["foo", "bar"],
28+
description="SOME LIST",
29+
)
2630

2731

2832
class TestConfigModelToAirflowParams:
@@ -45,6 +49,11 @@ def test_string(self, params):
4549
assert param.value == "foo"
4650
assert param.schema["type"] == "string"
4751

52+
def test_list(self, params):
53+
param = params["some_list"]
54+
assert param.value == ["foo", "bar"]
55+
assert param.schema["type"] == "array"
56+
4857
def test_opt_string_untouched(self, params):
4958
param = params["opt_string_untouched"]
5059
assert param.value is None

0 commit comments

Comments
 (0)