The pod management system provides comprehensive monitoring and lifecycle management of executor pods in Kubernetes clusters, using a snapshot-based architecture for real-time state tracking and automated pod lifecycle operations.
Immutable snapshot representing the current state of all executor pods in the cluster:
case class ExecutorPodsSnapshot(
executorPods: Map[Long, ExecutorPodState]
) {
def withUpdate(updatedPod: Pod): ExecutorPodsSnapshot
def withoutExecutor(executorId: Long): ExecutorPodsSnapshot
}Key Features:
Usage Patterns:
// Get current cluster state
val snapshot = snapshotsStore.currentSnapshot
// Query running executors
val runningExecutors = snapshot.executorPods.values.collect {
case PodRunning(pod) => pod.getMetadata.getName
}
// Check executor status
snapshot.executorPods.get(executorId) match {
case Some(PodRunning(pod)) => println(s"Executor $executorId is running")
case Some(PodFailed(pod)) => println(s"Executor $executorId has failed")
case None => println(s"Executor $executorId not found")
}Sealed trait hierarchy representing all possible executor pod states:
sealed trait ExecutorPodState {
def pod: Pod
}
// Active states
case class PodPending(pod: Pod) extends ExecutorPodState
case class PodRunning(pod: Pod) extends ExecutorPodState
case class PodUnknown(pod: Pod) extends ExecutorPodState
// Final states (terminal)
sealed trait FinalPodState extends ExecutorPodState
case class PodSucceeded(pod: Pod) extends FinalPodState
case class PodFailed(pod: Pod) extends FinalPodState
case class PodDeleted(pod: Pod) extends FinalPodStateState Transitions:
// Pod lifecycle state machine
PodPending → PodRunning → PodSucceeded/PodFailed
PodPending → PodFailed
Any State → PodDeleted (explicit deletion)
Any State → PodUnknown (temporary loss of connectivity)State Classification:
object ExecutorPodState {
def fromPod(pod: Pod): ExecutorPodState = {
Option(pod.getStatus).map(_.getPhase) match {
case Some("Pending") => PodPending(pod)
case Some("Running") => PodRunning(pod)
case Some("Succeeded") => PodSucceeded(pod)
case Some("Failed") => PodFailed(pod)
case _ => PodUnknown(pod)
}
}
def isActive(state: ExecutorPodState): Boolean = state match {
case _: FinalPodState => false
case _ => true
}
def shouldRetry(state: ExecutorPodState): Boolean = state match {
case PodFailed(pod) =>
val restartPolicy = pod.getSpec.getRestartPolicy
restartPolicy != "Never"
case _ => false
}
}Interface for storing and retrieving executor pod snapshots:
trait ExecutorPodsSnapshotsStore {
def addSubscriber(processBatchIntervalMillis: Long)
(onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit): Unit
def notifySubscribers(): Unit
def currentSnapshot: ExecutorPodsSnapshot
def stop(): Unit
}Concrete implementation providing thread-safe snapshot management:
class ExecutorPodsSnapshotsStoreImpl(
subscribersExecutor: ScheduledExecutorService
) extends ExecutorPodsSnapshotsStore {
private val currentSnapshotState = new AtomicReference(ExecutorPodsSnapshot())
private val subscribers = new ConcurrentLinkedQueue[SnapshotSubscriber]()
def replaceSnapshot(newSnapshot: ExecutorPodsSnapshot): Unit
def updatePod(updatedPod: Pod): Unit
def removePod(deletedPod: Pod): Unit
}Key Operations:
// Update snapshot with new pod state
snapshotsStore.updatePod(updatedPod)
// Remove terminated pod from tracking
snapshotsStore.removePod(terminatedPod)
// Replace entire snapshot (bulk updates)
val newSnapshot = ExecutorPodsSnapshot(newExecutorMap)
snapshotsStore.replaceSnapshot(newSnapshot)
// Subscribe to snapshot changes
snapshotsStore.addSubscriber(1000) { snapshots =>
snapshots.foreach(processSnapshotUpdate)
}Provides real-time snapshot updates via Kubernetes Watch API:
class ExecutorPodsWatchSnapshotSource(
snapshotsStore: ExecutorPodsSnapshotsStore,
kubernetesClient: KubernetesClient,
labels: Map[String, String]
) {
def start(applicationId: String): Unit
def stop(): Unit
}Watch API Integration:
// Watch for pod changes in real-time
val watch = kubernetesClient.pods()
.inNamespace(namespace)
.withLabels(selectorLabels)
.watch(new Watcher[Pod] {
override def eventReceived(action: Action, pod: Pod): Unit = {
action match {
case ADDED | MODIFIED => snapshotsStore.updatePod(pod)
case DELETED => snapshotsStore.removePod(pod)
case ERROR => logWarning("Watch error received")
}
}
override def onClose(cause: WatcherException): Unit = {
if (cause != null) {
logError("Watch connection closed", cause)
scheduleReconnect()
}
}
})Benefits:
Alternative snapshot source using periodic API server polling:
class ExecutorPodsPollingSnapshotSource(
snapshotsStore: ExecutorPodsSnapshotsStore,
kubernetesClient: KubernetesClient,
labels: Map[String, String],
pollingInterval: Long
) {
def start(applicationId: String): Unit
def stop(): Unit
}Polling Implementation:
// Periodic snapshot refresh
val pollingTask = scheduledExecutor.scheduleAtFixedRate(() => {
try {
val pods = kubernetesClient.pods()
.inNamespace(namespace)
.withLabels(selectorLabels)
.list()
.getItems
val executorPods = pods.asScala
.map(pod => extractExecutorId(pod) -> ExecutorPodState.fromPod(pod))
.toMap
val newSnapshot = ExecutorPodsSnapshot(executorPods)
snapshotsStore.replaceSnapshot(newSnapshot)
} catch {
case e: Exception =>
logWarning("Failed to poll executor pods", e)
}
}, 0, pollingInterval, TimeUnit.MILLISECONDS)Use Cases:
Manages complete lifecycle of executor pods from creation to cleanup:
class ExecutorPodsLifecycleManager(
conf: SparkConf,
kubernetesClient: KubernetesClient,
snapshotsStore: ExecutorPodsSnapshotsStore
) {
def start(applicationId: String): Unit
def stop(): Unit
def onFinalNonDeletedState(podState: FinalPodState): Unit
}Lifecycle Operations:
// Subscribe to snapshot updates for lifecycle management
snapshotsStore.addSubscriber(batchIntervalMillis) { snapshots =>
snapshots.foreach { snapshot =>
snapshot.executorPods.values.foreach {
case finalState: FinalPodState if !finalState.isDeleted =>
onFinalNonDeletedState(finalState)
case _ => // Continue monitoring active pods
}
}
}def onFinalNonDeletedState(podState: FinalPodState): Unit = {
podState match {
case PodFailed(pod) =>
val executorId = getExecutorId(pod)
logWarning(s"Executor $executorId failed")
// Notify scheduler of executor loss
schedulerBackend.removeExecutor(executorId,
SlaveLost(s"Pod ${pod.getMetadata.getName} failed"))
// Clean up failed pod if configured
if (conf.get(DELETE_FAILED_EXECUTOR_PODS)) {
deleteExecutorPod(pod)
}
case PodSucceeded(pod) =>
val executorId = getExecutorId(pod)
logInfo(s"Executor $executorId completed successfully")
// Clean up completed pod
if (conf.get(DELETE_SUCCESSFUL_EXECUTOR_PODS)) {
deleteExecutorPod(pod)
}
}
}private def deleteExecutorPod(pod: Pod): Unit = {
val podName = pod.getMetadata.getName
try {
kubernetesClient.pods()
.inNamespace(pod.getMetadata.getNamespace)
.withName(podName)
.delete()
logInfo(s"Deleted executor pod: $podName")
} catch {
case e: Exception =>
logWarning(s"Failed to delete executor pod $podName", e)
}
}Handles allocation of new executor pods based on scheduler requests:
class ExecutorPodsAllocator(
conf: SparkConf,
secMgr: SecurityManager,
executorBuilder: KubernetesExecutorBuilder,
kubernetesClient: KubernetesClient,
snapshotsStore: ExecutorPodsSnapshotsStore,
clock: Clock
) {
def setTotalExpectedExecutors(newTotal: Int): Unit
def start(applicationId: String): Unit
def stop(): Unit
}Allocation Logic:
def setTotalExpectedExecutors(newTotal: Int): Unit = {
val currentSnapshot = snapshotsStore.currentSnapshot
val currentTotal = currentSnapshot.executorPods.size
if (newTotal > currentTotal) {
val podsToCreate = newTotal - currentTotal
logInfo(s"Requesting $podsToCreate new executor pods")
(1 to podsToCreate).foreach { _ =>
val newExecutorId = generateExecutorId()
val executorConf = createExecutorConf(newExecutorId)
val executorPod = executorBuilder.buildFromFeatures(executorConf, secMgr, kubernetesClient)
createExecutorPod(executorPod, newExecutorId)
}
}
}
private def createExecutorPod(pod: SparkPod, executorId: String): Unit = {
try {
val createdPod = kubernetesClient.pods()
.inNamespace(namespace)
.create(pod.pod)
logInfo(s"Created executor pod ${createdPod.getMetadata.getName} for executor $executorId")
} catch {
case e: Exception =>
logError(s"Failed to create executor pod for executor $executorId", e)
}
}// Configure pod anti-affinity for executor distribution
val podAntiAffinity = new PodAntiAffinityBuilder()
.addNewPreferredDuringSchedulingIgnoredDuringExecution()
.withWeight(100)
.withNewPodAffinityTerm()
.withNewLabelSelector()
.addToMatchLabels(SPARK_APP_ID_LABEL, applicationId)
.addToMatchLabels(SPARK_ROLE_LABEL, EXECUTOR_ROLE)
.endLabelSelector()
.withTopologyKey("kubernetes.io/hostname")
.endPodAffinityTerm()
.endPreferredDuringSchedulingIgnoredDuringExecution()
.build()// Monitor resource usage across executor pods
def collectExecutorMetrics(snapshot: ExecutorPodsSnapshot): ExecutorMetrics = {
val runningPods = snapshot.executorPods.values.collect {
case PodRunning(pod) => pod
}
ExecutorMetrics(
totalPods = runningPods.size,
totalCpuRequest = runningPods.map(getCpuRequest).sum,
totalMemoryRequest = runningPods.map(getMemoryRequest).sum,
nodeDistribution = runningPods.groupBy(getNodeName).mapValues(_.size)
)
}// Merge pod template with runtime configuration
def applyPodTemplate(
templatePod: Pod,
executorConf: KubernetesExecutorConf
): SparkPod = {
val mergedPod = new PodBuilder(templatePod)
// Override container image from configuration
.editFirstContainer()
.withImage(executorConf.get(CONTAINER_IMAGE))
.withResources(buildResourceRequirements(executorConf))
.endContainer()
// Add Spark-specific labels and annotations
.editMetadata()
.addToLabels(SPARK_APP_ID_LABEL, executorConf.appId)
.addToLabels(SPARK_ROLE_LABEL, EXECUTOR_ROLE)
.addToAnnotations(executorConf.annotations.asJava)
.endMetadata()
.build()
SparkPod(mergedPod, extractMainContainer(mergedPod))
}def diagnosePodFailure(failedPod: Pod): FailureDiagnosis = {
val podStatus = failedPod.getStatus
val containerStatuses = podStatus.getContainerStatuses.asScala
containerStatuses.find(_.getName == EXECUTOR_CONTAINER_NAME) match {
case Some(containerStatus) =>
Option(containerStatus.getState.getTerminated) match {
case Some(terminated) =>
FailureDiagnosis(
reason = terminated.getReason,
exitCode = terminated.getExitCode,
message = terminated.getMessage,
failureType = classifyFailureType(terminated)
)
case None =>
FailureDiagnosis.unknown("Container not terminated")
}
case None =>
FailureDiagnosis.unknown("Executor container not found")
}
}def handleExecutorFailure(
executorId: String,
failureDiagnosis: FailureDiagnosis
): RecoveryAction = {
failureDiagnosis.failureType match {
case OutOfMemory =>
RecoveryAction.IncreaseMemory(executorId)
case ImagePullError =>
RecoveryAction.RetryWithBackoff(executorId, maxRetries = 3)
case NodeFailure =>
RecoveryAction.RescheduleOnDifferentNode(executorId)
case ApplicationError =>
RecoveryAction.NoRetry(executorId)
case _ =>
RecoveryAction.StandardRetry(executorId)
}
}// Implement health checks for executor pods
def performHealthCheck(pod: Pod): HealthStatus = {
val podName = pod.getMetadata.getName
try {
// Check if pod is responding to health endpoint
val healthResponse = kubernetesClient.pods()
.inNamespace(pod.getMetadata.getNamespace)
.withName(podName)
.portForward(8080)
.connect()
// Perform HTTP health check
val healthCheck = performHttpHealthCheck(healthResponse.getLocalPort)
healthCheck match {
case Success => HealthStatus.Healthy
case Failure(reason) => HealthStatus.Unhealthy(reason)
}
} catch {
case e: Exception =>
HealthStatus.Unknown(s"Health check failed: ${e.getMessage}")
}
}// Process snapshot updates in batches for efficiency
class BatchingSnapshotProcessor(
batchSize: Int,
batchTimeout: Long
) {
private val pendingUpdates = new ConcurrentLinkedQueue[PodUpdate]()
def processBatch(): Unit = {
val batch = mutable.ArrayBuffer[PodUpdate]()
// Collect batch of updates
while (batch.size < batchSize && pendingUpdates.nonEmpty) {
Option(pendingUpdates.poll()).foreach(batch += _)
}
if (batch.nonEmpty) {
// Process all updates in single snapshot operation
val updatedSnapshot = batch.foldLeft(currentSnapshot) { (snapshot, update) =>
applyUpdate(snapshot, update)
}
snapshotsStore.replaceSnapshot(updatedSnapshot)
}
}
}// Optimize memory usage for large clusters
class CompactExecutorPodsSnapshot(
private val podStates: Array[ExecutorPodState],
private val executorIdToIndex: Map[Long, Int]
) extends ExecutorPodsSnapshot {
// Use arrays instead of maps for memory efficiency with large executor counts
override def executorPods: Map[Long, ExecutorPodState] = {
executorIdToIndex.map { case (id, index) =>
id -> podStates(index)
}
}
}The pod management system provides a robust, scalable foundation for monitoring and managing executor pods in Kubernetes environments, with comprehensive state tracking, automatic recovery, and performance optimizations for large-scale deployments.