Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
1a5fc78
add fct get_yearly_dataset_infos
moreaupascal56 Feb 2, 2025
dcb5fa5
parametrize process_sise_eaux_dataset_2024
moreaupascal56 Feb 2, 2025
2da6cdf
recreate cache folder by default in clear_cache()
moreaupascal56 Feb 2, 2025
5b3de67
add check_table_existence function
moreaupascal56 Feb 2, 2025
b46a69c
parametrize process_sise_eaux_dataset_2024()
moreaupascal56 Feb 2, 2025
e222562
format _common.py
moreaupascal56 Feb 3, 2025
b58894f
add process_sise_eaux_dataset() controller and rename process_sise_ea…
moreaupascal56 Feb 3, 2025
dcc29a3
upd docstrings, formatting
moreaupascal56 Feb 3, 2025
c628c8e
upd logs and add a clear_cache() in process_sise_eaux_dataset
moreaupascal56 Feb 3, 2025
69516eb
reorganize file
moreaupascal56 Feb 3, 2025
40e85e3
add notebook to preview data
jereze Feb 3, 2025
1ff4f23
fix Incompatible types in assignment
moreaupascal56 Feb 3, 2025
c0c9997
rename SISE to EDC
moreaupascal56 Feb 3, 2025
d6e1ab9
rename annee_prelevement to de_partition
moreaupascal56 Feb 3, 2025
077150e
catch and raise error if refresh_type not in the allowed values
moreaupascal56 Feb 3, 2025
700eb94
format
moreaupascal56 Feb 3, 2025
42950fa
fix typo
moreaupascal56 Feb 3, 2025
5358514
add _config_edc.py and use it accordingly
moreaupascal56 Feb 4, 2025
a5a19dd
add de_ingestion_date
moreaupascal56 Feb 4, 2025
c3470dd
make refresh_type="last" as default
moreaupascal56 Feb 4, 2025
5e901f8
fix example notebooks with new sql table names
moreaupascal56 Feb 5, 2025
1529dfb
delete test_plusierus_annees.ipynb
moreaupascal56 Feb 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions analytics/notebooks/exemple.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"# Exemple de notebook - premières analyses des données SISE-Eaux\n"
"# Exemple de notebook - premières analyses des données EDC (Eau Distribuée par Commune)\n"
]
},
{
Expand Down Expand Up @@ -197,7 +197,7 @@
"source": [
"# Affichons les tables\n",
"\n",
"con.table(\"sise_communes\").df()"
"con.table(\"edc_communes\").df()"
]
},
{
Expand Down Expand Up @@ -584,7 +584,7 @@
}
],
"source": [
"con.table(\"sise_prelevements\").df()"
"con.table(\"edc_prelevements\").df()"
]
},
{
Expand Down Expand Up @@ -1155,7 +1155,7 @@
}
],
"source": [
"con.table(\"sise_resultats\").df().head(20)"
"con.table(\"edc_resultats\").df().head(20)"
]
},
{
Expand All @@ -1172,9 +1172,9 @@
}
],
"source": [
"# Chargeons la table sise_communes dans un pandas dataframe, et calculons le nombre de communes\n",
"# Chargeons la table edc_communes dans un pandas dataframe, et calculons le nombre de communes\n",
"\n",
"communes = con.table(\"sise_communes\").to_df()\n",
"communes = con.table(\"edc_communes\").to_df()\n",
"nombre_de_communes = communes.nunique()[\"inseecommune\"]\n",
"print(f\"nombre_de_communes = {nombre_de_communes}\")"
]
Expand Down Expand Up @@ -1228,7 +1228,7 @@
"\n",
"con.sql(\"\"\"\n",
" SELECT libmajparametre, COUNT(*) as count\n",
" FROM sise_resultats\n",
" FROM edc_resultats\n",
" GROUP BY libmajparametre\n",
" ORDER BY count DESC\n",
"\"\"\").show()"
Expand Down Expand Up @@ -1361,7 +1361,7 @@
"\n",
"# ...et faisons la même requête SQL en utilisant l'extension SQL pour Jupyter\n",
"\n",
"%sql SELECT libmajparametre, COUNT(*) as count FROM sise_resultats GROUP BY libmajparametre ORDER BY count DESC;"
"%sql SELECT libmajparametre, COUNT(*) as count FROM edc_resultats GROUP BY libmajparametre ORDER BY count DESC;"
]
},
{
Expand Down Expand Up @@ -1414,7 +1414,7 @@
"\n",
"con.sql(f\"\"\"\n",
" SELECT *\n",
" FROM sise_prelevements\n",
" FROM edc_prelevements\n",
" WHERE nomcommuneprinc = '{nomcommune}'\n",
"\"\"\").show()"
]
Expand Down
4 changes: 3 additions & 1 deletion pipelines/tasks/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
os.makedirs(DATABASE_FOLDER, exist_ok=True)


