or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-submission.mdcluster-management.mdconfiguration.mdfeature-steps.mdindex.mdpod-management.mdutilities.md

pod-management.mddocs/

0

# Pod Management

1

2

The pod management system provides comprehensive monitoring and lifecycle management of executor pods in Kubernetes clusters, using a snapshot-based architecture for real-time state tracking and automated pod lifecycle operations.

3

4

## Core Architecture

5

6

### ExecutorPodsSnapshot { .api }

7

8

Immutable snapshot representing the current state of all executor pods in the cluster:

9

10

```scala

11

case class ExecutorPodsSnapshot(

12

executorPods: Map[Long, ExecutorPodState]

13

) {

14

def withUpdate(updatedPod: Pod): ExecutorPodsSnapshot

15

def withoutExecutor(executorId: Long): ExecutorPodsSnapshot

16

}

17

```

18

19

**Key Features**:

20

- **Immutable Design**: Snapshots are immutable, ensuring thread-safe access

21

- **Efficient Updates**: New snapshots are created with minimal object creation

22

- **State Consistency**: Provides consistent view of cluster state at a point in time

23

24

**Usage Patterns**:

25

```scala

26

// Get current cluster state

27

val snapshot = snapshotsStore.currentSnapshot

28

29

// Query running executors

30

val runningExecutors = snapshot.executorPods.values.collect {

31

case PodRunning(pod) => pod.getMetadata.getName

32

}

33

34

// Check executor status

35

snapshot.executorPods.get(executorId) match {

36

case Some(PodRunning(pod)) => println(s"Executor $executorId is running")

37

case Some(PodFailed(pod)) => println(s"Executor $executorId has failed")

38

case None => println(s"Executor $executorId not found")

39

}

40

```

41

42

## Pod State Management

43

44

### ExecutorPodState Hierarchy { .api }

45

46

Sealed trait hierarchy representing all possible executor pod states:

47

48

```scala

49

sealed trait ExecutorPodState {

50

def pod: Pod

51

}

52

53

// Active states

54

case class PodPending(pod: Pod) extends ExecutorPodState

55

case class PodRunning(pod: Pod) extends ExecutorPodState

56

case class PodUnknown(pod: Pod) extends ExecutorPodState

57

58

// Final states (terminal)

59

sealed trait FinalPodState extends ExecutorPodState

60

61

case class PodSucceeded(pod: Pod) extends FinalPodState

62

case class PodFailed(pod: Pod) extends FinalPodState

63

case class PodDeleted(pod: Pod) extends FinalPodState

64

```

65

66

**State Transitions**:

67

```scala

68

// Pod lifecycle state machine

69

PodPending → PodRunning → PodSucceeded/PodFailed

70

PodPending → PodFailed

71

Any State → PodDeleted (explicit deletion)

72

Any State → PodUnknown (temporary loss of connectivity)

73

```

74

75

**State Classification**:

76

- **Active States**: Pod is still being managed and may transition to other states

77

- **Final States**: Terminal states indicating pod lifecycle completion

78

- **Unknown State**: Temporary state when pod status cannot be determined

79

80

### Pod State Utilities

81

82

```scala

83

object ExecutorPodState {

84

85

def fromPod(pod: Pod): ExecutorPodState = {

86

Option(pod.getStatus).map(_.getPhase) match {

87

case Some("Pending") => PodPending(pod)

88

case Some("Running") => PodRunning(pod)

89

case Some("Succeeded") => PodSucceeded(pod)

90

case Some("Failed") => PodFailed(pod)

91

case _ => PodUnknown(pod)

92

}

93

}

94

95

def isActive(state: ExecutorPodState): Boolean = state match {

96

case _: FinalPodState => false

97

case _ => true

98

}

99

100

def shouldRetry(state: ExecutorPodState): Boolean = state match {

101

case PodFailed(pod) =>

102

val restartPolicy = pod.getSpec.getRestartPolicy

103

restartPolicy != "Never"

104

case _ => false

105

}

106

}

107

```

108

109

## Snapshot Management

110

111

### ExecutorPodsSnapshotsStore { .api }

