Skip to content

Add support for Apache Spark(Core module). #20

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 20 commits into
base: spark
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion core/add_project.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ function setup_alluxio() {
mvn clean install -DskipTests -Dcheckstyle.skip -Dlicense.skip -Dfindbugs.skip -Dmaven.javadoc.skip=true
}

function setup_spark() {
[ ! -d "app/ctest-spark" ] && git clone https://github.com/ZHLOLin/spark.git app/ctest-spark
cd app/ctest-spark
git fetch && git checkout ctest_enable_intercepting
mvn clean install -pl core -am -DskipTests
}

function usage() {
echo "Usage: add_project.sh <main project>"
exit 1
Expand All @@ -63,8 +70,9 @@ function main() {
hadoop) setup_hadoop ;;
hbase) setup_hbase ;;
zookeeper) setup_zookeeper ;;
spark) setup_spark ;;
alluxio) setup_alluxio ;;
*) echo "Unexpected project: $project - only support hadoop, hbase, zookeeper and alluxio." ;;
*) echo "Unexpected project: $project - only support hadoop, hbase, zookeeper, spark and alluxio." ;;
esac
fi
}
Expand Down
11 changes: 10 additions & 1 deletion core/ctest_const.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,21 @@
HBASE = "hbase-server"
ZOOKEEPER = "zookeeper-server"
ALLUXIO = "alluxio-core"
SPARK = "spark-core"

CTEST_HADOOP_DIR = os.path.join(APP_DIR, "ctest-hadoop")
CTEST_HBASE_DIR = os.path.join(APP_DIR, "ctest-hbase")
CTEST_ZK_DIR = os.path.join(APP_DIR, "ctest-zookeeper")
CTEST_ALLUXIO_DIR = os.path.join(APP_DIR, "ctest-alluxio")
CTEST_SPARK_DIR = os.path.join(APP_DIR, "ctest-spark")

PROJECT_DIR = {
HCOMMON: CTEST_HADOOP_DIR,
HDFS: CTEST_HADOOP_DIR,
HBASE: CTEST_HBASE_DIR,
ZOOKEEPER: CTEST_ZK_DIR,
ALLUXIO: CTEST_ALLUXIO_DIR,
SPARK: CTEST_SPARK_DIR
}


Expand All @@ -34,6 +37,7 @@
HBASE: "hbase-server",
ZOOKEEPER: "zookeeper-server",
ALLUXIO: "core",
SPARK: "core"
}


Expand All @@ -58,6 +62,7 @@
os.path.join(CTEST_ALLUXIO_DIR, MODULE_SUBDIR[ALLUXIO], "server/worker", SUREFIRE_SUBDIR),
os.path.join(CTEST_ALLUXIO_DIR, MODULE_SUBDIR[ALLUXIO], "server/master", SUREFIRE_SUBDIR),
],
SPARK: [os.path.join(CTEST_SPARK_DIR, MODULE_SUBDIR[SPARK], SUREFIRE_SUBDIR)]
}

# default or deprecate conf path
Expand All @@ -74,7 +79,8 @@
HDFS: os.path.join(DEFAULT_CONF_DIR, HDFS + "-default.tsv"),
HBASE: os.path.join(DEFAULT_CONF_DIR, HBASE + "-default.tsv"),
ALLUXIO: os.path.join(DEFAULT_CONF_DIR, ALLUXIO + "-default.tsv"),
ZOOKEEPER: os.path.join(DEFAULT_CONF_DIR, ZOOKEEPER + "-default.tsv")
ZOOKEEPER: os.path.join(DEFAULT_CONF_DIR, ZOOKEEPER + "-default.tsv"),
SPARK: os.path.join(DEFAULT_CONF_DIR, SPARK + "-default.tsv")
}


Expand All @@ -96,6 +102,9 @@
],
ALLUXIO: [
os.path.join(CTEST_ALLUXIO_DIR, "core/alluxio-ctest.properties")
],
SPARK: [
CTEST_SPARK_DIR
]
}

