or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

application-submission.mdcluster-management.mdconfiguration.mdfeature-steps.mdindex.mdpod-management.mdutilities.md
tile.json

pod-management.mddocs/

Pod Management

The pod management system provides comprehensive monitoring and lifecycle management of executor pods in Kubernetes clusters, using a snapshot-based architecture for real-time state tracking and automated pod lifecycle operations.

Core Architecture

ExecutorPodsSnapshot { .api }

Immutable snapshot representing the current state of all executor pods in the cluster:

case class ExecutorPodsSnapshot(
  executorPods: Map[Long, ExecutorPodState]
) {
  def withUpdate(updatedPod: Pod): ExecutorPodsSnapshot
  def withoutExecutor(executorId: Long): ExecutorPodsSnapshot
}

Key Features:

  • Immutable Design: Snapshots are immutable, ensuring thread-safe access
  • Efficient Updates: New snapshots are created with minimal object creation
  • State Consistency: Provides consistent view of cluster state at a point in time

Usage Patterns:

// Get current cluster state
val snapshot = snapshotsStore.currentSnapshot

// Query running executors
val runningExecutors = snapshot.executorPods.values.collect {
  case PodRunning(pod) => pod.getMetadata.getName
}

// Check executor status
snapshot.executorPods.get(executorId) match {
  case Some(PodRunning(pod)) => println(s"Executor $executorId is running")
  case Some(PodFailed(pod)) => println(s"Executor $executorId has failed")
  case None => println(s"Executor $executorId not found")
}

Pod State Management

ExecutorPodState Hierarchy { .api }

Sealed trait hierarchy representing all possible executor pod states:

sealed trait ExecutorPodState {
  def pod: Pod
}

// Active states
case class PodPending(pod: Pod) extends ExecutorPodState
case class PodRunning(pod: Pod) extends ExecutorPodState  
case class PodUnknown(pod: Pod) extends ExecutorPodState

// Final states (terminal)
sealed trait FinalPodState extends ExecutorPodState

case class PodSucceeded(pod: Pod) extends FinalPodState
case class PodFailed(pod: Pod) extends FinalPodState
case class PodDeleted(pod: Pod) extends FinalPodState

State Transitions:

// Pod lifecycle state machine
PodPending → PodRunning → PodSucceeded/PodFailed
PodPending → PodFailed
Any State → PodDeleted (explicit deletion)
Any State → PodUnknown (temporary loss of connectivity)

State Classification:

  • Active States: Pod is still being managed and may transition to other states
  • Final States: Terminal states indicating pod lifecycle completion
  • Unknown State: Temporary state when pod status cannot be determined

Pod State Utilities

object ExecutorPodState {
  
  def fromPod(pod: Pod): ExecutorPodState = {
    Option(pod.getStatus).map(_.getPhase) match {
      case Some("Pending") => PodPending(pod)
      case Some("Running") => PodRunning(pod)
      case Some("Succeeded") => PodSucceeded(pod)
      case Some("Failed") => PodFailed(pod)
      case _ => PodUnknown(pod)
    }
  }
  
  def isActive(state: ExecutorPodState): Boolean = state match {
    case _: FinalPodState => false
    case _ => true
  }
  
  def shouldRetry(state: ExecutorPodState): Boolean = state match {
    case PodFailed(pod) => 
      val restartPolicy = pod.getSpec.getRestartPolicy
      restartPolicy != "Never"
    case _ => false
  }
}

Snapshot Management

ExecutorPodsSnapshotsStore { .api }

Interface for storing and retrieving executor pod snapshots:

trait ExecutorPodsSnapshotsStore {
  def addSubscriber(processBatchIntervalMillis: Long)
    (onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit): Unit
  
  def notifySubscribers(): Unit
  def currentSnapshot: ExecutorPodsSnapshot
  def stop(): Unit
}

ExecutorPodsSnapshotsStoreImpl { .api }

Concrete implementation providing thread-safe snapshot management:

class ExecutorPodsSnapshotsStoreImpl(
  subscribersExecutor: ScheduledExecutorService
) extends ExecutorPodsSnapshotsStore {
  
  private val currentSnapshotState = new AtomicReference(ExecutorPodsSnapshot())
  private val subscribers = new ConcurrentLinkedQueue[SnapshotSubscriber]()
  
  def replaceSnapshot(newSnapshot: ExecutorPodsSnapshot): Unit
  def updatePod(updatedPod: Pod): Unit  
  def removePod(deletedPod: Pod): Unit
}