112

113

Interface for storing and retrieving executor pod snapshots:

114

115

```scala

116

trait ExecutorPodsSnapshotsStore {

117

def addSubscriber(processBatchIntervalMillis: Long)

118

(onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit): Unit

119

120

def notifySubscribers(): Unit

121

def currentSnapshot: ExecutorPodsSnapshot

122

def stop(): Unit

123

}

124

```

125

126

### ExecutorPodsSnapshotsStoreImpl { .api }

127

128

Concrete implementation providing thread-safe snapshot management:

129

130

```scala

131

class ExecutorPodsSnapshotsStoreImpl(

132

subscribersExecutor: ScheduledExecutorService

133

) extends ExecutorPodsSnapshotsStore {

134

135

private val currentSnapshotState = new AtomicReference(ExecutorPodsSnapshot())

136

private val subscribers = new ConcurrentLinkedQueue[SnapshotSubscriber]()

137

138

def replaceSnapshot(newSnapshot: ExecutorPodsSnapshot): Unit

139

def updatePod(updatedPod: Pod): Unit

140

def removePod(deletedPod: Pod): Unit

141

}

142

```

143

144

**Key Operations**:

145

```scala

146

// Update snapshot with new pod state

147

snapshotsStore.updatePod(updatedPod)

148

149

// Remove terminated pod from tracking

150

snapshotsStore.removePod(terminatedPod)

151

152

// Replace entire snapshot (bulk updates)

153

val newSnapshot = ExecutorPodsSnapshot(newExecutorMap)

154

snapshotsStore.replaceSnapshot(newSnapshot)

155

156

// Subscribe to snapshot changes

157

snapshotsStore.addSubscriber(1000) { snapshots =>

158

snapshots.foreach(processSnapshotUpdate)

159

}

160

```

161

162

## Snapshot Sources

163

164

### ExecutorPodsWatchSnapshotSource { .api }

165

166

Provides real-time snapshot updates via Kubernetes Watch API:

167

168

```scala

169

class ExecutorPodsWatchSnapshotSource(

170

snapshotsStore: ExecutorPodsSnapshotsStore,

171

kubernetesClient: KubernetesClient,

172

labels: Map[String, String]

173

) {

174

175

def start(applicationId: String): Unit

176

def stop(): Unit

177

}

178

```

179

180

**Watch API Integration**:

181

```scala

182

// Watch for pod changes in real-time

183

val watch = kubernetesClient.pods()

184

.inNamespace(namespace)

185

.withLabels(selectorLabels)

186

.watch(new Watcher[Pod] {

187

override def eventReceived(action: Action, pod: Pod): Unit = {

188

action match {

189

case ADDED | MODIFIED => snapshotsStore.updatePod(pod)

190

case DELETED => snapshotsStore.removePod(pod)

191

case ERROR => logWarning("Watch error received")

192

}

193

}

194

195

override def onClose(cause: WatcherException): Unit = {

196

if (cause != null) {

197

logError("Watch connection closed", cause)

198

scheduleReconnect()

199

}

200

}

201

})

202

```

203

204

**Benefits**:

205

- **Real-Time Updates**: Immediate notification of pod state changes

206

- **Low Latency**: Minimal delay between Kubernetes events and application response

207

- **Efficient**: No polling overhead, events pushed from Kubernetes API server

208

209

### ExecutorPodsPollingSnapshotSource { .api }

210

211

Alternative snapshot source using periodic API server polling:

212

213

```scala

214

class ExecutorPodsPollingSnapshotSource(

215

snapshotsStore: ExecutorPodsSnapshotsStore,

216

kubernetesClient: KubernetesClient,

217

labels: Map[String, String],

218

pollingInterval: Long

219

) {

220

221

def start(applicationId: String): Unit

222

def stop(): Unit

223

}

224

```

225

226

**Polling Implementation**:

227

