Skip to content

Commit ea260f6

Browse files
razvanNickLarsenNZ
andauthored
feat: expose services with listener classes (#562)
* add listener.rs and replace usages of CurrentlySupportedListenerClasses * expose history pods via listener classes * expose spark connect pods via listener classes * update changelog * connect: use persistent volumes for listener and move from deplument to stateful set Decided to move to stateful set because deployments don't support volume claim templates. * history: use persistent columes for listener volumes * history: create group listeners and update crd * history: remove services created by the operator and update test * connect: create server listeners and update crd * history: refactor to use listener::build_listener * connect: remove traces of deployment * history: refactor rbac as per todo * docs: update * chore: remove set command to make jenkins test work * Update CHANGELOG.md Co-authored-by: Nick <10092581+NickLarsenNZ@users.noreply.github.com> * Update rust/operator-binary/src/connect/server.rs Co-authored-by: Nick <10092581+NickLarsenNZ@users.noreply.github.com> * Update rust/operator-binary/src/crd/history.rs Co-authored-by: Nick <10092581+NickLarsenNZ@users.noreply.github.com> * doc: mention cluster_config usage status * review feedback --------- Co-authored-by: Nick <10092581+NickLarsenNZ@users.noreply.github.com>
1 parent 9416378 commit ea260f6

File tree

16 files changed

+418
-393
lines changed

16 files changed

+418
-393
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ All notable changes to this project will be documented in this file.
1111
- Use `--file-log-max-files` (or `FILE_LOG_MAX_FILES`) to limit the number of log files kept.
1212
- Use `--file-log-rotation-period` (or `FILE_LOG_ROTATION_PERIOD`) to configure the frequency of rotation.
1313
- Use `--console-log-format` (or `CONSOLE_LOG_FORMAT`) to set the format to `plain` (default) or `json`.
14+
- Expose history and connect services via listener classes ([#562]).
1415

1516
### Changed
1617

@@ -35,6 +36,7 @@ All notable changes to this project will be documented in this file.
3536
[#554]: https://github.com/stackabletech/spark-k8s-operator/pull/554
3637
[#559]: https://github.com/stackabletech/spark-k8s-operator/pull/559
3738
[#560]: https://github.com/stackabletech/spark-k8s-operator/pull/560
39+
[#562]: https://github.com/stackabletech/spark-k8s-operator/pull/562
3840

3941
## [25.3.0] - 2025-03-21
4042

deploy/helm/spark-k8s-operator/crds/crds.yaml

+18-40
Original file line numberDiff line numberDiff line change
@@ -1118,27 +1118,11 @@ spec:
11181118
description: A Spark cluster history server component. This resource is managed by the Stackable operator for Apache Spark. Find more information on how to use it in the [operator documentation](https://docs.stackable.tech/home/nightly/spark-k8s/usage-guide/history-server).
11191119
properties:
11201120
clusterConfig:
1121-
default:
1122-
listenerClass: cluster-internal
1123-
description: Global Spark history server configuration that applies to all roles and role groups.
1124-
properties:
1125-
listenerClass:
1126-
default: cluster-internal
1127-
description: |-
1128-
This field controls which type of Service the Operator creates for this HistoryServer:
1129-
1130-
* cluster-internal: Use a ClusterIP service
1131-
1132-
* external-unstable: Use a NodePort service
1133-
1134-
* external-stable: Use a LoadBalancer service
1121+
default: {}
1122+
description: |-
1123+
Global Spark history server configuration that applies to all roles.
11351124
1136-
This is a temporary solution with the goal to keep yaml manifests forward compatible. In the future, this setting will control which ListenerClass <https://docs.stackable.tech/home/stable/listener-operator/listenerclass.html> will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change.
1137-
enum:
1138-
- cluster-internal
1139-
- external-unstable
1140-
- external-stable
1141-
type: string
1125+
This was previously used to hold the listener configuration, which has since moved to the role configuration.
11421126
type: object
11431127
image:
11441128
anyOf:
@@ -1395,6 +1379,9 @@ spec:
13951379
cleaner:
13961380
nullable: true
13971381
type: boolean
1382+
listenerClass:
1383+
nullable: true
1384+
type: string
13981385
logging:
13991386
default:
14001387
containers: {}
@@ -1641,6 +1628,9 @@ spec:
16411628
cleaner:
16421629
nullable: true
16431630
type: boolean
1631+
listenerClass:
1632+
nullable: true
1633+
type: string
16441634
logging:
16451635
default:
16461636
containers: {}
@@ -1879,27 +1869,11 @@ spec:
18791869
type: string
18801870
type: array
18811871
clusterConfig:
1882-
default:
1883-
listenerClass: external-unstable
1884-
description: Global Spark Connect server configuration that applies to all roles.
1885-
properties:
1886-
listenerClass:
1887-
default: external-unstable
1888-
description: |-
1889-
This field controls which type of Service the Operator creates for this ConnectServer:
1890-
1891-
* cluster-internal: Use a ClusterIP service
1892-
1893-
* external-unstable: Use a NodePort service
1894-
1895-
* external-stable: Use a LoadBalancer service
1872+
default: {}
1873+
description: |-
1874+
Global Spark Connect server configuration that applies to all roles.
18961875
1897-
This is a temporary solution with the goal to keep yaml manifests forward compatible. In the future, this setting will control which ListenerClass <https://docs.stackable.tech/home/stable/listener-operator/listenerclass.html> will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change.
1898-
enum:
1899-
- cluster-internal
1900-
- external-unstable
1901-
- external-stable
1902-
type: string
1876+
This was previously used to hold the listener configuration, which has since moved to the server configuration.
19031877
type: object
19041878
clusterOperation:
19051879
default:
@@ -2191,6 +2165,10 @@ spec:
21912165
config:
21922166
default: {}
21932167
properties:
2168+
listenerClass:
2169+
description: This field controls which [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) is used to expose the Spark services.
2170+
nullable: true
2171+
type: string
21942172
logging:
21952173
default:
21962174
containers: {}

deploy/helm/spark-k8s-operator/templates/roles.yaml

+11
Original file line numberDiff line numberDiff line change
@@ -133,3 +133,14 @@ rules:
133133
- bind
134134
resourceNames:
135135
- {{ include "operator.name" . }}-clusterrole
136+
- apiGroups:
137+
- listeners.stackable.tech
138+
resources:
139+
- listeners
140+
verbs:
141+
- get
142+
- list
143+
- watch
144+
- patch
145+
- create
146+
- delete
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,26 @@
1-
= Service exposition with ListenerClasses
1+
= Service exposition with listener classes
2+
:description: Configure the Spark connect and history services exposure with listener classes: cluster-internal, external-unstable, or external-stable.
23

3-
The Spark operator deploys SparkApplications, and does not offer a UI or other API, so no services are exposed.
4-
However, the operator can also deploy HistoryServers, which do offer a UI and API.
5-
The operator deploys a service called `<name>-historyserver` (where `<name>` is the name of the spark application) through which the HistoryServer can be reached.
4+
== History services
65

7-
This service can have three different types: `cluster-internal`, `external-unstable` and `external-stable`.
8-
Read more about the types in the xref:concepts:service-exposition.adoc[service exposition] documentation at platform level.
9-
10-
This is how the ListenerClass is configured:
6+
The operator deploys a xref:listener-operator:listener.adoc[Listener] for each spark history pod.
7+
The default is to only being accessible from within the Kubernetes cluster, but this can be changed by setting `.spec.nodes.config.listenerClass`:
118

129
[source,yaml]
1310
----
11+
apiVersion: spark.stackable.tech/v1alpha1
12+
kind: SparkHistoryServer
13+
metadata:
14+
name: spark-history
1415
spec:
15-
clusterConfig:
16-
listenerClass: cluster-internal # <1>
16+
nodes:
17+
config:
18+
listenerClass: external-unstable # <1>
1719
----
18-
<1> The default `cluster-internal` setting.
20+
<1> Specify one of `external-stable`, `external-unstable`, `cluster-internal` (the default setting is `cluster-internal`).
21+
22+
For the example above, the listener operator creates a service named `spark-history-node-default` where `spark-history` is the name of the SparkHistoryServer, `node` is the service role (the only service role available for history servers) and `default` is the role group.
23+
24+
== Connect services
25+
26+
Connect pods can be exposed using listener classes in exactly tha same fashion as history servers.

rust/operator-binary/src/connect/controller.rs

+33-25
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ use stackable_operator::{
1111
},
1212
logging::controller::ReconcilerError,
1313
status::condition::{
14-
compute_conditions, deployment::DeploymentConditionBuilder,
15-
operations::ClusterOperationsConditionBuilder,
14+
compute_conditions, operations::ClusterOperationsConditionBuilder,
15+
statefulset::StatefulSetConditionBuilder,
1616
},
1717
time::Duration,
1818
};
@@ -29,6 +29,14 @@ use crate::{
2929
#[strum_discriminants(derive(IntoStaticStr))]
3030
#[allow(clippy::enum_variant_names)]
3131
pub enum Error {
32+
#[snafu(display("failed to build spark connect listener"))]
33+
BuildListener { source: server::Error },
34+
35+
#[snafu(display("failed to apply spark connect listener"))]
36+
ApplyListener {
37+
source: stackable_operator::cluster_resources::Error,
38+
},
39+
3240
#[snafu(display("failed to serialize connect properties"))]
3341
SerializeProperties { source: common::Error },
3442

@@ -50,8 +58,8 @@ pub enum Error {
5058
#[snafu(display("failed to build spark connect server config map for {name}"))]
5159
BuildServerConfigMap { source: server::Error, name: String },
5260

53-
#[snafu(display("failed to build spark connect deployment"))]
54-
BuildServerDeployment { source: server::Error },
61+
#[snafu(display("failed to build spark connect stateful set"))]
62+
BuildServerStatefulSet { source: server::Error },
5563

5664
#[snafu(display("failed to update status of spark connect server {name}"))]
5765
ApplyStatus {
@@ -62,8 +70,8 @@ pub enum Error {
6270
#[snafu(display("spark connect object has no namespace"))]
6371
ObjectHasNoNamespace,
6472

65-
#[snafu(display("failed to update the connect server deployment"))]
66-
ApplyDeployment {
73+
#[snafu(display("failed to update the connect server stateful set"))]
74+
ApplyStatefulSet {
6775
source: stackable_operator::cluster_resources::Error,
6876
},
6977

@@ -192,21 +200,9 @@ pub async fn reconcile(
192200
.await
193201
.context(ApplyRoleBindingSnafu)?;
194202

195-
// Expose connect server to the outside world
196-
let service = server::build_service(scs, &resolved_product_image.app_version_label, None)
197-
.context(BuildServiceSnafu)?;
198-
cluster_resources
199-
.add(client, service.clone())
200-
.await
201-
.context(ApplyServiceSnafu)?;
202-
203203
// Headless service used by executors connect back to the driver
204-
let service = server::build_service(
205-
scs,
206-
&resolved_product_image.app_version_label,
207-
Some("None".to_string()),
208-
)
209-
.context(BuildServiceSnafu)?;
204+
let service = server::build_internal_service(scs, &resolved_product_image.app_version_label)
205+
.context(BuildServiceSnafu)?;
210206

211207
cluster_resources
212208
.add(client, service.clone())
@@ -275,24 +271,36 @@ pub async fn reconcile(
275271
name: scs.name_unchecked(),
276272
})?;
277273

274+
// ========================================
275+
// Server stateful set
278276
let args = server::command_args(&scs.spec.args);
279-
let deployment = server::build_deployment(
277+
let stateful_set = server::build_stateful_set(
280278
scs,
281279
&server_config,
282280
&resolved_product_image,
283281
&service_account,
284282
&server_config_map,
285283
args,
286284
)
287-
.context(BuildServerDeploymentSnafu)?;
285+
.context(BuildServerStatefulSetSnafu)?;
286+
287+
// ========================================
288+
// Server listener
289+
let listener = server::build_listener(scs, &server_config, &resolved_product_image)
290+
.context(BuildListenerSnafu)?;
291+
292+
cluster_resources
293+
.add(client, listener)
294+
.await
295+
.context(ApplyListenerSnafu)?;
288296

289-
let mut ss_cond_builder = DeploymentConditionBuilder::default();
297+
let mut ss_cond_builder = StatefulSetConditionBuilder::default();
290298

291299
ss_cond_builder.add(
292300
cluster_resources
293-
.add(client, deployment)
301+
.add(client, stateful_set)
294302
.await
295-
.context(ApplyDeploymentSnafu)?,
303+
.context(ApplyStatefulSetSnafu)?,
296304
);
297305

298306
cluster_resources

rust/operator-binary/src/connect/crd.rs

+9-38
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ pub mod versioned {
7979
pub image: ProductImage,
8080

8181
/// Global Spark Connect server configuration that applies to all roles.
82+
///
83+
/// This was previously used to hold the listener configuration, which has since moved
84+
/// to the server configuration.
8285
#[serde(default)]
8386
pub cluster_config: v1alpha1::SparkConnectServerClusterConfig,
8487

@@ -106,21 +109,7 @@ pub mod versioned {
106109

107110
#[derive(Clone, Deserialize, Debug, Default, Eq, JsonSchema, PartialEq, Serialize)]
108111
#[serde(rename_all = "camelCase")]
109-
pub struct SparkConnectServerClusterConfig {
110-
/// This field controls which type of Service the Operator creates for this ConnectServer:
111-
///
112-
/// * cluster-internal: Use a ClusterIP service
113-
///
114-
/// * external-unstable: Use a NodePort service
115-
///
116-
/// * external-stable: Use a LoadBalancer service
117-
///
118-
/// This is a temporary solution with the goal to keep yaml manifests forward compatible.
119-
/// In the future, this setting will control which ListenerClass <https://docs.stackable.tech/home/stable/listener-operator/listenerclass.html>
120-
/// will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change.
121-
#[serde(default)]
122-
pub listener_class: CurrentlySupportedListenerClasses,
123-
}
112+
pub struct SparkConnectServerClusterConfig {}
124113

125114
#[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)]
126115
#[fragment_attrs(
@@ -147,6 +136,10 @@ pub mod versioned {
147136
/// This can be shortened by the `maxCertificateLifetime` setting on the SecretClass issuing the TLS certificate.
148137
#[fragment_attrs(serde(default))]
149138
pub requested_secret_lifetime: Option<Duration>,
139+
140+
/// This field controls which [ListenerClass](DOCS_BASE_URL_PLACEHOLDER/listener-operator/listenerclass.html) is used to expose the Spark services.
141+
#[serde(default)]
142+
pub listener_class: String,
150143
}
151144

152145
#[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)]
@@ -178,29 +171,6 @@ pub mod versioned {
178171
}
179172
}
180173

181-
// TODO: Temporary solution until listener-operator is finished
182-
#[derive(Clone, Debug, Default, Display, Deserialize, Eq, JsonSchema, PartialEq, Serialize)]
183-
#[serde(rename_all = "PascalCase")]
184-
pub(crate) enum CurrentlySupportedListenerClasses {
185-
#[serde(rename = "cluster-internal")]
186-
ClusterInternal,
187-
#[default]
188-
#[serde(rename = "external-unstable")]
189-
ExternalUnstable,
190-
#[serde(rename = "external-stable")]
191-
ExternalStable,
192-
}
193-
194-
impl CurrentlySupportedListenerClasses {
195-
pub fn k8s_service_type(&self) -> String {
196-
match self {
197-
CurrentlySupportedListenerClasses::ClusterInternal => "ClusterIP".to_string(),
198-
CurrentlySupportedListenerClasses::ExternalUnstable => "NodePort".to_string(),
199-
CurrentlySupportedListenerClasses::ExternalStable => "LoadBalancer".to_string(),
200-
}
201-
}
202-
}
203-
204174
#[allow(clippy::derive_partial_eq_without_eq)]
205175
#[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)]
206176
#[fragment_attrs(
@@ -258,6 +228,7 @@ impl v1alpha1::ServerConfig {
258228
},
259229
logging: product_logging::spec::default_logging(),
260230
requested_secret_lifetime: Some(Self::DEFAULT_CONNECT_SECRET_LIFETIME),
231+
listener_class: Some("cluster-internal".into()),
261232
}
262233
}
263234

0 commit comments

Comments
 (0)