Skip to content

Commit 3595672

Browse files
committed
Get several geojson for UDIs; rename it as atlasante source; update dbt models
1 parent 13df7cc commit 3595672

12 files changed

Lines changed: 180 additions & 90 deletions

dbt_/models/sources/__sources.yml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@ sources:
1515
- name: laposte_communes
1616
- name: cog_communes
1717
- name: opendatasoft_communes
18-
- name: udi
18+
- name: atlasante
1919
database: data
2020
schema: main
21-
tables:
22-
- name: atlasante_udi
21+
tables:
22+
- name: atlasante_udi_2023
23+
- name: atlasante_udi_corse
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
version: 2
2+
3+
models:
4+
- name: stg_atlasante_udi_2023
5+
description: "Unités de distribution (UDI) de la France métropolitaine"
6+
columns:
7+
- name: gid
8+
type: INTEGER
9+
- name: code_udi
10+
type: VARCHAR
11+
- name: ins_nom
12+
type: VARCHAR
13+
- name: uge_nom
14+
type: VARCHAR
15+
- name: udi_pop
16+
type: VARCHAR
17+
- name: geom
18+
type: GEOMETRY
19+
- name: ingestion_date
20+
type: DATE
21+
- name: stg_atlasante_udi_corse
22+
description: "Unités de distribution (UDI) de la Corse"
23+
columns:
24+
- name: gid
25+
type: INTEGER
26+
- name: cn_udi
27+
type: VARCHAR
28+
- name: nom_udi
29+
type: VARCHAR
30+
- name: geom
31+
type: GEOMETRY
32+
- name: ingestion_date
33+
type: DATE
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
SELECT
2+
gid::INTEGER AS gid,
3+
code_udi::VARCHAR AS code_udi,
4+
ins_nom::VARCHAR AS ins_nom,
5+
uge_nom::VARCHAR AS uge_nom,
6+
udi_pop::VARCHAR AS udi_pop,
7+
geom::GEOMETRY AS geom,
8+
ingestion_date::DATE AS ingestion_date
9+
FROM {{ source('atlasante', 'atlasante_udi_2023') }}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
SELECT
2+
gid::INTEGER AS gid,
3+
cn_udi::VARCHAR AS cn_udi,
4+
nom_udi::VARCHAR AS nom_udi,
5+
geom::GEOMETRY AS geom,
6+
ingestion_date::DATE AS ingestion_date
7+
FROM {{ source('atlasante', 'atlasante_udi_corse') }}

dbt_/models/staging/udi/_udi_models.yml

Lines changed: 0 additions & 20 deletions
This file was deleted.

dbt_/models/staging/udi/stg_udi.sql

Lines changed: 0 additions & 7 deletions
This file was deleted.

dbt_/models/staging/udi/stg_udi_json.sql

Lines changed: 0 additions & 7 deletions
This file was deleted.

pipelines/tasks/build_database.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@
66
- custom-years (str) : List of years to process when refresh_type is "custom"
77
- drop-tables : Drop all table before ingestion if flag is added
88
- check-update : For edc, only refresh table if source was updated since last refresh
9-
- refresh-table (str): Source to refresh ("all", "edc", "commune", "udi")
9+
- refresh-table (str): Source to refresh ("all", "edc", "commune", "atlasante")
1010
1111
Examples:
1212
- build_database --refresh-table all : refresh all tables
13-
- build_database --refresh-table edc : only refresh edc table
14-
- build_database --refresh-table commune : only refresh commune table
15-
- build_database --refresh-table udi : only refresh udi table
13+
- build_database --refresh-table edc : only refresh edc tables
14+
- build_database --refresh-table commune : only refresh commune tables
15+
- build_database --refresh-table atlasante : only refresh tables from atlasante
1616
- build_database --refresh-type all : Process all years
1717
- build_database --refresh-type last : Process last year only
1818
- build_database --refresh-type custom --custom-years 2018,2024 : Process only the years 2018 and 2024
@@ -28,10 +28,10 @@
2828
from pipelines.tasks.client.core.duckdb_client import DuckDBClient
2929
from pipelines.tasks.client.datagouv_client import DataGouvClient
3030
from pipelines.tasks.client.opendatasoft_client import OpenDataSoftClient
31-
from pipelines.tasks.client.udi_client import UDIClient
31+
from pipelines.tasks.client.uploaded_geojson_client import UploadedGeoJSONClient
3232
from pipelines.tasks.config.config_insee import get_insee_config
3333
from pipelines.tasks.config.config_laposte import get_laposte_config
34-
from pipelines.tasks.config.config_udi import get_udi_config
34+
from pipelines.tasks.config.config_uploaded_geojson import uploaded_geojson_config
3535
from pipelines.utils.logger import get_logger
3636

