Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
1a5fc78
add fct get_yearly_dataset_infos
moreaupascal56 Feb 2, 2025
dcb5fa5
parametrize process_sise_eaux_dataset_2024
moreaupascal56 Feb 2, 2025
2da6cdf
recreate cache folder by default in clear_cache()
moreaupascal56 Feb 2, 2025
5b3de67
add check_table_existence function
moreaupascal56 Feb 2, 2025
b46a69c
parametrize process_sise_eaux_dataset_2024()
moreaupascal56 Feb 2, 2025
e222562
format _common.py
moreaupascal56 Feb 3, 2025
b58894f
add process_sise_eaux_dataset() controller and rename process_sise_ea…
moreaupascal56 Feb 3, 2025
dcc29a3
upd docstrings, formatting
moreaupascal56 Feb 3, 2025
c628c8e
upd logs and add a clear_cache() in process_sise_eaux_dataset
moreaupascal56 Feb 3, 2025
69516eb
reorganize file
moreaupascal56 Feb 3, 2025
40e85e3
add notebook to preview data
jereze Feb 3, 2025
1ff4f23
fix Incompatible types in assignment
moreaupascal56 Feb 3, 2025
c0c9997
rename SISE to EDC
moreaupascal56 Feb 3, 2025
d6e1ab9
rename annee_prelevement to de_partition
moreaupascal56 Feb 3, 2025
077150e
catch and raise error if refresh_type not in the allowed values
moreaupascal56 Feb 3, 2025
700eb94
format
moreaupascal56 Feb 3, 2025
42950fa
fix typo
moreaupascal56 Feb 3, 2025
5358514
add _config_edc.py and use it accordingly
moreaupascal56 Feb 4, 2025
a5a19dd
add de_ingestion_date
moreaupascal56 Feb 4, 2025
c3470dd
make refresh_type="last" as default
moreaupascal56 Feb 4, 2025
5e901f8
fix example notebooks with new sql table names
moreaupascal56 Feb 5, 2025
1529dfb
delete test_plusierus_annees.ipynb
moreaupascal56 Feb 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo on filename and file is empty, add code or remove the notebook

Copy link
Collaborator Author

@moreaupascal56 moreaupascal56 Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jereze je peux supprimer ce notebook? je crois que tu l'as rajouté mais il a du y avoir une erreur lors de l'import ou du commit je ne sais pas (commit)

Empty file.
4 changes: 3 additions & 1 deletion pipelines/tasks/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
os.makedirs(DATABASE_FOLDER, exist_ok=True)


def clear_cache():
def clear_cache(recreate_folder: bool = True):
"""Clear the cache folder."""
shutil.rmtree(CACHE_FOLDER)
if recreate_folder:
os.makedirs(CACHE_FOLDER, exist_ok=True)
169 changes: 149 additions & 20 deletions pipelines/tasks/build_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import logging
import os
from typing import Dict, List, Literal
from zipfile import ZipFile

import duckdb
Expand All @@ -14,48 +15,176 @@
logger = logging.getLogger(__name__)


def process_sise_eaux_dataset_2024():
"""Process SISE-Eaux dataset for 2024."""
def check_table_existence(conn: duckdb.DuckDBPyConnection, table_name: str) -> bool:
"""
Check if a table exists in the duckdb database
:param conn: The duckdb connection to use
:param table_name: The table name to check existence
:return: True if the table exists, False if not
"""
query = f"""
SELECT COUNT(*)
FROM information_schema.tables
WHERE table_name = '{table_name}'
"""
conn.execute(query)
return list(conn.fetchone())[0] == 1


def get_yearly_dataset_infos(year: str) -> Dict[str, str]:
"""
Returns information for yearly dataset extract of the SISE Eaux datasets.
The data comes from https://www.data.gouv.fr/fr/datasets/resultats-du-controle-sanitaire-de-leau-distribuee-commune-par-commune/
For each year a dataset is downloadable on a URL like this (ex. 2024):
https://www.data.gouv.fr/fr/datasets/r/84a67a3b-08a7-4001-98e6-231c74a98139
The id of the dataset is the last part of this URL
The name of the dataset is dis-YEAR.zip (but the format could potentially change).
:param year: The year from which we want to get the dataset information.
:return: A dict with the id and name of the dataset.
"""
dis_files_info_by_year = {
"2024": {"id": "84a67a3b-08a7-4001-98e6-231c74a98139", "name": "dis-2024.zip"},
"2023": {"id": "c89dec4a-d985-447c-a102-75ba814c398e", "name": "dis-2023.zip"},
"2022": {"id": "a97b6074-c4dd-4ef2-8922-b0cf04dbff9a", "name": "dis-2022.zip"},
"2021": {"id": "d2b432cc-3761-44d3-8e66-48bc15300bb5", "name": "dis-2021.zip"},
"2020": {"id": "a6cb4fea-ef8c-47a5-acb3-14e49ccad801", "name": "dis-2020.zip"},
"2019": {"id": "861f2a7d-024c-4bf0-968b-9e3069d9de07", "name": "dis-2019.zip"},
"2018": {"id": "0513b3c0-dc18-468d-a969-b3508f079792", "name": "dis-2018.zip"},
"2017": {"id": "5785427b-3167-49fa-a581-aef835f0fb04", "name": "dis-2017.zip"},
"2016": {"id": "483c84dd-7912-483b-b96f-4fa5e1d8651f", "name": "dis-2016.zip"},
}
return dis_files_info_by_year[year]


