0
# Feature Steps System
1
2
The feature steps system provides a modular, extensible architecture for configuring Kubernetes pods through composable configuration steps. Each feature step handles a specific aspect of pod configuration, enabling flexible customization of both driver and executor pods.
3
4
## Core Architecture
5
6
### KubernetesFeatureConfigStep { .api }
7
8
Base interface for all feature configuration steps:
9
10
```scala
11
trait KubernetesFeatureConfigStep {
12
def configurePod(pod: SparkPod): SparkPod
13
def getAdditionalPodSystemProperties(): Map[String, String]
14
def getAdditionalKubernetesResources(): Seq[HasMetadata]
15
}
16
```
17
18
**Key Responsibilities**:
19
- **Pod Configuration**: Apply specific modifications to pod specifications
20
- **System Properties**: Contribute JVM system properties for Spark components
21
- **Additional Resources**: Create supporting Kubernetes resources (Services, ConfigMaps, etc.)
22
23
**Usage Pattern**:
24
```scala
25
// Feature steps are applied in sequence during pod building
26
val featureSteps: Seq[KubernetesFeatureConfigStep] = createFeatureSteps(conf)
27
28
val finalSpec = featureSteps.foldLeft(initialPod) { (pod, step) =>
29
step.configurePod(pod)
30
}
31
32
val allSystemProperties = featureSteps.flatMap(_.getAdditionalPodSystemProperties()).toMap
33
val additionalResources = featureSteps.flatMap(_.getAdditionalKubernetesResources())
34
```
35
36
## Driver Feature Steps
37
38
### BasicDriverFeatureStep { .api }
39
40
Core driver pod configuration including essential container setup:
41
42
```scala
43
class BasicDriverFeatureStep(conf: KubernetesDriverConf) extends KubernetesFeatureConfigStep {
44
override def configurePod(pod: SparkPod): SparkPod
45
override def getAdditionalPodSystemProperties(): Map[String, String]
46
override def getAdditionalKubernetesResources(): Seq[HasMetadata]
47
}
48
```
49
50
**Configuration Applied**:
51
- **Container Image**: Sets driver container image from configuration
52
- **Resource Limits**: Applies CPU and memory limits and requests
53
- **Basic Labels**: Adds Spark application and role labels
54
- **Environment Variables**: Sets essential Spark environment variables
55
- **Port Configuration**: Configures driver ports for communication
56
57
**Implementation Example**:
58
```scala
59
override def configurePod(pod: SparkPod): SparkPod = {
60
val driverContainer = new ContainerBuilder(pod.container)
61
.withName(Constants.DRIVER_CONTAINER_NAME)
62
.withImage(conf.get(CONTAINER_IMAGE))
63
.withImagePullPolicy(conf.imagePullPolicy)
64
.addNewPort()
65
.withName("driver-rpc-port")
66
.withContainerPort(DEFAULT_DRIVER_PORT)
67
.withProtocol("TCP")
68
.endPort()
69
.addToEnv(ENV_DRIVER_BIND_ADDRESS, "0.0.0.0")
70
.addToEnv(ENV_APPLICATION_ID, conf.appId)
71
.withResources(buildResourceRequirements(conf))
72
.build()
73
74
val driverPod = new PodBuilder(pod.pod)
75
.editOrNewMetadata()
76
.withName(conf.resourceNamePrefix + "-driver")
77
.addToLabels(SPARK_APP_ID_LABEL, conf.appId)
78
.addToLabels(SPARK_ROLE_LABEL, DRIVER_ROLE)
79
.endMetadata()
80
.editOrNewSpec()
81
.withRestartPolicy("Never")
82
.withServiceAccount(conf.get(KUBERNETES_SERVICE_ACCOUNT_NAME).orNull)
83
.endSpec()
84
.build()
85
86
SparkPod(driverPod, driverContainer)
87
}
88
```
89
90
### DriverServiceFeatureStep { .api }
91
92
Creates Kubernetes service for driver pod to enable executor communication:
93
94
```scala
95
class DriverServiceFeatureStep(conf: KubernetesDriverConf) extends KubernetesFeatureConfigStep {
96
override def configurePod(pod: SparkPod): SparkPod
97
override def getAdditionalKubernetesResources(): Seq[HasMetadata]
98
}
99
```
100
101
**Service Configuration**:
102
```scala
103
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
104
val service = new ServiceBuilder()
105
.withNewMetadata()
106
.withName(conf.resourceNamePrefix + "-driver-svc")
107
.withNamespace(conf.namespace)
108
.addToLabels(SPARK_APP_ID_LABEL, conf.appId)
109
.addToAnnotations(conf.serviceAnnotations.asJava)
110
.endMetadata()
111
.withNewSpec()
112
.withType(conf.get(KUBERNETES_DRIVER_SERVICE_TYPE))
113
.addToSelector(SPARK_APP_ID_LABEL, conf.appId)
114
.addToSelector(SPARK_ROLE_LABEL, DRIVER_ROLE)
115
.addNewPort()
116
.withName("driver-rpc-port")
117
.withPort(DEFAULT_DRIVER_PORT)
118
.withTargetPort(new IntOrString(DEFAULT_DRIVER_PORT))
119
.endPort()
120
.addNewPort()
121
.withName("blockmanager")
122
.withPort(DEFAULT_BLOCKMANAGER_PORT)
123
.withTargetPort(new IntOrString(DEFAULT_BLOCKMANAGER_PORT))
124
.endPort()
125
.endSpec()
126
.build()
127
128
Seq(service)
129
}
130
```
131
132
### DriverCommandFeatureStep { .api }
133
134
Configures the driver container command and arguments for application execution:
135
136
```scala
137
class DriverCommandFeatureStep(conf: KubernetesDriverConf) extends KubernetesFeatureConfigStep {
138
override def configurePod(pod: SparkPod): SparkPod
139
}
140
```
141
142
**Command Configuration**:
143
```scala
144
override def configurePod(pod: SparkPod): SparkPod = {
145
val driverContainer = new ContainerBuilder(pod.container)
146
.withCommand("/opt/spark/bin/spark-submit")
147
.withArgs(buildDriverArgs(conf): _*)
148
.build()
149
150
pod.copy(container = driverContainer)
151
}
152
153
private def buildDriverArgs(conf: KubernetesDriverConf): Seq[String] = {
154
val baseArgs = Seq(
155
"--class", conf.mainClass,
156
"--deploy-mode", "client"
157
)
158
159
val resourceArgs = conf.mainAppResource match {
160
case JavaMainAppResource(Some(resource)) => Seq(resource)
161
case PythonMainAppResource(resource) => Seq(resource)
162
case RMainAppResource(resource) => Seq(resource)
163
case _ => Seq.empty
164
}
165
166
val appArgs = conf.appArgs
167
168
baseArgs ++ resourceArgs ++ appArgs
169
}
170
```
171
172
### DriverKubernetesCredentialsFeatureStep { .api }
173
174
Configures Kubernetes API credentials for driver pod operations:
175
176
```scala
177
class DriverKubernetesCredentialsFeatureStep(conf: KubernetesDriverConf) extends KubernetesFeatureConfigStep {
178
override def configurePod(pod: SparkPod): SparkPod
179
override def getAdditionalPodSystemProperties(): Map[String, String]
180
}
181
```
182
183
**Credential Configuration**:
184
```scala
185
override def configurePod(pod: SparkPod): SparkPod = {
186
val credentialVolumes = buildCredentialVolumes(conf)
187
val credentialVolumeMounts = buildCredentialVolumeMounts(conf)
188
189
val driverContainer = new ContainerBuilder(pod.container)
190
.addAllToVolumeMounts(credentialVolumeMounts.asJava)
191
.build()
192
193
val driverPod = new PodBuilder(pod.pod)
194
.editSpec()
195
.addAllToVolumes(credentialVolumes.asJava)
196
.endSpec()
197
.build()
198
199
SparkPod(driverPod, driverContainer)
200
}
201
202
override def getAdditionalPodSystemProperties(): Map[String, String] = {
203
Map(
204
"spark.kubernetes.executor.podNamePrefix" -> conf.resourceNamePrefix,
205
"spark.kubernetes.executor.namespace" -> conf.namespace,
206
"spark.kubernetes.executor.container.image" -> conf.get(EXECUTOR_CONTAINER_IMAGE)
207
)
208
}
209
```
210
211
## Executor Feature Steps
212
213
### BasicExecutorFeatureStep { .api }
214
215
Core executor pod configuration for Spark worker functionality:
216
217
```scala
218
class BasicExecutorFeatureStep(
219
conf: KubernetesExecutorConf,
220
secMgr: SecurityManager,
221
clock: Clock
222
) extends KubernetesFeatureConfigStep {
223
224
override def configurePod(pod: SparkPod): SparkPod
225
}
226
```
227
228
**Executor Configuration**:
229
```scala
230
override def configurePod(pod: SparkPod): SparkPod = {
231
val executorContainer = new ContainerBuilder(pod.container)
232
.withName(Constants.EXECUTOR_CONTAINER_NAME)
233
.withImage(conf.get(EXECUTOR_CONTAINER_IMAGE))
234
.withImagePullPolicy(conf.imagePullPolicy)
235
.addNewPort()
236
.withName("blockmanager")
237
.withContainerPort(DEFAULT_BLOCKMANAGER_PORT)
238
.withProtocol("TCP")
239
.endPort()
240
.addToEnv(ENV_DRIVER_URL, buildDriverUrl(conf))
241
.addToEnv(ENV_EXECUTOR_CORES, conf.get(KUBERNETES_EXECUTOR_CORES))
242
.addToEnv(ENV_EXECUTOR_MEMORY, conf.get(KUBERNETES_EXECUTOR_MEMORY))
243
.addToEnv(ENV_EXECUTOR_ID, conf.executorId)
244
.withCommand("/opt/spark/bin/spark-class")
245
.withArgs("org.apache.spark.executor.CoarseGrainedExecutorBackend")
246
.withResources(buildExecutorResourceRequirements(conf))
247
.build()
248
249
val executorPod = new PodBuilder(pod.pod)
250
.editOrNewMetadata()
251
.withName(s"${conf.resourceNamePrefix}-exec-${conf.executorId}")
252
.addToLabels(SPARK_APP_ID_LABEL, conf.appId)
253
.addToLabels(SPARK_ROLE_LABEL, EXECUTOR_ROLE)
254
.addToLabels(SPARK_EXECUTOR_ID_LABEL, conf.executorId)
255
.endMetadata()
256
.build()
257
258
SparkPod(executorPod, executorContainer)
259
}
260
```
261
262
### ExecutorKubernetesCredentialsFeatureStep { .api }
263
264
Configures Kubernetes credentials for executor pod operations:
265
266
```scala
267
class ExecutorKubernetesCredentialsFeatureStep(conf: KubernetesExecutorConf) extends KubernetesFeatureConfigStep {
268
override def configurePod(pod: SparkPod): SparkPod
269
}
270
```
271
272
## Shared Feature Steps
273
274
### MountSecretsFeatureStep { .api }
275
276
Mounts Kubernetes secrets as volumes in pods:
277
278
```scala
279
class MountSecretsFeatureStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep {
280
override def configurePod(pod: SparkPod): SparkPod
281
}
282
```
283
284
**Secret Mounting**:
285
```scala
286
override def configurePod(pod: SparkPod): SparkPod = {
287
if (conf.secretNamesToMountPaths.nonEmpty) {
288
val secretVolumes = conf.secretNamesToMountPaths.map { case (secretName, mountPath) =>
289
new VolumeBuilder()
290
.withName(s"$secretName-volume")
291
.withNewSecret()
292
.withSecretName(secretName)
293
.withDefaultMode(420) // 0644 octal
294
.endSecret()
295
.build()
296
}.toSeq
297
298
val secretVolumeMounts = conf.secretNamesToMountPaths.map { case (secretName, mountPath) =>
299
new VolumeMountBuilder()
300
.withName(s"$secretName-volume")
301
.withMountPath(mountPath)
302
.withReadOnly(true)
303
.build()
304
}.toSeq
305
306
val updatedContainer = new ContainerBuilder(pod.container)
307
.addAllToVolumeMounts(secretVolumeMounts.asJava)
308
.build()
309
310
val updatedPod = new PodBuilder(pod.pod)
311
.editSpec()
312
.addAllToVolumes(secretVolumes.asJava)
313
.endSpec()
314
.build()
315
316
SparkPod(updatedPod, updatedContainer)
317
} else {
318
pod
319
}
320
}
321
```
322
323
### EnvSecretsFeatureStep { .api }
324
325
Injects secret values as environment variables using Kubernetes secret references:
326
327
```scala
328
class EnvSecretsFeatureStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep {
329
override def configurePod(pod: SparkPod): SparkPod
330
}
331
```
332
333
**Environment Variable Secrets**:
334
```scala
335
override def configurePod(pod: SparkPod): SparkPod = {
336
if (conf.secretEnvNamesToKeyRefs.nonEmpty) {
337
val secretEnvVars = conf.secretEnvNamesToKeyRefs.map { case (envName, secretKeyRef) =>
338
val Array(secretName, key) = secretKeyRef.split(":", 2)
339
340
new EnvVarBuilder()
341
.withName(envName)
342
.withNewValueFrom()
343
.withNewSecretKeyRef()
344
.withName(secretName)
345
.withKey(key)
346
.endSecretKeyRef()
347
.endValueFrom()
348
.build()
349
}.toSeq
350
351
val updatedContainer = new ContainerBuilder(pod.container)
352
.addAllToEnv(secretEnvVars.asJava)
353
.build()
354
355
pod.copy(container = updatedContainer)
356
} else {
357
pod
358
}
359
}
360
```
361
362
### MountVolumesFeatureStep { .api }
363
364
Mounts user-specified volumes in pods:
365
366
```scala
367
class MountVolumesFeatureStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep {
368
override def configurePod(pod: SparkPod): SparkPod
369
}
370
```
371
372
**Volume Mounting Implementation**:
373
```scala
374
override def configurePod(pod: SparkPod): SparkPod = {
375
if (conf.volumes.nonEmpty) {
376
val volumes = conf.volumes.map(createVolumeSpec)
377
val volumeMounts = conf.volumes.map(createVolumeMountSpec)
378
379
val updatedContainer = new ContainerBuilder(pod.container)
380
.addAllToVolumeMounts(volumeMounts.asJava)
381
.build()
382
383
val updatedPod = new PodBuilder(pod.pod)
384
.editSpec()
385
.addAllToVolumes(volumes.asJava)
386
.endSpec()
387
.build()
388
389
SparkPod(updatedPod, updatedContainer)
390
} else {
391
pod
392
}
393
}
394
395
private def createVolumeSpec(volumeSpec: KubernetesVolumeSpec): Volume = {
396
val volumeBuilder = new VolumeBuilder()
397
.withName(volumeSpec.volumeName)
398
399
volumeSpec.volumeConf match {
400
case KubernetesHostPathVolumeConf(hostPath) =>
401
volumeBuilder.withNewHostPath()
402
.withPath(hostPath)
403
.withType("Directory")
404
.endHostPath()
405
406
case KubernetesPVCVolumeConf(claimName) =>
407
volumeBuilder.withNewPersistentVolumeClaim()
408
.withClaimName(claimName)
409
.withReadOnly(volumeSpec.mountReadOnly)
410
.endPersistentVolumeClaim()
411
412
case KubernetesEmptyDirVolumeConf(medium, sizeLimit) =>
413
volumeBuilder.withNewEmptyDir()
414
.withMedium(medium.orNull)
415
.withSizeLimit(sizeLimit.map(new Quantity(_)).orNull)
416
.endEmptyDir()
417
}
418
419
volumeBuilder.build()
420
}
421
```
422
423
### LocalDirsFeatureStep { .api }
424
425
Configures local directories for Spark scratch space using emptyDir volumes:
426
427
```scala
428
class LocalDirsFeatureStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep {
429
override def configurePod(pod: SparkPod): SparkPod
430
override def getAdditionalPodSystemProperties(): Map[String, String]
431
}
432
```
433
434
**Local Directory Configuration**:
435
```scala
436
override def configurePod(pod: SparkPod): SparkPod = {
437
val localDirVolumes = createLocalDirVolumes(conf)
438
val localDirVolumeMounts = createLocalDirVolumeMounts(conf)
439
440
val updatedContainer = new ContainerBuilder(pod.container)
441
.addAllToVolumeMounts(localDirVolumeMounts.asJava)
442
.addToEnv("SPARK_LOCAL_DIRS", localDirPaths.mkString(","))
443
.build()
444
445
val updatedPod = new PodBuilder(pod.pod)
446
.editSpec()
447
.addAllToVolumes(localDirVolumes.asJava)
448
.endSpec()
449
.build()
450
451
SparkPod(updatedPod, updatedContainer)
452
}
453
454
override def getAdditionalPodSystemProperties(): Map[String, String] = {
455
Map("spark.local.dir" -> localDirPaths.mkString(","))
456
}
457
458
private def createLocalDirVolumes(conf: KubernetesConf): Seq[Volume] = {
459
val useTemporaryFileSystem = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS)
460
461
localDirNames.map { dirName =>
462
new VolumeBuilder()
463
.withName(s"spark-local-dir-$dirName")
464
.withNewEmptyDir()
465
.withMedium(if (useTemporaryFileSystem) "Memory" else null)
466
.endEmptyDir()
467
.build()
468
}
469
}
470
```
471
472
### HadoopConfDriverFeatureStep { .api }
473
474
Configures Hadoop configuration for driver pod:
475
476
```scala
477
class HadoopConfDriverFeatureStep(conf: KubernetesDriverConf) extends KubernetesFeatureConfigStep {
478
override def configurePod(pod: SparkPod): SparkPod
479
override def getAdditionalPodSystemProperties(): Map[String, String]
480
}
481
```
482
483
### KerberosConfDriverFeatureStep { .api }
484
485
Configures Kerberos authentication for driver pod:
486
487
```scala
488
class KerberosConfDriverFeatureStep(conf: KubernetesDriverConf) extends KubernetesFeatureConfigStep {
489
override def configurePod(pod: SparkPod): SparkPod
490
}
491
```
492
493
### PodTemplateConfigMapStep { .api }
494
495
Handles pod template configuration via Kubernetes ConfigMaps:
496
497
```scala
498
class PodTemplateConfigMapStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep {
499
override def getAdditionalKubernetesResources(): Seq[HasMetadata]
500
}
501
```
502
503
## Feature Step Composition
504
505
### Builder Pattern Integration
506
507
Feature steps are integrated into the builder pattern for both driver and executor pods:
508
509
```scala
510
// Driver feature step composition
511
class KubernetesDriverBuilder {
512
def buildFromFeatures(
513
conf: KubernetesDriverConf,
514
client: KubernetesClient
515
): KubernetesDriverSpec = {
516
517
val featureSteps = Seq(
518
new BasicDriverFeatureStep(conf),
519
new DriverServiceFeatureStep(conf),
520
new DriverCommandFeatureStep(conf),
521
new DriverKubernetesCredentialsFeatureStep(conf)
522
) ++ createOptionalSteps(conf)
523
524
val initialPod = SparkPod.initialPod()
525
val configuredPod = applyFeatureSteps(initialPod, featureSteps)
526
val systemProperties = collectSystemProperties(featureSteps)
527
val additionalResources = collectAdditionalResources(featureSteps)
528
529
KubernetesDriverSpec(configuredPod, additionalResources, systemProperties)
530
}
531
532
private def createOptionalSteps(conf: KubernetesDriverConf): Seq[KubernetesFeatureConfigStep] = {
533
val steps = mutable.ArrayBuffer[KubernetesFeatureConfigStep]()
534
535
if (conf.secretNamesToMountPaths.nonEmpty) {
536
steps += new MountSecretsFeatureStep(conf)
537
}
538
539
if (conf.secretEnvNamesToKeyRefs.nonEmpty) {
540
steps += new EnvSecretsFeatureStep(conf)
541
}
542
543
if (conf.volumes.nonEmpty) {
544
steps += new MountVolumesFeatureStep(conf)
545
}
546
547
if (conf.get(HADOOP_CONF_CONFIGMAP_NAME).isDefined) {
548
steps += new HadoopConfDriverFeatureStep(conf)
549
}
550
551
steps.toSeq
552
}
553
}
554
```
555
556
### Executor Feature Step Composition
557
558
```scala
559
// Executor feature step composition
560
class KubernetesExecutorBuilder {
561
def buildFromFeatures(
562
conf: KubernetesExecutorConf,
563
secMgr: SecurityManager,
564
client: KubernetesClient
565
): SparkPod = {
566
567
val featureSteps = Seq(
568
new BasicExecutorFeatureStep(conf, secMgr, Clock.systemUTC()),
569
new ExecutorKubernetesCredentialsFeatureStep(conf),
570
new MountSecretsFeatureStep(conf),
571
new EnvSecretsFeatureStep(conf),
572
new MountVolumesFeatureStep(conf),
573
new LocalDirsFeatureStep(conf)
574
)
575
576
val initialPod = SparkPod.initialPod()
577
applyFeatureSteps(initialPod, featureSteps)
578
}
579
}
580
```
581
582
## Advanced Feature Step Patterns
583
584
### Conditional Feature Steps
585
586
```scala
587
// Feature steps can be conditionally applied based on configuration
588
class ConditionalFeatureStep(
589
condition: KubernetesConf => Boolean,
590
step: KubernetesFeatureConfigStep
591
) extends KubernetesFeatureConfigStep {
592
593
override def configurePod(pod: SparkPod): SparkPod = {
594
if (condition(conf)) step.configurePod(pod) else pod
595
}
596
597
override def getAdditionalPodSystemProperties(): Map[String, String] = {
598
if (condition(conf)) step.getAdditionalPodSystemProperties() else Map.empty
599
}
600
601
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
602
if (condition(conf)) step.getAdditionalKubernetesResources() else Seq.empty
603
}
604
}
605
606
// Usage
607
val conditionalStep = new ConditionalFeatureStep(
608
conf => conf.get(ENABLE_MONITORING).getOrElse(false),
609
new MonitoringFeatureStep(conf)
610
)
611
```
612
613
### Composite Feature Steps
614
615
```scala
616
// Combine multiple related feature steps
617
class CompositeFeatureStep(steps: KubernetesFeatureConfigStep*) extends KubernetesFeatureConfigStep {
618
619
override def configurePod(pod: SparkPod): SparkPod = {
620
steps.foldLeft(pod) { (currentPod, step) =>
621
step.configurePod(currentPod)
622
}
623
}
624
625
override def getAdditionalPodSystemProperties(): Map[String, String] = {
626
steps.flatMap(_.getAdditionalPodSystemProperties()).toMap
627
}
628
629
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
630
steps.flatMap(_.getAdditionalKubernetesResources())
631
}
632
}
633
634
// Security-related feature steps
635
val securitySteps = new CompositeFeatureStep(
636
new MountSecretsFeatureStep(conf),
637
new EnvSecretsFeatureStep(conf),
638
new KerberosConfDriverFeatureStep(conf)
639
)
640
```
641
642
### Custom Feature Steps
643
644
```scala
645
// Example: Custom monitoring feature step
646
class MonitoringFeatureStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep {
647
648
override def configurePod(pod: SparkPod): SparkPod = {
649
val monitoringContainer = new ContainerBuilder(pod.container)
650
.addToEnv("PROMETHEUS_ENABLED", "true")
651
.addNewPort()
652
.withName("metrics")
653
.withContainerPort(8080)
654
.withProtocol("TCP")
655
.endPort()
656
.build()
657
658
val monitoringPod = new PodBuilder(pod.pod)
659
.editMetadata()
660
.addToAnnotations("prometheus.io/scrape", "true")
661
.addToAnnotations("prometheus.io/port", "8080")
662
.addToAnnotations("prometheus.io/path", "/metrics")
663
.endMetadata()
664
.build()
665
666
SparkPod(monitoringPod, monitoringContainer)
667
}
668
669
override def getAdditionalPodSystemProperties(): Map[String, String] = {
670
Map(
671
"spark.sql.streaming.metricsEnabled" -> "true",
672
"spark.metrics.conf.driver.sink.prometheus.class" -> "org.apache.spark.metrics.sink.PrometheusSink"
673
)
674
}
675
676
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
677
val monitoringService = new ServiceBuilder()
678
.withNewMetadata()
679
.withName(s"${conf.resourceNamePrefix}-metrics")
680
.withNamespace(conf.namespace)
681
.endMetadata()
682
.withNewSpec()
683
.addToSelector(SPARK_APP_ID_LABEL, conf.appId)
684
.addNewPort()
685
.withName("metrics")
686
.withPort(8080)
687
.withTargetPort(new IntOrString(8080))
688
.endPort()
689
.endSpec()
690
.build()
691
692
Seq(monitoringService)
693
}
694
}
695
```
696
697
The feature steps system provides a powerful, modular approach to pod configuration that enables extensive customization while maintaining clean separation of concerns and reusability across different deployment scenarios.