Skip to content

Commit 91d6298

Browse files
authored
feat: Add Example for Apache Beam on Spark on EKS (#852)
2 parents d220d8e + 4767dc7 commit 91d6298

File tree

10 files changed

+2413
-19
lines changed

10 files changed

+2413
-19
lines changed

analytics/terraform/spark-k8s-operator/addons.tf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -959,7 +959,7 @@ resource "random_password" "grafana" {
959959

960960
#tfsec:ignore:aws-ssm-secret-use-customer-key
961961
resource "aws_secretsmanager_secret" "grafana" {
962-
name = "${local.name}-grafana"
962+
name_prefix = "${local.name}-grafana-"
963963
recovery_window_in_days = 0 # Set to zero for this example to force delete during Terraform destroy
964964
}
965965

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
FROM apache/spark:3.5.6-scala2.12-java17-python3-ubuntu
2+
3+
ARG PYTHON_VERSION=3.11.3
4+
ARG BEAM_VERSION=2.58.0
5+
ARG HADOOP_VERSION=3.4.1
6+
ARG AWS_SDK_VERSION=2.29.0
7+
ARG SPARK_UID=185
8+
9+
ENV SPARK_HOME=/opt/spark
10+
11+
# Set up as root to install dependencies and tools
12+
USER root
13+
14+
# Remove any old Hadoop libraries to avoid conflicts
15+
RUN rm -f ${SPARK_HOME}/jars/hadoop-client-* && \
16+
rm -f ${SPARK_HOME}/jars/hadoop-yarn-server-web-proxy-*.jar
17+
18+
# Add Hadoop AWS connector and related Hadoop dependencies
19+
RUN cd ${SPARK_HOME}/jars && \
20+
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar && \
21+
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-api/${HADOOP_VERSION}/hadoop-client-api-${HADOOP_VERSION}.jar && \
22+
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-runtime/${HADOOP_VERSION}/hadoop-client-runtime-${HADOOP_VERSION}.jar && \
23+
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-common/${HADOOP_VERSION}/hadoop-common-${HADOOP_VERSION}.jar && \
24+
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-yarn-server-web-proxy/${HADOOP_VERSION}/hadoop-yarn-server-web-proxy-${HADOOP_VERSION}.jar && \
25+
wget https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/${AWS_SDK_VERSION}/bundle-${AWS_SDK_VERSION}.jar
26+
27+
RUN apt-get update && \
28+
apt-get install -y gcc libssl-dev lzma liblzma-dev libbz2-dev libffi-dev tar gzip wget make && \
29+
wget https://www.python.org/ftp/python/${PYTHON_VERSION}/Python-${PYTHON_VERSION}.tgz && \
30+
tar xzf Python-${PYTHON_VERSION}.tgz && \
31+
cd Python-${PYTHON_VERSION} && \
32+
./configure --enable-optimizations && \
33+
make install
34+
ENV VIRTUAL_ENV=/opt/venv
35+
RUN python3 -m venv $VIRTUAL_ENV --copies
36+
RUN cp -r /usr/local/lib/python3.11/* $VIRTUAL_ENV/lib/python3.11
37+
ENV PATH="$VIRTUAL_ENV/bin:$PATH"
38+
RUN python3 -m pip install --upgrade pip && \
39+
python3 -m pip install apache_beam==${BEAM_VERSION} \
40+
s3fs \
41+
boto3
42+
ENV PYSPARK_PYTHON="/opt/venv/bin/python3"
43+
ENV PYSPARK_DRIVER_PYTHON="/opt/venv/bin/python3"
44+
ENV RUN_PYTHON_SDK_IN_DEFAULT_ENVIRONMENT=1
45+
COPY --from=apache/beam_python3.11_sdk:2.58.0 /opt/apache/beam /opt/apache/beam
46+
47+
# Set working directory
48+
WORKDIR ${SPARK_HOME}
49+
50+
USER ${SPARK_UID}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
---
2+
apiVersion: "sparkoperator.k8s.io/v1beta2"
3+
kind: SparkApplication
4+
metadata:
5+
name: beam-wc
6+
namespace: spark-team-a
7+
spec:
8+
type: Python
9+
pythonVersion: "3"
10+
# Beam runtime image
11+
image: "$ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/beam-spark-repo:eks-beam-image"
12+
imagePullPolicy: Always
13+
mainClass: org.apache.beam.runners.spark.SparkPipelineRunner
14+
mainApplicationFile: "s3://$S3_BUCKET/app/wordcountApp.jar"
15+
sparkConf:
16+
spark.local.dir: "/data"
17+
spark.speculation: "false"
18+
spark.network.timeout: "2400"
19+
spark.hadoop.fs.s3a.connection.timeout: "1200000"
20+
spark.hadoop.fs.s3a.path.style.access: "true"
21+
spark.hadoop.fs.s3a.connection.maximum: "200"
22+
spark.hadoop.fs.s3a.fast.upload: "true"
23+
spark.hadoop.fs.s3a.readahead.range: "256K"
24+
spark.hadoop.fs.s3a.input.fadvise: "random"
25+
spark.hadoop.fs.s3a.aws.credentials.provider.mapping: "com.amazonaws.auth.WebIdentityTokenCredentialsProvider=software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider"
26+
spark.hadoop.fs.s3a.aws.credentials.provider: "software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider" # AWS SDK V2 https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/aws_sdk_upgrade.html"
27+
spark.hadoop.fs.s3.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
28+
spark.eventLog.enabled: "true"
29+
spark.eventLog.dir: "s3://$S3_BUCKET/spark-event-logs/"
30+
spark.app.name: "beam-wc"
31+
spark.kubernetes.executor.podNamePrefix: "beam-spark"
32+
spark.kubernetes.driver.pod.name: beam-spark-driver
33+
# Required for EMR Runtime and Glue Catalogue
34+
spark.sql.parquet.fs.optimized.committer.optimization-enabled: "true"
35+
spark.executor.defaultJavaOptions: -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseParallelGC -XX:InitiatingHeapOccupancyPercent=70 -XX:OnOutOfMemoryError="kill -9 %p"
36+
spark.driver.defaultJavaOptions: -XX:OnOutOfMemoryError="kill -9 %p" -XX:+UseParallelGC -XX:InitiatingHeapOccupancyPercent=70
37+
spark.pyspark.python: /opt/venv/bin/python3
38+
spark.pyspark.driver.python: /opt/venv/bin/python3
39+
sparkVersion: "3.5.6"
40+
restartPolicy:
41+
type: Never
42+
driver:
43+
cores: 1
44+
memory: "4g"
45+
serviceAccount: spark-team-a
46+
nodeSelector:
47+
NodeGroupType: "SparkComputeOptimized"
48+
karpenter.sh/capacity-type: "on-demand"
49+
executor:
50+
cores: 1
51+
instances: 4
52+
memory: "4g"
53+
serviceAccount: spark-team-a
54+
nodeSelector:
55+
NodeGroupType: "SparkComputeOptimized"
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
"""A word-counting workflow."""
19+
20+
# pytype: skip-file
21+
22+
# beam-playground:
23+
# name: WordCount
24+
# description: An example that counts words in Shakespeare's works.
25+
# multifile: false
26+
# pipeline_options: --output output.txt
27+
# context_line: 87
28+
# categories:
29+
# - Combiners
30+
# - Options
31+
# - Quickstart
32+
# complexity: MEDIUM
33+
# tags:
34+
# - options
35+
# - count
36+
# - combine
37+
# - strings
38+
39+
import argparse
40+
import logging
41+
import re
42+
43+
import apache_beam as beam
44+
from apache_beam.io import ReadFromText
45+
from apache_beam.io import WriteToText
46+
from apache_beam.options.pipeline_options import PipelineOptions
47+
from apache_beam.options.pipeline_options import SetupOptions
48+
from apache_beam.runners.runner import PipelineResult
49+
50+
51+
class WordExtractingDoFn(beam.DoFn):
52+
"""Parse each line of input text into words."""
53+
def process(self, element):
54+
"""Returns an iterator over the words of this element.
55+
56+
The element is a line of text. If the line is blank, note that, too.
57+
58+
Args:
59+
element: the element being processed
60+
61+
Returns:
62+
The processed element.
63+
"""
64+
return re.findall(r'[\w\']+', element, re.UNICODE)
65+
66+
67+
def run(argv=None, save_main_session=True) -> PipelineResult:
68+
"""Main entry point; defines and runs the wordcount pipeline."""
69+
parser = argparse.ArgumentParser()
70+
parser.add_argument(
71+
'--input',
72+
dest='input',
73+
default='gs://dataflow-samples/shakespeare/kinglear.txt',
74+
help='Input file to process.')
75+
parser.add_argument(
76+
'--output',
77+
dest='output',
78+
required=True,
79+
help='Output file to write results to.')
80+
known_args, pipeline_args = parser.parse_known_args(argv)
81+
82+
# We use the save_main_session option because one or more DoFn's in this
83+
# workflow rely on global context (e.g., a module imported at module level).
84+
pipeline_options = PipelineOptions(pipeline_args)
85+
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
86+
87+
pipeline = beam.Pipeline(options=pipeline_options)
88+
89+
# Read the text file[pattern] into a PCollection.
90+
lines = pipeline | 'Read' >> ReadFromText(known_args.input)
91+
92+
counts = (
93+
lines
94+
| 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
95+
| 'PairWithOne' >> beam.Map(lambda x: (x, 1))
96+
| 'GroupAndSum' >> beam.CombinePerKey(sum))
97+
98+
# Format the counts into a PCollection of strings.
99+
def format_result(word, count):
100+
return '%s: %d' % (word, count)
101+
102+
output = counts | 'Format' >> beam.MapTuple(format_result)
103+
104+
# Write the output using a "Write" transform that has side effects.
105+
# pylint: disable=expression-not-assigned
106+
output | 'Write' >> WriteToText(known_args.output)
107+
108+
# Execute the pipeline and return the result.
109+
result = pipeline.run()
110+
result.wait_until_finish()
111+
return result
112+
113+
114+
if __name__ == '__main__':
115+
logging.getLogger().setLevel(logging.INFO)
116+
run()

distributed-databases/trino/helm-values/trino.yaml

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ coordinator:
4747
config:
4848
query:
4949
#maxMemoryPerNode + (maxHeapSize * 0.3) < maxHeapSize
50-
maxMemoryPerNode: "22GB" # ~70% of maxHeapSize
50+
maxMemoryPerNode: "6GB" # ~70% of maxHeapSize
5151
minWorkers: 1
5252
initialHashPartitions: 100
5353
resources:
@@ -60,7 +60,7 @@ coordinator:
6060
annotations:
6161
karpenter.sh/do-not-disrupt: "true"
6262
nodeSelector:
63-
NodePool: trino-sql-karpenter
63+
NodePool: trino-control-karpenter
6464
karpenter.sh/capacity-type: on-demand
6565
topologySpreadConstraints:
6666
- maxSkew: 1
@@ -72,7 +72,7 @@ coordinator:
7272

7373
worker:
7474
jvm:
75-
maxHeapSize: "89G" # ~80% of container memory (110Gi)
75+
maxHeapSize: "12G" # ~80% of container memory (14Gi)
7676
extraArguments:
7777
- "-XX:+UseG1GC"
7878
- "-XX:G1HeapRegionSize=32M"
@@ -83,14 +83,14 @@ worker:
8383
- "-XX:+UseContainerSupport"
8484
config:
8585
query:
86-
maxMemoryPerNode: "71GB" # ~80% of maxHeapSize
86+
maxMemoryPerNode: "6GB" # ~80% of maxHeapSize
8787
resources:
8888
requests:
89-
cpu: "12000m" # Leave 3000m for system/DaemonSets
90-
memory: 112Gi # Leave 16Gi for system/DaemonSets
89+
cpu: "3000m"
90+
memory: 14Gi
9191
limits:
92-
cpu: "14000m"
93-
memory: 112Gi
92+
cpu: "6000m"
93+
memory: 14Gi
9494
nodeSelector:
9595
NodePool: trino-sql-karpenter
9696
karpenter.sh/capacity-type: on-demand
@@ -107,9 +107,9 @@ additionalConfigProperties:
107107
- "exchange.compression-enabled=true"
108108
- "query.remote-task.max-error-duration=1m"
109109
- "query.max-hash-partition-count=100" # Updated from query.hash-partition-count
110-
- "spill-enabled=true" # Updated from experimental.spill-enabled
111-
- "spiller-spill-path=/tmp/spill" # Chagne this to SSD mount for faster
112-
- "memory.heap-headroom-per-node=9.6GB"
110+
- "spill-enabled=false" # Updated from experimental.spill-enabled
111+
- "spiller-spill-path=/tmp/spill" # Change this to SSD mount for faster
112+
- "memory.heap-headroom-per-node=1.6GB"
113113
- "optimizer.join-reordering-strategy=AUTOMATIC" # Updated from join-reordering-strategy
114114
- "query.max-history=100"
115115
- "query.client.timeout=30m"
@@ -186,7 +186,7 @@ serviceMonitor:
186186
enabled: true
187187
labels:
188188
prometheus: kube-prometheus
189-
interval: "15s"
189+
interval: "5s"
190190
coordinator:
191191
enabled: true
192192
labels:

distributed-databases/trino/karpenter.tf

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ resource "kubectl_manifest" "karpenter_node_class" {
9494
}
9595

9696
# Create a Karpenter NodePool using the AL2023 NodeClass
97-
resource "kubectl_manifest" "karpenter_node_pool" {
97+
resource "kubectl_manifest" "karpenter_worker_pool" {
9898
yaml_body = <<-YAML
9999
apiVersion: karpenter.sh/v1
100100
kind: NodePool
@@ -138,4 +138,51 @@ resource "kubectl_manifest" "karpenter_node_pool" {
138138
depends_on = [
139139
kubectl_manifest.karpenter_node_class
140140
]
141-
}
141+
}
142+
143+
# Create a Karpenter NodePool using the AL2023 NodeClass
144+
resource "kubectl_manifest" "karpenter_ctl_pool" {
145+
yaml_body = <<-YAML
146+
apiVersion: karpenter.sh/v1
147+
kind: NodePool
148+
metadata:
149+
name: trino-control-karpenter
150+
spec:
151+
template:
152+
metadata:
153+
labels:
154+
NodePool: trino-control-karpenter
155+
spec:
156+
nodeClassRef:
157+
group: karpenter.k8s.aws
158+
kind: EC2NodeClass
159+
name: trino-karpenter
160+
requirements:
161+
- key: "karpenter.sh/capacity-type"
162+
operator: In
163+
values: ["on-demand"]
164+
- key: "kubernetes.io/arch"
165+
operator: In
166+
values: ["arm64"]
167+
- key: "karpenter.k8s.aws/instance-category"
168+
operator: In
169+
values: ["r"]
170+
- key: "karpenter.k8s.aws/instance-family"
171+
operator: In
172+
values: ["r6g", "r7g", "r8g"]
173+
- key: "karpenter.k8s.aws/instance-size"
174+
operator: In
175+
values: ["2xlarge", "4xlarge"]
176+
disruption:
177+
consolidationPolicy: WhenEmpty
178+
consolidateAfter: 60s
179+
limits:
180+
cpu: "128"
181+
memory: 256Gi
182+
weight: 10
183+
YAML
184+
185+
depends_on = [
186+
kubectl_manifest.karpenter_node_class
187+
]
188+
}

0 commit comments

Comments
 (0)