Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dags/enrich/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
DAG_ID_TO_CONFIG_MODEL,
EnrichActeursClosedConfig,
EnrichActeursRGPDConfig,
EnrichDbtModelsRefreshConfig,
)
from .tasks import TASKS # noqa: F401
from .xcoms import XCOMS, xcom_pull # noqa: F401
8 changes: 8 additions & 0 deletions dags/enrich/config/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,15 @@ class EnrichActeursRGPDConfig(EnrichBaseConfig):
)


class EnrichDbtModelsRefreshConfig(BaseModel):
dbt_models_refresh_commands: list[str] = Field(
default=[],
description="🔄 Liste de commandes DBT à exécuter pour rafraîchir les modèles",
)


DAG_ID_TO_CONFIG_MODEL = {
"enrich_acteurs_closed": EnrichActeursClosedConfig,
"enrich_acteurs_rgpd": EnrichActeursRGPDConfig,
"enrich_dbt_models_refresh": EnrichDbtModelsRefreshConfig,
}
55 changes: 55 additions & 0 deletions dags/enrich/dags/enrich_dbt_models_refresh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""DAG to refresh DBT models needed for enrich DAGs"""

import re

from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.operators.bash import BashOperator
from airflow.utils.trigger_rule import TriggerRule
from enrich.config import EnrichDbtModelsRefreshConfig
from shared.config import CATCHUPS, SCHEDULES, START_DATES, config_to_airflow_params

with DAG(
dag_id="enrich_dbt_models_refresh",
dag_display_name="🔄 Enrichir - Rafraîchir les modèles DBT",
default_args={
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
},
description=(
"Un DAG pour rafraîchir les modèles DBT nécessaires"
"à l'enrichissement des acteurs"
),
tags=["dbt", "models", "refresh", "enrich"],
schedule=SCHEDULES.DAILY,
catchup=CATCHUPS.AWLAYS_FALSE,
start_date=START_DATES.YESTERDAY,
params=config_to_airflow_params(
EnrichDbtModelsRefreshConfig(
dbt_models_refresh_commands=[
"dbt build --select +int_ae_unite_legale",
"dbt build --select +int_ae_etablissement",
"dbt build --select +int_ban_adresses",
"dbt build --select int_ban_villes",
],
)
),
) as dag:
tasks = []
for cmd in dag.params.get("dbt_models_refresh_commands", []):
cmd = cmd.strip()
if not cmd:
continue
cmd_id = re.sub(r"__+", "_", re.sub(r"[^a-zA-Z0-9]+", "_", cmd))
cmd += " --debug --threads 1"
tasks.append(
BashOperator(
task_id=f"enrich_{cmd_id}",
bash_command=cmd,
trigger_rule=TriggerRule.ALL_DONE,
)
)
chain(*tasks)
3 changes: 2 additions & 1 deletion dags/shared/config/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
bool: "boolean",
str: "string",
typing.Optional[str]: ["null", "string"],
list[str]: "array",
}


Expand All @@ -27,7 +28,7 @@ def config_to_airflow_params(model_instance: BaseModel) -> dict[str, Param]:
model_cls = model_instance.__class__
for field_name, field_info in model_cls.model_fields.items():
field_value = getattr(model_instance, field_name) # Get value from instance

assert field_info.annotation is not None
params[field_name] = Param(
field_value,
type=PYDANTIC_TYPE_TO_AIRFLOW_TYPE[field_info.annotation],
Expand Down
9 changes: 9 additions & 0 deletions dags_unit_tests/shared/config/test_shared_config_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ class MyModel(BaseModel):
default=None,
description="OPT STRING CHANGED",
)
some_list: list[str] = Field(
default=["foo", "bar"],
description="SOME LIST",
)


class TestConfigModelToAirflowParams:
Expand All @@ -45,6 +49,11 @@ def test_string(self, params):
assert param.value == "foo"
assert param.schema["type"] == "string"

def test_list(self, params):
param = params["some_list"]
assert param.value == ["foo", "bar"]
assert param.schema["type"] == "array"

def test_opt_string_untouched(self, params):
param = params["opt_string_untouched"]
assert param.value is None
Expand Down
Loading