Skip to content

Commit c1e88ce

Browse files
authored
Ajout de tests de cohérences et de tags (#1611)
1 parent 4587281 commit c1e88ce

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+547
-36
lines changed

dags/acteurs/dags/compute_acteur.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from airflow import DAG
1111
from airflow.operators.bash import BashOperator
1212
from shared.config.schedules import SCHEDULES
13+
from shared.config.tags import TAGS
1314

1415
default_args = {
1516
"owner": "airflow",
@@ -31,6 +32,7 @@
3132
" (vue exhaustive des acteurs), par la carte (vue des acteurs affichés) et"
3233
" par l'export des acteurs en open-data."
3334
),
35+
tags=[TAGS.COMPUTE, TAGS.ACTEURS, TAGS.CARTE, TAGS.OPENDATA, TAGS.DBT],
3436
schedule=SCHEDULES.DAILY,
3537
max_active_runs=1,
3638
) as dag:

dags/acteurs/dags/export_opendata.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from airflow import DAG
88
from decouple import config
99
from shared.config.schedules import SCHEDULES
10+
from shared.config.tags import TAGS
1011

1112
ENVIRONMENT = config("ENVIRONMENT", default="development")
1213

@@ -29,6 +30,7 @@
2930
"Ce DAG export les acteurs disponibles en opendata précédemment générés dans la"
3031
" table `exposure_opendata_acteur` de la base de données."
3132
),
33+
tags=[TAGS.COMPUTE, TAGS.EXPORT, TAGS.ACTEURS, TAGS.OPENDATA, TAGS.S3],
3234
params={
3335
"bucket_name": "lvao-opendata",
3436
"remote_dir": "acteurs" if ENVIRONMENT == "prod" else f"acteurs-{ENVIRONMENT}",

dags/clone/dags/clone_ae_etablissement.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from airflow.models.param import Param
99
from clone.tasks.airflow_logic.chain_tasks import chain_tasks
1010
from shared.config import CATCHUPS, SCHEDULES, START_DATES
11+
from shared.config.tags import TAGS
1112

1213
with DAG(
1314
dag_id="clone_ae_etablissement",
@@ -25,7 +26,13 @@
2526
description=(
2627
"Clone la table 'etablissement' de l'Annuaire Entreprises (AE) dans notre DB"
2728
),
28-
tags=["clone", "annuaire", "entreprise", "etablissement", "siret", "ae"],
29+
tags=[
30+
TAGS.ENRICH,
31+
TAGS.CLONE,
32+
TAGS.ANNAIRE_ENTREPRISE,
33+
TAGS.ETABLISSEMENT,
34+
TAGS.SIRET,
35+
],
2936
params={
3037
"dry_run": Param(
3138
False,

dags/clone/dags/clone_ae_unite_legale.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from airflow.models.param import Param
99
from clone.tasks.airflow_logic.chain_tasks import chain_tasks
1010
from shared.config import CATCHUPS, SCHEDULES, START_DATES
11+
from shared.config.tags import TAGS
1112

1213
with DAG(
1314
dag_id="clone_ae_unite_legale",
@@ -25,7 +26,13 @@
2526
description=(
2627
"Clone la table 'unite_legale' de l'Annuaire Entreprises (AE) dans notre DB"
2728
),
28-
tags=["clone", "annuaire", "entreprise", "unite_legale", "siren", "ae"],
29+
tags=[
30+
TAGS.ENRICH,
31+
TAGS.CLONE,
32+
TAGS.ANNAIRE_ENTREPRISE,
33+
TAGS.UNITE_LEGALE,
34+
TAGS.SIREN,
35+
],
2936
params={
3037
"dry_run": Param(
3138
False,

dags/clone/dags/clone_ban_adresses.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from airflow.models.param import Param
99
from clone.tasks.airflow_logic.chain_tasks import chain_tasks
1010
from shared.config import CATCHUPS, SCHEDULES, START_DATES
11+
from shared.config.tags import TAGS
1112

1213
with DAG(
1314
dag_id="clone_ban_adresses",
@@ -25,7 +26,7 @@
2526
description=(
2627
"Clone la table 'adresses' de la Base Adresse Nationale (BAN) dans notre DB"
2728
),
28-
tags=["clone", "ban", "adresses"],
29+
tags=[TAGS.ENRICH, TAGS.CLONE, TAGS.BAN, TAGS.ADRESSES],
2930
params={
3031
"dry_run": Param(
3132
False,

dags/clone/dags/clone_ban_lieux_dits.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from airflow.models.param import Param
99
from clone.tasks.airflow_logic.chain_tasks import chain_tasks
1010
from shared.config import CATCHUPS, SCHEDULES, START_DATES
11+
from shared.config.tags import TAGS
1112

1213
with DAG(
1314
dag_id="clone_ban_lieux_dits",
@@ -25,7 +26,7 @@
2526
description=(
2627
"Clone la table 'lieux_dits' de la Base Adresse Nationale (BAN) dans notre DB"
2728
),
28-
tags=["clone", "ban", "adresses", "lieux_dits"],
29+
tags=[TAGS.ENRICH, TAGS.CLONE, TAGS.BAN, TAGS.LIEUX_DITS],
2930
params={
3031
"dry_run": Param(
3132
False,

dags/cluster/dags/cluster_acteur_suggestions.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
from cluster.tasks.airflow_logic.chain_tasks import chain_tasks
88
from cluster.ui import params_separators as UI_PARAMS_SEPARATORS
99
from shared.config import CATCHUPS, SCHEDULES, START_DATES
10+
from shared.config.tags import TAGS
11+
1012
from utils.airflow_params import airflow_params_dropdown_from_mapping
1113
from utils.django import django_model_fields_get, django_setup_full
1214

@@ -290,7 +292,7 @@
290292
catchup=CATCHUPS.AWLAYS_FALSE,
291293
schedule=SCHEDULES.NONE,
292294
description=("Un DAG pour générer des suggestions de clustering pour les acteurs"),
293-
tags=["cluster", "acteurs", "suggestions"],
295+
tags=[TAGS.CLUSTER, TAGS.ACTEURS, TAGS.SUGGESTIONS, TAGS.CLUSTERING],
294296
params=PARAMS,
295297
) as dag:
296298
chain_tasks(dag)

dags/compute_acteurs/dags/create_final_actors.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
deduplicate_labels_task,
1515
deduplicate_propositionservices_task,
1616
)
17+
from shared.config.tags import TAGS
1718
from shared.tasks.database_logic.db_tasks import read_data_from_postgres
1819

1920
default_args = {
@@ -40,6 +41,7 @@
4041
" plusieurs sources en cumulant leur services, sources et propositions"
4142
" services."
4243
),
44+
tags=[TAGS.COMPUTE, TAGS.ACTEURS, TAGS.CARTE, TAGS.DEPRECATED],
4345
max_active_runs=1,
4446
schedule=None,
4547
)

dags/crawl/dags/crawl_urls.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from crawl.tasks.airflow_logic.crawl_urls_suggest_syntax_fail_task import (
2626
crawl_urls_suggest_syntax_fail_task,
2727
)
28+
from shared.config.tags import TAGS
2829

2930
UI_PARAMS_SEPARATOR_SELECTION = r"""
3031
@@ -50,7 +51,7 @@
5051
catchup=False,
5152
schedule_interval=None,
5253
description=("Un DAG pour parcourir des URLs et suggérer des corrections"),
53-
tags=["crawl", "acteurs", "url", "suggestions"],
54+
tags=[TAGS.ENRICH, TAGS.CRAWL, TAGS.ACTEURS, TAGS.URL, TAGS.SUGGESTIONS],
5455
params={
5556
"dry_run": Param(
5657
True,

dags/enrich/dags/enrich_acteurs_closed.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
enrich_dbt_models_refresh_task,
1919
)
2020
from shared.config import CATCHUPS, SCHEDULES, START_DATES, config_to_airflow_params
21+
from shared.config.tags import TAGS
2122

2223
with DAG(
2324
dag_id="enrich_acteurs_closed",
@@ -33,7 +34,14 @@
3334
"Un DAG pour détécter et remplacer les acteurs fermés"
3435
"dans l'Annuaire Entreprises (AE)"
3536
),
36-
tags=["annuaire", "entreprises", "ae", "siren", "siret", "acteurs", "fermés"],
37+
tags=[
38+
TAGS.ENRICH,
39+
TAGS.ANNAIRE_ENTREPRISE,
40+
TAGS.SIREN,
41+
TAGS.SIRET,
42+
TAGS.ACTEURS,
43+
TAGS.CLOSED,
44+
],
3745
schedule=SCHEDULES.NONE,
3846
catchup=CATCHUPS.AWLAYS_FALSE,
3947
start_date=START_DATES.YESTERDAY,

0 commit comments

Comments
 (0)