def download_extract_insert_yearly_SISE_data(year: str):
"""
Downloads from www.data.gouv.fr the SISE-Eaux dataset for one year, extract the files and insert into duckdb
:param year: The year from which we want to download the dataset
:return: Create or replace the associated tables in the duckcb database.
It adds the column "annee_prelevement" based on year as an integer.
"""

yearly_dataset_info = get_yearly_dataset_infos(year=year)

# Dataset specific constants
DATA_URL = (
"https://www.data.gouv.fr/fr/datasets/r/84a67a3b-08a7-4001-98e6-231c74a98139"
)
ZIP_FILE = os.path.join(CACHE_FOLDER, "dis-2024.zip")
EXTRACT_FOLDER = os.path.join(CACHE_FOLDER, "raw_data_2024")
DATA_URL = f"https://www.data.gouv.fr/fr/datasets/r/{yearly_dataset_info['id']}"
ZIP_FILE = os.path.join(CACHE_FOLDER, yearly_dataset_info["name"])
EXTRACT_FOLDER = os.path.join(CACHE_FOLDER, f"raw_data_{year}")

FILES = {
"communes": {"filename": "DIS_COM_UDI_2024.txt", "table": "sise_communes"},
"prelevements": {"filename": "DIS_PLV_2024.txt", "table": "sise_prelevements"},
"resultats": {"filename": "DIS_RESULT_2024.txt", "table": "sise_resultats"},
"communes": {
"filename_prefix": f"DIS_COM_UDI_",
"file_extension": ".txt",
"table_name": f"sise_communes",
},
"prelevements": {
"filename_prefix": f"DIS_PLV_",
"file_extension": ".txt",
"table_name": f"sise_prelevements",
},
"resultats": {
"filename_prefix": f"DIS_RESULT_",
"file_extension": ".txt",
"table_name": f"sise_resultats",
},
}

logger.info("Downloading and extracting SISE-Eaux dataset for 2024...")
logger.info(f"Processing SISE-Eaux dataset for {year}...")
response = requests.get(DATA_URL, stream=True)
with open(ZIP_FILE, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)

logger.info("Extracting files...")
logger.info(" Extracting files...")
with ZipFile(ZIP_FILE, "r") as zip_ref:
zip_ref.extractall(EXTRACT_FOLDER)

logger.info("Creating tables in the database...")
logger.info(" Creating or updating tables in the database...")
conn = duckdb.connect(DUCKDB_FILE)

for file_info in FILES.values():
filepath = os.path.join(EXTRACT_FOLDER, file_info["filename"])
query = f"""
CREATE OR REPLACE TABLE {file_info["table"]} AS
SELECT * FROM read_csv('{filepath}', header=true, delim=',');
filepath = os.path.join(
EXTRACT_FOLDER,
f"{file_info['filename_prefix']}{year}{file_info['file_extension']}",
)

if check_table_existence(conn=conn, table_name=f"{file_info['table_name']}"):
query = f"""
DELETE FROM {f"{file_info['table_name']}"}
WHERE annee_prelevement = CAST({year} as INTEGER)
;
"""
conn.execute(query)
query_start = f"INSERT INTO {f'{file_info["table_name"]}'} "

else:
query_start = f"CREATE TABLE {f'{file_info["table_name"]}'} AS "

query_select = f"""
SELECT
*,
CAST({year} as INTEGER) AS annee_prelevement
FROM read_csv('{filepath}', header=true, delim=',');
"""
conn.execute(query)

conn.execute(query_start + query_select)

conn.close()

logger.info("Cleaning up...")
logger.info(" Cleaning up cache...")
clear_cache()

return True


def process_sise_eaux_dataset(
refresh_type: Literal["all", "last", "custom"] = "all",
custom_years: List[str] = None,
):
"""
Process the SISE eaux dataset.
:param refresh_type: Refresh type to run
- "all": Refresh the data for every possible year
- "last": Refresh the data only for the last available year
- "custom": Refresh the data for the years specified in the list custom_years
:param custom_years: years to update
:return:
"""
available_years = [
"2016",
"2017",
"2018",
"2019",
"2020",
"2021",
"2022",
"2023",
"2024",
]

if refresh_type == "all":
years_to_update = available_years
elif refresh_type == "last":
years_to_update = available_years[-1]
elif refresh_type == "custom":
if custom_years:
years_to_update = list(set(custom_years).intersection(available_years))
else:
raise ValueError(
""" custom_years parameter needs to be specified if refresh_type="custom" """
)

logger.info(
f"Launching processing of SISE-Eaux dataset for years: {years_to_update}"
)

for year in years_to_update:
download_extract_insert_yearly_SISE_data(year=year)

logger.info("Cleaning up cache...")
clear_cache(recreate_folder=False)
return True


def execute():
process_sise_eaux_dataset_2024()
process_sise_eaux_dataset()