Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,14 @@ run-airflow:

.PHONY: run-django
run-django:
rm -rf .parcel-cache
honcho start -f Procfile.dev
docker compose --profile airflow down
docker compose --profile lvao down
honcho start -f Procfile.django.dev

run-all:
docker compose --profile airflow up -d
rm -rf .parcel-cache
honcho start -f Procfile.dev
docker compose --profile airflow down
docker compose --profile lvao down
honcho start -f Procfile.all.dev

# Local django operations
.PHONY: migrate
Expand Down
3 changes: 3 additions & 0 deletions Procfile.all.dev
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
services: docker compose --profile airflow up
web: poetry run python manage.py runserver 0.0.0.0:8000
npm: rm -rf .parcel-cache; npm run watch
File renamed without changes.
52 changes: 16 additions & 36 deletions dags/acteurs/dags/compute_acteur.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,90 +36,70 @@
) as dag:
dbt_run_base_acteurs = BashOperator(
task_id="dbt_run_base_acteurs",
bash_command=("cd /opt/airflow/dbt/ && dbt run --models base.acteurs"),
bash_command=("dbt run --models base.acteurs"),
)
dbt_test_base_acteurs = BashOperator(
task_id="dbt_test_base_acteurs",
bash_command=("cd /opt/airflow/dbt/ && dbt test --models base.acteurs"),
bash_command=("dbt test --models base.acteurs"),
)
dbt_run_intermediate_acteurs = BashOperator(
task_id="dbt_run_intermediate_acteurs",
bash_command=("cd /opt/airflow/dbt/ && dbt run --models intermediate.acteurs"),
bash_command=("dbt run --models intermediate.acteurs"),
)
dbt_test_intermediate_acteurs = BashOperator(
task_id="dbt_test_intermediate_acteurs",
bash_command=("cd /opt/airflow/dbt/ && dbt test --models intermediate.acteurs"),
bash_command=("dbt test --models intermediate.acteurs"),
)

dbt_run_marts_acteurs_exhaustive = BashOperator(
task_id="dbt_run_marts_acteurs_exhaustive",
bash_command=(
"cd /opt/airflow/dbt/ && dbt run --models marts.acteurs.exhaustive"
),
bash_command=("dbt run --models marts.acteurs.exhaustive"),
)
dbt_test_marts_acteurs_exhaustive = BashOperator(
task_id="dbt_test_marts_acteurs_exhaustive",
bash_command=(
"cd /opt/airflow/dbt/ && dbt test --models marts.acteurs.exhaustive"
),
bash_command=("dbt test --models marts.acteurs.exhaustive"),
)
dbt_run_exposure_acteurs_exhaustive = BashOperator(
task_id="dbt_run_exposure_acteurs_exhaustive",
bash_command=(
"cd /opt/airflow/dbt/ && dbt run --models exposure.acteurs.exhaustive"
),
bash_command=("dbt run --models exposure.acteurs.exhaustive"),
)
dbt_test_exposure_acteurs_exhaustive = BashOperator(
task_id="dbt_test_exposure_acteurs_exhaustive",
bash_command=(
"cd /opt/airflow/dbt/ && dbt test --models exposure.acteurs.exhaustive"
),
bash_command=("dbt test --models exposure.acteurs.exhaustive"),
)

dbt_run_marts_acteurs_carte = BashOperator(
task_id="dbt_run_marts_acteurs_carte",
bash_command=("cd /opt/airflow/dbt/ && dbt run --models marts.acteurs.carte"),
bash_command=("dbt run --models marts.acteurs.carte"),
)
dbt_test_marts_acteurs_carte = BashOperator(
task_id="dbt_test_marts_acteurs_carte",
bash_command=("cd /opt/airflow/dbt/ && dbt test --models marts.acteurs.carte"),
bash_command=("dbt test --models marts.acteurs.carte"),
)
dbt_run_exposure_acteurs_carte = BashOperator(
task_id="dbt_run_exposure_acteurs_carte",
bash_command=(
"cd /opt/airflow/dbt/ && dbt run --models exposure.acteurs.carte"
),
bash_command=("dbt run --models exposure.acteurs.carte"),
)
dbt_test_exposure_acteurs_carte = BashOperator(
task_id="dbt_test_exposure_acteurs_carte",
bash_command=(
"cd /opt/airflow/dbt/ && dbt test --models exposure.acteurs.carte"
),
bash_command=("dbt test --models exposure.acteurs.carte"),
)

