The cluster management layer provides the core Kubernetes integration for Apache Spark, implementing Spark's external cluster manager interface to enable native execution on Kubernetes clusters.
The main entry point for Kubernetes cluster management that integrates with Spark's cluster manager framework:
class KubernetesClusterManager extends ExternalClusterManager {
def canCreate(masterURL: String): Boolean
def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler
def createSchedulerBackend(
sc: SparkContext,
masterURL: String,
scheduler: TaskScheduler
): SchedulerBackend
def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit
}Key Responsibilities:
k8s://https://kubernetes-api:port)Usage Example:
// Automatic registration when using k8s:// master URL
val spark = SparkSession.builder()
.appName("MyKubernetesApp")
.master("k8s://https://my-cluster.example.com:443")
.config("spark.kubernetes.container.image", "my-spark:latest")
.getOrCreate()Kubernetes-specific implementation of Spark's scheduler backend that manages executor lifecycle:
class KubernetesClusterSchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext,
kubernetesClient: KubernetesClient,
executorService: ScheduledExecutorService,
snapshotsStore: ExecutorPodsSnapshotsStore,
podAllocator: ExecutorPodsAllocator,
lifecycleEventHandler: ExecutorPodsLifecycleManager,
watchEvents: ExecutorPodsWatchSnapshotSource,
pollEvents: ExecutorPodsPollingSnapshotSource
) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)Core Features:
Key Methods:
override def start(): Unit
override def stop(): Unit
override def doRequestTotalExecutors(requestedTotal: Int, localityAwareTasks: Int, hostToLocalTaskCount: Map[String, Int], nodeBlacklist: Set[String]): Future[Boolean]
override def doKillExecutors(executorIds: Seq[String]): Future[Boolean]Manages the allocation and creation of new executor pods:
class ExecutorPodsAllocator(
conf: SparkConf,
secMgr: SecurityManager,
executorBuilder: KubernetesExecutorBuilder,
kubernetesClient: KubernetesClient,
snapshotsStore: ExecutorPodsSnapshotsStore,
clock: Clock
)Responsibilities:
Key Operations:
def setTotalExpectedExecutors(newTotal: Int): Unit
def start(applicationId: String): Unit
def stop(): UnitManages the complete lifecycle of executor pods from creation to termination:
class ExecutorPodsLifecycleManager(
conf: SparkConf,
kubernetesClient: KubernetesClient,
snapshotsStore: ExecutorPodsSnapshotsStore
)Lifecycle Operations:
State Management:
def start(applicationId: String): Unit
def stop(): Unit
def onFinalNonDeletedState(podState: FinalPodState): UnitThe cluster manager follows a specific initialization sequence:
// 1. Cluster Manager Creation
val clusterManager = new KubernetesClusterManager()
// 2. Task Scheduler Creation
val taskScheduler = clusterManager.createTaskScheduler(sparkContext, masterURL)
// 3. Scheduler Backend Creation
val schedulerBackend = clusterManager.createSchedulerBackend(
sparkContext, masterURL, taskScheduler
)
// 4. Component Initialization
clusterManager.initialize(taskScheduler, schedulerBackend)Dynamic resource allocation follows this pattern:
// 1. Scheduler requests additional executors
scheduler.requestTotalExecutors(targetNum, localityAwareTasks, hostToLocalTaskCount)
// 2. Backend processes the request
schedulerBackend.doRequestTotalExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount, nodeBlacklist)
// 3. Allocator creates new pods
executorPodsAllocator.setTotalExpectedExecutors(newTotal)
// 4. Lifecycle manager monitors pod states
lifecycleManager.onFinalNonDeletedState(podState)The cluster manager integrates with Spark's configuration system:
import org.apache.spark.deploy.k8s.Config._
val conf = new SparkConf()
// Cluster configuration
.set(KUBERNETES_NAMESPACE, "spark-cluster")
.set(KUBERNETES_SERVICE_ACCOUNT_NAME, "spark-service-account")
// Resource limits
.set(KUBERNETES_EXECUTOR_LIMIT_CORES, "2")
.set(KUBERNETES_DRIVER_LIMIT_CORES, "1")
// Dynamic allocation
.set(DYN_ALLOCATION_ENABLED, "true")
.set(DYN_ALLOCATION_MAX_EXECUTORS, "10")
.set(DYN_ALLOCATION_MIN_EXECUTORS, "1")Kubernetes master URLs follow a specific format:
// Standard format
k8s://https://kubernetes-api-server:port
// Examples
k8s://https://my-cluster.example.com:443
k8s://https://kubernetes.default.svc.cluster.local:443
k8s://https://10.0.0.100:6443The cluster manager provides robust failure recovery:
// Automatic pod restart on failure
class PodFailureHandler {
def handlePodFailure(failedPod: Pod, reason: String): Unit = {
logWarning(s"Executor pod ${failedPod.getMetadata.getName} failed: $reason")
// Remove failed executor from scheduler
scheduler.executorLost(executorId, SlaveLost(reason))
// Allocator will create replacement if needed
if (shouldReplace(failedPod)) {
allocator.requestNewExecutor()
}
}
}Kubernetes API connectivity is handled robustly:
// Retry configuration for API operations
val retryConfig = RetryConfig.builder()
.maxAttempts(3)
.backoff(Duration.ofSeconds(1), Duration.ofSeconds(10))
.build()
// Client with retry capability
val kubernetesClient = clientFactory.createKubernetesClient(
ClientType.Driver,
retryConfig
)The cluster manager exposes metrics for monitoring:
// Executor metrics
sparkContext.statusTracker.getExecutorInfos.foreach { executor =>
println(s"Executor ${executor.executorId}: ${executor.totalCores} cores, ${executor.maxMemory} memory")
}
// Pod state metrics
val snapshot = snapshotsStore.currentSnapshot
val podCounts = snapshot.executorPods.values.groupBy(_.getClass.getSimpleName)
podCounts.foreach { case (state, pods) =>
println(s"$state: ${pods.size} pods")
}Comprehensive logging for troubleshooting:
// Enable Kubernetes-specific logging
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.kubernetes.executor.deleteOnTermination", "false") // For debugging
// Log levels
spark.sparkContext.setLogLevel("INFO") // General logs
// Set logger for Kubernetes components
Logger.getLogger("org.apache.spark.scheduler.cluster.k8s").setLevel(Level.DEBUG)Support for custom pod templates:
// Using pod template file
spark.conf.set("spark.kubernetes.driver.podTemplateFile", "/path/to/driver-template.yaml")
spark.conf.set("spark.kubernetes.executor.podTemplateFile", "/path/to/executor-template.yaml")
// Template applied during pod creation
val templatePod = KubernetesUtils.loadPodFromTemplate(
kubernetesClient,
templateFile,
containerName
)RBAC integration for secure cluster access:
// Service account configuration
spark.conf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark-driver")
spark.conf.set("spark.kubernetes.authenticate.executor.serviceAccountName", "spark-executor")
// RBAC permissions required:
// - pods: create, delete, get, list, watch
// - services: create, delete, get
// - configmaps: create, delete, get, listAdvanced scheduling capabilities:
// Node selector
spark.conf.set("spark.kubernetes.node.selector.node-type", "compute")
// Pod affinity (via annotations)
spark.conf.set("spark.kubernetes.executor.annotation.scheduler.alpha.kubernetes.io/preferred-anti-affinity",
"spark-executor")
// Tolerations for tainted nodes
spark.conf.set("spark.kubernetes.executor.annotation.scheduler.alpha.kubernetes.io/tolerations",
"[{\"key\":\"dedicated\",\"value\":\"spark\",\"effect\":\"NoSchedule\"}]")// Proper resource configuration
val conf = new SparkConf()
.set(KUBERNETES_DRIVER_LIMIT_CORES, "1")
.set(KUBERNETES_DRIVER_REQUEST_CORES, "0.5")
.set(KUBERNETES_EXECUTOR_LIMIT_CORES, "2")
.set(KUBERNETES_EXECUTOR_REQUEST_CORES, "1")
.set("spark.kubernetes.driver.memory", "2g")
.set("spark.kubernetes.executor.memory", "4g")// Multiple API server endpoints
spark.conf.set("spark.kubernetes.apiserver.host", "https://k8s-api-1:6443,https://k8s-api-2:6443")
// Enable driver restart policy
spark.conf.set("spark.kubernetes.driver.restartPolicy", "OnFailure")// TLS configuration
spark.conf.set("spark.kubernetes.apiserver.caCertFile", "/path/to/ca.crt")
spark.conf.set("spark.kubernetes.apiserver.clientCertFile", "/path/to/client.crt")
spark.conf.set("spark.kubernetes.apiserver.clientKeyFile", "/path/to/client.key")
// OAuth token authentication
spark.conf.set("spark.kubernetes.apiserver.oauthToken", "your-oauth-token")The cluster management layer provides a robust foundation for running Spark applications on Kubernetes, with comprehensive integration of Kubernetes-native features and enterprise-grade reliability.