8
8
from airflow import DAG
9
9
from airflow .exceptions import AirflowFailException
10
10
from airflow .operators .python import PythonOperator
11
+
11
12
from utils import logging_utils as log
12
13
13
14
logger = logging .getLogger (__name__ )
@@ -30,30 +31,41 @@ def task_info_get(model_name, table_name):
30
31
31
32
32
33
def check_model_table_consistency_wrapper (
33
- ti , params , * , model_name : str , table_name : str
34
+ ti , params , * , django_app : str , model_name : str , db_schema : str , table_name : str
34
35
) -> None :
35
36
# model_name = "DisplayedActeur"
36
37
# table_name = "exposure_carte_acteur"
37
38
logger .info (task_info_get (model_name , table_name ))
38
39
39
40
log .preview ("Modèle Django" , model_name )
40
41
log .preview ("Table" , table_name )
41
- if not check_model_table_consistency (model_name = model_name , table_name = table_name ):
42
+ if not check_model_table_consistency (
43
+ django_app = django_app ,
44
+ model_name = model_name ,
45
+ db_schema = db_schema ,
46
+ table_name = table_name ,
47
+ ):
42
48
raise AirflowFailException (
43
49
f"le modèle { model_name } ne correspond pas à la table { table_name } "
44
50
)
45
51
46
52
47
53
def check_model_table_consistency_task (
48
- dag : DAG , model_name : str , table_name : str
54
+ dag : DAG ,
55
+ django_app : str ,
56
+ model_name : str ,
57
+ db_schema : str ,
58
+ table_name : str ,
49
59
) -> PythonOperator :
50
60
task_name = f"check_{ model_name } _vs_{ table_name } _consistency"
51
61
return PythonOperator (
52
62
task_id = task_name ,
53
63
python_callable = check_model_table_consistency_wrapper ,
54
64
dag = dag ,
55
65
op_kwargs = {
66
+ "django_app" : django_app ,
56
67
"model_name" : model_name ,
68
+ "db_schema" : db_schema ,
57
69
"table_name" : table_name ,
58
70
},
59
71
)
0 commit comments