```scala

228

// Periodic snapshot refresh

229

val pollingTask = scheduledExecutor.scheduleAtFixedRate(() => {

230

try {

231

val pods = kubernetesClient.pods()

232

.inNamespace(namespace)

233

.withLabels(selectorLabels)

234

.list()

235

.getItems

236

237

val executorPods = pods.asScala

238

.map(pod => extractExecutorId(pod) -> ExecutorPodState.fromPod(pod))

239

.toMap

240

241

val newSnapshot = ExecutorPodsSnapshot(executorPods)

242

snapshotsStore.replaceSnapshot(newSnapshot)

243

244

} catch {

245

case e: Exception =>

246

logWarning("Failed to poll executor pods", e)

247

}

248

}, 0, pollingInterval, TimeUnit.MILLISECONDS)

249

```

250

251

**Use Cases**:

252

- **Network Reliability**: More resilient to temporary network issues

253

- **Firewall Restrictions**: Works when watch API is blocked

254

- **Debugging**: Predictable polling intervals aid in troubleshooting

255

256

## Lifecycle Management

257

258

### ExecutorPodsLifecycleManager { .api }

259

260

Manages complete lifecycle of executor pods from creation to cleanup:

261

262

```scala

263

class ExecutorPodsLifecycleManager(

264

conf: SparkConf,

265

kubernetesClient: KubernetesClient,

266

snapshotsStore: ExecutorPodsSnapshotsStore

267

) {

268

269

def start(applicationId: String): Unit

270

def stop(): Unit

271

def onFinalNonDeletedState(podState: FinalPodState): Unit

272

}

273

```

274

275

**Lifecycle Operations**:

276

277

#### Pod Creation and Monitoring

278

```scala

279

// Subscribe to snapshot updates for lifecycle management

280

snapshotsStore.addSubscriber(batchIntervalMillis) { snapshots =>

281

snapshots.foreach { snapshot =>

282

snapshot.executorPods.values.foreach {

283

case finalState: FinalPodState if !finalState.isDeleted =>

284

onFinalNonDeletedState(finalState)

285

case _ => // Continue monitoring active pods

286

}

287

}

288

}

289

```

290

291

#### Failure Handling

292

```scala

293

def onFinalNonDeletedState(podState: FinalPodState): Unit = {

294

podState match {

295

case PodFailed(pod) =>

296

val executorId = getExecutorId(pod)

297

logWarning(s"Executor $executorId failed")

298

299

// Notify scheduler of executor loss

300

schedulerBackend.removeExecutor(executorId,

301

SlaveLost(s"Pod ${pod.getMetadata.getName} failed"))

302

303

// Clean up failed pod if configured

304

if (conf.get(DELETE_FAILED_EXECUTOR_PODS)) {

305

deleteExecutorPod(pod)

306

}

307

308

case PodSucceeded(pod) =>

309

val executorId = getExecutorId(pod)

310

logInfo(s"Executor $executorId completed successfully")

311

312

// Clean up completed pod

313

if (conf.get(DELETE_SUCCESSFUL_EXECUTOR_PODS)) {

314

deleteExecutorPod(pod)

315

}

316

}

317

}

318

```

319

320

#### Resource Cleanup

321

```scala

322

private def deleteExecutorPod(pod: Pod): Unit = {

323

val podName = pod.getMetadata.getName

324

try {

325

kubernetesClient.pods()

326

.inNamespace(pod.getMetadata.getNamespace)

327

.withName(podName)

328

.delete()

329

330

logInfo(s"Deleted executor pod: $podName")

331

} catch {

332

case e: Exception =>

333

logWarning(s"Failed to delete executor pod $podName", e)

334

}

335

}

336

```

337

338

### ExecutorPodsAllocator { .api }

339

340

Handles allocation of new executor pods based on scheduler requests:

341

342

```scala

343

class ExecutorPodsAllocator(

344

conf: SparkConf,

345

secMgr: SecurityManager,

346

executorBuilder: KubernetesExecutorBuilder,

347

kubernetesClient: KubernetesClient,

348

snapshotsStore: ExecutorPodsSnapshotsStore,

349

clock: Clock

350

) {

351

352

def setTotalExpectedExecutors(newTotal: Int): Unit

353

def start(applicationId: String): Unit

354

def stop(): Unit

355

}

356

```