3737
logger = get_logger(__name__)
@@ -48,7 +48,7 @@ def execute(
4848
Execute the EDC dataset processing with specified parameters.
4949
5050
:param refresh_type: Type of refresh to perform ("all", "last", or "custom")
51-
:param refresh_table: which table to refresh ("all", "edc","commune", "udi")
51+
:param refresh_table: which tables to refresh ("all", "edc", "commune", "atlasante")
5252
:param custom_years: List of years to process when refresh_type is "custom"
5353
:param drop_tables: Whether to drop edc tables in the database before data insertion.
5454
"""
@@ -74,8 +74,8 @@ def execute(
7474
laposte.process_datasets()
7575
opendatasoft = OpenDataSoftClient(duckdb_client)
7676
opendatasoft.process_datasets()
77-
if refresh_table == "all" or refresh_table == "udi":
78-
udi_client = UDIClient(get_udi_config(), duckdb_client)
79-
udi_client.process_datasets()
77+
if refresh_table == "all" or refresh_table == "atlasante":
78+
geojson_client = UploadedGeoJSONClient(uploaded_geojson_config, duckdb_client)
79+
geojson_client.process_datasets()
8080

8181
duckdb_client.close()

pipelines/tasks/client/udi_client.py

Lines changed: 0 additions & 30 deletions
This file was deleted.
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import os
2+
from pathlib import Path
3+
4+
from pipelines.tasks.client.core.duckdb_client import DuckDBClient
5+
from pipelines.tasks.config.common import (
6+
CACHE_FOLDER,
7+
logger,
8+
)
9+
from pipelines.utils.storage_client import ObjectStorageClient
10+
11+
12+
class UploadedGeoJSONClient:
13+
"""Client pour télécharger et ingérer plusieurs fichiers GeoJSON uploadés préalablement manuellement sur S3"""
14+
15+
def __init__(self, config, duckdb_client: DuckDBClient):
16+
self.config = config
17+
self.duckdb_client = duckdb_client
18+
self.storage_client = ObjectStorageClient()
19+
20+
if "files" not in self.config:
21+
raise ValueError(
22+
"Configuration must contain a 'files' list with the GeoJSON files to process"
23+
)
24+
25+
self.files_config = self.config["files"]
26+
logger.info(
27+
f"UploadedGeoJSONClient initialized with {len(self.files_config)} file(s)"
28+
)
29+
30+
def process_datasets(self):
31+
logger.info(f"Processing {self.__class__.__name__} data")
32+
self._download_data()
33+
self._ingest_to_duckdb()
34+
logger.info(f"Finishing processing {self.__class__.__name__} data")
35+
36+
def _download_data(self):
37+
os.makedirs(CACHE_FOLDER, exist_ok=True)
38+
39+
for file_config in self.files_config:
40+
s3_key = (
41+
f"{self.config['source'].get('prefix', 'upload')}/{file_config['path']}"
42+
)
43+
local_path = Path(CACHE_FOLDER, file_config["local_file_name"])
44+
logger.info(f"Downloading {s3_key} to {local_path}")
45+
self.storage_client.download_object(
46+
file_key=s3_key, local_path=str(local_path)
47+
)
48+
49+
def _ingest_to_duckdb(self):
50+
logger.info(
51+
f"Ingesting {len(self.files_config)} uploaded GeoJSON file(s) into DuckDB"
52+
)
53+
54+
# Collect all table names for dropping
55+
table_names = [file_config["table_name"] for file_config in self.files_config]
56+
self.duckdb_client.drop_tables(table_names=table_names)
57+
58+
# Ingest each file
59+
for file_config in self.files_config:
60+
logger.info(
61+
f"Ingesting {file_config['local_file_name']} into table {file_config['table_name']}"
62+
)
63+
self.duckdb_client.ingest_from_geojson(
64+
table_name=file_config["table_name"],
65+
filepath=Path(CACHE_FOLDER, file_config["local_file_name"]),
66+
)
67+
logger.info(
68+
f"✅ {file_config['local_file_name']} has been ingested into table {file_config['table_name']}"
69+
)
70+
71+
logger.info("✅ All uploaded GeoJSON files have been ingested in DB")

0 commit comments

Comments
 (0)