def clear_cache():
def clear_cache(recreate_folder: bool = True):
"""Clear the cache folder."""
shutil.rmtree(CACHE_FOLDER)
if recreate_folder:
os.makedirs(CACHE_FOLDER, exist_ok=True)
103 changes: 103 additions & 0 deletions pipelines/tasks/_config_edc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
from typing import Dict


def get_edc_config() -> Dict:
"""
Returns various configuration for processing the EDC (Eau distribuée par commune) datasets.
The data comes from https://www.data.gouv.fr/fr/datasets/resultats-du-controle-sanitaire-de-leau-distribuee-commune-par-commune/
For each year a dataset is downloadable on a URL like this (ex. 2024):
https://www.data.gouv.fr/fr/datasets/r/84a67a3b-08a7-4001-98e6-231c74a98139
:return: A dict with the config used for processing.
The "source" part is related to the data.gouv datasource
The "files" part is related to the extracted files information and sql table names
"""

edc_config = {
"source": {
"base_url": "https://www.data.gouv.fr/fr/datasets/r/",
"available_years": [
"2016",
"2017",
"2018",
"2019",
"2020",
"2021",
"2022",
"2023",
"2024",
],
"yearly_files_infos": {
"2024": {
"id": "84a67a3b-08a7-4001-98e6-231c74a98139",
"zipfile": "dis-2024.zip",
},
"2023": {
"id": "c89dec4a-d985-447c-a102-75ba814c398e",
"zipfile": "dis-2023.zip",
},
"2022": {
"id": "a97b6074-c4dd-4ef2-8922-b0cf04dbff9a",
"zipfile": "dis-2022.zip",
},
"2021": {
"id": "d2b432cc-3761-44d3-8e66-48bc15300bb5",
"zipfile": "dis-2021.zip",
},
"2020": {
"id": "a6cb4fea-ef8c-47a5-acb3-14e49ccad801",
"zipfile": "dis-2020.zip",
},
"2019": {
"id": "861f2a7d-024c-4bf0-968b-9e3069d9de07",
"zipfile": "dis-2019.zip",
},
"2018": {
"id": "0513b3c0-dc18-468d-a969-b3508f079792",
"zipfile": "dis-2018.zip",
},
"2017": {
"id": "5785427b-3167-49fa-a581-aef835f0fb04",
"zipfile": "dis-2017.zip",
},
"2016": {
"id": "483c84dd-7912-483b-b96f-4fa5e1d8651f",
"zipfile": "dis-2016.zip",
},
},
},
"files": {
"communes": {
"file_name_prefix": "DIS_COM_UDI_",
"file_extension": ".txt",
"table_name": "edc_communes",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comme tu renomme les tables, le notebook de test ne va plus fonctionner (analytics/notebooks/exemple.ipynb). Il faut mettre a jour le nom des tables dans le notebook

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix dans le commit 5e901f8

},
"prelevements": {
"file_name_prefix": "DIS_PLV_",
"file_extension": ".txt",
"table_name": "edc_prelevements",
},
"resultats": {
"file_name_prefix": "DIS_RESULT_",
"file_extension": ".txt",
"table_name": "edc_resultats",
},
},
}

return edc_config


def create_edc_yearly_filename(
file_name_prefix: str, file_extension: str, year: str
) -> str:
"""
This function is used to recreate the yearly filenames of the extracted files.
It is intended for use with the edc_config["files"] data above.
For example in 2024 the file name for communes should be:
DIS_COM_UDI_2024.txt
:param file_name_prefix: prefix of the filename
:param file_extension: extension of the file
:param year: year of the needed file
:return: the yearly filename as a string
"""
return file_name_prefix + year + file_extension
132 changes: 111 additions & 21 deletions pipelines/tasks/build_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,58 +4,148 @@

import logging
import os
from typing import List, Literal
from zipfile import ZipFile

import duckdb
import requests

from ._common import CACHE_FOLDER, DUCKDB_FILE, clear_cache
from ._config_edc import get_edc_config, create_edc_yearly_filename

logger = logging.getLogger(__name__)
edc_config = get_edc_config()


def process_sise_eaux_dataset_2024():
"""Process SISE-Eaux dataset for 2024."""
def check_table_existence(conn: duckdb.DuckDBPyConnection, table_name: str) -> bool:
"""
Check if a table exists in the duckdb database
:param conn: The duckdb connection to use
:param table_name: The table name to check existence
:return: True if the table exists, False if not
"""
query = f"""
SELECT COUNT(*)
FROM information_schema.tables
WHERE table_name = '{table_name}'
"""
conn.execute(query)
return list(conn.fetchone())[0] == 1


