Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
1a5fc78
add fct get_yearly_dataset_infos
moreaupascal56 Feb 2, 2025
dcb5fa5
parametrize process_sise_eaux_dataset_2024
moreaupascal56 Feb 2, 2025
2da6cdf
recreate cache folder by default in clear_cache()
moreaupascal56 Feb 2, 2025
5b3de67
add check_table_existence function
moreaupascal56 Feb 2, 2025
b46a69c
parametrize process_sise_eaux_dataset_2024()
moreaupascal56 Feb 2, 2025
e222562
format _common.py
moreaupascal56 Feb 3, 2025
b58894f
add process_sise_eaux_dataset() controller and rename process_sise_ea…
moreaupascal56 Feb 3, 2025
dcc29a3
upd docstrings, formatting
moreaupascal56 Feb 3, 2025
c628c8e
upd logs and add a clear_cache() in process_sise_eaux_dataset
moreaupascal56 Feb 3, 2025
69516eb
reorganize file
moreaupascal56 Feb 3, 2025
40e85e3
add notebook to preview data
jereze Feb 3, 2025
1ff4f23
fix Incompatible types in assignment
moreaupascal56 Feb 3, 2025
c0c9997
rename SISE to EDC
moreaupascal56 Feb 3, 2025
d6e1ab9
rename annee_prelevement to de_partition
moreaupascal56 Feb 3, 2025
077150e
catch and raise error if refresh_type not in the allowed values
moreaupascal56 Feb 3, 2025
700eb94
format
moreaupascal56 Feb 3, 2025
42950fa
fix typo
moreaupascal56 Feb 3, 2025
df03a14
Add cli argument to request by specific years
SprinTech Feb 3, 2025
23b5d3b
Merge branch 'main' into feature/request-by-specific-year
SprinTech Feb 5, 2025
47c1d84
Improve documentation from CLI
SprinTech Feb 5, 2025
64ba8a0
Merge branch 'main' into feature/request-by-specific-year
SprinTech Feb 7, 2025
9991395
Adapt run command to task
SprinTech Feb 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,22 @@ Tout le code dans pipelines sera installé en tant que package python automatiqu
### Comment construire la database

Une fois l'environnement python setup avec uv, vous pouvez lancer data_pipeline/run.py pour remplir la database
Il suffit de lancer

Le téléchargement des données peut se faire de plusieurs manières :
* 1. Téléchargement des données de la dernière année (par défaut)
```bash
uv run pipelines/run.py run build_database
uv run pipelines/run.py run build_database --refresh-type last
```

* 2. Téléchargement de toutes les données

```bash
uv run pipelines/run.py run build_database --refresh-type all
```

* 3. Téléchargement de données d'années spécifiques
```bash
uv run pipelines/run.py run build_database --refresh-type custom --custom-years 2018,2024,...
```

### Comment télécharger la database depuis S3
Expand Down
3 changes: 1 addition & 2 deletions pipelines/config/.env.example
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
SCW_ACCESS_KEY=MyKey
SCW_SECRET_KEY=MySecret
ENV=dev
SCW_SECRET_KEY=MySecret
90 changes: 71 additions & 19 deletions pipelines/run.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import importlib
import logging
import os
import sys

import click

Expand All @@ -26,29 +25,82 @@ def cli():
def list():
"""List all available tasks."""
tasks_dir = os.path.join(os.path.dirname(__file__), "tasks")
click.echo("Available tasks:")
for filename in os.listdir(tasks_dir):

for filename in sorted(os.listdir(tasks_dir)):
if filename.endswith(".py") and not filename.startswith("_"):
module_name = filename[:-3]
module = importlib.import_module(f"tasks.{module_name}")
description = module.__doc__ or "No description"
description = description.strip().split("\n")[0]
click.echo(f"- {module_name}: {description}")

doc = module.__doc__ or "No description"
doc_lines = doc.strip().split("\n")
while doc_lines and not doc_lines[0].strip():
doc_lines.pop(0)
while doc_lines and not doc_lines[-1].strip():
doc_lines.pop()

@cli.command()
@click.argument("task_name")
def run(task_name):
"""Run a specified task."""
try:
module = importlib.import_module(f"tasks.{task_name}")
task_func = getattr(module, "execute")
logging.info(f"Starting task {task_name}...")
task_func()
logging.info(f"Task {task_name} completed.")
except (ModuleNotFoundError, AttributeError):
logging.error(f"Task {task_name} not found.")
sys.exit(1)
click.echo(f"\n{module_name}:")
for line in doc_lines:
click.echo(f" {line}")


