Skip to content

Commit 61e18ff

Browse files
committed
Issue #366 also cover job_options in synchronous mode
1 parent 787f2c8 commit 61e18ff

File tree

3 files changed

+78
-3
lines changed

3 files changed

+78
-3
lines changed

openeo_driver/views.py

+7
Original file line numberDiff line numberDiff line change
@@ -658,9 +658,16 @@ def result(user: User):
658658
budget = post_data.get("budget")
659659
plan = post_data.get("plan")
660660
log_level = post_data.get("log_level", DEFAULT_LOG_LEVEL_PROCESSING)
661+
661662
job_options = _extract_job_options(
662663
post_data, to_ignore=["process", "process_graph", "budget", "plan", "log_level"]
663664
)
665+
job_option_defaults = extract_default_job_options_from_process_graph(
666+
process_graph=process_graph, processing_mode="synchronous"
667+
)
668+
if job_option_defaults:
669+
_log.info(f"Extending {job_options=} with extracted {job_option_defaults=}")
670+
job_options = {**job_option_defaults, **(job_options or {})}
664671

665672
request_id = FlaskRequestCorrelationIdLogging.get_request_id()
666673

tests/test_views.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1328,7 +1328,7 @@ def test_create_job_100_with_job_options_and_defaults_from_remote_process_defini
13281328
process_definition = {
13291329
"id": "add3",
13301330
"process_graph": {
1331-
"add": {"process_id": "add", "arguments": {"x": {"from_parameter": "x", "y": 3}, "result": True}}
1331+
"add": {"process_id": "add", "arguments": {"x": {"from_parameter": "x"}, "y": 3}, "result": True}
13321332
},
13331333
"parameters": [
13341334
{"name": "x", "schema": {"type": "number"}},

tests/test_views_execute.py

+70-2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
custom_process,
3535
custom_process_from_process_graph,
3636
collect,
37+
ENV_DRY_RUN_TRACER,
3738
)
3839
from openeo_driver.testing import (
3940
TEST_USER,
@@ -4657,9 +4658,11 @@ def custom_process_registry(backend_implementation) -> ProcessRegistry:
46574658
)
46584659
def test_synchronous_processing_job_options(api, custom_process_registry, post_data_base, expected_job_options):
46594660
"""Test job options handling in synchronous processing in EvalEnv"""
4660-
4661+
actual_job_options = []
46614662
def i_spy_with_my_little_eye(args: ProcessArgs, env: EvalEnv):
4662-
assert env.get("job_options") == expected_job_options
4663+
nonlocal actual_job_options
4664+
if not env.get(ENV_DRY_RUN_TRACER):
4665+
actual_job_options.append(env.get("job_options"))
46634666
return args.get("x")
46644667

46654668
custom_process_registry.add_function(i_spy_with_my_little_eye, spec={"id": "i_spy_with_my_little_eye"})
@@ -4672,6 +4675,71 @@ def i_spy_with_my_little_eye(args: ProcessArgs, env: EvalEnv):
46724675
api.ensure_auth_header()
46734676
res = api.post(path="/result", json=post_data)
46744677
assert res.assert_status_code(200).json == 123
4678+
assert actual_job_options == [expected_job_options]
4679+
4680+
4681+
@pytest.mark.parametrize(
4682+
["default_job_options", "given_job_options", "expected_job_options"],
4683+
[
4684+
(None, None, None),
4685+
({}, {}, {}),
4686+
({"cpu": "yellow"}, {}, {"cpu": "yellow"}),
4687+
({}, {"cpu": "yellow"}, {"cpu": "yellow"}),
4688+
({"cpu": "yellow"}, {"cpu": "blue"}, {"cpu": "blue"}),
4689+
(
4690+
{"memory": "2GB", "cpu": "yellow"},
4691+
{"memory": "4GB", "queue": "fast"},
4692+
{"cpu": "yellow", "memory": "4GB", "queue": "fast"},
4693+
),
4694+
],
4695+
)
4696+
def test_synchronous_processing_job_options_and_defaults_from_remote_process_definition(
4697+
api, custom_process_registry, requests_mock, default_job_options, given_job_options, expected_job_options
4698+
):
4699+
process_definition = {
4700+
"id": "add3",
4701+
"process_graph": {
4702+
"add": {"process_id": "add", "arguments": {"x": {"from_parameter": "x"}, "y": 3}, "result": True}
4703+
},
4704+
"parameters": [
4705+
{"name": "x", "schema": {"type": "number"}},
4706+
],
4707+
"returns": {"schema": {"type": "number"}},
4708+
}
4709+
if default_job_options is not None:
4710+
process_definition["default_synchronous_parameters"] = default_job_options
4711+
requests_mock.get("https://share.test/add3.json", json=process_definition)
4712+
4713+
actual_job_options = []
4714+
4715+
def i_spy_with_my_little_eye(args: ProcessArgs, env: EvalEnv):
4716+
nonlocal actual_job_options
4717+
if not env.get(ENV_DRY_RUN_TRACER):
4718+
actual_job_options.append(env.get("job_options"))
4719+
return args.get("x")
4720+
4721+
custom_process_registry.add_function(i_spy_with_my_little_eye, spec={"id": "i_spy_with_my_little_eye"})
4722+
custom_process_registry.add_process(name="add", function=lambda args, env: args.get("x") + args.get("y"))
4723+
4724+
pg = {
4725+
"ispy": {"process_id": "i_spy_with_my_little_eye", "arguments": {"x": 123}},
4726+
"add3": {
4727+
"process_id": "add3",
4728+
"namespace": "https://share.test/add3.json",
4729+
"arguments": {"x": {"from_node": "ispy"}},
4730+
"result": True,
4731+
},
4732+
}
4733+
post_data = {
4734+
"process": {"process_graph": pg},
4735+
}
4736+
if given_job_options is not None:
4737+
post_data["job_options"] = given_job_options
4738+
4739+
api.ensure_auth_header()
4740+
res = api.post(path="/result", json=post_data)
4741+
assert res.assert_status_code(200).json == 126
4742+
assert actual_job_options == [expected_job_options]
46754743

46764744

46774745
def test_load_collection_property_from_parameter(api, udp_registry):

0 commit comments

Comments
 (0)