357

358

**Allocation Logic**:

359

```scala

360

def setTotalExpectedExecutors(newTotal: Int): Unit = {

361

val currentSnapshot = snapshotsStore.currentSnapshot

362

val currentTotal = currentSnapshot.executorPods.size

363

364

if (newTotal > currentTotal) {

365

val podsToCreate = newTotal - currentTotal

366

logInfo(s"Requesting $podsToCreate new executor pods")

367

368

(1 to podsToCreate).foreach { _ =>

369

val newExecutorId = generateExecutorId()

370

val executorConf = createExecutorConf(newExecutorId)

371

val executorPod = executorBuilder.buildFromFeatures(executorConf, secMgr, kubernetesClient)

372

373

createExecutorPod(executorPod, newExecutorId)

374

}

375

}

376

}

377

378

private def createExecutorPod(pod: SparkPod, executorId: String): Unit = {

379

try {

380

val createdPod = kubernetesClient.pods()

381

.inNamespace(namespace)

382

.create(pod.pod)

383

384

logInfo(s"Created executor pod ${createdPod.getMetadata.getName} for executor $executorId")

385

} catch {

386

case e: Exception =>

387

logError(s"Failed to create executor pod for executor $executorId", e)

388

}

389

}

390

```

391

392

## Advanced Pod Management

393

394

### Pod Affinity and Anti-Affinity

395

396

```scala

397

// Configure pod anti-affinity for executor distribution

398

val podAntiAffinity = new PodAntiAffinityBuilder()

399

.addNewPreferredDuringSchedulingIgnoredDuringExecution()

400

.withWeight(100)

401

.withNewPodAffinityTerm()

402

.withNewLabelSelector()

403

.addToMatchLabels(SPARK_APP_ID_LABEL, applicationId)

404

.addToMatchLabels(SPARK_ROLE_LABEL, EXECUTOR_ROLE)

405

.endLabelSelector()

406

.withTopologyKey("kubernetes.io/hostname")

407

.endPodAffinityTerm()

408

.endPreferredDuringSchedulingIgnoredDuringExecution()

409

.build()

410

```

411

412

### Resource Monitoring

413

414

```scala

415

// Monitor resource usage across executor pods

416

def collectExecutorMetrics(snapshot: ExecutorPodsSnapshot): ExecutorMetrics = {

417

val runningPods = snapshot.executorPods.values.collect {

418

case PodRunning(pod) => pod

419

}

420

421

ExecutorMetrics(

422

totalPods = runningPods.size,

423

totalCpuRequest = runningPods.map(getCpuRequest).sum,

424

totalMemoryRequest = runningPods.map(getMemoryRequest).sum,

425

nodeDistribution = runningPods.groupBy(getNodeName).mapValues(_.size)

426

)

427

}

428

```

429

430

### Pod Template Integration

431

432

```scala

433

// Merge pod template with runtime configuration

434

def applyPodTemplate(

435

templatePod: Pod,

436

executorConf: KubernetesExecutorConf

437

): SparkPod = {

438

439

val mergedPod = new PodBuilder(templatePod)

440

// Override container image from configuration

441

.editFirstContainer()

442

.withImage(executorConf.get(CONTAINER_IMAGE))

443

.withResources(buildResourceRequirements(executorConf))

444

.endContainer()

445

// Add Spark-specific labels and annotations

446

.editMetadata()

447

.addToLabels(SPARK_APP_ID_LABEL, executorConf.appId)

448

.addToLabels(SPARK_ROLE_LABEL, EXECUTOR_ROLE)

449

.addToAnnotations(executorConf.annotations.asJava)

450

.endMetadata()

451

.build()

452

453

SparkPod(mergedPod, extractMainContainer(mergedPod))

454

}

455

```

456

457

## Error Handling and Recovery

458

459

### Pod Failure Detection

460

461

