Skip to content

Commit c54769c

Browse files
Feature/collecte de plusieurs annees pour le dataset edc (#9)
* add fct get_yearly_dataset_infos * parametrize process_sise_eaux_dataset_2024 * recreate cache folder by default in clear_cache() * add check_table_existence function * parametrize process_sise_eaux_dataset_2024() * format _common.py * add process_sise_eaux_dataset() controller and rename process_sise_eaux_dataset_2024 to download_extract_insert_yearly_SISE_data * upd docstrings, formatting * upd logs and add a clear_cache() in process_sise_eaux_dataset * reorganize file * add notebook to preview data * fix Incompatible types in assignment * rename SISE to EDC * rename annee_prelevement to de_partition * catch and raise error if refresh_type not in the allowed values * format * fix typo * add _config_edc.py and use it accordingly * add de_ingestion_date * make refresh_type="last" as default * fix example notebooks with new sql table names * delete test_plusierus_annees.ipynb --------- Co-authored-by: Jeremy Greze <jereze@users.noreply.github.com>
1 parent ab6efee commit c54769c

File tree

4 files changed

+226
-31
lines changed

4 files changed

+226
-31
lines changed

analytics/notebooks/exemple.ipynb

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"cell_type": "markdown",
55
"metadata": {},
66
"source": [
7-
"# Exemple de notebook - premières analyses des données SISE-Eaux\n"
7+
"# Exemple de notebook - premières analyses des données EDC (Eau Distribuée par Commune)\n"
88
]
99
},
1010
{
@@ -197,7 +197,7 @@
197197
"source": [
198198
"# Affichons les tables\n",
199199
"\n",
200-
"con.table(\"sise_communes\").df()"
200+
"con.table(\"edc_communes\").df()"
201201
]
202202
},
203203
{
@@ -584,7 +584,7 @@
584584
}
585585
],
586586
"source": [
587-
"con.table(\"sise_prelevements\").df()"
587+
"con.table(\"edc_prelevements\").df()"
588588
]
589589
},
590590
{
@@ -1155,7 +1155,7 @@
11551155
}
11561156
],
11571157
"source": [
1158-
"con.table(\"sise_resultats\").df().head(20)"
1158+
"con.table(\"edc_resultats\").df().head(20)"
11591159
]
11601160
},
11611161
{
@@ -1172,9 +1172,9 @@
11721172
}
11731173
],
11741174
"source": [
1175-
"# Chargeons la table sise_communes dans un pandas dataframe, et calculons le nombre de communes\n",
1175+
"# Chargeons la table edc_communes dans un pandas dataframe, et calculons le nombre de communes\n",
11761176
"\n",
1177-
"communes = con.table(\"sise_communes\").to_df()\n",
1177+
"communes = con.table(\"edc_communes\").to_df()\n",
11781178
"nombre_de_communes = communes.nunique()[\"inseecommune\"]\n",
11791179
"print(f\"nombre_de_communes = {nombre_de_communes}\")"
11801180
]
@@ -1228,7 +1228,7 @@
12281228
"\n",
12291229
"con.sql(\"\"\"\n",
12301230
" SELECT libmajparametre, COUNT(*) as count\n",
1231-
" FROM sise_resultats\n",
1231+
" FROM edc_resultats\n",
12321232
" GROUP BY libmajparametre\n",
12331233
" ORDER BY count DESC\n",
12341234
"\"\"\").show()"
@@ -1361,7 +1361,7 @@
13611361
"\n",
13621362
"# ...et faisons la même requête SQL en utilisant l'extension SQL pour Jupyter\n",
13631363
"\n",
1364-
"%sql SELECT libmajparametre, COUNT(*) as count FROM sise_resultats GROUP BY libmajparametre ORDER BY count DESC;"
1364+
"%sql SELECT libmajparametre, COUNT(*) as count FROM edc_resultats GROUP BY libmajparametre ORDER BY count DESC;"
13651365
]
13661366
},
13671367
{
@@ -1414,7 +1414,7 @@
14141414
"\n",
14151415
"con.sql(f\"\"\"\n",
14161416
" SELECT *\n",
1417-
" FROM sise_prelevements\n",
1417+
" FROM edc_prelevements\n",
14181418
" WHERE nomcommuneprinc = '{nomcommune}'\n",
14191419
"\"\"\").show()"
14201420
]