def download_extract_insert_yearly_edc_data(year: str):
"""
Downloads from www.data.gouv.fr the EDC (Eau distribuée par commune) dataset for one year,
extracts the files and insert the data into duckdb
:param year: The year from which we want to download the dataset
:return: Create or replace the associated tables in the duckcb database.
It adds the column "de_partition" based on year as an integer.
"""

# Dataset specific constants
DATA_URL = (
"https://www.data.gouv.fr/fr/datasets/r/84a67a3b-08a7-4001-98e6-231c74a98139"
edc_config["source"]["base_url"]
+ edc_config["source"]["yearly_files_infos"][year]["id"]
)
ZIP_FILE = os.path.join(CACHE_FOLDER, "dis-2024.zip")
EXTRACT_FOLDER = os.path.join(CACHE_FOLDER, "raw_data_2024")

FILES = {
"communes": {"filename": "DIS_COM_UDI_2024.txt", "table": "sise_communes"},
"prelevements": {"filename": "DIS_PLV_2024.txt", "table": "sise_prelevements"},
"resultats": {"filename": "DIS_RESULT_2024.txt", "table": "sise_resultats"},
}
ZIP_FILE = os.path.join(
CACHE_FOLDER, edc_config["source"]["yearly_files_infos"][year]["zipfile"]
)
EXTRACT_FOLDER = os.path.join(CACHE_FOLDER, f"raw_data_{year}")
FILES = edc_config["files"]

logger.info("Downloading and extracting SISE-Eaux dataset for 2024...")
logger.info(f"Processing EDC dataset for {year}...")
response = requests.get(DATA_URL, stream=True)
with open(ZIP_FILE, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)

logger.info("Extracting files...")
logger.info(" Extracting files...")
with ZipFile(ZIP_FILE, "r") as zip_ref:
zip_ref.extractall(EXTRACT_FOLDER)

logger.info("Creating tables in the database...")
logger.info(" Creating or updating tables in the database...")
conn = duckdb.connect(DUCKDB_FILE)

for file_info in FILES.values():
filepath = os.path.join(EXTRACT_FOLDER, file_info["filename"])
query = f"""
CREATE OR REPLACE TABLE {file_info["table"]} AS
SELECT * FROM read_csv('{filepath}', header=true, delim=',');
filepath = os.path.join(
EXTRACT_FOLDER,
create_edc_yearly_filename(
file_name_prefix=file_info["file_name_prefix"],
file_extension=file_info["file_extension"],
year=year,
),
)

if check_table_existence(conn=conn, table_name=f"{file_info['table_name']}"):
query = f"""
DELETE FROM {f"{file_info['table_name']}"}
WHERE de_partition = CAST({year} as INTEGER)
;
"""
conn.execute(query)
query_start = f"INSERT INTO {f'{file_info["table_name"]}'} "

else:
query_start = f"CREATE TABLE {f'{file_info["table_name"]}'} AS "

query_select = f"""
SELECT
*,
CAST({year} AS INTEGER) AS de_partition,
current_date AS de_ingestion_date
FROM read_csv('{filepath}', header=true, delim=',');
"""
conn.execute(query)

conn.execute(query_start + query_select)

conn.close()

logger.info("Cleaning up...")
logger.info(" Cleaning up cache...")
clear_cache()

return True


def process_edc_datasets(
refresh_type: Literal["all", "last", "custom"] = "last",
custom_years: List[str] = None,
):
"""
Process the EDC datasets.
:param refresh_type: Refresh type to run
- "all": Refresh the data for every possible year
- "last": Refresh the data only for the last available year
- "custom": Refresh the data for the years specified in the list custom_years
:param custom_years: years to update
:return:
"""
available_years = edc_config["source"]["available_years"]

if refresh_type == "all":
years_to_update = available_years
elif refresh_type == "last":
years_to_update = available_years[-1:]
elif refresh_type == "custom":
if custom_years:
years_to_update = list(set(custom_years).intersection(available_years))
else:
raise ValueError(
""" custom_years parameter needs to be specified if refresh_type="custom" """
)
else:
raise ValueError(
f""" refresh_type needs to be one of ["all", "last", "custom"], it can't be: {refresh_type}"""
)

logger.info(f"Launching processing of EDC datasets for years: {years_to_update}")

for year in years_to_update:
download_extract_insert_yearly_edc_data(year=year)

logger.info("Cleaning up cache...")
clear_cache(recreate_folder=False)
return True


def execute():
process_sise_eaux_dataset_2024()
process_edc_datasets()