```scala

462

def diagnosePodFailure(failedPod: Pod): FailureDiagnosis = {

463

val podStatus = failedPod.getStatus

464

val containerStatuses = podStatus.getContainerStatuses.asScala

465

466

containerStatuses.find(_.getName == EXECUTOR_CONTAINER_NAME) match {

467

case Some(containerStatus) =>

468

Option(containerStatus.getState.getTerminated) match {

469

case Some(terminated) =>

470

FailureDiagnosis(

471

reason = terminated.getReason,

472

exitCode = terminated.getExitCode,

473

message = terminated.getMessage,

474

failureType = classifyFailureType(terminated)

475

)

476

case None =>

477

FailureDiagnosis.unknown("Container not terminated")

478

}

479

case None =>

480

FailureDiagnosis.unknown("Executor container not found")

481

}

482

}

483

```

484

485

### Automatic Recovery Strategies

486

487

```scala

488

def handleExecutorFailure(

489

executorId: String,

490

failureDiagnosis: FailureDiagnosis

491

): RecoveryAction = {

492

493

failureDiagnosis.failureType match {

494

case OutOfMemory =>

495

RecoveryAction.IncreaseMemory(executorId)

496

497

case ImagePullError =>

498

RecoveryAction.RetryWithBackoff(executorId, maxRetries = 3)

499

500

case NodeFailure =>

501

RecoveryAction.RescheduleOnDifferentNode(executorId)

502

503

case ApplicationError =>

504

RecoveryAction.NoRetry(executorId)

505

506

case _ =>

507

RecoveryAction.StandardRetry(executorId)

508

}

509

}

510

```

511

512

### Health Monitoring

513

514

```scala

515

// Implement health checks for executor pods

516

def performHealthCheck(pod: Pod): HealthStatus = {

517

val podName = pod.getMetadata.getName

518

519

try {

520

// Check if pod is responding to health endpoint

521

val healthResponse = kubernetesClient.pods()

522

.inNamespace(pod.getMetadata.getNamespace)

523

.withName(podName)

524

.portForward(8080)

525

.connect()

526

527

// Perform HTTP health check

528

val healthCheck = performHttpHealthCheck(healthResponse.getLocalPort)

529

530

healthCheck match {

531

case Success => HealthStatus.Healthy

532

case Failure(reason) => HealthStatus.Unhealthy(reason)

533

}

534

535

} catch {

536

case e: Exception =>

537

HealthStatus.Unknown(s"Health check failed: ${e.getMessage}")

538

}

539

}

540

```

541

542

## Performance Optimization

543

544

### Batch Processing

545

546

```scala

547

// Process snapshot updates in batches for efficiency

548

class BatchingSnapshotProcessor(

549

batchSize: Int,

550

batchTimeout: Long

551

) {

552

553

private val pendingUpdates = new ConcurrentLinkedQueue[PodUpdate]()

554

555

def processBatch(): Unit = {

556

val batch = mutable.ArrayBuffer[PodUpdate]()

557

558

// Collect batch of updates

559

while (batch.size < batchSize && pendingUpdates.nonEmpty) {

560

Option(pendingUpdates.poll()).foreach(batch += _)

561

}

562

563

if (batch.nonEmpty) {

564

// Process all updates in single snapshot operation

565

val updatedSnapshot = batch.foldLeft(currentSnapshot) { (snapshot, update) =>

566

applyUpdate(snapshot, update)

567

}

568

569

snapshotsStore.replaceSnapshot(updatedSnapshot)

570

}

571

}

572

}

573

```

574

575

### Memory Efficiency

576

577

```scala

578

// Optimize memory usage for large clusters

579

class CompactExecutorPodsSnapshot(

580

private val podStates: Array[ExecutorPodState],

581

private val executorIdToIndex: Map[Long, Int]

582

) extends ExecutorPodsSnapshot {

583

584

// Use arrays instead of maps for memory efficiency with large executor counts

585

override def executorPods: Map[Long, ExecutorPodState] = {

586

executorIdToIndex.map { case (id, index) =>

587

id -> podStates(index)

588

}

589

}

590

}

591

```

592

593

The pod management system provides a robust, scalable foundation for monitoring and managing executor pods in Kubernetes environments, with comprehensive state tracking, automatic recovery, and performance optimizations for large-scale deployments.