Key Operations:

// Update snapshot with new pod state
snapshotsStore.updatePod(updatedPod)

// Remove terminated pod from tracking
snapshotsStore.removePod(terminatedPod)

// Replace entire snapshot (bulk updates)
val newSnapshot = ExecutorPodsSnapshot(newExecutorMap)
snapshotsStore.replaceSnapshot(newSnapshot)

// Subscribe to snapshot changes
snapshotsStore.addSubscriber(1000) { snapshots =>
  snapshots.foreach(processSnapshotUpdate)
}

Snapshot Sources

ExecutorPodsWatchSnapshotSource { .api }

Provides real-time snapshot updates via Kubernetes Watch API:

class ExecutorPodsWatchSnapshotSource(
  snapshotsStore: ExecutorPodsSnapshotsStore,
  kubernetesClient: KubernetesClient,
  labels: Map[String, String]
) {
  
  def start(applicationId: String): Unit
  def stop(): Unit
}

Watch API Integration:

// Watch for pod changes in real-time
val watch = kubernetesClient.pods()
  .inNamespace(namespace)
  .withLabels(selectorLabels)
  .watch(new Watcher[Pod] {
    override def eventReceived(action: Action, pod: Pod): Unit = {
      action match {
        case ADDED | MODIFIED => snapshotsStore.updatePod(pod)
        case DELETED => snapshotsStore.removePod(pod)
        case ERROR => logWarning("Watch error received")
      }
    }
    
    override def onClose(cause: WatcherException): Unit = {
      if (cause != null) {
        logError("Watch connection closed", cause)
        scheduleReconnect()
      }
    }
  })

Benefits:

  • Real-Time Updates: Immediate notification of pod state changes
  • Low Latency: Minimal delay between Kubernetes events and application response
  • Efficient: No polling overhead, events pushed from Kubernetes API server

ExecutorPodsPollingSnapshotSource { .api }

Alternative snapshot source using periodic API server polling:

class ExecutorPodsPollingSnapshotSource(
  snapshotsStore: ExecutorPodsSnapshotsStore,
  kubernetesClient: KubernetesClient,
  labels: Map[String, String],
  pollingInterval: Long
) {
  
  def start(applicationId: String): Unit
  def stop(): Unit
}

Polling Implementation:

// Periodic snapshot refresh
val pollingTask = scheduledExecutor.scheduleAtFixedRate(() => {
  try {
    val pods = kubernetesClient.pods()
      .inNamespace(namespace)
      .withLabels(selectorLabels)
      .list()
      .getItems
    
    val executorPods = pods.asScala
      .map(pod => extractExecutorId(pod) -> ExecutorPodState.fromPod(pod))
      .toMap
    
    val newSnapshot = ExecutorPodsSnapshot(executorPods)
    snapshotsStore.replaceSnapshot(newSnapshot)
    
  } catch {
    case e: Exception => 
      logWarning("Failed to poll executor pods", e)
  }
}, 0, pollingInterval, TimeUnit.MILLISECONDS)

Use Cases:

  • Network Reliability: More resilient to temporary network issues
  • Firewall Restrictions: Works when watch API is blocked
  • Debugging: Predictable polling intervals aid in troubleshooting

Lifecycle Management

ExecutorPodsLifecycleManager { .api }

Manages complete lifecycle of executor pods from creation to cleanup:

class ExecutorPodsLifecycleManager(
  conf: SparkConf,
  kubernetesClient: KubernetesClient,
  snapshotsStore: ExecutorPodsSnapshotsStore
) {
  
  def start(applicationId: String): Unit
  def stop(): Unit
  def onFinalNonDeletedState(podState: FinalPodState): Unit
}

Lifecycle Operations:

Pod Creation and Monitoring

// Subscribe to snapshot updates for lifecycle management
snapshotsStore.addSubscriber(batchIntervalMillis) { snapshots =>
  snapshots.foreach { snapshot =>
    snapshot.executorPods.values.foreach {
      case finalState: FinalPodState if !finalState.isDeleted =>
        onFinalNonDeletedState(finalState)
      case _ => // Continue monitoring active pods
    }
  }
}

Failure Handling

