0
# Configuration Management
1
2
The configuration management system provides a comprehensive, type-safe approach to configuring Kubernetes deployments for Spark applications, extending Spark's native configuration framework with Kubernetes-specific options and validation.
3
4
## Configuration Architecture
5
6
### KubernetesConf Hierarchy { .api }
7
8
Base abstract class containing all metadata needed for Kubernetes pod creation and management:
9
10
```scala
11
abstract class KubernetesConf(val sparkConf: SparkConf) {
12
13
val resourceNamePrefix: String
14
def labels: Map[String, String]
15
def environment: Map[String, String]
16
def annotations: Map[String, String]
17
def secretEnvNamesToKeyRefs: Map[String, String]
18
def secretNamesToMountPaths: Map[String, String]
19
def volumes: Seq[KubernetesVolumeSpec]
20
21
def appName: String = get("spark.app.name", "spark")
22
def namespace: String = get(KUBERNETES_NAMESPACE)
23
def imagePullPolicy: String = get(CONTAINER_IMAGE_PULL_POLICY)
24
def nodeSelector: Map[String, String] =
25
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX)
26
27
// Utility methods for configuration access
28
def contains(config: ConfigEntry[_]): Boolean = sparkConf.contains(config)
29
def get[T](config: ConfigEntry[T]): T = sparkConf.get(config)
30
def get(conf: String): String = sparkConf.get(conf)
31
def get(conf: String, defaultValue: String): String = sparkConf.get(conf, defaultValue)
32
def getOption(key: String): Option[String] = sparkConf.getOption(key)
33
}
34
```
35
36
**Core Properties**:
37
- **Resource Naming**: `resourceNamePrefix` ensures unique Kubernetes resource names
38
- **Metadata**: `labels` and `annotations` for pod identification and configuration
39
- **Environment**: Environment variables and secret references
40
- **Storage**: Volume specifications and secret mount paths
41
- **Scheduling**: Node selectors and image pull policies
42
43
### KubernetesDriverConf { .api }
44
45
Driver-specific configuration extending the base configuration:
46
47
```scala
48
class KubernetesDriverConf(
49
sparkConf: SparkConf,
50
val appId: String,
51
val mainAppResource: MainAppResource,
52
val mainClass: String,
53
val appArgs: Array[String]
54
) extends KubernetesConf(sparkConf) {
55
56
def serviceAnnotations: Map[String, String]
57
}
58
```
59
60
**Driver-Specific Features**:
61
- **Application Resources**: Main class, application JAR, and command-line arguments
62
- **Service Configuration**: Annotations for Kubernetes service creation
63
- **Network Configuration**: Driver port and service type settings
64
65
**Creation Pattern**:
66
```scala
67
val driverConf = KubernetesConf.createDriverConf(
68
sparkConf = conf,
69
appName = "my-spark-app",
70
appResourceNamePrefix = s"spark-${UUID.randomUUID().toString.take(8)}",
71
appId = "app-123456789",
72
mainAppResource = JavaMainAppResource(Some("local:///opt/spark/jars/my-app.jar")),
73
mainClass = "com.example.MySparkApp",
74
appArgs = Array("--input", "/data", "--output", "/results")
75
)
76
```
77
78
### KubernetesExecutorConf { .api }
79
80
Executor-specific configuration for worker pod creation:
81
82
```scala
83
class KubernetesExecutorConf(
84
sparkConf: SparkConf,
85
val appId: String,
86
val executorId: String,
87
val driverPod: Option[Pod]
88
) extends KubernetesConf(sparkConf)
89
```
90
91
**Executor-Specific Features**:
92
- **Executor Identity**: Unique executor ID for tracking and management
93
- **Driver Connection**: Reference to driver pod for communication setup
94
- **Resource Allocation**: Executor-specific CPU, memory, and storage configuration
95
96
## Configuration Definitions
97
98
### Config Object { .api }
99
100
Centralized configuration definitions using Spark's ConfigBuilder pattern:
101
102
```scala
103
object Config {
104
// Container Images
105
val CONTAINER_IMAGE = ConfigBuilder("spark.kubernetes.container.image")
106
.doc("Container image to use for Spark pods")
107
.stringConf
108
.createWithDefault(null)
109
110
val DRIVER_CONTAINER_IMAGE = ConfigBuilder("spark.kubernetes.driver.container.image")
111
.doc("Container image to use for driver pod")
112
.fallbackConf(CONTAINER_IMAGE)
113
114
val EXECUTOR_CONTAINER_IMAGE = ConfigBuilder("spark.kubernetes.executor.container.image")
115
.doc("Container image to use for executor pods")
116
.fallbackConf(CONTAINER_IMAGE)
117
118
// Namespace and Authentication
119
val KUBERNETES_NAMESPACE = ConfigBuilder("spark.kubernetes.namespace")
120
.doc("Kubernetes namespace for Spark pods")
121
.stringConf
122
.createWithDefault("default")
123
124
val KUBERNETES_CONTEXT = ConfigBuilder("spark.kubernetes.context")
125
.doc("Kubernetes context to use")
126
.stringConf
127
.createOptional
128
129
val KUBERNETES_SERVICE_ACCOUNT_NAME = ConfigBuilder("spark.kubernetes.authenticate.serviceAccountName")
130
.doc("Service account for Spark pods")
131
.stringConf
132
.createOptional
133
134
// Resource Limits
135
val KUBERNETES_DRIVER_LIMIT_CORES = ConfigBuilder("spark.kubernetes.driver.limit.cores")
136
.doc("CPU limit for driver pod")
137
.stringConf
138
.createOptional
139
140
val KUBERNETES_EXECUTOR_LIMIT_CORES = ConfigBuilder("spark.kubernetes.executor.limit.cores")
141
.doc("CPU limit for executor pods")
142
.stringConf
143
.createOptional
144
145
// Dynamic Allocation
146
val DYN_ALLOCATION_ENABLED = ConfigBuilder("spark.dynamicAllocation.enabled")
147
.doc("Enable dynamic allocation of executors")
148
.booleanConf
149
.createWithDefault(false)
150
151
val DYN_ALLOCATION_MIN_EXECUTORS = ConfigBuilder("spark.dynamicAllocation.minExecutors")
152
.doc("Minimum number of executors")
153
.intConf
154
.createWithDefault(0)
155
156
val DYN_ALLOCATION_MAX_EXECUTORS = ConfigBuilder("spark.dynamicAllocation.maxExecutors")
157
.doc("Maximum number of executors")
158
.intConf
159
.createWithDefault(Int.MaxValue)
160
161
// Networking
162
val KUBERNETES_DRIVER_POD_NAME = ConfigBuilder("spark.kubernetes.driver.pod.name")
163
.doc("Name of the driver pod")
164
.stringConf
165
.createOptional
166
167
val KUBERNETES_DRIVER_SERVICE_TYPE = ConfigBuilder("spark.kubernetes.driver.service.type")
168
.doc("Service type for driver pod")
169
.stringConf
170
.createWithDefault("ClusterIP")
171
172
// Volume Configuration
173
val KUBERNETES_VOLUMES_PREFIX = "spark.kubernetes.volume."
174
val KUBERNETES_DRIVER_VOLUMES_PREFIX = "spark.kubernetes.driver.volume."
175
val KUBERNETES_EXECUTOR_VOLUMES_PREFIX = "spark.kubernetes.executor.volume."
176
177
// Pod Templates
178
val KUBERNETES_DRIVER_PODTEMPLATE_FILE = ConfigBuilder("spark.kubernetes.driver.podTemplateFile")
179
.doc("Path to driver pod template file")
180
.stringConf
181
.createOptional
182
183
val KUBERNETES_EXECUTOR_PODTEMPLATE_FILE = ConfigBuilder("spark.kubernetes.executor.podTemplateFile")
184
.doc("Path to executor pod template file")
185
.stringConf
186
.createOptional
187
}
188
```
189
190
### Configuration Categories
191
192
#### Authentication and Security
193
```scala
194
// Service Account Configuration
195
val KUBERNETES_SERVICE_ACCOUNT_NAME = ConfigBuilder("spark.kubernetes.authenticate.serviceAccountName")
196
197
// OAuth Token Authentication
198
val OAUTH_TOKEN = ConfigBuilder("spark.kubernetes.authenticate.oauthToken")
199
val OAUTH_TOKEN_FILE = ConfigBuilder("spark.kubernetes.authenticate.oauthTokenFile")
200
201
// Certificate Authentication
202
val CLIENT_KEY_FILE = ConfigBuilder("spark.kubernetes.authenticate.clientKeyFile")
203
val CLIENT_CERT_FILE = ConfigBuilder("spark.kubernetes.authenticate.clientCertFile")
204
val CA_CERT_FILE = ConfigBuilder("spark.kubernetes.authenticate.caCertFile")
205
```
206
207
#### Resource Management
208
```scala
209
// CPU Configuration
210
val KUBERNETES_DRIVER_REQUEST_CORES = ConfigBuilder("spark.kubernetes.driver.request.cores")
211
val KUBERNETES_EXECUTOR_REQUEST_CORES = ConfigBuilder("spark.kubernetes.executor.request.cores")
212
213
// Memory Configuration
214
val KUBERNETES_DRIVER_MEMORY = ConfigBuilder("spark.kubernetes.driver.memory")
215
val KUBERNETES_EXECUTOR_MEMORY = ConfigBuilder("spark.kubernetes.executor.memory")
216
val MEMORY_OVERHEAD_FACTOR = ConfigBuilder("spark.kubernetes.memoryOverheadFactor")
217
218
// Instance Configuration
219
val KUBERNETES_EXECUTOR_INSTANCES = ConfigBuilder("spark.kubernetes.executor.instances")
220
```
221
222
#### Storage and Volumes
223
```scala
224
// Local Directory Configuration
225
val KUBERNETES_LOCAL_DIRS_TMPFS = ConfigBuilder("spark.kubernetes.local.dirs.tmpfs")
226
227
// Volume Mount Prefixes
228
val KUBERNETES_VOLUMES_HOSTPATH_PREFIX = "spark.kubernetes.volume.hostPath"
229
val KUBERNETES_VOLUMES_PVC_PREFIX = "spark.kubernetes.volume.persistentVolumeClaim"
230
val KUBERNETES_VOLUMES_EMPTYDIR_PREFIX = "spark.kubernetes.volume.emptyDir"
231
```
232
233
## Constants and Defaults
234
235
### Constants Object { .api }
236
237
Kubernetes-specific constants and default values:
238
239
```scala
240
object Constants {
241
// Labels
242
val SPARK_APP_ID_LABEL = "spark-app-selector"
243
val SPARK_EXECUTOR_ID_LABEL = "spark-exec-id"
244
val SPARK_ROLE_LABEL = "spark-role"
245
val SPARK_VERSION_LABEL = "spark-version"
246
247
// Label Values
248
val SPARK_APP_NAME_LABEL = "spark-app-name"
249
val DRIVER_ROLE = "driver"
250
val EXECUTOR_ROLE = "executor"
251
252
// Ports
253
val DEFAULT_DRIVER_PORT = 7077
254
val DEFAULT_BLOCKMANAGER_PORT = 7078
255
val DEFAULT_UI_PORT = 4040
256
257
// Environment Variables
258
val ENV_DRIVER_URL = "SPARK_DRIVER_URL"
259
val ENV_EXECUTOR_CORES = "SPARK_EXECUTOR_CORES"
260
val ENV_EXECUTOR_MEMORY = "SPARK_EXECUTOR_MEMORY"
261
val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID"
262
val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID"
263
264
// Paths
265
val SPARK_CONF_DIR_INTERNAL = "/opt/spark/conf"
266
val SPARK_CONF_FILE_NAME = "spark.conf"
267
val HADOOP_CONF_DIR_PATH = "/opt/hadoop/conf"
268
val KERBEROS_CONF_DIR_PATH = "/opt/kerberos"
269
270
// Resource Types
271
val APP_RESOURCE_TYPE_JAVA = "java"
272
val APP_RESOURCE_TYPE_PYTHON = "python"
273
val APP_RESOURCE_TYPE_R = "r"
274
275
// Container Names
276
val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
277
val EXECUTOR_CONTAINER_NAME = "spark-kubernetes-executor"
278
279
// Annotations
280
val CREATED_BY_ANNOTATION = "created-by"
281
val SPARK_APP_NAME_ANNOTATION = "spark-app-name"
282
}
283
```
284
285
### Default Values and Validation
286
287
```scala
288
// Image Pull Policies
289
object ImagePullPolicy extends Enumeration {
290
val Always, Never, IfNotPresent = Value
291
}
292
293
// Service Types
294
object ServiceType extends Enumeration {
295
val ClusterIP, NodePort, LoadBalancer = Value
296
}
297
298
// Restart Policies
299
object RestartPolicy extends Enumeration {
300
val Always, OnFailure, Never = Value
301
}
302
```
303
304
## Volume Configuration
305
306
### KubernetesVolumeSpec { .api }
307
308
Comprehensive volume specification supporting multiple Kubernetes volume types:
309
310
```scala
311
case class KubernetesVolumeSpec(
312
volumeName: String,
313
mountPath: String,
314
mountSubPath: String = "",
315
mountReadOnly: Boolean = false,
316
volumeConf: KubernetesVolumeSpecificConf
317
)
318
```
319
320
### Volume Types { .api }
321
322
```scala
323
// Host Path Volumes
324
case class KubernetesHostPathVolumeConf(
325
hostPath: String
326
) extends KubernetesVolumeSpecificConf
327
328
// Persistent Volume Claims
329
case class KubernetesPVCVolumeConf(
330
claimName: String
331
) extends KubernetesVolumeSpecificConf
332
333
// Empty Directory Volumes
334
case class KubernetesEmptyDirVolumeConf(
335
medium: Option[String] = None,
336
sizeLimit: Option[String] = None
337
) extends KubernetesVolumeSpecificConf
338
339
// NFS Volumes
340
case class KubernetesNFSVolumeConf(
341
server: String,
342
path: String
343
) extends KubernetesVolumeSpecificConf
344
```
345
346
### Volume Configuration Examples
347
348
```scala
349
// Host path volume
350
spark.conf.set("spark.kubernetes.volume.data.hostPath.path", "/host/data")
351
spark.conf.set("spark.kubernetes.volume.data.mount.path", "/data")
352
spark.conf.set("spark.kubernetes.volume.data.mount.readOnly", "true")
353
354
// PVC volume
355
spark.conf.set("spark.kubernetes.volume.storage.persistentVolumeClaim.claimName", "spark-pvc")
356
spark.conf.set("spark.kubernetes.volume.storage.mount.path", "/storage")
357
358
// Empty dir with size limit
359
spark.conf.set("spark.kubernetes.volume.tmp.emptyDir.medium", "Memory")
360
spark.conf.set("spark.kubernetes.volume.tmp.emptyDir.sizeLimit", "1Gi")
361
spark.conf.set("spark.kubernetes.volume.tmp.mount.path", "/tmp")
362
```
363
364
## Configuration Validation
365
366
### Validation Framework
367
368
```scala
369
object ConfigValidation {
370
371
def validateDriverConfig(conf: KubernetesDriverConf): Unit = {
372
validateBaseConfig(conf)
373
validateDriverSpecific(conf)
374
}
375
376
def validateExecutorConfig(conf: KubernetesExecutorConf): Unit = {
377
validateBaseConfig(conf)
378
validateExecutorSpecific(conf)
379
}
380
381
private def validateBaseConfig(conf: KubernetesConf): Unit = {
382
// Required configurations
383
require(conf.get(CONTAINER_IMAGE).nonEmpty,
384
"Container image must be specified with spark.kubernetes.container.image")
385
386
require(conf.namespace.nonEmpty,
387
"Kubernetes namespace must be specified")
388
389
// Resource validation
390
validateResourceLimits(conf)
391
validateVolumeConfiguration(conf)
392
validateNetworkConfiguration(conf)
393
}
394
395
private def validateResourceLimits(conf: KubernetesConf): Unit = {
396
// CPU validation
397
conf.get(KUBERNETES_DRIVER_LIMIT_CORES).foreach { cores =>
398
require(cores.toDouble > 0, "Driver CPU limit must be positive")
399
}
400
401
// Memory validation
402
conf.get(KUBERNETES_DRIVER_MEMORY).foreach { memory =>
403
require(Utils.byteStringAsBytes(memory) > 0, "Driver memory must be positive")
404
}
405
}
406
407
private def validateVolumeConfiguration(conf: KubernetesConf): Unit = {
408
conf.volumes.foreach { volume =>
409
require(volume.volumeName.nonEmpty, "Volume name cannot be empty")
410
require(volume.mountPath.nonEmpty, "Mount path cannot be empty")
411
require(volume.mountPath.startsWith("/"), "Mount path must be absolute")
412
}
413
}
414
}
415
```
416
417
### Common Validation Rules
418
419
```scala
420
// Image name validation
421
def validateImageName(image: String): Unit = {
422
require(image.nonEmpty, "Container image cannot be empty")
423
require(!image.contains("latest") || sparkConf.get(ALLOW_LATEST_IMAGE_TAG),
424
"Using 'latest' tag is discouraged in production")
425
}
426
427
// Namespace validation
428
def validateNamespace(namespace: String): Unit = {
429
require(namespace.matches("[a-z0-9]([-a-z0-9]*[a-z0-9])?"),
430
"Namespace must be a valid DNS-1123 label")
431
require(namespace.length <= 63, "Namespace must be 63 characters or less")
432
}
433
434
// Resource name validation
435
def validateResourceName(name: String): Unit = {
436
require(name.matches("[a-z0-9]([-a-z0-9]*[a-z0-9])?"),
437
"Resource name must be a valid DNS-1123 subdomain")
438
require(name.length <= 253, "Resource name must be 253 characters or less")
439
}
440
```
441
442
## Configuration Usage Patterns
443
444
### Dynamic Configuration
445
446
```scala
447
// Environment-based configuration
448
val conf = new SparkConf()
449
.set(KUBERNETES_NAMESPACE, sys.env.getOrElse("SPARK_NAMESPACE", "default"))
450
.set(CONTAINER_IMAGE, sys.env("SPARK_IMAGE"))
451
.set(KUBERNETES_SERVICE_ACCOUNT_NAME, sys.env.getOrElse("SERVICE_ACCOUNT", "spark"))
452
453
// Conditional configuration
454
if (conf.get(DYN_ALLOCATION_ENABLED)) {
455
conf.set(DYN_ALLOCATION_MIN_EXECUTORS, "2")
456
conf.set(DYN_ALLOCATION_MAX_EXECUTORS, "20")
457
}
458
```
459
460
### Configuration Templates
461
462
```scala
463
// Development configuration template
464
def createDevConfig(): SparkConf = new SparkConf()
465
.set(KUBERNETES_NAMESPACE, "spark-dev")
466
.set(CONTAINER_IMAGE, "spark:3.0.1-dev")
467
.set(KUBERNETES_DRIVER_MEMORY, "1g")
468
.set(KUBERNETES_EXECUTOR_INSTANCES, "2")
469
.set(KUBERNETES_EXECUTOR_MEMORY, "2g")
470
471
// Production configuration template
472
def createProdConfig(): SparkConf = new SparkConf()
473
.set(KUBERNETES_NAMESPACE, "spark-prod")
474
.set(CONTAINER_IMAGE, "my-org/spark:3.0.1-stable")
475
.set(KUBERNETES_DRIVER_MEMORY, "4g")
476
.set(KUBERNETES_EXECUTOR_INSTANCES, "10")
477
.set(KUBERNETES_EXECUTOR_MEMORY, "8g")
478
.set(KUBERNETES_DRIVER_LIMIT_CORES, "2")
479
.set(KUBERNETES_EXECUTOR_LIMIT_CORES, "4")
480
```
481
482
### Configuration Inheritance
483
484
```scala
485
// Base configuration
486
val baseConf = new SparkConf()
487
.set(KUBERNETES_NAMESPACE, "spark")
488
.set(CONTAINER_IMAGE, "spark:3.0.1")
489
490
// Driver-specific additions
491
val driverConf = baseConf.clone()
492
.set(KUBERNETES_DRIVER_MEMORY, "2g")
493
.set(KUBERNETES_DRIVER_SERVICE_TYPE, "LoadBalancer")
494
495
// Executor-specific additions
496
val executorConf = baseConf.clone()
497
.set(KUBERNETES_EXECUTOR_MEMORY, "4g")
498
.set(KUBERNETES_EXECUTOR_INSTANCES, "5")
499
```
500
501
## Advanced Configuration Features
502
503
### Prefix-Based Configuration
504
505
```scala
506
// Volume configuration using prefixes
507
spark.conf.set("spark.kubernetes.volume.data.hostPath.path", "/host/data")
508
spark.conf.set("spark.kubernetes.volume.logs.persistentVolumeClaim.claimName", "logs-pvc")
509
510
// Secret configuration using prefixes
511
spark.conf.set("spark.kubernetes.driver.secrets.db-secret", "/opt/secrets/db")
512
spark.conf.set("spark.kubernetes.executor.secrets.api-key", "/opt/secrets/api")
513
514
// Environment variable secrets
515
spark.conf.set("spark.kubernetes.driver.secretKeyRef.DB_PASSWORD", "db-secret:password")
516
spark.conf.set("spark.kubernetes.executor.secretKeyRef.API_TOKEN", "api-secret:token")
517
```
518
519
### Pod Template Integration
520
521
```scala
522
// Pod template file configuration
523
spark.conf.set("spark.kubernetes.driver.podTemplateFile", "/templates/driver-pod.yaml")
524
spark.conf.set("spark.kubernetes.executor.podTemplateFile", "/templates/executor-pod.yaml")
525
526
// Template merging with configuration
527
val templatePod = KubernetesUtils.loadPodFromTemplate(
528
kubernetesClient,
529
new File("/templates/driver-pod.yaml"),
530
Some(Constants.DRIVER_CONTAINER_NAME)
531
)
532
533
// Configuration takes precedence over template
534
val finalPod = mergePodTemplateWithConfig(templatePod, driverConf)
535
```
536
537
The configuration management system provides a robust, extensible foundation for customizing every aspect of Kubernetes deployments while maintaining compatibility with Spark's existing configuration patterns.