pipelines/tasks/_common.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
os.makedirs(DATABASE_FOLDER, exist_ok=True)
1212

1313

14-
def clear_cache():
14+
def clear_cache(recreate_folder: bool = True):
1515
"""Clear the cache folder."""
1616
shutil.rmtree(CACHE_FOLDER)
17+
if recreate_folder:
18+
os.makedirs(CACHE_FOLDER, exist_ok=True)

pipelines/tasks/_config_edc.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
from typing import Dict
2+
3+
4+
def get_edc_config() -> Dict:
5+
"""
6+
Returns various configuration for processing the EDC (Eau distribuée par commune) datasets.
7+
The data comes from https://www.data.gouv.fr/fr/datasets/resultats-du-controle-sanitaire-de-leau-distribuee-commune-par-commune/
8+
For each year a dataset is downloadable on a URL like this (ex. 2024):
9+
https://www.data.gouv.fr/fr/datasets/r/84a67a3b-08a7-4001-98e6-231c74a98139
10+
:return: A dict with the config used for processing.
11+
The "source" part is related to the data.gouv datasource
12+
The "files" part is related to the extracted files information and sql table names
13+
"""
14+
15+
edc_config = {
16+
"source": {
17+
"base_url": "https://www.data.gouv.fr/fr/datasets/r/",
18+
"available_years": [
19+
"2016",
20+
"2017",
21+
"2018",
22+
"2019",
23+
"2020",
24+
"2021",
25+
"2022",
26+
"2023",
27+
"2024",
28+
],
29+
"yearly_files_infos": {
30+
"2024": {
31+
"id": "84a67a3b-08a7-4001-98e6-231c74a98139",
32+
"zipfile": "dis-2024.zip",
33+
},
34+
"2023": {
35+
"id": "c89dec4a-d985-447c-a102-75ba814c398e",
36+
"zipfile": "dis-2023.zip",
37+
},
38+
"2022": {
39+
"id": "a97b6074-c4dd-4ef2-8922-b0cf04dbff9a",
40+
"zipfile": "dis-2022.zip",
41+
},
42+
"2021": {
43+
"id": "d2b432cc-3761-44d3-8e66-48bc15300bb5",
44+
"zipfile": "dis-2021.zip",
45+
},
46+
"2020": {
47+
"id": "a6cb4fea-ef8c-47a5-acb3-14e49ccad801",
48+
"zipfile": "dis-2020.zip",
49+
},
50+
"2019": {
51+
"id": "861f2a7d-024c-4bf0-968b-9e3069d9de07",
52+
"zipfile": "dis-2019.zip",
53+
},
54+
"2018": {
55+
"id": "0513b3c0-dc18-468d-a969-b3508f079792",
56+
"zipfile": "dis-2018.zip",
57+
},
58+
"2017": {
59+
"id": "5785427b-3167-49fa-a581-aef835f0fb04",
60+
"zipfile": "dis-2017.zip",
61+
},
62+
"2016": {
63+
"id": "483c84dd-7912-483b-b96f-4fa5e1d8651f",
64+
"zipfile": "dis-2016.zip",
65+
},
66+
},
67+
},
68+
"files": {
69+
"communes": {
70+
"file_name_prefix": "DIS_COM_UDI_",
71+
"file_extension": ".txt",
72+
"table_name": "edc_communes",
73+
},
74+
"prelevements": {
75+
"file_name_prefix": "DIS_PLV_",
76+
"file_extension": ".txt",
77+
"table_name": "edc_prelevements",
78+
},
79+
"resultats": {
80+
"file_name_prefix": "DIS_RESULT_",
81+
"file_extension": ".txt",
82+
"table_name": "edc_resultats",
83+
},
84+
},
85+
}
86+
87+
return edc_config
88+
89+
90+
def create_edc_yearly_filename(
91+
file_name_prefix: str, file_extension: str, year: str
92+
) -> str:
93+
"""
94+
This function is used to recreate the yearly filenames of the extracted files.
95+
It is intended for use with the edc_config["files"] data above.
96+
For example in 2024 the file name for communes should be:
97+
DIS_COM_UDI_2024.txt
98+
:param file_name_prefix: prefix of the filename
99+
:param file_extension: extension of the file
100+
:param year: year of the needed file
101+
:return: the yearly filename as a string
102+
"""
103+
return file_name_prefix + year + file_extension

