@@ -207,6 +207,18 @@ class BatchJobSubmitArgs:
207
207
Passed to `--conf spark.executor.pyspark.memory={python_max_memory}b`.
208
208
See https://spark.apache.org/docs/latest/configuration.html
209
209
"""
210
+
211
+ spark_eventlog_dir : str
212
+ """Directory for Spark event logs"""
213
+
214
+ spark_history_fs_logdirectory : str
215
+ """Directory for Spark history server logs"""
216
+
217
+ spark_yarn_historyserver_address : str
218
+ """Address of Spark history server"""
219
+
220
+ yarn_container_runtime_docker_client_config : str
221
+ """Path to Docker client configuration for YARN containers (optional)"""
210
222
211
223
def to_args_list (self ) -> list [str ]:
212
224
"""Convert the dataclass to a list of string arguments for subprocess."""
@@ -249,6 +261,10 @@ def to_args_list(self) -> list[str]:
249
261
self .udf_python_dependencies_archive_path ,
250
262
self .propagatable_web_app_driver_envars ,
251
263
self .python_max_memory ,
264
+ self .spark_eventlog_dir ,
265
+ self .spark_history_fs_logdirectory ,
266
+ self .spark_yarn_historyserver_address ,
267
+ self .yarn_container_runtime_docker_client_config ,
252
268
]
253
269
254
270
@@ -438,6 +454,15 @@ def as_boolean_arg(job_option_key: str, default_value: str) -> str:
438
454
principal_value = "no_principal"
439
455
keytab_value = "no_keytab"
440
456
457
+ backend_config = get_backend_config ()
458
+
459
+ if not backend_config .batch_spark_eventlog_dir :
460
+ raise InternalException ("batch_spark_eventlog_dir must be configured in backend config" )
461
+ if not backend_config .batch_spark_history_fs_logdirectory :
462
+ raise InternalException ("batch_spark_history_fs_logdirectory must be configured in backend config" )
463
+ if not backend_config .batch_spark_yarn_historyserver_address :
464
+ raise InternalException ("batch_spark_yarn_historyserver_address must be configured in backend config" )
465
+
441
466
# Create structured arguments using dataclass
442
467
submit_args = BatchJobSubmitArgs (
443
468
script_location = script_location ,
@@ -471,13 +496,17 @@ def as_boolean_arg(job_option_key: str, default_value: str) -> str:
471
496
log_level = options .log_level ,
472
497
openeo_backend_config = os .environ .get (ConfigGetter .OPENEO_BACKEND_CONFIG , "" ),
473
498
udf_python_dependencies_folder_path = str (job_work_dir / UDF_PYTHON_DEPENDENCIES_FOLDER_NAME ),
474
- ejr_api = get_backend_config () .ejr_api or "" ,
475
- ejr_backend_id = get_backend_config () .ejr_backend_id ,
499
+ ejr_api = backend_config .ejr_api or "" ,
500
+ ejr_backend_id = backend_config .ejr_backend_id ,
476
501
ejr_oidc_client_credentials = os .environ .get ("OPENEO_EJR_OIDC_CLIENT_CREDENTIALS" , "" ),
477
502
docker_mounts = docker_mounts ,
478
503
udf_python_dependencies_archive_path = str (job_work_dir / UDF_PYTHON_DEPENDENCIES_ARCHIVE_NAME ),
479
504
propagatable_web_app_driver_envars = os .environ .get ("OPENEO_PROPAGATABLE_WEB_APP_DRIVER_ENVARS" , "" ),
480
505
python_max_memory = str (byte_string_as (options .python_memory )) if options .python_memory else "-1" ,
506
+ spark_eventlog_dir = backend_config .batch_spark_eventlog_dir ,
507
+ spark_history_fs_logdirectory = backend_config .batch_spark_history_fs_logdirectory ,
508
+ spark_yarn_historyserver_address = backend_config .batch_spark_yarn_historyserver_address ,
509
+ yarn_container_runtime_docker_client_config = backend_config .batch_yarn_container_runtime_docker_client_config ,
481
510
)
482
511
args = submit_args .to_args_list ()
483
512
0 commit comments