Skip to content

Commit df03a14

Browse files
committed
Add cli argument to request by specific years
1 parent 42950fa commit df03a14

File tree

2 files changed

+38
-5
lines changed

2 files changed

+38
-5
lines changed

pipelines/run.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,32 @@ def list():
3333

3434
@cli.command()
3535
@click.argument("task_name")
36-
def run(task_name):
36+
@click.option(
37+
"--refresh-type",
38+
type=click.Choice(["all", "last", "custom"]),
39+
default="all",
40+
help="Type of refresh to perform",
41+
)
42+
@click.option(
43+
"--custom-years",
44+
type=str,
45+
help="Comma-separated list of years to process (for custom refresh type)",
46+
)
47+
def run(task_name, refresh_type, custom_years):
3748
"""Run a specified task."""
3849
try:
3950
module = importlib.import_module(f"tasks.{task_name}")
4051
task_func = getattr(module, "execute")
4152
logging.info(f"Starting task {task_name}...")
42-
task_func()
53+
54+
# Parse custom years if provided
55+
custom_years_list = None
56+
if custom_years:
57+
custom_years_list = [year.strip() for year in custom_years.split(",")]
58+
59+
# Call the task function with parameters
60+
task_func(refresh_type=refresh_type, custom_years=custom_years_list)
61+
4362
logging.info(f"Task {task_name} completed.")
4463
except (ModuleNotFoundError, AttributeError):
4564
logging.error(f"Task {task_name} not found.")

pipelines/tasks/build_database.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,11 +169,19 @@ def process_edc_datasets(
169169
years_to_update = available_years[-1:]
170170
elif refresh_type == "custom":
171171
if custom_years:
172-
years_to_update = list(set(custom_years).intersection(available_years))
172+
# Check if every year provided are available
173+
invalid_years = set(custom_years) - set(available_years)
174+
if invalid_years:
175+
raise ValueError(
176+
f"Invalid years provided: {sorted(invalid_years)}. Years must be among: {available_years}"
177+
)
178+
# Filtering and sorting of valid years
179+
years_to_update = sorted(list(set(custom_years).intersection(available_years)))
173180
else:
174181
raise ValueError(
175182
""" custom_years parameter needs to be specified if refresh_type="custom" """
176183
)
184+
177185
else:
178186
raise ValueError(
179187
f""" refresh_type needs to be one of ["all", "last", "custom"], it can't be: {refresh_type}"""
@@ -189,5 +197,11 @@ def process_edc_datasets(
189197
return True
190198

191199

192-
def execute():
193-
process_edc_datasets()
200+
def execute(refresh_type: str = "all", custom_years: List[str] = None):
201+
"""
202+
Execute the EDC dataset processing with specified parameters.
203+
204+
:param refresh_type: Type of refresh to perform ("all", "last", or "custom")
205+
:param custom_years: List of years to process when refresh_type is "custom"
206+
"""
207+
process_edc_datasets(refresh_type=refresh_type, custom_years=custom_years)

0 commit comments

Comments
 (0)