pipelines/tasks/build_database.py

Lines changed: 111 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,58 +4,148 @@
44

55
import logging
66
import os
7+
from typing import List, Literal
78
from zipfile import ZipFile
89

910
import duckdb
1011
import requests
1112

1213
from ._common import CACHE_FOLDER, DUCKDB_FILE, clear_cache
14+
from ._config_edc import get_edc_config, create_edc_yearly_filename
1315

1416
logger = logging.getLogger(__name__)
17+
edc_config = get_edc_config()
1518

1619

17-
def process_sise_eaux_dataset_2024():
18-
"""Process SISE-Eaux dataset for 2024."""
20+
def check_table_existence(conn: duckdb.DuckDBPyConnection, table_name: str) -> bool:
21+
"""
22+
Check if a table exists in the duckdb database
23+
:param conn: The duckdb connection to use
24+
:param table_name: The table name to check existence
25+
:return: True if the table exists, False if not
26+
"""
27+
query = f"""
28+
SELECT COUNT(*)
29+
FROM information_schema.tables
30+
WHERE table_name = '{table_name}'
31+
"""
32+
conn.execute(query)
33+
return list(conn.fetchone())[0] == 1
34+
35+
36+
def download_extract_insert_yearly_edc_data(year: str):
37+
"""
38+
Downloads from www.data.gouv.fr the EDC (Eau distribuée par commune) dataset for one year,
39+
extracts the files and insert the data into duckdb
40+
:param year: The year from which we want to download the dataset
41+
:return: Create or replace the associated tables in the duckcb database.
42+
It adds the column "de_partition" based on year as an integer.
43+
"""
1944

2045
# Dataset specific constants
2146
DATA_URL = (
22-
"https://www.data.gouv.fr/fr/datasets/r/84a67a3b-08a7-4001-98e6-231c74a98139"
47+
edc_config["source"]["base_url"]
48+
+ edc_config["source"]["yearly_files_infos"][year]["id"]
2349
)
24-
ZIP_FILE = os.path.join(CACHE_FOLDER, "dis-2024.zip")
25-
EXTRACT_FOLDER = os.path.join(CACHE_FOLDER, "raw_data_2024")
26-
27-
FILES = {
28-
"communes": {"filename": "DIS_COM_UDI_2024.txt", "table": "sise_communes"},
29-
"prelevements": {"filename": "DIS_PLV_2024.txt", "table": "sise_prelevements"},
30-
"resultats": {"filename": "DIS_RESULT_2024.txt", "table": "sise_resultats"},
31-
}
50+
ZIP_FILE = os.path.join(
51+
CACHE_FOLDER, edc_config["source"]["yearly_files_infos"][year]["zipfile"]
52+
)
53+
EXTRACT_FOLDER = os.path.join(CACHE_FOLDER, f"raw_data_{year}")
54+
FILES = edc_config["files"]
3255

