0
# Pod Management
1
2
The pod management system provides comprehensive monitoring and lifecycle management of executor pods in Kubernetes clusters, using a snapshot-based architecture for real-time state tracking and automated pod lifecycle operations.
3
4
## Core Architecture
5
6
### ExecutorPodsSnapshot { .api }
7
8
Immutable snapshot representing the current state of all executor pods in the cluster:
9
10
```scala
11
case class ExecutorPodsSnapshot(
12
executorPods: Map[Long, ExecutorPodState]
13
) {
14
def withUpdate(updatedPod: Pod): ExecutorPodsSnapshot
15
def withoutExecutor(executorId: Long): ExecutorPodsSnapshot
16
}
17
```
18
19
**Key Features**:
20
- **Immutable Design**: Snapshots are immutable, ensuring thread-safe access
21
- **Efficient Updates**: New snapshots are created with minimal object creation
22
- **State Consistency**: Provides consistent view of cluster state at a point in time
23
24
**Usage Patterns**:
25
```scala
26
// Get current cluster state
27
val snapshot = snapshotsStore.currentSnapshot
28
29
// Query running executors
30
val runningExecutors = snapshot.executorPods.values.collect {
31
case PodRunning(pod) => pod.getMetadata.getName
32
}
33
34
// Check executor status
35
snapshot.executorPods.get(executorId) match {
36
case Some(PodRunning(pod)) => println(s"Executor $executorId is running")
37
case Some(PodFailed(pod)) => println(s"Executor $executorId has failed")
38
case None => println(s"Executor $executorId not found")
39
}
40
```
41
42
## Pod State Management
43
44
### ExecutorPodState Hierarchy { .api }
45
46
Sealed trait hierarchy representing all possible executor pod states:
47
48
```scala
49
sealed trait ExecutorPodState {
50
def pod: Pod
51
}
52
53
// Active states
54
case class PodPending(pod: Pod) extends ExecutorPodState
55
case class PodRunning(pod: Pod) extends ExecutorPodState
56
case class PodUnknown(pod: Pod) extends ExecutorPodState
57
58
// Final states (terminal)
59
sealed trait FinalPodState extends ExecutorPodState
60
61
case class PodSucceeded(pod: Pod) extends FinalPodState
62
case class PodFailed(pod: Pod) extends FinalPodState
63
case class PodDeleted(pod: Pod) extends FinalPodState
64
```
65
66
**State Transitions**:
67
```scala
68
// Pod lifecycle state machine
69
PodPending → PodRunning → PodSucceeded/PodFailed
70
PodPending → PodFailed
71
Any State → PodDeleted (explicit deletion)
72
Any State → PodUnknown (temporary loss of connectivity)
73
```
74
75
**State Classification**:
76
- **Active States**: Pod is still being managed and may transition to other states
77
- **Final States**: Terminal states indicating pod lifecycle completion
78
- **Unknown State**: Temporary state when pod status cannot be determined
79
80
### Pod State Utilities
81
82
```scala
83
object ExecutorPodState {
84
85
def fromPod(pod: Pod): ExecutorPodState = {
86
Option(pod.getStatus).map(_.getPhase) match {
87
case Some("Pending") => PodPending(pod)
88
case Some("Running") => PodRunning(pod)
89
case Some("Succeeded") => PodSucceeded(pod)
90
case Some("Failed") => PodFailed(pod)
91
case _ => PodUnknown(pod)
92
}
93
}
94
95
def isActive(state: ExecutorPodState): Boolean = state match {
96
case _: FinalPodState => false
97
case _ => true
98
}
99
100
def shouldRetry(state: ExecutorPodState): Boolean = state match {
101
case PodFailed(pod) =>
102
val restartPolicy = pod.getSpec.getRestartPolicy
103
restartPolicy != "Never"
104
case _ => false
105
}
106
}
107
```
108
109
## Snapshot Management
110
111
### ExecutorPodsSnapshotsStore { .api }
112
113
Interface for storing and retrieving executor pod snapshots:
114
115
```scala
116
trait ExecutorPodsSnapshotsStore {
117
def addSubscriber(processBatchIntervalMillis: Long)
118
(onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit): Unit
119
120
def notifySubscribers(): Unit
121
def currentSnapshot: ExecutorPodsSnapshot
122
def stop(): Unit
123
}
124
```
125
126
### ExecutorPodsSnapshotsStoreImpl { .api }
127
128
Concrete implementation providing thread-safe snapshot management:
129
130
```scala
131
class ExecutorPodsSnapshotsStoreImpl(
132
subscribersExecutor: ScheduledExecutorService
133
) extends ExecutorPodsSnapshotsStore {
134
135
private val currentSnapshotState = new AtomicReference(ExecutorPodsSnapshot())
136
private val subscribers = new ConcurrentLinkedQueue[SnapshotSubscriber]()
137
138
def replaceSnapshot(newSnapshot: ExecutorPodsSnapshot): Unit
139
def updatePod(updatedPod: Pod): Unit
140
def removePod(deletedPod: Pod): Unit
141
}
142
```
143
144
**Key Operations**:
145
```scala
146
// Update snapshot with new pod state
147
snapshotsStore.updatePod(updatedPod)
148
149
// Remove terminated pod from tracking
150
snapshotsStore.removePod(terminatedPod)
151
152
// Replace entire snapshot (bulk updates)
153
val newSnapshot = ExecutorPodsSnapshot(newExecutorMap)
154
snapshotsStore.replaceSnapshot(newSnapshot)
155
156
// Subscribe to snapshot changes
157
snapshotsStore.addSubscriber(1000) { snapshots =>
158
snapshots.foreach(processSnapshotUpdate)
159
}
160
```
161
162
## Snapshot Sources
163
164
### ExecutorPodsWatchSnapshotSource { .api }
165
166
Provides real-time snapshot updates via Kubernetes Watch API:
167
168
```scala
169
class ExecutorPodsWatchSnapshotSource(
170
snapshotsStore: ExecutorPodsSnapshotsStore,
171
kubernetesClient: KubernetesClient,
172
labels: Map[String, String]
173
) {
174
175
def start(applicationId: String): Unit
176
def stop(): Unit
177
}
178
```
179
180
**Watch API Integration**:
181
```scala
182
// Watch for pod changes in real-time
183
val watch = kubernetesClient.pods()
184
.inNamespace(namespace)
185
.withLabels(selectorLabels)
186
.watch(new Watcher[Pod] {
187
override def eventReceived(action: Action, pod: Pod): Unit = {
188
action match {
189
case ADDED | MODIFIED => snapshotsStore.updatePod(pod)
190
case DELETED => snapshotsStore.removePod(pod)
191
case ERROR => logWarning("Watch error received")
192
}
193
}
194
195
override def onClose(cause: WatcherException): Unit = {
196
if (cause != null) {
197
logError("Watch connection closed", cause)
198
scheduleReconnect()
199
}
200
}
201
})
202
```
203
204
**Benefits**:
205
- **Real-Time Updates**: Immediate notification of pod state changes
206
- **Low Latency**: Minimal delay between Kubernetes events and application response
207
- **Efficient**: No polling overhead, events pushed from Kubernetes API server
208
209
### ExecutorPodsPollingSnapshotSource { .api }
210
211
Alternative snapshot source using periodic API server polling:
212
213
```scala
214
class ExecutorPodsPollingSnapshotSource(
215
snapshotsStore: ExecutorPodsSnapshotsStore,
216
kubernetesClient: KubernetesClient,
217
labels: Map[String, String],
218
pollingInterval: Long
219
) {
220
221
def start(applicationId: String): Unit
222
def stop(): Unit
223
}
224
```
225
226
**Polling Implementation**:
227
```scala
228
// Periodic snapshot refresh
229
val pollingTask = scheduledExecutor.scheduleAtFixedRate(() => {
230
try {
231
val pods = kubernetesClient.pods()
232
.inNamespace(namespace)
233
.withLabels(selectorLabels)
234
.list()
235
.getItems
236
237
val executorPods = pods.asScala
238
.map(pod => extractExecutorId(pod) -> ExecutorPodState.fromPod(pod))
239
.toMap
240
241
val newSnapshot = ExecutorPodsSnapshot(executorPods)
242
snapshotsStore.replaceSnapshot(newSnapshot)
243
244
} catch {
245
case e: Exception =>
246
logWarning("Failed to poll executor pods", e)
247
}
248
}, 0, pollingInterval, TimeUnit.MILLISECONDS)
249
```
250
251
**Use Cases**:
252
- **Network Reliability**: More resilient to temporary network issues
253
- **Firewall Restrictions**: Works when watch API is blocked
254
- **Debugging**: Predictable polling intervals aid in troubleshooting
255
256
## Lifecycle Management
257
258
### ExecutorPodsLifecycleManager { .api }
259
260
Manages complete lifecycle of executor pods from creation to cleanup:
261
262
```scala
263
class ExecutorPodsLifecycleManager(
264
conf: SparkConf,
265
kubernetesClient: KubernetesClient,
266
snapshotsStore: ExecutorPodsSnapshotsStore
267
) {
268
269
def start(applicationId: String): Unit
270
def stop(): Unit
271
def onFinalNonDeletedState(podState: FinalPodState): Unit
272
}
273
```
274
275
**Lifecycle Operations**:
276
277
#### Pod Creation and Monitoring
278
```scala
279
// Subscribe to snapshot updates for lifecycle management
280
snapshotsStore.addSubscriber(batchIntervalMillis) { snapshots =>
281
snapshots.foreach { snapshot =>
282
snapshot.executorPods.values.foreach {
283
case finalState: FinalPodState if !finalState.isDeleted =>
284
onFinalNonDeletedState(finalState)
285
case _ => // Continue monitoring active pods
286
}
287
}
288
}
289
```
290
291
#### Failure Handling
292
```scala
293
def onFinalNonDeletedState(podState: FinalPodState): Unit = {
294
podState match {
295
case PodFailed(pod) =>
296
val executorId = getExecutorId(pod)
297
logWarning(s"Executor $executorId failed")
298
299
// Notify scheduler of executor loss
300
schedulerBackend.removeExecutor(executorId,
301
SlaveLost(s"Pod ${pod.getMetadata.getName} failed"))
302
303
// Clean up failed pod if configured
304
if (conf.get(DELETE_FAILED_EXECUTOR_PODS)) {
305
deleteExecutorPod(pod)
306
}
307
308
case PodSucceeded(pod) =>
309
val executorId = getExecutorId(pod)
310
logInfo(s"Executor $executorId completed successfully")
311
312
// Clean up completed pod
313
if (conf.get(DELETE_SUCCESSFUL_EXECUTOR_PODS)) {
314
deleteExecutorPod(pod)
315
}
316
}
317
}
318
```
319
320
#### Resource Cleanup
321
```scala
322
private def deleteExecutorPod(pod: Pod): Unit = {
323
val podName = pod.getMetadata.getName
324
try {
325
kubernetesClient.pods()
326
.inNamespace(pod.getMetadata.getNamespace)
327
.withName(podName)
328
.delete()
329
330
logInfo(s"Deleted executor pod: $podName")
331
} catch {
332
case e: Exception =>
333
logWarning(s"Failed to delete executor pod $podName", e)
334
}
335
}
336
```
337
338
### ExecutorPodsAllocator { .api }
339
340
Handles allocation of new executor pods based on scheduler requests:
341
342
```scala
343
class ExecutorPodsAllocator(
344
conf: SparkConf,
345
secMgr: SecurityManager,
346
executorBuilder: KubernetesExecutorBuilder,
347
kubernetesClient: KubernetesClient,
348
snapshotsStore: ExecutorPodsSnapshotsStore,
349
clock: Clock
350
) {
351
352
def setTotalExpectedExecutors(newTotal: Int): Unit
353
def start(applicationId: String): Unit
354
def stop(): Unit
355
}
356
```
357
358
**Allocation Logic**:
359
```scala
360
def setTotalExpectedExecutors(newTotal: Int): Unit = {
361
val currentSnapshot = snapshotsStore.currentSnapshot
362
val currentTotal = currentSnapshot.executorPods.size
363
364
if (newTotal > currentTotal) {
365
val podsToCreate = newTotal - currentTotal
366
logInfo(s"Requesting $podsToCreate new executor pods")
367
368
(1 to podsToCreate).foreach { _ =>
369
val newExecutorId = generateExecutorId()
370
val executorConf = createExecutorConf(newExecutorId)
371
val executorPod = executorBuilder.buildFromFeatures(executorConf, secMgr, kubernetesClient)
372
373
createExecutorPod(executorPod, newExecutorId)
374
}
375
}
376
}
377
378
private def createExecutorPod(pod: SparkPod, executorId: String): Unit = {
379
try {
380
val createdPod = kubernetesClient.pods()
381
.inNamespace(namespace)
382
.create(pod.pod)
383
384
logInfo(s"Created executor pod ${createdPod.getMetadata.getName} for executor $executorId")
385
} catch {
386
case e: Exception =>
387
logError(s"Failed to create executor pod for executor $executorId", e)
388
}
389
}
390
```
391
392
## Advanced Pod Management
393
394
### Pod Affinity and Anti-Affinity
395
396
```scala
397
// Configure pod anti-affinity for executor distribution
398
val podAntiAffinity = new PodAntiAffinityBuilder()
399
.addNewPreferredDuringSchedulingIgnoredDuringExecution()
400
.withWeight(100)
401
.withNewPodAffinityTerm()
402
.withNewLabelSelector()
403
.addToMatchLabels(SPARK_APP_ID_LABEL, applicationId)
404
.addToMatchLabels(SPARK_ROLE_LABEL, EXECUTOR_ROLE)
405
.endLabelSelector()
406
.withTopologyKey("kubernetes.io/hostname")
407
.endPodAffinityTerm()
408
.endPreferredDuringSchedulingIgnoredDuringExecution()
409
.build()
410
```
411
412
### Resource Monitoring
413
414
```scala
415
// Monitor resource usage across executor pods
416
def collectExecutorMetrics(snapshot: ExecutorPodsSnapshot): ExecutorMetrics = {
417
val runningPods = snapshot.executorPods.values.collect {
418
case PodRunning(pod) => pod
419
}
420
421
ExecutorMetrics(
422
totalPods = runningPods.size,
423
totalCpuRequest = runningPods.map(getCpuRequest).sum,
424
totalMemoryRequest = runningPods.map(getMemoryRequest).sum,
425
nodeDistribution = runningPods.groupBy(getNodeName).mapValues(_.size)
426
)
427
}
428
```
429
430
### Pod Template Integration
431
432
```scala
433
// Merge pod template with runtime configuration
434
def applyPodTemplate(
435
templatePod: Pod,
436
executorConf: KubernetesExecutorConf
437
): SparkPod = {
438
439
val mergedPod = new PodBuilder(templatePod)
440
// Override container image from configuration
441
.editFirstContainer()
442
.withImage(executorConf.get(CONTAINER_IMAGE))
443
.withResources(buildResourceRequirements(executorConf))
444
.endContainer()
445
// Add Spark-specific labels and annotations
446
.editMetadata()
447
.addToLabels(SPARK_APP_ID_LABEL, executorConf.appId)
448
.addToLabels(SPARK_ROLE_LABEL, EXECUTOR_ROLE)
449
.addToAnnotations(executorConf.annotations.asJava)
450
.endMetadata()
451
.build()
452
453
SparkPod(mergedPod, extractMainContainer(mergedPod))
454
}
455
```
456
457
## Error Handling and Recovery
458
459
### Pod Failure Detection
460
461
```scala
462
def diagnosePodFailure(failedPod: Pod): FailureDiagnosis = {
463
val podStatus = failedPod.getStatus
464
val containerStatuses = podStatus.getContainerStatuses.asScala
465
466
containerStatuses.find(_.getName == EXECUTOR_CONTAINER_NAME) match {
467
case Some(containerStatus) =>
468
Option(containerStatus.getState.getTerminated) match {
469
case Some(terminated) =>
470
FailureDiagnosis(
471
reason = terminated.getReason,
472
exitCode = terminated.getExitCode,
473
message = terminated.getMessage,
474
failureType = classifyFailureType(terminated)
475
)
476
case None =>
477
FailureDiagnosis.unknown("Container not terminated")
478
}
479
case None =>
480
FailureDiagnosis.unknown("Executor container not found")
481
}
482
}
483
```
484
485
### Automatic Recovery Strategies
486
487
```scala
488
def handleExecutorFailure(
489
executorId: String,
490
failureDiagnosis: FailureDiagnosis
491
): RecoveryAction = {
492
493
failureDiagnosis.failureType match {
494
case OutOfMemory =>
495
RecoveryAction.IncreaseMemory(executorId)
496
497
case ImagePullError =>
498
RecoveryAction.RetryWithBackoff(executorId, maxRetries = 3)
499
500
case NodeFailure =>
501
RecoveryAction.RescheduleOnDifferentNode(executorId)
502
503
case ApplicationError =>
504
RecoveryAction.NoRetry(executorId)
505
506
case _ =>
507
RecoveryAction.StandardRetry(executorId)
508
}
509
}
510
```
511
512
### Health Monitoring
513
514
```scala
515
// Implement health checks for executor pods
516
def performHealthCheck(pod: Pod): HealthStatus = {
517
val podName = pod.getMetadata.getName
518
519
try {
520
// Check if pod is responding to health endpoint
521
val healthResponse = kubernetesClient.pods()
522
.inNamespace(pod.getMetadata.getNamespace)
523
.withName(podName)
524
.portForward(8080)
525
.connect()
526
527
// Perform HTTP health check
528
val healthCheck = performHttpHealthCheck(healthResponse.getLocalPort)
529
530
healthCheck match {
531
case Success => HealthStatus.Healthy
532
case Failure(reason) => HealthStatus.Unhealthy(reason)
533
}
534
535
} catch {
536
case e: Exception =>
537
HealthStatus.Unknown(s"Health check failed: ${e.getMessage}")
538
}
539
}
540
```
541
542
## Performance Optimization
543
544
### Batch Processing
545
546
```scala
547
// Process snapshot updates in batches for efficiency
548
class BatchingSnapshotProcessor(
549
batchSize: Int,
550
batchTimeout: Long
551
) {
552
553
private val pendingUpdates = new ConcurrentLinkedQueue[PodUpdate]()
554
555
def processBatch(): Unit = {
556
val batch = mutable.ArrayBuffer[PodUpdate]()
557
558
// Collect batch of updates
559
while (batch.size < batchSize && pendingUpdates.nonEmpty) {
560
Option(pendingUpdates.poll()).foreach(batch += _)
561
}
562
563
if (batch.nonEmpty) {
564
// Process all updates in single snapshot operation
565
val updatedSnapshot = batch.foldLeft(currentSnapshot) { (snapshot, update) =>
566
applyUpdate(snapshot, update)
567
}
568
569
snapshotsStore.replaceSnapshot(updatedSnapshot)
570
}
571
}
572
}
573
```
574
575
### Memory Efficiency
576
577
```scala
578
// Optimize memory usage for large clusters
579
class CompactExecutorPodsSnapshot(
580
private val podStates: Array[ExecutorPodState],
581
private val executorIdToIndex: Map[Long, Int]
582
) extends ExecutorPodsSnapshot {
583
584
// Use arrays instead of maps for memory efficiency with large executor counts
585
override def executorPods: Map[Long, ExecutorPodState] = {
586
executorIdToIndex.map { case (id, index) =>
587
id -> podStates(index)
588
}
589
}
590
}
591
```
592
593
The pod management system provides a robust, scalable foundation for monitoring and managing executor pods in Kubernetes environments, with comprehensive state tracking, automatic recovery, and performance optimizations for large-scale deployments.