dbt_run_marts_acteurs_opendata = BashOperator(
task_id="dbt_run_marts_acteurs_opendata",
bash_command=(
"cd /opt/airflow/dbt/ && dbt run --models marts.acteurs.opendata"
),
bash_command=("dbt run --models marts.acteurs.opendata"),
)
dbt_test_marts_acteurs_opendata = BashOperator(
task_id="dbt_test_marts_acteurs_opendata",
bash_command=(
"cd /opt/airflow/dbt/ && dbt test --models marts.acteurs.opendata"
),
bash_command=("dbt test --models marts.acteurs.opendata"),
)
dbt_run_exposure_acteurs_opendata = BashOperator(
task_id="dbt_run_exposure_acteurs_opendata",
bash_command=(
"cd /opt/airflow/dbt/ && dbt run --models exposure.acteurs.opendata"
),
bash_command=("dbt run --models exposure.acteurs.opendata"),
)
dbt_test_exposure_acteurs_opendata = BashOperator(
task_id="dbt_test_exposure_acteurs_opendata",
bash_command=(
"cd /opt/airflow/dbt/ && dbt test --models exposure.acteurs.opendata"
),
bash_command=("dbt test --models exposure.acteurs.opendata"),
)

check_model_table_displayedacteur_task = check_model_table_consistency_task(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Schema generated by scripts/db_schema_create_from_csv.py

CREATE TABLE {{table_name}} (
"id" VARCHAR(12), -- 🟡 on reste scrict (min/max=10/12)
"nom_lieu_dit" VARCHAR(125), -- 98 -> 125
"nom_lieu_dit" VARCHAR(255), -- 98 -> 255
"code_postal" CHAR(5), -- 🟡 on reste scrict (min/max=5)
"code_insee" CHAR(5), -- 🟡 on reste scrict (min/max=5)
"nom_commune" VARCHAR(60), -- 45 -> 60
Expand Down
2 changes: 1 addition & 1 deletion dags/clone/tasks/business_logic/clone_table_validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from clone.config import DIR_SQL_VALIDATION
from django.db import connection

from utils import logging_utils as log

logger = logging.getLogger(__name__)
Expand All @@ -23,7 +24,6 @@ def clone_table_validate(table_kind: str, table_name: str, dry_run: bool) -> Non
logger.info("Mode dry-run, on ne valide pas")
continue
with connection.cursor() as cursor:

# Running validation and getting results
cursor.execute(sql)
row = cursor.fetchone()
Expand Down
5 changes: 3 additions & 2 deletions dags/crawl/tasks/business_logic/crawl_urls_suggest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pandas as pd
from crawl.config.columns import COLS
from crawl.config.constants import LABEL_URL_ORIGINE, LABEL_URL_PROPOSEE

from utils import logging_utils as log
from utils.dataframes import (
df_col_assert_get_unique,
Expand Down Expand Up @@ -39,7 +40,7 @@ def suggestions_prepare(
- df_crawl_ok_diff = successful AND different = propose
- df_crawl_fail = failed = propose None"""
from data.models.change import SuggestionChange
from data.models.changes import ChangeActeurUpdateData
from data.models.changes import ChangeActeurUpdateRevision

if df_none_or_empty(df):
return []
Expand All @@ -54,7 +55,7 @@ def suggestions_prepare(
order=1,
reason=row[COLS.COHORT],
entity_type="acteur_displayed",
model_name=ChangeActeurUpdateData.name(),
model_name=ChangeActeurUpdateRevision.name(),
model_params={
"id": acteur[COLS.ID],
"data": {COLS.URL_DB: row[COLS.SUGGEST_VALUE]},
Expand Down
2 changes: 2 additions & 0 deletions dags/enrich/config/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ class COLS:

# Acteurs
ACTEUR_ID: str = "acteur_id"
ACTEUR_ID_EXTERNE: str = "acteur_id_externe"
ACTEUR_PARENT_ID: str = "acteur_parent_id"
ACTEUR_TYPE_ID: str = "acteur_type_id"
ACTEUR_SOURCE_ID: str = "acteur_source_id"
ACTEUR_SIRET: str = "acteur_siret"
Expand Down
5 changes: 4 additions & 1 deletion dags/enrich/dags/enrich_dbt_models_refresh.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""DAG to refresh DBT models needed for enrich DAGs"""

import logging
import re

from airflow import DAG
Expand All @@ -9,6 +10,8 @@
from enrich.config.models import EnrichDbtModelsRefreshConfig
from shared.config import CATCHUPS, SCHEDULES, START_DATES, config_to_airflow_params

logger = logging.getLogger(__name__)

with DAG(
dag_id="enrich_dbt_models_refresh",
dag_display_name="🔄 Enrichir - Rafraîchir les modèles DBT",
Expand Down Expand Up @@ -42,7 +45,7 @@
if not cmd:
continue
cmd_id = re.sub(r"__+", "_", re.sub(r"[^a-zA-Z0-9]+", "_", cmd))
cmd += " --debug --threads 1"
logger.info(f"Create bash operator with command: {cmd}")
tasks.append(
BashOperator(
task_id=f"enrich_{cmd_id}",
Expand Down
113 changes: 33 additions & 80 deletions dags/enrich/tasks/business_logic/enrich_dbt_model_to_suggestions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,11 @@
from datetime import datetime, timezone

import pandas as pd
from cluster.tasks.business_logic.cluster_acteurs_parents_choose_new import (
parent_id_generate,
)
from enrich.config.cohorts import COHORTS
from enrich.config.columns import COLS
from enrich.tasks.business_logic.enrich_dbt_model_row_to_suggest_data import (
dbt_model_row_to_suggest_data,
)
from utils import logging_utils as log

from data.models.changes.acteur_rgpd_anonymize import rgpd_data_get
from utils import logging_utils as log

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -40,7 +34,7 @@ def changes_prepare(

def changes_prepare_villes(row: dict) -> tuple[list[dict], dict]:
"""Prepare suggestions for villes cohorts"""
from data.models.changes import ChangeActeurUpdateData
from data.models.changes import ChangeActeurUpdateRevision

changes = []
model_params = {
Expand All @@ -51,7 +45,7 @@ def changes_prepare_villes(row: dict) -> tuple[list[dict], dict]:
}
changes.append(
changes_prepare(
model=ChangeActeurUpdateData,
model=ChangeActeurUpdateRevision,
model_params=model_params,
order=1,
reason="On fait confiance à la BAN",
Expand Down Expand Up @@ -98,24 +92,26 @@ def changes_prepare_closed_not_replaced(
row: dict,
) -> tuple[list[dict], dict]:
"""Prepare suggestions for closed not replaced cohorts"""
from data.models.changes import ChangeActeurUpdateData
from data.models.changes import ChangeActeurUpdateRevision
from qfdmo.models import ActeurStatus

changes = []
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
model_params = {
"id": row[COLS.ACTEUR_ID],
"data": {
"statut": ActeurStatus.INACTIF,
"siret": row[COLS.ACTEUR_SIRET],
"siren": row[COLS.ACTEUR_SIRET][:9],
"siret_is_closed": True,
"acteur_type": row[COLS.ACTEUR_TYPE_ID],
"source": row[COLS.ACTEUR_SOURCE_ID],
"parent_reason": (
f"Modifications de l'acteur le {today}: "
f"SIRET {row[COLS.ACTEUR_SIRET]} détecté comme fermé dans AE,"
" Pas de remplacement"
),
},
}
changes.append(
changes_prepare(
model=ChangeActeurUpdateData,
model=ChangeActeurUpdateRevision,
model_params=model_params,
order=1,
reason="SIRET & SIREN fermés, 0 remplacement trouvé",
Expand All @@ -130,79 +126,34 @@ def changes_prepare_closed_replaced(
row: dict,
) -> tuple[list[dict], dict]:
"""Prepare suggestion changes for closed replaced cohorts"""
from data.models.changes import (
ChangeActeurCreateAsChild,
ChangeActeurCreateAsParent,
ChangeActeurUpdateData,
)
from qfdmo.models import ActeurStatus
from data.models.changes import ChangeActeurUpdateRevision

changes = []
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
# Parent
parent_id = parent_id_generate([str(row[COLS.SUGGEST_SIRET])])
parent_data = dbt_model_row_to_suggest_data(row)
parent_data["source"] = None
parent_data["statut"] = ActeurStatus.ACTIF
params_parent = {
"id": parent_id,
"data": parent_data,

update_revision = {
"id": row[COLS.ACTEUR_ID],
"data": {
"siret": row[COLS.SUGGEST_SIRET],
"siren": row[COLS.SUGGEST_SIRET][:9],
"siret_is_closed": False,
"parent_reason": (
f"Modifications de l'acteur le {today}: "
f"SIRET {row[COLS.ACTEUR_SIRET]} détecté comme fermé dans AE, "
f"remplacé par le SIRET {row[COLS.SUGGEST_SIRET]}"
),
},
}
changes.append(
changes = [
changes_prepare(
model=ChangeActeurCreateAsParent,
model_params=params_parent,
model=ChangeActeurUpdateRevision,
model_params=update_revision,
order=1,
reason="besoin d'un parent pour rattaché acteur fermé",
entity_type="acteur_displayed",
)
)

# New child to hold the reference data as standalone
# as parents are surrogates (e.g. they can be deleted
# during clustering)
now = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S")
child_new_id = f"{row[COLS.ACTEUR_ID]}_{row[COLS.ACTEUR_SIRET]}_{now}"
params_child_new = params_parent.copy()
params_child_new["id"] = child_new_id
params_child_new["data"]["source"] = row[COLS.ACTEUR_SOURCE_ID]
params_child_new["data"]["parent"] = parent_id
params_child_new["data"]["parent_reason"] = (
f"Nouvel enfant pour conserver les données suite à: "
f"SIRET {row[COLS.ACTEUR_SIRET]} "
f"détecté le {today} comme fermé dans AE, "
f"remplacé par SIRET {row[COLS.SUGGEST_SIRET]}"
)
changes.append(
changes_prepare(
model=ChangeActeurCreateAsChild,
model_params=params_child_new,
order=2,
reason="besoin nouvel enfant pour conserver les données",
reason="Modification du SIRET",
entity_type="acteur_displayed",
)
)
]

# Existing Child
params_child_old = params_child_new.copy()
params_child_old["id"] = row[COLS.ACTEUR_ID]
params_child_old["data"]["parent"] = parent_id
params_child_old["data"]["parent_reason"] = (
f"SIRET {row[COLS.ACTEUR_SIRET]} "
f"détecté le {today} comme fermé dans AE, "
f"remplacé par SIRET {row[COLS.SUGGEST_SIRET]}"
)
params_child_old["data"]["siret_is_closed"] = True
params_child_old["data"]["statut"] = ActeurStatus.INACTIF
changes.append(
changes_prepare(
model=ChangeActeurUpdateData,
model_params=params_child_old,
order=3,
reason="rattacher enfant fermé à un parent",
entity_type="acteur_displayed",
)
)
contexte = {} # changes are self-explanatory
return changes, contexte

Expand Down Expand Up @@ -259,7 +210,9 @@ def enrich_dbt_model_to_suggestions(
row = dict(row)

try:
changes, contexte = COHORTS_TO_PREPARE_CHANGES[cohort](row)
logger.info(f"Interprétation de {cohort=}")
change_preparation_function = COHORTS_TO_PREPARE_CHANGES[cohort]
changes, contexte = change_preparation_function(row)
suggestion = {
"contexte": contexte,
"suggestion": {"title": cohort, "changes": changes},
Expand Down
Loading