def onFinalNonDeletedState(podState: FinalPodState): Unit = {
  podState match {
    case PodFailed(pod) =>
      val executorId = getExecutorId(pod)
      logWarning(s"Executor $executorId failed")
      
      // Notify scheduler of executor loss
      schedulerBackend.removeExecutor(executorId, 
        SlaveLost(s"Pod ${pod.getMetadata.getName} failed"))
      
      // Clean up failed pod if configured
      if (conf.get(DELETE_FAILED_EXECUTOR_PODS)) {
        deleteExecutorPod(pod)
      }
      
    case PodSucceeded(pod) =>
      val executorId = getExecutorId(pod)
      logInfo(s"Executor $executorId completed successfully")
      
      // Clean up completed pod
      if (conf.get(DELETE_SUCCESSFUL_EXECUTOR_PODS)) {
        deleteExecutorPod(pod)
      }
  }
}

Resource Cleanup

private def deleteExecutorPod(pod: Pod): Unit = {
  val podName = pod.getMetadata.getName
  try {
    kubernetesClient.pods()
      .inNamespace(pod.getMetadata.getNamespace)
      .withName(podName)
      .delete()
    
    logInfo(s"Deleted executor pod: $podName")
  } catch {
    case e: Exception =>
      logWarning(s"Failed to delete executor pod $podName", e)
  }
}

ExecutorPodsAllocator { .api }

Handles allocation of new executor pods based on scheduler requests:

class ExecutorPodsAllocator(
  conf: SparkConf,
  secMgr: SecurityManager,
  executorBuilder: KubernetesExecutorBuilder,
  kubernetesClient: KubernetesClient,
  snapshotsStore: ExecutorPodsSnapshotsStore,
  clock: Clock
) {
  
  def setTotalExpectedExecutors(newTotal: Int): Unit
  def start(applicationId: String): Unit  
  def stop(): Unit
}

Allocation Logic:

def setTotalExpectedExecutors(newTotal: Int): Unit = {
  val currentSnapshot = snapshotsStore.currentSnapshot
  val currentTotal = currentSnapshot.executorPods.size
  
  if (newTotal > currentTotal) {
    val podsToCreate = newTotal - currentTotal
    logInfo(s"Requesting $podsToCreate new executor pods")
    
    (1 to podsToCreate).foreach { _ =>
      val newExecutorId = generateExecutorId()
      val executorConf = createExecutorConf(newExecutorId)
      val executorPod = executorBuilder.buildFromFeatures(executorConf, secMgr, kubernetesClient)
      
      createExecutorPod(executorPod, newExecutorId)
    }
  }
}

private def createExecutorPod(pod: SparkPod, executorId: String): Unit = {
  try {
    val createdPod = kubernetesClient.pods()
      .inNamespace(namespace)
      .create(pod.pod)
    
    logInfo(s"Created executor pod ${createdPod.getMetadata.getName} for executor $executorId")
  } catch {
    case e: Exception =>
      logError(s"Failed to create executor pod for executor $executorId", e)
  }
}

Advanced Pod Management

Pod Affinity and Anti-Affinity

// Configure pod anti-affinity for executor distribution
val podAntiAffinity = new PodAntiAffinityBuilder()
  .addNewPreferredDuringSchedulingIgnoredDuringExecution()
    .withWeight(100)
    .withNewPodAffinityTerm()
      .withNewLabelSelector()
        .addToMatchLabels(SPARK_APP_ID_LABEL, applicationId)
        .addToMatchLabels(SPARK_ROLE_LABEL, EXECUTOR_ROLE)
      .endLabelSelector()
      .withTopologyKey("kubernetes.io/hostname")
    .endPodAffinityTerm()
  .endPreferredDuringSchedulingIgnoredDuringExecution()
  .build()

Resource Monitoring

// Monitor resource usage across executor pods
def collectExecutorMetrics(snapshot: ExecutorPodsSnapshot): ExecutorMetrics = {
  val runningPods = snapshot.executorPods.values.collect {
    case PodRunning(pod) => pod
  }
  
  ExecutorMetrics(
    totalPods = runningPods.size,
    totalCpuRequest = runningPods.map(getCpuRequest).sum,
    totalMemoryRequest = runningPods.map(getMemoryRequest).sum,
    nodeDistribution = runningPods.groupBy(getNodeName).mapValues(_.size)
  )
}

Pod Template Integration

