Skip to content

Commit e8198c8

Browse files
SprinTechmoreaupascal56jereze
authored
Feature/request by specific year (#11)
* add fct get_yearly_dataset_infos * parametrize process_sise_eaux_dataset_2024 * recreate cache folder by default in clear_cache() * add check_table_existence function * parametrize process_sise_eaux_dataset_2024() * format _common.py * add process_sise_eaux_dataset() controller and rename process_sise_eaux_dataset_2024 to download_extract_insert_yearly_SISE_data * upd docstrings, formatting * upd logs and add a clear_cache() in process_sise_eaux_dataset * reorganize file * add notebook to preview data * fix Incompatible types in assignment * rename SISE to EDC * rename annee_prelevement to de_partition * catch and raise error if refresh_type not in the allowed values * format * fix typo * Add cli argument to request by specific years * Improve documentation from CLI * Adapt run command to task --------- Co-authored-by: moreaupascal56 <moreaupascal56@gmail.com> Co-authored-by: Jeremy Greze <jereze@users.noreply.github.com>
1 parent 36c4887 commit e8198c8

File tree

6 files changed

+135
-27
lines changed

6 files changed

+135
-27
lines changed

README.md

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,22 @@ Tout le code dans pipelines sera installé en tant que package python automatiqu
8080
### Comment construire la database
8181

8282
Une fois l'environnement python setup avec uv, vous pouvez lancer data_pipeline/run.py pour remplir la database
83-
Il suffit de lancer
8483

84+
Le téléchargement des données peut se faire de plusieurs manières :
85+
* 1. Téléchargement des données de la dernière année (par défaut)
8586
```bash
86-
uv run pipelines/run.py run build_database
87+
uv run pipelines/run.py run build_database --refresh-type last
88+
```
89+
90+
* 2. Téléchargement de toutes les données
91+
92+
```bash
93+
uv run pipelines/run.py run build_database --refresh-type all
94+
```
95+
96+
* 3. Téléchargement de données d'années spécifiques
97+
```bash
98+
uv run pipelines/run.py run build_database --refresh-type custom --custom-years 2018,2024,...
8799
```
88100

89101
### Comment télécharger la database depuis S3

pipelines/config/.env.example

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,2 @@
11
SCW_ACCESS_KEY=MyKey
2-
SCW_SECRET_KEY=MySecret
3-
ENV=dev
2+
SCW_SECRET_KEY=MySecret

pipelines/run.py

Lines changed: 71 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import importlib
22
import logging
33
import os
4-
import sys
54

65
import click
76

@@ -26,29 +25,82 @@ def cli():
2625
def list():
2726
"""List all available tasks."""
2827
tasks_dir = os.path.join(os.path.dirname(__file__), "tasks")
29-
click.echo("Available tasks:")
30-
for filename in os.listdir(tasks_dir):
28+
29+
for filename in sorted(os.listdir(tasks_dir)):
3130
if filename.endswith(".py") and not filename.startswith("_"):
3231
module_name = filename[:-3]
3332
module = importlib.import_module(f"tasks.{module_name}")
34-
description = module.__doc__ or "No description"
35-
description = description.strip().split("\n")[0]
36-
click.echo(f"- {module_name}: {description}")
3733

34+
doc = module.__doc__ or "No description"
35+
doc_lines = doc.strip().split("\n")
36+
while doc_lines and not doc_lines[0].strip():
37+
doc_lines.pop(0)
38+
while doc_lines and not doc_lines[-1].strip():
39+
doc_lines.pop()
3840

39-
@cli.command()
40-
@click.argument("task_name")
41-
def run(task_name):
42-
"""Run a specified task."""
43-
try:
44-
module = importlib.import_module(f"tasks.{task_name}")
45-
task_func = getattr(module, "execute")
46-
logging.info(f"Starting task {task_name}...")
47-
task_func()
48-
logging.info(f"Task {task_name} completed.")
49-
except (ModuleNotFoundError, AttributeError):
50-
logging.error(f"Task {task_name} not found.")
51-
sys.exit(1)
41+
click.echo(f"\n{module_name}:")
42+
for line in doc_lines:
43+
click.echo(f" {line}")
44+
45+
46+
@cli.group()
47+
def run():
48+
"""Run tasks."""
49+
pass
50+
51+
52+
@run.command("build_database")
53+
@click.option(
54+
"--refresh-type",
55+
type=click.Choice(["all", "last", "custom"]),
56+
default="all",
57+
help="Type of refresh to perform",
58+
)
59+
@click.option(
60+
"--custom-years",
61+
type=str,
62+
help="Comma-separated list of years to process (for custom refresh type)",
63+
)
64+
def run_build_database(refresh_type, custom_years):
65+
"""Run build_database task."""
66+
module = importlib.import_module("tasks.build_database")
67+
task_func = getattr(module, "execute")
68+
69+
custom_years_list = None
70+
if custom_years:
71+
custom_years_list = [year.strip() for year in custom_years.split(",")]
72+
73+
task_func(refresh_type=refresh_type, custom_years=custom_years_list)
74+
75+
76+
@run.command("download_database")
77+
@click.option(
78+
"--env",
79+
type=click.Choice(["dev", "prod"]),
80+
default="prod",
81+
help="Environment to download from",
82+
)
83+
def run_download_database(env):
84+
"""Download database from S3."""
85+
os.environ["ENVIRONMENT"] = env
86+
module = importlib.import_module("tasks.download_database")
87+
task_func = getattr(module, "execute")
88+
task_func()
89+
90+
91+
@run.command("upload_database")
92+
@click.option(
93+
"--env",
94+
type=click.Choice(["dev", "prod"]),
95+
default="dev",
96+
help="Environment to upload to",
97+
)
98+
def run_upload_database(env):
99+
"""Upload database to S3."""
100+
os.environ["ENVIRONMENT"] = env
101+
module = importlib.import_module("tasks.upload_database")
102+
task_func = getattr(module, "execute")
103+
task_func()
52104

53105

54106
if __name__ == "__main__":

pipelines/tasks/build_database.py

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
11
"""
22
Consolidate data into the database.
3+
4+
Args:
5+
- refresh-type (str): Type of refresh to perform ("all", "last", or "custom")
6+
- custom-years (str): List of years to process when refresh_type is "custom"
7+
8+
Examples:
9+
- build_database --refresh-type all : Process all years
10+
- build_database --refresh-type last : Process last year only
11+
- build_database --refresh-type custom --custom-years 2018,2024 : Process only the years 2018 and 2024
312
"""
413

514
import logging
@@ -41,7 +50,6 @@ def download_extract_insert_yearly_edc_data(year: str):
4150
:return: Create or replace the associated tables in the duckcb database.
4251
It adds the column "de_partition" based on year as an integer.
4352
"""
44-
4553
# Dataset specific constants
4654
DATA_URL = (
4755
edc_config["source"]["base_url"]
@@ -127,7 +135,16 @@ def process_edc_datasets(
127135
years_to_update = available_years[-1:]
128136
elif refresh_type == "custom":
129137
if custom_years:
130-
years_to_update = list(set(custom_years).intersection(available_years))
138+
# Check if every year provided are available
139+
invalid_years = set(custom_years) - set(available_years)
140+
if invalid_years:
141+
raise ValueError(
142+
f"Invalid years provided: {sorted(invalid_years)}. Years must be among: {available_years}"
143+
)
144+
# Filtering and sorting of valid years
145+
years_to_update = sorted(
146+
list(set(custom_years).intersection(available_years))
147+
)
131148
else:
132149
raise ValueError(
133150
""" custom_years parameter needs to be specified if refresh_type="custom" """
@@ -147,5 +164,11 @@ def process_edc_datasets(
147164
return True
148165

149166

150-
def execute():
151-
process_edc_datasets()
167+
def execute(refresh_type: str = "all", custom_years: List[str] = None):
168+
"""
169+
Execute the EDC dataset processing with specified parameters.
170+
171+
:param refresh_type: Type of refresh to perform ("all", "last", or "custom")
172+
:param custom_years: List of years to process when refresh_type is "custom"
173+
"""
174+
process_edc_datasets(refresh_type=refresh_type, custom_years=custom_years)

pipelines/tasks/download_database.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,14 @@
1+
"""
2+
Download database from S3 storage.
3+
4+
Args:
5+
- env (str): Environment to download from ("dev" or "prod")
6+
7+
Examples:
8+
- download_database --env prod : Download database from production environment
9+
- download_database --env dev : Download database from development environment
10+
"""
11+
112
import logging
213

314
from pipelines.config.config import get_environment, get_s3_path

pipelines/tasks/upload_database.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,14 @@
1+
"""
2+
Upload database to S3 storage.
3+
4+
Args:
5+
- env (str): Environment to upload to ("dev" or "prod")
6+
7+
Examples:
8+
- upload_database --env dev : Upload database to development environment
9+
- upload_database --env prod : Upload database to production environment
10+
"""
11+
112
import logging
213

314
from pipelines.config.config import get_environment, get_s3_path

0 commit comments

Comments
 (0)