33-
logger.info("Downloading and extracting SISE-Eaux dataset for 2024...")
56+
logger.info(f"Processing EDC dataset for {year}...")
3457
response = requests.get(DATA_URL, stream=True)
3558
with open(ZIP_FILE, "wb") as f:
3659
for chunk in response.iter_content(chunk_size=8192):
3760
f.write(chunk)
3861

39-
logger.info("Extracting files...")
62+
logger.info(" Extracting files...")
4063
with ZipFile(ZIP_FILE, "r") as zip_ref:
4164
zip_ref.extractall(EXTRACT_FOLDER)
4265

43-
logger.info("Creating tables in the database...")
66+
logger.info(" Creating or updating tables in the database...")
4467
conn = duckdb.connect(DUCKDB_FILE)
68+
4569
for file_info in FILES.values():
46-
filepath = os.path.join(EXTRACT_FOLDER, file_info["filename"])
47-
query = f"""
48-
CREATE OR REPLACE TABLE {file_info["table"]} AS
49-
SELECT * FROM read_csv('{filepath}', header=true, delim=',');
70+
filepath = os.path.join(
71+
EXTRACT_FOLDER,
72+
create_edc_yearly_filename(
73+
file_name_prefix=file_info["file_name_prefix"],
74+
file_extension=file_info["file_extension"],
75+
year=year,
76+
),
77+
)
78+
79+
if check_table_existence(conn=conn, table_name=f"{file_info['table_name']}"):
80+
query = f"""
81+
DELETE FROM {f"{file_info['table_name']}"}
82+
WHERE de_partition = CAST({year} as INTEGER)
83+
;
84+
"""
85+
conn.execute(query)
86+
query_start = f"INSERT INTO {f'{file_info["table_name"]}'} "
87+
88+
else:
89+
query_start = f"CREATE TABLE {f'{file_info["table_name"]}'} AS "
90+
91+
query_select = f"""
92+
SELECT
93+
*,
94+
CAST({year} AS INTEGER) AS de_partition,
95+
current_date AS de_ingestion_date
96+
FROM read_csv('{filepath}', header=true, delim=',');
5097
"""
51-
conn.execute(query)
98+
99+
conn.execute(query_start + query_select)
100+
52101
conn.close()
53102

54-
logger.info("Cleaning up...")
103+
logger.info(" Cleaning up cache...")
55104
clear_cache()
56105

57106
return True
58107

59108

109+
def process_edc_datasets(
110+
refresh_type: Literal["all", "last", "custom"] = "last",
111+
custom_years: List[str] = None,
112+
):
113+
"""
114+
Process the EDC datasets.
115+
:param refresh_type: Refresh type to run
116+
- "all": Refresh the data for every possible year
117+
- "last": Refresh the data only for the last available year
118+
- "custom": Refresh the data for the years specified in the list custom_years
119+
:param custom_years: years to update
120+
:return:
121+
"""
122+
available_years = edc_config["source"]["available_years"]
123+
124+
if refresh_type == "all":
125+
years_to_update = available_years
126+
elif refresh_type == "last":
127+
years_to_update = available_years[-1:]
128+
elif refresh_type == "custom":
129+
if custom_years:
130+
years_to_update = list(set(custom_years).intersection(available_years))
131+
else:
132+
raise ValueError(
133+
""" custom_years parameter needs to be specified if refresh_type="custom" """
134+
)
135+
else:
136+
raise ValueError(
137+
f""" refresh_type needs to be one of ["all", "last", "custom"], it can't be: {refresh_type}"""
138+
)
139+
140+
logger.info(f"Launching processing of EDC datasets for years: {years_to_update}")
141+
142+
for year in years_to_update:
143+
download_extract_insert_yearly_edc_data(year=year)
144+
145+
logger.info("Cleaning up cache...")
146+
clear_cache(recreate_folder=False)
147+
return True
148+
149+
60150
def execute():
61-
process_sise_eaux_dataset_2024()
151+
process_edc_datasets()

0 commit comments

Comments
 (0)