From 1a5fc78bfad921b4a31d3cab8978b3f9a116a11b Mon Sep 17 00:00:00 2001 From: moreaupascal56 Date: Sun, 2 Feb 2025 22:16:27 +0100 Subject: [PATCH 01/22] add fct get_yearly_dataset_infos --- pipelines/tasks/build_database.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/pipelines/tasks/build_database.py b/pipelines/tasks/build_database.py index a70de57f..75d5484b 100644 --- a/pipelines/tasks/build_database.py +++ b/pipelines/tasks/build_database.py @@ -4,6 +4,7 @@ import logging import os +from typing import Dict from zipfile import ZipFile import duckdb @@ -13,6 +14,30 @@ logger = logging.getLogger(__name__) +def get_yearly_dataset_infos(year:str) -> Dict[str,str]: + """ + Returns information for yearly dataset extract of the SISE Eaux datasets. + The data comes from https://www.data.gouv.fr/fr/datasets/resultats-du-controle-sanitaire-de-leau-distribuee-commune-par-commune/ + For each year a dataset is downloadable on a URL like this (ex. 2024): + https://www.data.gouv.fr/fr/datasets/r/84a67a3b-08a7-4001-98e6-231c74a98139 + The id of the dataset is the last part of this URL + The name of the dataset is dis-YEAR.zip (but the format could potentially change). + :param year: The year from which we want to get the dataset information. + :return: A dict with the id and name of the dataset. + """ + dis_files_info_by_year = { + "2024": {"id": "84a67a3b-08a7-4001-98e6-231c74a98139", "name" : "dis-2024.zip"}, + "2023": {"id":"c89dec4a-d985-447c-a102-75ba814c398e", "name" : "dis-2023.zip"}, + "2022": {"id":"a97b6074-c4dd-4ef2-8922-b0cf04dbff9a", "name" : "dis-2022.zip"}, + "2021": {"id":"d2b432cc-3761-44d3-8e66-48bc15300bb5", "name" : "dis-2021.zip"}, + "2020": {"id":"a6cb4fea-ef8c-47a5-acb3-14e49ccad801", "name" : "dis-2020.zip"}, + "2019": {"id":"861f2a7d-024c-4bf0-968b-9e3069d9de07", "name" : "dis-2019.zip"}, + "2018": {"id":"0513b3c0-dc18-468d-a969-b3508f079792", "name" : "dis-2018.zip"}, + "2017": {"id":"5785427b-3167-49fa-a581-aef835f0fb04", "name" : "dis-2017.zip"}, + "2016": {"id":"483c84dd-7912-483b-b96f-4fa5e1d8651f", "name" : "dis-2016.zip"} + } + return dis_files_info_by_year[year] + def process_sise_eaux_dataset_2024(): """Process SISE-Eaux dataset for 2024.""" From dcb5fa500205b329789df855afba749c255c8dd9 Mon Sep 17 00:00:00 2001 From: moreaupascal56 Date: Sun, 2 Feb 2025 22:17:37 +0100 Subject: [PATCH 02/22] parametrize process_sise_eaux_dataset_2024 --- pipelines/tasks/build_database.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/pipelines/tasks/build_database.py b/pipelines/tasks/build_database.py index 75d5484b..aec49ce9 100644 --- a/pipelines/tasks/build_database.py +++ b/pipelines/tasks/build_database.py @@ -39,23 +39,24 @@ def get_yearly_dataset_infos(year:str) -> Dict[str,str]: return dis_files_info_by_year[year] -def process_sise_eaux_dataset_2024(): +def process_sise_eaux_dataset_2024(year:str): """Process SISE-Eaux dataset for 2024.""" + yearly_dataset_info = get_yearly_dataset_infos(year = year) # Dataset specific constants DATA_URL = ( - "https://www.data.gouv.fr/fr/datasets/r/84a67a3b-08a7-4001-98e6-231c74a98139" + f"https://www.data.gouv.fr/fr/datasets/r/{yearly_dataset_info["id"]}" ) - ZIP_FILE = os.path.join(CACHE_FOLDER, "dis-2024.zip") - EXTRACT_FOLDER = os.path.join(CACHE_FOLDER, "raw_data_2024") + ZIP_FILE = os.path.join(CACHE_FOLDER, yearly_dataset_info["name"]) + EXTRACT_FOLDER = os.path.join(CACHE_FOLDER, f"raw_data_{year}") FILES = { - "communes": {"filename": "DIS_COM_UDI_2024.txt", "table": "sise_communes"}, - "prelevements": {"filename": "DIS_PLV_2024.txt", "table": "sise_prelevements"}, - "resultats": {"filename": "DIS_RESULT_2024.txt", "table": "sise_resultats"}, + "communes": {"filename": f"DIS_COM_UDI_{year}.txt", "table": f"sise_communes_{year}"}, + "prelevements": {"filename": f"DIS_PLV_{year}.txt", "table": f"sise_prelevements_{year}"}, + "resultats": {"filename": f"DIS_RESULT_{year}.txt", "table": f"sise_resultats_{year}"}, } - logger.info("Downloading and extracting SISE-Eaux dataset for 2024...") + logger.info(f"Downloading and extracting SISE-Eaux dataset for {year}...") response = requests.get(DATA_URL, stream=True) with open(ZIP_FILE, "wb") as f: for chunk in response.iter_content(chunk_size=8192): @@ -83,4 +84,4 @@ def process_sise_eaux_dataset_2024(): def execute(): - process_sise_eaux_dataset_2024() + process_sise_eaux_dataset_2024(year="2024") From 2da6cdf6f5107ec8d5d96b38f3508b8c65541004 Mon Sep 17 00:00:00 2001 From: moreaupascal56 Date: Mon, 3 Feb 2025 00:38:32 +0100 Subject: [PATCH 03/22] recreate cache folder by default in clear_cache() --- pipelines/tasks/_common.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pipelines/tasks/_common.py b/pipelines/tasks/_common.py index f296669b..81641952 100644 --- a/pipelines/tasks/_common.py +++ b/pipelines/tasks/_common.py @@ -11,6 +11,8 @@ os.makedirs(DATABASE_FOLDER, exist_ok=True) -def clear_cache(): +def clear_cache(recreate_folder:bool=True): """Clear the cache folder.""" shutil.rmtree(CACHE_FOLDER) + if recreate_folder: + os.makedirs(CACHE_FOLDER, exist_ok=True) \ No newline at end of file From 5b3de6769298fe0bc92903f477f343bea7564bd6 Mon Sep 17 00:00:00 2001 From: moreaupascal56 Date: Mon, 3 Feb 2025 00:46:51 +0100 Subject: [PATCH 04/22] add check_table_existence function --- pipelines/tasks/build_database.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pipelines/tasks/build_database.py b/pipelines/tasks/build_database.py index aec49ce9..b9e38379 100644 --- a/pipelines/tasks/build_database.py +++ b/pipelines/tasks/build_database.py @@ -83,5 +83,15 @@ def process_sise_eaux_dataset_2024(year:str): return True +def check_table_existence(conn, table_name): + query = f""" + SELECT COUNT(*) + FROM information_schema.tables + WHERE table_name = '{table_name}' + """ + conn.execute(query) + return list(conn.fetchone())[0] == 1 + + def execute(): process_sise_eaux_dataset_2024(year="2024") From b46a69c8da4cf990b85fe367fbe0b309db4d88ea Mon Sep 17 00:00:00 2001 From: moreaupascal56 Date: Mon, 3 Feb 2025 00:48:23 +0100 Subject: [PATCH 05/22] parametrize process_sise_eaux_dataset_2024() --- pipelines/tasks/build_database.py | 65 ++++++++++++++++++++++++------- 1 file changed, 51 insertions(+), 14 deletions(-) diff --git a/pipelines/tasks/build_database.py b/pipelines/tasks/build_database.py index b9e38379..74338f00 100644 --- a/pipelines/tasks/build_database.py +++ b/pipelines/tasks/build_database.py @@ -39,21 +39,37 @@ def get_yearly_dataset_infos(year:str) -> Dict[str,str]: return dis_files_info_by_year[year] -def process_sise_eaux_dataset_2024(year:str): - """Process SISE-Eaux dataset for 2024.""" +def process_sise_eaux_dataset_2024(year: str): + """ + Downloads from www.data.gouv.fr the SISE-Eaux dataset for one year + :param year: The year from which we want to download the dataset + :return: Create or replace the associated tables in the duckcb database. + It adds the column "annee_prelevement" based on year as an integer. + """ + + yearly_dataset_info = get_yearly_dataset_infos(year=year) - yearly_dataset_info = get_yearly_dataset_infos(year = year) # Dataset specific constants - DATA_URL = ( - f"https://www.data.gouv.fr/fr/datasets/r/{yearly_dataset_info["id"]}" - ) + DATA_URL = f"https://www.data.gouv.fr/fr/datasets/r/{yearly_dataset_info['id']}" ZIP_FILE = os.path.join(CACHE_FOLDER, yearly_dataset_info["name"]) EXTRACT_FOLDER = os.path.join(CACHE_FOLDER, f"raw_data_{year}") FILES = { - "communes": {"filename": f"DIS_COM_UDI_{year}.txt", "table": f"sise_communes_{year}"}, - "prelevements": {"filename": f"DIS_PLV_{year}.txt", "table": f"sise_prelevements_{year}"}, - "resultats": {"filename": f"DIS_RESULT_{year}.txt", "table": f"sise_resultats_{year}"}, + "communes": { + "filename_prefix": f"DIS_COM_UDI_", + "file_extension": ".txt", + "table_name": f"sise_communes", + }, + "prelevements": { + "filename_prefix": f"DIS_PLV_", + "file_extension": ".txt", + "table_name": f"sise_prelevements", + }, + "resultats": { + "filename_prefix": f"DIS_RESULT_", + "file_extension": ".txt", + "table_name": f"sise_resultats", + }, } logger.info(f"Downloading and extracting SISE-Eaux dataset for {year}...") @@ -68,13 +84,34 @@ def process_sise_eaux_dataset_2024(year:str): logger.info("Creating tables in the database...") conn = duckdb.connect(DUCKDB_FILE) + for file_info in FILES.values(): - filepath = os.path.join(EXTRACT_FOLDER, file_info["filename"]) - query = f""" - CREATE OR REPLACE TABLE {file_info["table"]} AS - SELECT * FROM read_csv('{filepath}', header=true, delim=','); + filepath = os.path.join( + EXTRACT_FOLDER, + f"{file_info['filename_prefix']}{year}{file_info['file_extension']}", + ) + + if check_table_existence(conn=conn, table_name=f"{file_info['table_name']}"): + query = f""" + DELETE FROM {f"{file_info['table_name']}"} + WHERE annee_prelevement = {year} + ; + """ + conn.execute(query) + query_start = f"INSERT INTO {f'{file_info["table_name"]}'} " + + else: + query_start = f"CREATE TABLE {f'{file_info["table_name"]}'} AS " + + query_select = f""" + SELECT + *, + CAST({year} as INTEGER) AS annee_prelevement + FROM read_csv('{filepath}', header=true, delim=','); """ - conn.execute(query) + + conn.execute(query_start + query_select) + conn.close() logger.info("Cleaning up...") From e22256204a5db38702c86c0af08b4c3e1087c6eb Mon Sep 17 00:00:00 2001 From: moreaupascal56 Date: Mon, 3 Feb 2025 01:07:57 +0100 Subject: [PATCH 06/22] format _common.py --- pipelines/tasks/_common.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pipelines/tasks/_common.py b/pipelines/tasks/_common.py index 81641952..9b8cedb8 100644 --- a/pipelines/tasks/_common.py +++ b/pipelines/tasks/_common.py @@ -11,8 +11,8 @@ os.makedirs(DATABASE_FOLDER, exist_ok=True) -def clear_cache(recreate_folder:bool=True): +def clear_cache(recreate_folder: bool = True): """Clear the cache folder.""" shutil.rmtree(CACHE_FOLDER) if recreate_folder: - os.makedirs(CACHE_FOLDER, exist_ok=True) \ No newline at end of file + os.makedirs(CACHE_FOLDER, exist_ok=True) From b58894fde3328cbcfa56b54d1fcc6130fb826854 Mon Sep 17 00:00:00 2001 From: moreaupascal56 Date: Mon, 3 Feb 2025 01:09:06 +0100 Subject: [PATCH 07/22] add process_sise_eaux_dataset() controller and rename process_sise_eaux_dataset_2024 to download_extract_insert_yearly_SISE_data --- pipelines/tasks/build_database.py | 49 +++++++++++++++++++++++++++++-- 1 file changed, 46 insertions(+), 3 deletions(-) diff --git a/pipelines/tasks/build_database.py b/pipelines/tasks/build_database.py index 74338f00..31478923 100644 --- a/pipelines/tasks/build_database.py +++ b/pipelines/tasks/build_database.py @@ -39,9 +39,9 @@ def get_yearly_dataset_infos(year:str) -> Dict[str,str]: return dis_files_info_by_year[year] -def process_sise_eaux_dataset_2024(year: str): +def download_extract_insert_yearly_SISE_data(year: str): """ - Downloads from www.data.gouv.fr the SISE-Eaux dataset for one year + Downloads from www.data.gouv.fr the SISE-Eaux dataset for one year, extract the files and insert into duckdb :param year: The year from which we want to download the dataset :return: Create or replace the associated tables in the duckcb database. It adds the column "annee_prelevement" based on year as an integer. @@ -94,7 +94,7 @@ def process_sise_eaux_dataset_2024(year: str): if check_table_existence(conn=conn, table_name=f"{file_info['table_name']}"): query = f""" DELETE FROM {f"{file_info['table_name']}"} - WHERE annee_prelevement = {year} + WHERE annee_prelevement = CAST({year} as INTEGER) ; """ conn.execute(query) @@ -130,5 +130,48 @@ def check_table_existence(conn, table_name): return list(conn.fetchone())[0] == 1 +def process_sise_eaux_dataset( + refresh_type: Literal["all", "last", "custom"] = "all", + custom_years: List[str] = None, +): + """ + Process the SISE eaux dataset. + :param refresh_type: Refresh type to run + - "all": Refresh the data for every possible year + - "last": Refresh the data only for the last available year + - "custom": Refresh the data for the years specified in the list custom_years + :param custom_years: years to update + :return: + """ + available_years = [ + "2016", + "2017", + "2018", + "2019", + "2020", + "2021", + "2022", + "2023", + "2024", + ] + + if refresh_type == "all": + years_to_update = available_years + elif refresh_type == "last": + years_to_update = available_years[-1] + elif refresh_type == "custom": + if custom_years: + years_to_update = list(set(custom_years).intersection(available_years)) + else: + raise ValueError( + """ custom_years parameter needs to be specified if refresh_type="custom" """ + ) + + for year in years_to_update: + download_extract_insert_yearly_SISE_data(year=year) + + return True + + def execute(): process_sise_eaux_dataset_2024(year="2024") From dcc29a389842d9f019d04b302b8c441742af2ae6 Mon Sep 17 00:00:00 2001 From: moreaupascal56 Date: Mon, 3 Feb 2025 01:09:58 +0100 Subject: [PATCH 08/22] upd docstrings, formatting --- pipelines/tasks/build_database.py | 33 +++++++++++++++++++------------ 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/pipelines/tasks/build_database.py b/pipelines/tasks/build_database.py index 31478923..b1fd9daf 100644 --- a/pipelines/tasks/build_database.py +++ b/pipelines/tasks/build_database.py @@ -4,7 +4,7 @@ import logging import os -from typing import Dict +from typing import Dict, List, Literal from zipfile import ZipFile import duckdb @@ -14,7 +14,8 @@ logger = logging.getLogger(__name__) -def get_yearly_dataset_infos(year:str) -> Dict[str,str]: + +def get_yearly_dataset_infos(year: str) -> Dict[str, str]: """ Returns information for yearly dataset extract of the SISE Eaux datasets. The data comes from https://www.data.gouv.fr/fr/datasets/resultats-du-controle-sanitaire-de-leau-distribuee-commune-par-commune/ @@ -26,15 +27,15 @@ def get_yearly_dataset_infos(year:str) -> Dict[str,str]: :return: A dict with the id and name of the dataset. """ dis_files_info_by_year = { - "2024": {"id": "84a67a3b-08a7-4001-98e6-231c74a98139", "name" : "dis-2024.zip"}, - "2023": {"id":"c89dec4a-d985-447c-a102-75ba814c398e", "name" : "dis-2023.zip"}, - "2022": {"id":"a97b6074-c4dd-4ef2-8922-b0cf04dbff9a", "name" : "dis-2022.zip"}, - "2021": {"id":"d2b432cc-3761-44d3-8e66-48bc15300bb5", "name" : "dis-2021.zip"}, - "2020": {"id":"a6cb4fea-ef8c-47a5-acb3-14e49ccad801", "name" : "dis-2020.zip"}, - "2019": {"id":"861f2a7d-024c-4bf0-968b-9e3069d9de07", "name" : "dis-2019.zip"}, - "2018": {"id":"0513b3c0-dc18-468d-a969-b3508f079792", "name" : "dis-2018.zip"}, - "2017": {"id":"5785427b-3167-49fa-a581-aef835f0fb04", "name" : "dis-2017.zip"}, - "2016": {"id":"483c84dd-7912-483b-b96f-4fa5e1d8651f", "name" : "dis-2016.zip"} + "2024": {"id": "84a67a3b-08a7-4001-98e6-231c74a98139", "name": "dis-2024.zip"}, + "2023": {"id": "c89dec4a-d985-447c-a102-75ba814c398e", "name": "dis-2023.zip"}, + "2022": {"id": "a97b6074-c4dd-4ef2-8922-b0cf04dbff9a", "name": "dis-2022.zip"}, + "2021": {"id": "d2b432cc-3761-44d3-8e66-48bc15300bb5", "name": "dis-2021.zip"}, + "2020": {"id": "a6cb4fea-ef8c-47a5-acb3-14e49ccad801", "name": "dis-2020.zip"}, + "2019": {"id": "861f2a7d-024c-4bf0-968b-9e3069d9de07", "name": "dis-2019.zip"}, + "2018": {"id": "0513b3c0-dc18-468d-a969-b3508f079792", "name": "dis-2018.zip"}, + "2017": {"id": "5785427b-3167-49fa-a581-aef835f0fb04", "name": "dis-2017.zip"}, + "2016": {"id": "483c84dd-7912-483b-b96f-4fa5e1d8651f", "name": "dis-2016.zip"}, } return dis_files_info_by_year[year] @@ -120,7 +121,13 @@ def download_extract_insert_yearly_SISE_data(year: str): return True -def check_table_existence(conn, table_name): +def check_table_existence(conn: duckdb.DuckDBPyConnection, table_name: str) -> bool: + """ + Check if a table exists in the duckdb database + :param conn: The duckdb connection to use + :param table_name: The table name to check existence + :return: True if the table exists, False if not + """ query = f""" SELECT COUNT(*) FROM information_schema.tables @@ -174,4 +181,4 @@ def process_sise_eaux_dataset( def execute(): - process_sise_eaux_dataset_2024(year="2024") + process_sise_eaux_dataset() From c628c8e101359800d9067ff8fc4ec1870e5d803f Mon Sep 17 00:00:00 2001 From: moreaupascal56 Date: Mon, 3 Feb 2025 01:15:47 +0100 Subject: [PATCH 09/22] upd logs and add a clear_cache() in process_sise_eaux_dataset --- pipelines/tasks/build_database.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/pipelines/tasks/build_database.py b/pipelines/tasks/build_database.py index b1fd9daf..90dc6142 100644 --- a/pipelines/tasks/build_database.py +++ b/pipelines/tasks/build_database.py @@ -73,17 +73,17 @@ def download_extract_insert_yearly_SISE_data(year: str): }, } - logger.info(f"Downloading and extracting SISE-Eaux dataset for {year}...") + logger.info(f"Processing SISE-Eaux dataset for {year}...") response = requests.get(DATA_URL, stream=True) with open(ZIP_FILE, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) - logger.info("Extracting files...") + logger.info(" Extracting files...") with ZipFile(ZIP_FILE, "r") as zip_ref: zip_ref.extractall(EXTRACT_FOLDER) - logger.info("Creating tables in the database...") + logger.info(" Creating or updating tables in the database...") conn = duckdb.connect(DUCKDB_FILE) for file_info in FILES.values(): @@ -115,7 +115,7 @@ def download_extract_insert_yearly_SISE_data(year: str): conn.close() - logger.info("Cleaning up...") + logger.info(" Cleaning up cache...") clear_cache() return True @@ -174,9 +174,15 @@ def process_sise_eaux_dataset( """ custom_years parameter needs to be specified if refresh_type="custom" """ ) + logger.info( + f"Launching processing of SISE-Eaux dataset for years: {years_to_update}" + ) + for year in years_to_update: download_extract_insert_yearly_SISE_data(year=year) + logger.info("Cleaning up cache...") + clear_cache(recreate_folder=False) return True From 69516eb59c0f8a06f847370ec2f02633f4d13153 Mon Sep 17 00:00:00 2001 From: moreaupascal56 Date: Mon, 3 Feb 2025 02:04:31 +0100 Subject: [PATCH 10/22] reorganize file --- pipelines/tasks/build_database.py | 32 +++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/pipelines/tasks/build_database.py b/pipelines/tasks/build_database.py index 90dc6142..a8127228 100644 --- a/pipelines/tasks/build_database.py +++ b/pipelines/tasks/build_database.py @@ -15,6 +15,22 @@ logger = logging.getLogger(__name__) +def check_table_existence(conn: duckdb.DuckDBPyConnection, table_name: str) -> bool: + """ + Check if a table exists in the duckdb database + :param conn: The duckdb connection to use + :param table_name: The table name to check existence + :return: True if the table exists, False if not + """ + query = f""" + SELECT COUNT(*) + FROM information_schema.tables + WHERE table_name = '{table_name}' + """ + conn.execute(query) + return list(conn.fetchone())[0] == 1 + + def get_yearly_dataset_infos(year: str) -> Dict[str, str]: """ Returns information for yearly dataset extract of the SISE Eaux datasets. @@ -121,22 +137,6 @@ def download_extract_insert_yearly_SISE_data(year: str): return True -def check_table_existence(conn: duckdb.DuckDBPyConnection, table_name: str) -> bool: - """ - Check if a table exists in the duckdb database - :param conn: The duckdb connection to use - :param table_name: The table name to check existence - :return: True if the table exists, False if not - """ - query = f""" - SELECT COUNT(*) - FROM information_schema.tables - WHERE table_name = '{table_name}' - """ - conn.execute(query) - return list(conn.fetchone())[0] == 1 - - def process_sise_eaux_dataset( refresh_type: Literal["all", "last", "custom"] = "all", custom_years: List[str] = None, From 40e85e3b0fb9f2a46b1d7678aa4db9a941240592 Mon Sep 17 00:00:00 2001 From: Jeremy Greze Date: Mon, 3 Feb 2025 13:06:13 +0100 Subject: [PATCH 11/22] add notebook to preview data --- pipelines/notebooks/test_plusierus_annees.ipynb | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 pipelines/notebooks/test_plusierus_annees.ipynb diff --git a/pipelines/notebooks/test_plusierus_annees.ipynb b/pipelines/notebooks/test_plusierus_annees.ipynb new file mode 100644 index 00000000..e69de29b From 1ff4f23d78f10133d92b24ed5d0f1cb23fc07798 Mon Sep 17 00:00:00 2001 From: moreaupascal56 Date: Mon, 3 Feb 2025 14:44:58 +0100 Subject: [PATCH 12/22] fix Incompatible types in assignment --- pipelines/tasks/build_database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/tasks/build_database.py b/pipelines/tasks/build_database.py index a8127228..e51e665e 100644 --- a/pipelines/tasks/build_database.py +++ b/pipelines/tasks/build_database.py @@ -165,7 +165,7 @@ def process_sise_eaux_dataset( if refresh_type == "all": years_to_update = available_years elif refresh_type == "last": - years_to_update = available_years[-1] + years_to_update = available_years[-1:] elif refresh_type == "custom": if custom_years: years_to_update = list(set(custom_years).intersection(available_years)) From c0c99976f8376e6e88db966d9f8e10c81fbc2ccb Mon Sep 17 00:00:00 2001 From: moreaupascal56 Date: Mon, 3 Feb 2025 15:05:20 +0100 Subject: [PATCH 13/22] rename SISE to EDC --- pipelines/tasks/build_database.py | 35 +++++++++++++++++-------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/pipelines/tasks/build_database.py b/pipelines/tasks/build_database.py index e51e665e..72e1e42b 100644 --- a/pipelines/tasks/build_database.py +++ b/pipelines/tasks/build_database.py @@ -31,9 +31,9 @@ def check_table_existence(conn: duckdb.DuckDBPyConnection, table_name: str) -> b return list(conn.fetchone())[0] == 1 -def get_yearly_dataset_infos(year: str) -> Dict[str, str]: +def get_yearly_edc_infos(year: str) -> Dict[str, str]: """ - Returns information for yearly dataset extract of the SISE Eaux datasets. + Returns information for yearly dataset extract of the EDC (Eau distribuée par commune) datasets. The data comes from https://www.data.gouv.fr/fr/datasets/resultats-du-controle-sanitaire-de-leau-distribuee-commune-par-commune/ For each year a dataset is downloadable on a URL like this (ex. 2024): https://www.data.gouv.fr/fr/datasets/r/84a67a3b-08a7-4001-98e6-231c74a98139 @@ -42,7 +42,7 @@ def get_yearly_dataset_infos(year: str) -> Dict[str, str]: :param year: The year from which we want to get the dataset information. :return: A dict with the id and name of the dataset. """ - dis_files_info_by_year = { + edc_dis_files_info_by_year = { "2024": {"id": "84a67a3b-08a7-4001-98e6-231c74a98139", "name": "dis-2024.zip"}, "2023": {"id": "c89dec4a-d985-447c-a102-75ba814c398e", "name": "dis-2023.zip"}, "2022": {"id": "a97b6074-c4dd-4ef2-8922-b0cf04dbff9a", "name": "dis-2022.zip"}, @@ -53,18 +53,19 @@ def get_yearly_dataset_infos(year: str) -> Dict[str, str]: "2017": {"id": "5785427b-3167-49fa-a581-aef835f0fb04", "name": "dis-2017.zip"}, "2016": {"id": "483c84dd-7912-483b-b96f-4fa5e1d8651f", "name": "dis-2016.zip"}, } - return dis_files_info_by_year[year] + return edc_dis_files_info_by_year[year] -def download_extract_insert_yearly_SISE_data(year: str): +def download_extract_insert_yearly_edc_data(year: str): """ - Downloads from www.data.gouv.fr the SISE-Eaux dataset for one year, extract the files and insert into duckdb + Downloads from www.data.gouv.fr the EDC (Eau distribuée par commune) dataset for one year, + extracts the files and insert the data into duckdb :param year: The year from which we want to download the dataset :return: Create or replace the associated tables in the duckcb database. It adds the column "annee_prelevement" based on year as an integer. """ - yearly_dataset_info = get_yearly_dataset_infos(year=year) + yearly_dataset_info = get_yearly_edc_infos(year=year) # Dataset specific constants DATA_URL = f"https://www.data.gouv.fr/fr/datasets/r/{yearly_dataset_info['id']}" @@ -75,21 +76,21 @@ def download_extract_insert_yearly_SISE_data(year: str): "communes": { "filename_prefix": f"DIS_COM_UDI_", "file_extension": ".txt", - "table_name": f"sise_communes", + "table_name": f"edc_communes", }, "prelevements": { "filename_prefix": f"DIS_PLV_", "file_extension": ".txt", - "table_name": f"sise_prelevements", + "table_name": f"edc_prelevements", }, "resultats": { "filename_prefix": f"DIS_RESULT_", "file_extension": ".txt", - "table_name": f"sise_resultats", + "table_name": f"edc_resultats", }, } - logger.info(f"Processing SISE-Eaux dataset for {year}...") + logger.info(f"Processing EDC dataset for {year}...") response = requests.get(DATA_URL, stream=True) with open(ZIP_FILE, "wb") as f: for chunk in response.iter_content(chunk_size=8192): @@ -137,12 +138,12 @@ def download_extract_insert_yearly_SISE_data(year: str): return True -def process_sise_eaux_dataset( +def process_edc_datasets( refresh_type: Literal["all", "last", "custom"] = "all", custom_years: List[str] = None, ): """ - Process the SISE eaux dataset. + Process the EDC datasets. :param refresh_type: Refresh type to run - "all": Refresh the data for every possible year - "last": Refresh the data only for the last available year @@ -175,11 +176,11 @@ def process_sise_eaux_dataset( ) logger.info( - f"Launching processing of SISE-Eaux dataset for years: {years_to_update}" + f"Launching processing of EDC datasets for years: {years_to_update}" ) for year in years_to_update: - download_extract_insert_yearly_SISE_data(year=year) + download_extract_insert_yearly_edc_data(year=year) logger.info("Cleaning up cache...") clear_cache(recreate_folder=False) @@ -187,4 +188,6 @@ def process_sise_eaux_dataset( def execute(): - process_sise_eaux_dataset() + process_edc_datasets() + + conn = duckdb.connect(DUCKDB_FILE) From d6e1ab9142ee4fa8f6b3ffbf6df8d0d324f86b7f Mon Sep 17 00:00:00 2001 From: moreaupascal56 Date: Mon, 3 Feb 2025 15:06:33 +0100 Subject: [PATCH 14/22] rename annee_prelevement to de_partition --- pipelines/tasks/build_database.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pipelines/tasks/build_database.py b/pipelines/tasks/build_database.py index 72e1e42b..97713cbe 100644 --- a/pipelines/tasks/build_database.py +++ b/pipelines/tasks/build_database.py @@ -62,7 +62,7 @@ def download_extract_insert_yearly_edc_data(year: str): extracts the files and insert the data into duckdb :param year: The year from which we want to download the dataset :return: Create or replace the associated tables in the duckcb database. - It adds the column "annee_prelevement" based on year as an integer. + It adds the column "de_partition" based on year as an integer. """ yearly_dataset_info = get_yearly_edc_infos(year=year) @@ -112,7 +112,7 @@ def download_extract_insert_yearly_edc_data(year: str): if check_table_existence(conn=conn, table_name=f"{file_info['table_name']}"): query = f""" DELETE FROM {f"{file_info['table_name']}"} - WHERE annee_prelevement = CAST({year} as INTEGER) + WHERE de_partition = CAST({year} as INTEGER) ; """ conn.execute(query) @@ -124,7 +124,7 @@ def download_extract_insert_yearly_edc_data(year: str): query_select = f""" SELECT *, - CAST({year} as INTEGER) AS annee_prelevement + CAST({year} as INTEGER) AS de_partition FROM read_csv('{filepath}', header=true, delim=','); """ From 077150e1a4b715c127dc1c7885c386f4027b7127 Mon Sep 17 00:00:00 2001 From: moreaupascal56 Date: Mon, 3 Feb 2025 15:06:55 +0100 Subject: [PATCH 15/22] catch and raise error if refresh_type not in the allowed values --- pipelines/tasks/build_database.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pipelines/tasks/build_database.py b/pipelines/tasks/build_database.py index 97713cbe..d359331a 100644 --- a/pipelines/tasks/build_database.py +++ b/pipelines/tasks/build_database.py @@ -174,6 +174,10 @@ def process_edc_datasets( raise ValueError( """ custom_years parameter needs to be specified if refresh_type="custom" """ ) + else: + raise ValueError( + f""" refresh_type needs to be one of ["all", "last", "custom"], it can't be: {refresh_type}""" + ) logger.info( f"Launching processing of EDC datasets for years: {years_to_update}" From 700eb94542d4961f76c711cda71be5f34dff41b5 Mon Sep 17 00:00:00 2001 From: moreaupascal56 Date: Mon, 3 Feb 2025 15:11:52 +0100 Subject: [PATCH 16/22] format --- pipelines/tasks/build_database.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pipelines/tasks/build_database.py b/pipelines/tasks/build_database.py index d359331a..f8f45f54 100644 --- a/pipelines/tasks/build_database.py +++ b/pipelines/tasks/build_database.py @@ -179,9 +179,7 @@ def process_edc_datasets( f""" refresh_type needs to be one of ["all", "last", "custom"], it can't be: {refresh_type}""" ) - logger.info( - f"Launching processing of EDC datasets for years: {years_to_update}" - ) + logger.info(f"Launching processing of EDC datasets for years: {years_to_update}") for year in years_to_update: download_extract_insert_yearly_edc_data(year=year) From 42950fa6a87392909a032cfe4c6beb5cf01c52e0 Mon Sep 17 00:00:00 2001 From: moreaupascal56 Date: Mon, 3 Feb 2025 15:45:43 +0100 Subject: [PATCH 17/22] fix typo --- pipelines/tasks/build_database.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pipelines/tasks/build_database.py b/pipelines/tasks/build_database.py index f8f45f54..d6938c6f 100644 --- a/pipelines/tasks/build_database.py +++ b/pipelines/tasks/build_database.py @@ -191,5 +191,3 @@ def process_edc_datasets( def execute(): process_edc_datasets() - - conn = duckdb.connect(DUCKDB_FILE) From 5358514719cd658d98920a5c392f0013f50860dc Mon Sep 17 00:00:00 2001 From: moreaupascal56 Date: Tue, 4 Feb 2025 10:43:35 +0100 Subject: [PATCH 18/22] add _config_edc.py and use it accordingly --- pipelines/tasks/_config_edc.py | 103 ++++++++++++++++++++++++++++++ pipelines/tasks/build_database.py | 77 +++++----------------- 2 files changed, 120 insertions(+), 60 deletions(-) create mode 100644 pipelines/tasks/_config_edc.py diff --git a/pipelines/tasks/_config_edc.py b/pipelines/tasks/_config_edc.py new file mode 100644 index 00000000..30b78c45 --- /dev/null +++ b/pipelines/tasks/_config_edc.py @@ -0,0 +1,103 @@ +from typing import Dict + + +def get_edc_config() -> Dict: + """ + Returns various configuration for processing the EDC (Eau distribuée par commune) datasets. + The data comes from https://www.data.gouv.fr/fr/datasets/resultats-du-controle-sanitaire-de-leau-distribuee-commune-par-commune/ + For each year a dataset is downloadable on a URL like this (ex. 2024): + https://www.data.gouv.fr/fr/datasets/r/84a67a3b-08a7-4001-98e6-231c74a98139 + :return: A dict with the config used for processing. + The "source" part is related to the data.gouv datasource + The "files" part is related to the extracted files information and sql table names + """ + + edc_config = { + "source": { + "base_url": "https://www.data.gouv.fr/fr/datasets/r/", + "available_years": [ + "2016", + "2017", + "2018", + "2019", + "2020", + "2021", + "2022", + "2023", + "2024", + ], + "yearly_files_infos": { + "2024": { + "id": "84a67a3b-08a7-4001-98e6-231c74a98139", + "zipfile": "dis-2024.zip", + }, + "2023": { + "id": "c89dec4a-d985-447c-a102-75ba814c398e", + "zipfile": "dis-2023.zip", + }, + "2022": { + "id": "a97b6074-c4dd-4ef2-8922-b0cf04dbff9a", + "zipfile": "dis-2022.zip", + }, + "2021": { + "id": "d2b432cc-3761-44d3-8e66-48bc15300bb5", + "zipfile": "dis-2021.zip", + }, + "2020": { + "id": "a6cb4fea-ef8c-47a5-acb3-14e49ccad801", + "zipfile": "dis-2020.zip", + }, + "2019": { + "id": "861f2a7d-024c-4bf0-968b-9e3069d9de07", + "zipfile": "dis-2019.zip", + }, + "2018": { + "id": "0513b3c0-dc18-468d-a969-b3508f079792", + "zipfile": "dis-2018.zip", + }, + "2017": { + "id": "5785427b-3167-49fa-a581-aef835f0fb04", + "zipfile": "dis-2017.zip", + }, + "2016": { + "id": "483c84dd-7912-483b-b96f-4fa5e1d8651f", + "zipfile": "dis-2016.zip", + }, + }, + }, + "files": { + "communes": { + "file_name_prefix": "DIS_COM_UDI_", + "file_extension": ".txt", + "table_name": "edc_communes", + }, + "prelevements": { + "file_name_prefix": "DIS_PLV_", + "file_extension": ".txt", + "table_name": "edc_prelevements", + }, + "resultats": { + "file_name_prefix": "DIS_RESULT_", + "file_extension": ".txt", + "table_name": "edc_resultats", + }, + }, + } + + return edc_config + + +def create_edc_yearly_filename( + file_name_prefix: str, file_extension: str, year: str +) -> str: + """ + This function is used to recreate the yearly filenames of the extracted files. + It is intended for use with the edc_config["files"] data above. + For example in 2024 the file name for communes should be: + DIS_COM_UDI_2024.txt + :param file_name_prefix: prefix of the filename + :param file_extension: extension of the file + :param year: year of the needed file + :return: the yearly filename as a string + """ + return file_name_prefix + year + file_extension diff --git a/pipelines/tasks/build_database.py b/pipelines/tasks/build_database.py index d6938c6f..b5eae5ec 100644 --- a/pipelines/tasks/build_database.py +++ b/pipelines/tasks/build_database.py @@ -4,15 +4,17 @@ import logging import os -from typing import Dict, List, Literal +from typing import List, Literal from zipfile import ZipFile import duckdb import requests from ._common import CACHE_FOLDER, DUCKDB_FILE, clear_cache +from ._config_edc import get_edc_config, create_edc_yearly_filename logger = logging.getLogger(__name__) +edc_config = get_edc_config() def check_table_existence(conn: duckdb.DuckDBPyConnection, table_name: str) -> bool: @@ -31,31 +33,6 @@ def check_table_existence(conn: duckdb.DuckDBPyConnection, table_name: str) -> b return list(conn.fetchone())[0] == 1 -def get_yearly_edc_infos(year: str) -> Dict[str, str]: - """ - Returns information for yearly dataset extract of the EDC (Eau distribuée par commune) datasets. - The data comes from https://www.data.gouv.fr/fr/datasets/resultats-du-controle-sanitaire-de-leau-distribuee-commune-par-commune/ - For each year a dataset is downloadable on a URL like this (ex. 2024): - https://www.data.gouv.fr/fr/datasets/r/84a67a3b-08a7-4001-98e6-231c74a98139 - The id of the dataset is the last part of this URL - The name of the dataset is dis-YEAR.zip (but the format could potentially change). - :param year: The year from which we want to get the dataset information. - :return: A dict with the id and name of the dataset. - """ - edc_dis_files_info_by_year = { - "2024": {"id": "84a67a3b-08a7-4001-98e6-231c74a98139", "name": "dis-2024.zip"}, - "2023": {"id": "c89dec4a-d985-447c-a102-75ba814c398e", "name": "dis-2023.zip"}, - "2022": {"id": "a97b6074-c4dd-4ef2-8922-b0cf04dbff9a", "name": "dis-2022.zip"}, - "2021": {"id": "d2b432cc-3761-44d3-8e66-48bc15300bb5", "name": "dis-2021.zip"}, - "2020": {"id": "a6cb4fea-ef8c-47a5-acb3-14e49ccad801", "name": "dis-2020.zip"}, - "2019": {"id": "861f2a7d-024c-4bf0-968b-9e3069d9de07", "name": "dis-2019.zip"}, - "2018": {"id": "0513b3c0-dc18-468d-a969-b3508f079792", "name": "dis-2018.zip"}, - "2017": {"id": "5785427b-3167-49fa-a581-aef835f0fb04", "name": "dis-2017.zip"}, - "2016": {"id": "483c84dd-7912-483b-b96f-4fa5e1d8651f", "name": "dis-2016.zip"}, - } - return edc_dis_files_info_by_year[year] - - def download_extract_insert_yearly_edc_data(year: str): """ Downloads from www.data.gouv.fr the EDC (Eau distribuée par commune) dataset for one year, @@ -65,30 +42,16 @@ def download_extract_insert_yearly_edc_data(year: str): It adds the column "de_partition" based on year as an integer. """ - yearly_dataset_info = get_yearly_edc_infos(year=year) - # Dataset specific constants - DATA_URL = f"https://www.data.gouv.fr/fr/datasets/r/{yearly_dataset_info['id']}" - ZIP_FILE = os.path.join(CACHE_FOLDER, yearly_dataset_info["name"]) + DATA_URL = ( + edc_config["source"]["base_url"] + + edc_config["source"]["yearly_files_infos"][year]["id"] + ) + ZIP_FILE = os.path.join( + CACHE_FOLDER, edc_config["source"]["yearly_files_infos"][year]["zipfile"] + ) EXTRACT_FOLDER = os.path.join(CACHE_FOLDER, f"raw_data_{year}") - - FILES = { - "communes": { - "filename_prefix": f"DIS_COM_UDI_", - "file_extension": ".txt", - "table_name": f"edc_communes", - }, - "prelevements": { - "filename_prefix": f"DIS_PLV_", - "file_extension": ".txt", - "table_name": f"edc_prelevements", - }, - "resultats": { - "filename_prefix": f"DIS_RESULT_", - "file_extension": ".txt", - "table_name": f"edc_resultats", - }, - } + FILES = edc_config["files"] logger.info(f"Processing EDC dataset for {year}...") response = requests.get(DATA_URL, stream=True) @@ -106,7 +69,11 @@ def download_extract_insert_yearly_edc_data(year: str): for file_info in FILES.values(): filepath = os.path.join( EXTRACT_FOLDER, - f"{file_info['filename_prefix']}{year}{file_info['file_extension']}", + create_edc_yearly_filename( + file_name_prefix=file_info["file_name_prefix"], + file_extension=file_info["file_extension"], + year=year, + ), ) if check_table_existence(conn=conn, table_name=f"{file_info['table_name']}"): @@ -151,17 +118,7 @@ def process_edc_datasets( :param custom_years: years to update :return: """ - available_years = [ - "2016", - "2017", - "2018", - "2019", - "2020", - "2021", - "2022", - "2023", - "2024", - ] + available_years = edc_config["source"]["available_years"] if refresh_type == "all": years_to_update = available_years From a5a19dde065dfda472f134048dbf957eacdd96fd Mon Sep 17 00:00:00 2001 From: moreaupascal56 Date: Tue, 4 Feb 2025 10:51:56 +0100 Subject: [PATCH 19/22] add de_ingestion_date --- pipelines/tasks/build_database.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pipelines/tasks/build_database.py b/pipelines/tasks/build_database.py index b5eae5ec..1d8c9891 100644 --- a/pipelines/tasks/build_database.py +++ b/pipelines/tasks/build_database.py @@ -91,7 +91,8 @@ def download_extract_insert_yearly_edc_data(year: str): query_select = f""" SELECT *, - CAST({year} as INTEGER) AS de_partition + CAST({year} AS INTEGER) AS de_partition, + current_date AS de_ingestion_date FROM read_csv('{filepath}', header=true, delim=','); """ From c3470ddb2c956ec61d807025feafeec2dd39e3e7 Mon Sep 17 00:00:00 2001 From: moreaupascal56 Date: Tue, 4 Feb 2025 10:52:15 +0100 Subject: [PATCH 20/22] make refresh_type="last" as default --- pipelines/tasks/build_database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/tasks/build_database.py b/pipelines/tasks/build_database.py index 1d8c9891..5afe0815 100644 --- a/pipelines/tasks/build_database.py +++ b/pipelines/tasks/build_database.py @@ -107,7 +107,7 @@ def download_extract_insert_yearly_edc_data(year: str): def process_edc_datasets( - refresh_type: Literal["all", "last", "custom"] = "all", + refresh_type: Literal["all", "last", "custom"] = "last", custom_years: List[str] = None, ): """ From 5e901f8fbb34aad5d8c75767cee31056a2f2760f Mon Sep 17 00:00:00 2001 From: moreaupascal56 Date: Wed, 5 Feb 2025 11:10:31 +0100 Subject: [PATCH 21/22] fix example notebooks with new sql table names --- analytics/notebooks/exemple.ipynb | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/analytics/notebooks/exemple.ipynb b/analytics/notebooks/exemple.ipynb index 4d9f06bb..55bfcfcb 100644 --- a/analytics/notebooks/exemple.ipynb +++ b/analytics/notebooks/exemple.ipynb @@ -4,7 +4,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "# Exemple de notebook - premières analyses des données SISE-Eaux\n" + "# Exemple de notebook - premières analyses des données EDC (Eau Distribuée par Commune)\n" ] }, { @@ -197,7 +197,7 @@ "source": [ "# Affichons les tables\n", "\n", - "con.table(\"sise_communes\").df()" + "con.table(\"edc_communes\").df()" ] }, { @@ -584,7 +584,7 @@ } ], "source": [ - "con.table(\"sise_prelevements\").df()" + "con.table(\"edc_prelevements\").df()" ] }, { @@ -1155,7 +1155,7 @@ } ], "source": [ - "con.table(\"sise_resultats\").df().head(20)" + "con.table(\"edc_resultats\").df().head(20)" ] }, { @@ -1172,9 +1172,9 @@ } ], "source": [ - "# Chargeons la table sise_communes dans un pandas dataframe, et calculons le nombre de communes\n", + "# Chargeons la table edc_communes dans un pandas dataframe, et calculons le nombre de communes\n", "\n", - "communes = con.table(\"sise_communes\").to_df()\n", + "communes = con.table(\"edc_communes\").to_df()\n", "nombre_de_communes = communes.nunique()[\"inseecommune\"]\n", "print(f\"nombre_de_communes = {nombre_de_communes}\")" ] @@ -1228,7 +1228,7 @@ "\n", "con.sql(\"\"\"\n", " SELECT libmajparametre, COUNT(*) as count\n", - " FROM sise_resultats\n", + " FROM edc_resultats\n", " GROUP BY libmajparametre\n", " ORDER BY count DESC\n", "\"\"\").show()" @@ -1361,7 +1361,7 @@ "\n", "# ...et faisons la même requête SQL en utilisant l'extension SQL pour Jupyter\n", "\n", - "%sql SELECT libmajparametre, COUNT(*) as count FROM sise_resultats GROUP BY libmajparametre ORDER BY count DESC;" + "%sql SELECT libmajparametre, COUNT(*) as count FROM edc_resultats GROUP BY libmajparametre ORDER BY count DESC;" ] }, { @@ -1414,7 +1414,7 @@ "\n", "con.sql(f\"\"\"\n", " SELECT *\n", - " FROM sise_prelevements\n", + " FROM edc_prelevements\n", " WHERE nomcommuneprinc = '{nomcommune}'\n", "\"\"\").show()" ] From 1529dfb8d758ffed0d0972a0b52306f95d2d3f63 Mon Sep 17 00:00:00 2001 From: moreaupascal56 Date: Wed, 5 Feb 2025 11:17:55 +0100 Subject: [PATCH 22/22] delete test_plusierus_annees.ipynb --- pipelines/notebooks/test_plusierus_annees.ipynb | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 pipelines/notebooks/test_plusierus_annees.ipynb diff --git a/pipelines/notebooks/test_plusierus_annees.ipynb b/pipelines/notebooks/test_plusierus_annees.ipynb deleted file mode 100644 index e69de29b..00000000