11
11
import requests
12
12
13
13
from ._common import CACHE_FOLDER , DUCKDB_FILE , clear_cache
14
+ from ._config_edc import get_edc_config , create_edc_yearly_filename
14
15
15
16
logger = logging .getLogger (__name__ )
17
+ edc_config = get_edc_config ()
16
18
17
19
18
20
def check_table_existence (conn : duckdb .DuckDBPyConnection , table_name : str ) -> bool :
@@ -31,30 +33,6 @@ def check_table_existence(conn: duckdb.DuckDBPyConnection, table_name: str) -> b
31
33
return list (conn .fetchone ())[0 ] == 1
32
34
33
35
34
- def get_yearly_edc_infos (year : str ) -> Dict [str , str ]:
35
- """
36
- Returns information for yearly dataset extract of the EDC (Eau distribuée par commune) datasets.
37
- The data comes from https://www.data.gouv.fr/fr/datasets/resultats-du-controle-sanitaire-de-leau-distribuee-commune-par-commune/
38
- For each year a dataset is downloadable on a URL like this (ex. 2024):
39
- https://www.data.gouv.fr/fr/datasets/r/84a67a3b-08a7-4001-98e6-231c74a98139
40
- The id of the dataset is the last part of this URL
41
- The name of the dataset is dis-YEAR.zip (but the format could potentially change).
42
- :param year: The year from which we want to get the dataset information.
43
- :return: A dict with the id and name of the dataset.
44
- """
45
- edc_dis_files_info_by_year = {
46
- "2024" : {"id" : "84a67a3b-08a7-4001-98e6-231c74a98139" , "name" : "dis-2024.zip" },
47
- "2023" : {"id" : "c89dec4a-d985-447c-a102-75ba814c398e" , "name" : "dis-2023.zip" },
48
- "2022" : {"id" : "a97b6074-c4dd-4ef2-8922-b0cf04dbff9a" , "name" : "dis-2022.zip" },
49
- "2021" : {"id" : "d2b432cc-3761-44d3-8e66-48bc15300bb5" , "name" : "dis-2021.zip" },
50
- "2020" : {"id" : "a6cb4fea-ef8c-47a5-acb3-14e49ccad801" , "name" : "dis-2020.zip" },
51
- "2019" : {"id" : "861f2a7d-024c-4bf0-968b-9e3069d9de07" , "name" : "dis-2019.zip" },
52
- "2018" : {"id" : "0513b3c0-dc18-468d-a969-b3508f079792" , "name" : "dis-2018.zip" },
53
- "2017" : {"id" : "5785427b-3167-49fa-a581-aef835f0fb04" , "name" : "dis-2017.zip" },
54
- "2016" : {"id" : "483c84dd-7912-483b-b96f-4fa5e1d8651f" , "name" : "dis-2016.zip" },
55
- }
56
- return edc_dis_files_info_by_year [year ]
57
-
58
36
59
37
def download_extract_insert_yearly_edc_data (year : str ):
60
38
"""
@@ -64,31 +42,16 @@ def download_extract_insert_yearly_edc_data(year: str):
64
42
:return: Create or replace the associated tables in the duckcb database.
65
43
It adds the column "de_partition" based on year as an integer.
66
44
"""
67
-
68
- yearly_dataset_info = get_yearly_edc_infos (year = year )
69
-
70
45
# Dataset specific constants
71
- DATA_URL = f"https://www.data.gouv.fr/fr/datasets/r/{ yearly_dataset_info ['id' ]} "
72
- ZIP_FILE = os .path .join (CACHE_FOLDER , yearly_dataset_info ["name" ])
46
+ DATA_URL = (
47
+ edc_config ["source" ]["base_url" ]
48
+ + edc_config ["source" ]["yearly_files_infos" ][year ]["id" ]
49
+ )
50
+ ZIP_FILE = os .path .join (
51
+ CACHE_FOLDER , edc_config ["source" ]["yearly_files_infos" ][year ]["zipfile" ]
52
+ )
73
53
EXTRACT_FOLDER = os .path .join (CACHE_FOLDER , f"raw_data_{ year } " )
74
-
75
- FILES = {
76
- "communes" : {
77
- "filename_prefix" : f"DIS_COM_UDI_" ,
78
- "file_extension" : ".txt" ,
79
- "table_name" : f"edc_communes" ,
80
- },
81
- "prelevements" : {
82
- "filename_prefix" : f"DIS_PLV_" ,
83
- "file_extension" : ".txt" ,
84
- "table_name" : f"edc_prelevements" ,
85
- },
86
- "resultats" : {
87
- "filename_prefix" : f"DIS_RESULT_" ,
88
- "file_extension" : ".txt" ,
89
- "table_name" : f"edc_resultats" ,
90
- },
91
- }
54
+ FILES = edc_config ["files" ]
92
55
93
56
logger .info (f"Processing EDC dataset for { year } ..." )
94
57
response = requests .get (DATA_URL , stream = True )
@@ -106,7 +69,11 @@ def download_extract_insert_yearly_edc_data(year: str):
106
69
for file_info in FILES .values ():
107
70
filepath = os .path .join (
108
71
EXTRACT_FOLDER ,
109
- f"{ file_info ['filename_prefix' ]} { year } { file_info ['file_extension' ]} " ,
72
+ create_edc_yearly_filename (
73
+ file_name_prefix = file_info ["file_name_prefix" ],
74
+ file_extension = file_info ["file_extension" ],
75
+ year = year ,
76
+ ),
110
77
)
111
78
112
79
if check_table_existence (conn = conn , table_name = f"{ file_info ['table_name' ]} " ):
@@ -124,7 +91,8 @@ def download_extract_insert_yearly_edc_data(year: str):
124
91
query_select = f"""
125
92
SELECT
126
93
*,
127
- CAST({ year } as INTEGER) AS de_partition
94
+ CAST({ year } AS INTEGER) AS de_partition,
95
+ current_date AS de_ingestion_date
128
96
FROM read_csv('{ filepath } ', header=true, delim=',');
129
97
"""
130
98
@@ -139,7 +107,7 @@ def download_extract_insert_yearly_edc_data(year: str):
139
107
140
108
141
109
def process_edc_datasets (
142
- refresh_type : Literal ["all" , "last" , "custom" ] = "all " ,
110
+ refresh_type : Literal ["all" , "last" , "custom" ] = "last " ,
143
111
custom_years : List [str ] = None ,
144
112
):
145
113
"""
@@ -151,17 +119,7 @@ def process_edc_datasets(
151
119
:param custom_years: years to update
152
120
:return:
153
121
"""
154
- available_years = [
155
- "2016" ,
156
- "2017" ,
157
- "2018" ,
158
- "2019" ,
159
- "2020" ,
160
- "2021" ,
161
- "2022" ,
162
- "2023" ,
163
- "2024" ,
164
- ]
122
+ available_years = edc_config ["source" ]["available_years" ]
165
123
166
124
if refresh_type == "all" :
167
125
years_to_update = available_years
@@ -181,7 +139,6 @@ def process_edc_datasets(
181
139
raise ValueError (
182
140
""" custom_years parameter needs to be specified if refresh_type="custom" """
183
141
)
184
-
185
142
else :
186
143
raise ValueError (
187
144
f""" refresh_type needs to be one of ["all", "last", "custom"], it can't be: { refresh_type } """
@@ -196,7 +153,6 @@ def process_edc_datasets(
196
153
clear_cache (recreate_folder = False )
197
154
return True
198
155
199
-
200
156
def execute (refresh_type : str = "all" , custom_years : List [str ] = None ):
201
157
"""
202
158
Execute the EDC dataset processing with specified parameters.
0 commit comments