Skip to content

Commit 85b93bb

Browse files
committed
DAG de rafraichissement
1 parent 2063613 commit 85b93bb

File tree

1 file changed

+58
-0
lines changed

1 file changed

+58
-0
lines changed
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
"""
2+
DAG to anonymize QFDMO acteur which names
3+
contains people from Annuaire Entreprise (AE)
4+
"""
5+
6+
import re
7+
8+
from airflow import DAG
9+
from airflow.models.baseoperator import chain
10+
from airflow.operators.bash import BashOperator
11+
from enrich.config import (
12+
EnrichActeursClosedConfig,
13+
)
14+
from shared.config import CATCHUPS, SCHEDULES, START_DATES, config_to_airflow_params
15+
16+
with DAG(
17+
dag_id="enrich_dbt_models_refresh",
18+
dag_display_name="🔄 Enrichir - Rafraîchir les modèles DBT",
19+
default_args={
20+
"owner": "airflow",
21+
"depends_on_past": False,
22+
"email_on_failure": False,
23+
"email_on_retry": False,
24+
"retries": 0,
25+
},
26+
description=(
27+
"Un DAG pour rafraîchir les modèles DBT nécessaires"
28+
"à l'enrichissement des acteurs"
29+
),
30+
tags=["dbt", "annuaire", "entreprises", "ae", "ban", "marts"],
31+
schedule=SCHEDULES.DAILY,
32+
catchup=CATCHUPS.AWLAYS_FALSE,
33+
start_date=START_DATES.YESTERDAY,
34+
params=config_to_airflow_params(
35+
EnrichActeursClosedConfig(
36+
dbt_models_refresh=True,
37+
dbt_models_refresh_command="""
38+
dbt build --select model1
39+
dbt build --select model2
40+
""",
41+
)
42+
),
43+
) as dag:
44+
commands = [
45+
x.strip()
46+
for x in dag.params.get("dbt_models_refresh_command").split("\n") # type: ignore
47+
if x.strip()
48+
]
49+
ops = []
50+
for command in commands:
51+
cmd_id = re.sub(r"__+", "_", re.sub(r"[^a-zA-Z0-9]+", "_", command))
52+
ops.append(
53+
BashOperator(
54+
task_id=f"enrich_{cmd_id}",
55+
bash_command=command,
56+
)
57+
)
58+
chain(*ops)

0 commit comments

Comments
 (0)