@cli.group()
def run():
"""Run tasks."""
pass


@run.command("build_database")
@click.option(
"--refresh-type",
type=click.Choice(["all", "last", "custom"]),
default="all",
help="Type of refresh to perform",
)
@click.option(
"--custom-years",
type=str,
help="Comma-separated list of years to process (for custom refresh type)",
)
def run_build_database(refresh_type, custom_years):
"""Run build_database task."""
module = importlib.import_module("tasks.build_database")
task_func = getattr(module, "execute")

custom_years_list = None
if custom_years:
custom_years_list = [year.strip() for year in custom_years.split(",")]

task_func(refresh_type=refresh_type, custom_years=custom_years_list)


@run.command("download_database")
@click.option(
"--env",
type=click.Choice(["dev", "prod"]),
default="prod",
help="Environment to download from",
)
def run_download_database(env):
"""Download database from S3."""
os.environ["ENVIRONMENT"] = env
module = importlib.import_module("tasks.download_database")
task_func = getattr(module, "execute")
task_func()


@run.command("upload_database")
@click.option(
"--env",
type=click.Choice(["dev", "prod"]),
default="dev",
help="Environment to upload to",
)
def run_upload_database(env):
"""Upload database to S3."""
os.environ["ENVIRONMENT"] = env
module = importlib.import_module("tasks.upload_database")
task_func = getattr(module, "execute")
task_func()


if __name__ == "__main__":
Expand Down
31 changes: 27 additions & 4 deletions pipelines/tasks/build_database.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
"""
Consolidate data into the database.

Args:
- refresh-type (str): Type of refresh to perform ("all", "last", or "custom")
- custom-years (str): List of years to process when refresh_type is "custom"

Examples:
- build_database --refresh-type all : Process all years
- build_database --refresh-type last : Process last year only
- build_database --refresh-type custom --custom-years 2018,2024 : Process only the years 2018 and 2024
"""

import logging
Expand Down Expand Up @@ -41,7 +50,6 @@ def download_extract_insert_yearly_edc_data(year: str):
:return: Create or replace the associated tables in the duckcb database.
It adds the column "de_partition" based on year as an integer.
"""

# Dataset specific constants
DATA_URL = (
edc_config["source"]["base_url"]
Expand Down Expand Up @@ -127,7 +135,16 @@ def process_edc_datasets(
years_to_update = available_years[-1:]
elif refresh_type == "custom":
if custom_years:
years_to_update = list(set(custom_years).intersection(available_years))
# Check if every year provided are available
invalid_years = set(custom_years) - set(available_years)
if invalid_years:
raise ValueError(
f"Invalid years provided: {sorted(invalid_years)}. Years must be among: {available_years}"
)
# Filtering and sorting of valid years
years_to_update = sorted(
list(set(custom_years).intersection(available_years))
)
else:
raise ValueError(
""" custom_years parameter needs to be specified if refresh_type="custom" """
Expand All @@ -147,5 +164,11 @@ def process_edc_datasets(
return True


def execute():
process_edc_datasets()
def execute(refresh_type: str = "all", custom_years: List[str] = None):
"""
Execute the EDC dataset processing with specified parameters.

:param refresh_type: Type of refresh to perform ("all", "last", or "custom")
:param custom_years: List of years to process when refresh_type is "custom"
"""
process_edc_datasets(refresh_type=refresh_type, custom_years=custom_years)
11 changes: 11 additions & 0 deletions pipelines/tasks/download_database.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
"""
Download database from S3 storage.

Args:
- env (str): Environment to download from ("dev" or "prod")

Examples:
- download_database --env prod : Download database from production environment
- download_database --env dev : Download database from development environment
"""

import logging

from pipelines.config.config import get_environment, get_s3_path
Expand Down
11 changes: 11 additions & 0 deletions pipelines/tasks/upload_database.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
"""
Upload database to S3 storage.

Args:
- env (str): Environment to upload to ("dev" or "prod")

Examples:
- upload_database --env dev : Upload database to development environment
- upload_database --env prod : Upload database to production environment
"""

import logging

from pipelines.config.config import get_environment, get_s3_path
Expand Down