0
# Utilities and Helpers
1
2
The utilities layer provides essential helper functions, client management, and common operations that support the Kubernetes integration throughout the Apache Spark ecosystem. These utilities handle everything from configuration parsing to client creation and volume management.
3
4
## Core Utilities
5
6
### KubernetesUtils { .api }
7
8
Comprehensive collection of Kubernetes-specific utility functions:
9
10
```scala
11
object KubernetesUtils {
12
13
def parsePrefixedKeyValuePairs(
14
sparkConf: SparkConf,
15
prefix: String
16
): Map[String, String]
17
18
def loadPodFromTemplate(
19
kubernetesClient: KubernetesClient,
20
templateFile: File,
21
containerName: Option[String]
22
): SparkPod
23
24
def selectSparkContainer(
25
pod: Pod,
26
containerName: Option[String]
27
): SparkPod
28
29
def requireBothOrNeitherDefined[T](
30
opt1: Option[T],
31
opt2: Option[T],
32
errMessage: String
33
): Unit
34
}
35
```
36
37
### Configuration Parsing Utilities
38
39
#### Prefixed Key-Value Parsing
40
```scala
41
// Parse configuration properties with a common prefix
42
def parsePrefixedKeyValuePairs(
43
sparkConf: SparkConf,
44
prefix: String
45
): Map[String, String] = {
46
47
sparkConf.getAllWithPrefix(prefix).filter { case (key, value) =>
48
key.nonEmpty && value.nonEmpty
49
}.toMap
50
}
51
52
// Usage examples
53
val driverLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
54
sparkConf,
55
"spark.kubernetes.driver.label."
56
)
57
// Input: spark.kubernetes.driver.label.app=myapp, spark.kubernetes.driver.label.version=1.0
58
// Output: Map("app" -> "myapp", "version" -> "1.0")
59
60
val executorAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(
61
sparkConf,
62
"spark.kubernetes.executor.annotation."
63
)
64
// Input: spark.kubernetes.executor.annotation.prometheus.io/scrape=true
65
// Output: Map("prometheus.io/scrape" -> "true")
66
```
67
68
#### Validation Utilities
69
```scala
70
// Require both options to be defined or both to be None
71
def requireBothOrNeitherDefined[T](
72
opt1: Option[T],
73
opt2: Option[T],
74
errMessage: String
75
): Unit = {
76
(opt1, opt2) match {
77
case (Some(_), None) | (None, Some(_)) =>
78
throw new IllegalArgumentException(errMessage)
79
case _ => // Both defined or both None - valid
80
}
81
}
82
83
// Usage
84
KubernetesUtils.requireBothOrNeitherDefined(
85
conf.get(CLIENT_KEY_FILE),
86
conf.get(CLIENT_CERT_FILE),
87
"Both client key file and client cert file must be specified for mutual TLS"
88
)
89
```
90
91
#### Resource Name Generation
92
```scala
93
def generateResourceName(
94
appName: String,
95
appId: String,
96
resourceType: String
97
): String = {
98
val sanitizedAppName = appName
99
.toLowerCase
100
.replaceAll("[^a-z0-9-]", "-")
101
.replaceAll("-+", "-")
102
.take(30)
103
104
val shortAppId = appId.take(8)
105
106
s"$sanitizedAppName-$shortAppId-$resourceType"
107
}
108
109
// Usage
110
val driverPodName = KubernetesUtils.generateResourceName(
111
"My Spark App",
112
"app-123456789",
113
"driver"
114
)
115
// Result: "my-spark-app-app-1234-driver"
116
```
117
118
### Pod Template Management
119
120
#### Template Loading
121
```scala
122
def loadPodFromTemplate(
123
kubernetesClient: KubernetesClient,
124
templateFile: File,
125
containerName: Option[String]
126
): SparkPod = {
127
128
require(templateFile.exists(), s"Pod template file does not exist: ${templateFile.getPath}")
129
130
val podTemplate = try {
131
kubernetesClient.pods()
132
.load(new FileInputStream(templateFile))
133
.get()
134
} catch {
135
case e: Exception =>
136
throw new RuntimeException(s"Failed to load pod template from ${templateFile.getPath}", e)
137
}
138
139
selectSparkContainer(podTemplate, containerName)
140
}
141
142
// Usage
143
val templatePod = KubernetesUtils.loadPodFromTemplate(
144
kubernetesClient,
145
new File("/templates/driver-pod.yaml"),
146
Some(Constants.DRIVER_CONTAINER_NAME)
147
)
148
```
149
150
#### Container Selection
151
```scala
152
def selectSparkContainer(
153
pod: Pod,
154
containerName: Option[String]
155
): SparkPod = {
156
157
val containers = pod.getSpec.getContainers.asScala
158
159
val selectedContainer = containerName match {
160
case Some(name) =>
161
containers.find(_.getName == name).getOrElse {
162
throw new RuntimeException(s"Container '$name' not found in pod template")
163
}
164
case None =>
165
if (containers.size == 1) {
166
containers.head
167
} else {
168
throw new RuntimeException(
169
s"Pod template contains ${containers.size} containers. " +
170
"Must specify containerName when template has multiple containers."
171
)
172
}
173
}
174
175
SparkPod(pod, selectedContainer)
176
}
177
```
178
179
## Client Management
180
181
### SparkKubernetesClientFactory { .api }
182
183
Factory for creating configured Kubernetes client instances:
184
185
```scala
186
object SparkKubernetesClientFactory {
187
188
def createKubernetesClient(
189
clientType: ClientType,
190
kubernetesConf: Option[KubernetesConf] = None,
191
sparkConf: Option[SparkConf] = None,
192
defaultServiceAccountToken: Option[String] = None,
193
clientContext: Option[String] = None
194
): KubernetesClient
195
196
sealed trait ClientType
197
object ClientType {
198
case object Driver extends ClientType
199
case object Executor extends ClientType
200
case object Submission extends ClientType
201
}
202
}
203
```
204
205
### Client Configuration
206
207
#### Basic Client Creation
208
```scala
209
// Driver client for executor management
210
val driverClient = SparkKubernetesClientFactory.createKubernetesClient(
211
ClientType.Driver,
212
kubernetesConf = Some(driverConf)
213
)
214
215
// Submission client for application deployment
216
val submissionClient = SparkKubernetesClientFactory.createKubernetesClient(
217
ClientType.Submission,
218
sparkConf = Some(conf),
219
clientContext = Some("my-cluster-context")
220
)
221
222
// Executor client for pod operations
223
val executorClient = SparkKubernetesClientFactory.createKubernetesClient(
224
ClientType.Executor,
225
kubernetesConf = Some(executorConf),
226
defaultServiceAccountToken = Some(serviceAccountToken)
227
)
228
```
229
230
#### Authentication Configuration
231
```scala
232
def createKubernetesClient(
233
clientType: ClientType,
234
kubernetesConf: Option[KubernetesConf] = None,
235
sparkConf: Option[SparkConf] = None,
236
defaultServiceAccountToken: Option[String] = None,
237
clientContext: Option[String] = None
238
): KubernetesClient = {
239
240
val config = new ConfigBuilder()
241
.withApiVersion("v1")
242
243
// Configure API server URL
244
sparkConf.flatMap(_.getOption("spark.kubernetes.apiserver.host")).foreach { host =>
245
config.withMasterUrl(host)
246
}
247
248
// Configure authentication
249
configureAuthentication(config, sparkConf, defaultServiceAccountToken)
250
251
// Configure TLS
252
configureTls(config, sparkConf)
253
254
// Configure context
255
clientContext.foreach(config.withCurrentContext)
256
257
DefaultKubernetesClient.fromConfig(config.build())
258
}
259
260
private def configureAuthentication(
261
config: ConfigBuilder,
262
sparkConf: Option[SparkConf],
263
defaultServiceAccountToken: Option[String]
264
): Unit = {
265
266
sparkConf.foreach { conf =>
267
// OAuth token authentication
268
conf.getOption(OAUTH_TOKEN.key).orElse {
269
conf.getOption(OAUTH_TOKEN_FILE.key).map { tokenFile =>
270
Files.readAllLines(Paths.get(tokenFile)).asScala.mkString
271
}
272
}.foreach(config.withOauthToken)
273
274
// Username/password authentication
275
conf.getOption("spark.kubernetes.authenticate.username").foreach { username =>
276
config.withUsername(username)
277
conf.getOption("spark.kubernetes.authenticate.password").foreach { password =>
278
config.withPassword(password)
279
}
280
}
281
282
// Service account token
283
conf.getOption(KUBERNETES_SERVICE_ACCOUNT_NAME.key).orElse {
284
defaultServiceAccountToken
285
}.foreach(config.withOauthToken)
286
}
287
}
288
289
private def configureTls(
290
config: ConfigBuilder,
291
sparkConf: Option[SparkConf]
292
): Unit = {
293
294
sparkConf.foreach { conf =>
295
// CA certificate
296
conf.getOption(CA_CERT_FILE.key).foreach { caCertFile =>
297
config.withCaCertFile(caCertFile)
298
}
299
300
// Client certificates for mutual TLS
301
val clientCertFile = conf.getOption(CLIENT_CERT_FILE.key)
302
val clientKeyFile = conf.getOption(CLIENT_KEY_FILE.key)
303
304
(clientCertFile, clientKeyFile) match {
305
case (Some(certFile), Some(keyFile)) =>
306
config.withClientCertFile(certFile)
307
config.withClientKeyFile(keyFile)
308
case _ => // No client certificates
309
}
310
311
// Trust all certificates (for development only)
312
if (conf.getBoolean("spark.kubernetes.apiserver.trustCerts", false)) {
313
config.withTrustCerts(true)
314
}
315
}
316
}
317
```
318
319
## Volume Utilities
320
321
### KubernetesVolumeUtils { .api }
322
323
Utilities for parsing and handling Kubernetes volume configurations:
324
325
```scala
326
object KubernetesVolumeUtils {
327
328
def parseVolumesWithPrefix(
329
sparkConf: SparkConf,
330
prefix: String
331
): Seq[KubernetesVolumeSpec]
332
333
private def parseVolumeSpec(
334
volumeName: String,
335
volumeConf: Map[String, String]
336
): KubernetesVolumeSpec
337
}
338
```
339
340
### Volume Parsing Implementation
341
342
#### Prefix-Based Volume Configuration
343
```scala
344
def parseVolumesWithPrefix(
345
sparkConf: SparkConf,
346
prefix: String
347
): Seq[KubernetesVolumeSpec] = {
348
349
val volumeConfigs = sparkConf.getAllWithPrefix(prefix)
350
.groupBy { case (key, _) =>
351
// Extract volume name from key like "volume.data.hostPath.path"
352
key.split('.').headOption.getOrElse("")
353
}
354
.filter(_._1.nonEmpty)
355
356
volumeConfigs.map { case (volumeName, configs) =>
357
val configMap = configs.map { case (key, value) =>
358
// Remove volume name prefix: "data.hostPath.path" -> "hostPath.path"
359
key.substring(volumeName.length + 1) -> value
360
}.toMap
361
362
parseVolumeSpec(volumeName, configMap)
363
}.toSeq
364
}
365
366
// Usage
367
val driverVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix(
368
sparkConf,
369
"spark.kubernetes.driver.volume."
370
)
371
372
val executorVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix(
373
sparkConf,
374
"spark.kubernetes.executor.volume."
375
)
376
```
377
378
#### Volume Specification Parsing
379
```scala
380
private def parseVolumeSpec(
381
volumeName: String,
382
volumeConf: Map[String, String]
383
): KubernetesVolumeSpec = {
384
385
val mountPath = volumeConf.getOrElse("mount.path",
386
throw new IllegalArgumentException(s"Volume $volumeName missing mount.path"))
387
388
val mountSubPath = volumeConf.getOrElse("mount.subPath", "")
389
val mountReadOnly = volumeConf.getOrElse("mount.readOnly", "false").toBoolean
390
391
val volumeType = determineVolumeType(volumeConf)
392
val volumeSpecificConf = parseVolumeSpecificConf(volumeType, volumeConf)
393
394
KubernetesVolumeSpec(
395
volumeName = volumeName,
396
mountPath = mountPath,
397
mountSubPath = mountSubPath,
398
mountReadOnly = mountReadOnly,
399
volumeConf = volumeSpecificConf
400
)
401
}
402
403
private def determineVolumeType(volumeConf: Map[String, String]): String = {
404
val volumeTypes = Set("hostPath", "persistentVolumeClaim", "emptyDir", "nfs", "configMap", "secret")
405
406
val presentTypes = volumeTypes.filter { volumeType =>
407
volumeConf.keys.exists(_.startsWith(s"$volumeType."))
408
}
409
410
presentTypes.size match {
411
case 0 => throw new IllegalArgumentException("No volume type specified")
412
case 1 => presentTypes.head
413
case _ => throw new IllegalArgumentException(s"Multiple volume types specified: ${presentTypes.mkString(", ")}")
414
}
415
}
416
417
private def parseVolumeSpecificConf(
418
volumeType: String,
419
volumeConf: Map[String, String]
420
): KubernetesVolumeSpecificConf = {
421
422
volumeType match {
423
case "hostPath" =>
424
val hostPath = volumeConf.getOrElse("hostPath.path",
425
throw new IllegalArgumentException("hostPath volume missing path"))
426
KubernetesHostPathVolumeConf(hostPath)
427
428
case "persistentVolumeClaim" =>
429
val claimName = volumeConf.getOrElse("persistentVolumeClaim.claimName",
430
throw new IllegalArgumentException("PVC volume missing claimName"))
431
KubernetesPVCVolumeConf(claimName)
432
433
case "emptyDir" =>
434
val medium = volumeConf.get("emptyDir.medium")
435
val sizeLimit = volumeConf.get("emptyDir.sizeLimit")
436
KubernetesEmptyDirVolumeConf(medium, sizeLimit)
437
438
case "nfs" =>
439
val server = volumeConf.getOrElse("nfs.server",
440
throw new IllegalArgumentException("NFS volume missing server"))
441
val path = volumeConf.getOrElse("nfs.path",
442
throw new IllegalArgumentException("NFS volume missing path"))
443
KubernetesNFSVolumeConf(server, path)
444
445
case _ =>
446
throw new IllegalArgumentException(s"Unsupported volume type: $volumeType")
447
}
448
}
449
```
450
451
### Volume Configuration Examples
452
453
```scala
454
// Host path volume configuration
455
spark.conf.set("spark.kubernetes.volume.data.hostPath.path", "/host/data")
456
spark.conf.set("spark.kubernetes.volume.data.mount.path", "/data")
457
spark.conf.set("spark.kubernetes.volume.data.mount.readOnly", "true")
458
459
// PVC volume configuration
460
spark.conf.set("spark.kubernetes.volume.storage.persistentVolumeClaim.claimName", "spark-pvc")
461
spark.conf.set("spark.kubernetes.volume.storage.mount.path", "/storage")
462
spark.conf.set("spark.kubernetes.volume.storage.mount.subPath", "spark-data")
463
464
// EmptyDir volume with memory backing
465
spark.conf.set("spark.kubernetes.volume.tmp.emptyDir.medium", "Memory")
466
spark.conf.set("spark.kubernetes.volume.tmp.emptyDir.sizeLimit", "1Gi")
467
spark.conf.set("spark.kubernetes.volume.tmp.mount.path", "/tmp")
468
469
// NFS volume configuration
470
spark.conf.set("spark.kubernetes.volume.shared.nfs.server", "nfs-server.example.com")
471
spark.conf.set("spark.kubernetes.volume.shared.nfs.path", "/shared/data")
472
spark.conf.set("spark.kubernetes.volume.shared.mount.path", "/shared")
473
```
474
475
## Resource Management Utilities
476
477
### Resource Requirement Building
478
479
```scala
480
object ResourceUtils {
481
482
def buildResourceRequirements(
483
conf: KubernetesConf,
484
isDriver: Boolean = false
485
): ResourceRequirements = {
486
487
val prefix = if (isDriver) "spark.kubernetes.driver" else "spark.kubernetes.executor"
488
489
val limits = mutable.Map[String, Quantity]()
490
val requests = mutable.Map[String, Quantity]()
491
492
// CPU configuration
493
conf.sparkConf.getOption(s"$prefix.limit.cores").foreach { cores =>
494
limits("cpu") = new Quantity(cores)
495
}
496
497
conf.sparkConf.getOption(s"$prefix.request.cores").foreach { cores =>
498
requests("cpu") = new Quantity(cores)
499
}
500
501
// Memory configuration
502
conf.sparkConf.getOption(s"$prefix.memory").foreach { memory =>
503
val memoryBytes = Utils.byteStringAsBytes(memory)
504
val memoryMb = memoryBytes / (1024 * 1024)
505
506
// Add memory overhead
507
val overhead = conf.sparkConf.getOption(s"$prefix.memoryOverhead")
508
.map(Utils.byteStringAsBytes)
509
.getOrElse((memoryBytes * 0.1).toLong) // 10% default overhead
510
511
val totalMemory = memoryBytes + overhead
512
limits("memory") = new Quantity(s"${totalMemory}")
513
requests("memory") = new Quantity(s"${totalMemory}")
514
}
515
516
new ResourceRequirementsBuilder()
517
.withLimits(limits.asJava)
518
.withRequests(requests.asJava)
519
.build()
520
}
521
}
522
```
523
524
### Label and Annotation Utilities
525
526
```scala
527
object MetadataUtils {
528
529
def buildLabels(conf: KubernetesConf): Map[String, String] = {
530
val baseLabels = Map(
531
Constants.SPARK_APP_ID_LABEL -> conf.appId,
532
Constants.SPARK_APP_NAME_LABEL -> conf.appName,
533
Constants.SPARK_VERSION_LABEL -> org.apache.spark.SPARK_VERSION
534
)
535
536
baseLabels ++ conf.labels
537
}
538
539
def buildAnnotations(conf: KubernetesConf): Map[String, String] = {
540
val baseAnnotations = Map(
541
Constants.CREATED_BY_ANNOTATION -> "Apache Spark",
542
Constants.SPARK_APP_NAME_ANNOTATION -> conf.appName
543
)
544
545
baseAnnotations ++ conf.annotations
546
}
547
548
def validateLabelKey(key: String): Unit = {
549
require(key.matches("[a-z0-9A-Z]([a-z0-9A-Z._-]*[a-z0-9A-Z])?"),
550
s"Invalid label key: $key")
551
require(key.length <= 63, s"Label key too long: $key")
552
}
553
554
def validateLabelValue(value: String): Unit = {
555
require(value.matches("[a-z0-9A-Z]([a-z0-9A-Z._-]*[a-z0-9A-Z])?") || value.isEmpty,
556
s"Invalid label value: $value")
557
require(value.length <= 63, s"Label value too long: $value")
558
}
559
}
560
```
561
562
## Error Handling Utilities
563
564
### Kubernetes Exception Handling
565
566
```scala
567
object KubernetesExceptionUtils {
568
569
def handleKubernetesApiException[T](operation: String)(block: => T): Try[T] = {
570
Try(block).recover {
571
case e: KubernetesClientException =>
572
logError(s"Kubernetes API error during $operation: ${e.getMessage}")
573
throw new SparkException(s"Failed to $operation: ${e.getMessage}", e)
574
575
case e: IOException =>
576
logError(s"Network error during $operation: ${e.getMessage}")
577
throw new SparkException(s"Network error during $operation: ${e.getMessage}", e)
578
579
case e: Exception =>
580
logError(s"Unexpected error during $operation: ${e.getMessage}")
581
throw new SparkException(s"Unexpected error during $operation: ${e.getMessage}", e)
582
}
583
}
584
585
def withRetry[T](
586
operation: String,
587
maxRetries: Int = 3,
588
backoffMillis: Long = 1000
589
)(block: => T): T = {
590
591
var lastException: Exception = null
592
593
for (attempt <- 1 to maxRetries) {
594
try {
595
return block
596
} catch {
597
case e: Exception =>
598
lastException = e
599
if (attempt < maxRetries) {
600
logWarning(s"$operation failed (attempt $attempt/$maxRetries), retrying in ${backoffMillis}ms")
601
Thread.sleep(backoffMillis * attempt) // Exponential backoff
602
}
603
}
604
}
605
606
throw new SparkException(s"$operation failed after $maxRetries attempts", lastException)
607
}
608
}
609
610
// Usage
611
val pod = KubernetesExceptionUtils.withRetry("create executor pod") {
612
kubernetesClient.pods()
613
.inNamespace(namespace)
614
.create(podSpec)
615
}
616
```
617
618
### Configuration Validation Utilities
619
620
```scala
621
object ConfigValidationUtils {
622
623
def validateImageName(image: String): Unit = {
624
require(image.nonEmpty, "Container image cannot be empty")
625
626
// Basic image name validation
627
val imagePattern = "^([a-zA-Z0-9._-]+(/[a-zA-Z0-9._-]+)*)(:([a-zA-Z0-9._-]+))?$".r
628
require(imagePattern.matches(image), s"Invalid container image format: $image")
629
630
// Warn about latest tag
631
if (image.endsWith(":latest")) {
632
logWarning("Using 'latest' tag is discouraged in production environments")
633
}
634
}
635
636
def validateNamespaceName(namespace: String): Unit = {
637
require(namespace.nonEmpty, "Kubernetes namespace cannot be empty")
638
require(namespace.matches("[a-z0-9]([-a-z0-9]*[a-z0-9])?"),
639
s"Invalid namespace name: $namespace")
640
require(namespace.length <= 63, s"Namespace name too long: $namespace")
641
}
642
643
def validateResourceName(name: String): Unit = {
644
require(name.nonEmpty, "Resource name cannot be empty")
645
require(name.matches("[a-z0-9]([-a-z0-9]*[a-z0-9])?"),
646
s"Invalid resource name: $name")
647
require(name.length <= 253, s"Resource name too long: $name")
648
}
649
650
def validateMemoryString(memory: String): Unit = {
651
try {
652
val bytes = Utils.byteStringAsBytes(memory)
653
require(bytes > 0, "Memory must be positive")
654
} catch {
655
case _: NumberFormatException =>
656
throw new IllegalArgumentException(s"Invalid memory format: $memory")
657
}
658
}
659
}
660
```
661
662
## Integration Patterns
663
664
### Utility Composition
665
666
```scala
667
// Common utility operations combined
668
object KubernetesOperations {
669
670
def createConfiguredPod(
671
conf: KubernetesConf,
672
podTemplate: Option[File] = None
673
): SparkPod = {
674
675
val client = SparkKubernetesClientFactory.createKubernetesClient(
676
ClientType.Driver,
677
Some(conf)
678
)
679
680
val basePod = podTemplate match {
681
case Some(template) =>
682
KubernetesUtils.loadPodFromTemplate(client, template, None)
683
case None =>
684
SparkPod.initialPod()
685
}
686
687
val volumes = KubernetesVolumeUtils.parseVolumesWithPrefix(
688
conf.sparkConf,
689
"spark.kubernetes.volume."
690
)
691
692
// Apply volumes and other configurations
693
applyVolumesToPod(basePod, volumes)
694
}
695
696
private def applyVolumesToPod(
697
pod: SparkPod,
698
volumes: Seq[KubernetesVolumeSpec]
699
): SparkPod = {
700
// Implementation using MountVolumesFeatureStep logic
701
new MountVolumesFeatureStep(volumes).configurePod(pod)
702
}
703
}
704
```
705
706
The utilities layer provides the essential building blocks and helper functions that make the Kubernetes integration robust, flexible, and easy to use across all components of the Spark Kubernetes resource manager.