Expand Down
365 changes: 365 additions & 0 deletions core/default_configs/spark-core-default.tsv

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions core/generate_ctest/ctest_mapping/ctests-spark-core.json

Large diffs are not rendered by default.

28 changes: 28 additions & 0 deletions core/generate_ctest/inject.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""inject parameter, values into sw config"""

import shutil
import sys
import xml.etree.ElementTree as ET

Expand Down Expand Up @@ -35,6 +36,28 @@ def inject_config(param_value_pairs):
file.write(str.encode("<?xml version=\"1.0\"?>\n<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>\n"))
file.write(ET.tostring(conf))
file.close()
elif project in [SPARK]:
for inject_path in INJECTION_PATH[project]:
back_up = inject_path + "/back_up.xml"
inject_path = inject_path + "/pom.xml"
shutil.copyfile(inject_path, back_up)
print(">>>>[ctest_core] injecting into file: {}".format(inject_path))
tree = ET.parse(inject_path)
pom = tree.getroot()
namespace = pom.tag.split('{')[1].split('}')[0]
# for reading
namespace_mapping = {'mvnns': namespace}
# for writing: otherwise 'xmlns:ns0' will be used instead of the standard xml namespace 'xmlns'
ET.register_namespace('', namespace)
ns = "{http://maven.apache.org/POM/4.0.0}"
for child in pom.findall("%sbuild/%spluginManagement/%splugins/%splugin" % (ns, ns, ns, ns)):
gid = child.find("%sgroupId" % ns)
if gid.text == "org.scalatest":
child = child.find("%sconfiguration/%ssystemProperties" % (ns, ns))
for p, v in param_value_pairs.items():
sub = ET.SubElement(child, '%s%s' % (ns, p))
sub.text = v
tree.write(inject_path, encoding='utf-8')
else:
sys.exit(">>>>[ctest_core] value injection for {} is not supported yet".format(project))

Expand All @@ -53,5 +76,10 @@ def clean_conf_file(project):
file.write(str.encode("<?xml version=\"1.0\"?>\n<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>\n"))
file.write(ET.tostring(conf))
file.close()
elif project in [SPARK]:
for inject_path in INJECTION_PATH[project]:
back_up = inject_path + "/back_up.xml"
inject_path = inject_path + "/pom.xml"
shutil.copyfile(back_up, inject_path)
else:
sys.exit(">>>>[ctest_core] value injection for {} is not supported yet".format(project))
3 changes: 1 addition & 2 deletions core/generate_ctest/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ def test_value_pair(test_input):
for param, values in test_input.items():
tr_file = open(os.path.join(GENCTEST_TR_DIR, project, TR_FILE.format(id=param)), "w")
mt_file = open(os.path.join(GENCTEST_TR_DIR, project, MT_FILE.format(id=param)), "w")

associated_tests = mapping[param] if param in mapping else []
if len(mapping[param]) != 0:
if len(associated_tests) != 0:
for value in values:
tr = run_test_seperate(param, value, associated_tests)

Expand Down
6 changes: 3 additions & 3 deletions core/generate_ctest/program_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
# run mode
"run_mode": "generate_ctest", # string
# name of the project, i.e. hadoop-common, hadoop-hdfs, see constant.py
"project": "hadoop-common", # string
"project": "spark-core", # string
# path to param -> tests json mapping
"mapping_path": "../../data/ctest_mapping/opensource-hadoop-common.json", # string
"mapping_path": "../../data/ctest_mapping/opensource-spark-core.json", # string
# good values of params tests will be run against
"param_value_tsv": "sample-hadoop-common.tsv", # string
"param_value_tsv": "spark-core-generated-values.tsv", # string
# display the terminal output live, without saving any results
"display_mode": False, # bool
# whether to use mvn test or mvn surefire:test
Expand Down
12 changes: 9 additions & 3 deletions core/generate_ctest/run_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def run_test_seperate(param, value, associated_tests):
print(">>>>[ctest_core] chdir to {}".format(testing_dir))
start_time = time.time()
for test in associated_tests:
cmd = run_test_utils.maven_cmd(test)
cmd = run_test_utils.maven_cmd(test, project=project)
if display_mode:
os.system(" ".join(cmd))
continue
Expand All @@ -40,7 +40,10 @@ def run_test_seperate(param, value, associated_tests):
# test hanged, treated as failure.
process.kill()
print(">>>>[ctest_core] maven cmd timeout {}".format(e))
clsname, testname = test.split("#")
if project in [SPARK]:
clsname, testname = test.split(" @ ")
else:
clsname, testname = test.split("#")
tr.ran_tests_and_time.add(test + "\t" + str(cmd_timeout))
tr.failed_tests.add(test)
continue
Expand All @@ -49,7 +52,10 @@ def run_test_seperate(param, value, associated_tests):

print_output = run_test_utils.strip_ansi(stdout.decode("ascii", "ignore"))
print(print_output)
clsname, testname = test.split("#")
if project in [SPARK]:
clsname, testname = test.split(" @ ")
else:
clsname, testname = test.split("#")
times, errors = parse_surefire(clsname, [testname])
if testname in times:
tr.ran_tests_and_time.add(test + "\t" + times[testname])
Expand Down
11 changes: 8 additions & 3 deletions core/generate_ctest/run_test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,16 @@ def __init__(self, ran_tests_and_time=set(), failed_tests=set()):
self.ran_tests_and_time = ran_tests_and_time


def maven_cmd(test, add_time=False):
def maven_cmd(test, add_time=False, project=None):
# surefire:test reuses test build from last compilation
# if you modified the test and want to rerun it, you must use `mvn test`
test_mode = "surefire:test" if use_surefire else "test"
cmd = ["mvn", test_mode, "-Dtest={}".format(test)] + maven_args
cmd = None
if project == SPARK:
test_mode = "scalatest:test" if use_surefire else "test"
cmd = ["mvn", test_mode, "-Dtest=none", "-Dsuites=" + test] + maven_args
else:
test_mode = "surefire:test" if use_surefire else "test"
cmd = ["mvn", test_mode, "-Dtest={}".format(test)] + maven_args
if add_time:
cmd = ["time"] + cmd
print(">>>>[ctest_core] command: " + " ".join(cmd))
Expand Down
30 changes: 30 additions & 0 deletions core/generate_ctest/spark-core-generated-values.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
spark.driver.log.allowErasureCoding true SKIP
spark.driver.userClassPathFirst true SKIP
spark.executor.logs.rolling.enableCompression true SKIP
spark.shuffle.io.maxRetries 1 6
spark.shuffle.accurateBlockThreshold T OOM
spark.shuffle.registration.maxAttempts 1 6
spark.driver.userClassPathFirst true SKIP
spark.executor.logs.rolling.enableCompression true SKIP
spark.shuffle.io.maxRetries 1 6
spark.shuffle.accurateBlockThreshold T OOM
spark.shuffle.registration.maxAttempts 1 6
spark.eventLog.logBlockUpdates.enabled true SKIP
spark.eventLog.dir /valid/dir1 /valid/dir2
spark.eventLog.rolling.enabled true SKIP
spark.eventLog.rolling.maxFileSize 1m 256m
spark.checkpoint.compress true SKIP
spark.cleaner.referenceTracking.cleanCheckpoints true SKIP
spark.broadcast.UDFCompressionThreshold RDD SKIP
spark.files.useFetchCache false SKIP
spark.files.overwrite true SKIP
spark.storage.decommission.shuffleBlocks.enabled false SKIP
spark.storage.decommission.shuffleBlocks.maxThreads 1 16
spark.storage.decommission.rddBlocks.enabled false SKIP
spark.storage.decommission.fallbackStorage.cleanUp true SKIP
spark.eventLog.logStageExecutorMetrics true SKIP
spark.scheduler.excludeOnFailure.unschedulableTaskSetTimeout 1s 240s
spark.excludeOnFailure.timeout 10h 2h
spark.excludeOnFailure.application.maxFailedTasksPerExecutor 1 4
spark.excludeOnFailure.application.maxFailedExecutorsPerNode 1 4
spark.excludeOnFailure.killExcludedExecutors true SKIP
Loading