// Merge pod template with runtime configuration  
def applyPodTemplate(
  templatePod: Pod, 
  executorConf: KubernetesExecutorConf
): SparkPod = {
  
  val mergedPod = new PodBuilder(templatePod)
    // Override container image from configuration
    .editFirstContainer()
      .withImage(executorConf.get(CONTAINER_IMAGE))
      .withResources(buildResourceRequirements(executorConf))
    .endContainer()
    // Add Spark-specific labels and annotations
    .editMetadata()
      .addToLabels(SPARK_APP_ID_LABEL, executorConf.appId)
      .addToLabels(SPARK_ROLE_LABEL, EXECUTOR_ROLE)
      .addToAnnotations(executorConf.annotations.asJava)
    .endMetadata()
    .build()
    
  SparkPod(mergedPod, extractMainContainer(mergedPod))
}

Error Handling and Recovery

Pod Failure Detection

def diagnosePodFailure(failedPod: Pod): FailureDiagnosis = {
  val podStatus = failedPod.getStatus
  val containerStatuses = podStatus.getContainerStatuses.asScala
  
  containerStatuses.find(_.getName == EXECUTOR_CONTAINER_NAME) match {
    case Some(containerStatus) =>
      Option(containerStatus.getState.getTerminated) match {
        case Some(terminated) =>
          FailureDiagnosis(
            reason = terminated.getReason,
            exitCode = terminated.getExitCode,
            message = terminated.getMessage,
            failureType = classifyFailureType(terminated)
          )
        case None =>
          FailureDiagnosis.unknown("Container not terminated")
      }
    case None =>
      FailureDiagnosis.unknown("Executor container not found")
  }
}

Automatic Recovery Strategies

def handleExecutorFailure(
  executorId: String, 
  failureDiagnosis: FailureDiagnosis
): RecoveryAction = {
  
  failureDiagnosis.failureType match {
    case OutOfMemory =>
      RecoveryAction.IncreaseMemory(executorId)
      
    case ImagePullError =>
      RecoveryAction.RetryWithBackoff(executorId, maxRetries = 3)
      
    case NodeFailure =>
      RecoveryAction.RescheduleOnDifferentNode(executorId)
      
    case ApplicationError =>
      RecoveryAction.NoRetry(executorId)
      
    case _ =>
      RecoveryAction.StandardRetry(executorId)
  }
}

Health Monitoring

// Implement health checks for executor pods
def performHealthCheck(pod: Pod): HealthStatus = {
  val podName = pod.getMetadata.getName
  
  try {
    // Check if pod is responding to health endpoint
    val healthResponse = kubernetesClient.pods()
      .inNamespace(pod.getMetadata.getNamespace)
      .withName(podName)
      .portForward(8080)
      .connect()
    
    // Perform HTTP health check
    val healthCheck = performHttpHealthCheck(healthResponse.getLocalPort)
    
    healthCheck match {
      case Success => HealthStatus.Healthy
      case Failure(reason) => HealthStatus.Unhealthy(reason)
    }
    
  } catch {
    case e: Exception =>
      HealthStatus.Unknown(s"Health check failed: ${e.getMessage}")
  }
}

Performance Optimization

Batch Processing

// Process snapshot updates in batches for efficiency
class BatchingSnapshotProcessor(
  batchSize: Int,
  batchTimeout: Long
) {
  
  private val pendingUpdates = new ConcurrentLinkedQueue[PodUpdate]()
  
  def processBatch(): Unit = {
    val batch = mutable.ArrayBuffer[PodUpdate]()
    
    // Collect batch of updates
    while (batch.size < batchSize && pendingUpdates.nonEmpty) {
      Option(pendingUpdates.poll()).foreach(batch += _)
    }
    
    if (batch.nonEmpty) {
      // Process all updates in single snapshot operation
      val updatedSnapshot = batch.foldLeft(currentSnapshot) { (snapshot, update) =>
        applyUpdate(snapshot, update)
      }
      
      snapshotsStore.replaceSnapshot(updatedSnapshot)
    }
  }
}

Memory Efficiency

// Optimize memory usage for large clusters
class CompactExecutorPodsSnapshot(
  private val podStates: Array[ExecutorPodState],
  private val executorIdToIndex: Map[Long, Int]
) extends ExecutorPodsSnapshot {
  
  // Use arrays instead of maps for memory efficiency with large executor counts
  override def executorPods: Map[Long, ExecutorPodState] = {
    executorIdToIndex.map { case (id, index) =>
      id -> podStates(index)
    }
  }
}

The pod management system provides a robust, scalable foundation for monitoring and managing executor pods in Kubernetes environments, with comprehensive state tracking, automatic recovery, and performance optimizations for large-scale deployments.