|
2 | 2 | """
|
3 | 3 | Script to start a local server. This script can serve as the entry-point for doing spark-submit.
|
4 | 4 | """
|
5 |
| - |
| 5 | +import datetime |
6 | 6 | import logging
|
7 | 7 | import os
|
8 | 8 | import socket
|
@@ -34,7 +34,8 @@ def setup_local_spark(log_dir: Path = Path.cwd(), verbosity=0):
|
34 | 34 | # TODO: make this more reusable (e.g. also see `_setup_local_spark` in tests/conftest.py)
|
35 | 35 | from pyspark import SparkContext, find_spark_home
|
36 | 36 |
|
37 |
| - spark_python = os.path.join(find_spark_home._find_spark_home(), "python") |
| 37 | + spark_home = find_spark_home._find_spark_home() |
| 38 | + spark_python = os.path.join(spark_home, "python") |
38 | 39 | logging.info(f"spark_python: {spark_python}")
|
39 | 40 | py4j = glob(os.path.join(spark_python, "lib", "py4j-*.zip"))[0]
|
40 | 41 | sys.path[:0] = [spark_python, py4j]
|
@@ -86,6 +87,21 @@ def setup_local_spark(log_dir: Path = Path.cwd(), verbosity=0):
|
86 | 87 | conf.set(key="spark.executor.memory", value="2G")
|
87 | 88 | OPENEO_LOCAL_DEBUGGING = smart_bool(os.environ.get("OPENEO_LOCAL_DEBUGGING", "false"))
|
88 | 89 | conf.set("spark.ui.enabled", OPENEO_LOCAL_DEBUGGING)
|
| 90 | + if OPENEO_LOCAL_DEBUGGING: |
| 91 | + events_dir = "/tmp/spark-events" # manually create this folder if you want to keep history |
| 92 | + if os.path.exists(events_dir): |
| 93 | + conf.set("spark.eventLog.enabled", "true") |
| 94 | + _log.info( |
| 95 | + f"Start spark history server with {spark_home}/sbin/start-history-server.sh and open http://localhost:18080/" |
| 96 | + ) |
| 97 | + files = glob(os.path.join(events_dir, "*")) |
| 98 | + for f in files: |
| 99 | + # remove event logs older than 7 days: |
| 100 | + if os.path.getmtime(f) < datetime.datetime.now().timestamp() - 7 * 24 * 3600: |
| 101 | + try: |
| 102 | + os.remove(f) |
| 103 | + except Exception as e: |
| 104 | + _log.warning(f"Failed to remove old spark event log {f}: {e}") |
89 | 105 |
|
90 | 106 | jars = []
|
91 | 107 | more_jars = [] if "GEOPYSPARK_JARS_PATH" not in os.environ else os.environ["GEOPYSPARK_JARS_PATH"].split(":")
|
|
0 commit comments