or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

application-submission.mdcluster-management.mdconfiguration.mdfeature-steps.mdindex.mdpod-management.mdutilities.md
tile.json

cluster-management.mddocs/

Cluster Management

The cluster management layer provides the core Kubernetes integration for Apache Spark, implementing Spark's external cluster manager interface to enable native execution on Kubernetes clusters.

Core Components

KubernetesClusterManager { .api }

The main entry point for Kubernetes cluster management that integrates with Spark's cluster manager framework:

class KubernetesClusterManager extends ExternalClusterManager {
  def canCreate(masterURL: String): Boolean
  def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler
  def createSchedulerBackend(
    sc: SparkContext, 
    masterURL: String, 
    scheduler: TaskScheduler
  ): SchedulerBackend
  def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit
}

Key Responsibilities:

  • Validates Kubernetes master URLs (format: k8s://https://kubernetes-api:port)
  • Creates Kubernetes-specific task schedulers and scheduler backends
  • Manages the initialization lifecycle of cluster components
  • Integrates with Spark's cluster manager registry

Usage Example:

// Automatic registration when using k8s:// master URL
val spark = SparkSession.builder()
  .appName("MyKubernetesApp")
  .master("k8s://https://my-cluster.example.com:443")
  .config("spark.kubernetes.container.image", "my-spark:latest")
  .getOrCreate()

KubernetesClusterSchedulerBackend { .api }

Kubernetes-specific implementation of Spark's scheduler backend that manages executor lifecycle:

class KubernetesClusterSchedulerBackend(
  scheduler: TaskSchedulerImpl,
  sc: SparkContext,
  kubernetesClient: KubernetesClient,
  executorService: ScheduledExecutorService,
  snapshotsStore: ExecutorPodsSnapshotsStore,
  podAllocator: ExecutorPodsAllocator,
  lifecycleEventHandler: ExecutorPodsLifecycleManager,
  watchEvents: ExecutorPodsWatchSnapshotSource,
  pollEvents: ExecutorPodsPollingSnapshotSource
) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)

Core Features:

  • Dynamic Resource Allocation: Supports adding/removing executors based on workload
  • Pod Lifecycle Management: Handles creation, monitoring, and cleanup of executor pods
  • State Synchronization: Maintains consistency between Spark scheduler state and Kubernetes pod state
  • Fault Tolerance: Automatically handles pod failures and rescheduling

Key Methods:

override def start(): Unit
override def stop(): Unit
override def doRequestTotalExecutors(requestedTotal: Int, localityAwareTasks: Int, hostToLocalTaskCount: Map[String, Int], nodeBlacklist: Set[String]): Future[Boolean]
override def doKillExecutors(executorIds: Seq[String]): Future[Boolean]

Executor Management Components

ExecutorPodsAllocator { .api }

Manages the allocation and creation of new executor pods:

class ExecutorPodsAllocator(
  conf: SparkConf,
  secMgr: SecurityManager,
  executorBuilder: KubernetesExecutorBuilder,
  kubernetesClient: KubernetesClient,
  snapshotsStore: ExecutorPodsSnapshotsStore,
  clock: Clock
)

Responsibilities:

  • Creates new executor pods when requested by the scheduler
  • Applies resource configurations (CPU, memory, storage)
  • Handles pod template application and customization
  • Manages allocation timeouts and retries

Key Operations:

def setTotalExpectedExecutors(newTotal: Int): Unit
def start(applicationId: String): Unit
def stop(): Unit

ExecutorPodsLifecycleManager { .api }

Manages the complete lifecycle of executor pods from creation to termination:

class ExecutorPodsLifecycleManager(
  conf: SparkConf,
  kubernetesClient: KubernetesClient,
  snapshotsStore: ExecutorPodsSnapshotsStore
)

Lifecycle Operations:

  • Pod Creation: Coordinates with allocator for new executor creation
  • Health Monitoring: Tracks pod health and readiness
  • Failure Handling: Detects and responds to pod failures
  • Resource Cleanup: Ensures proper cleanup of terminated pods

State Management:

def start(applicationId: String): Unit
def stop(): Unit
def onFinalNonDeletedState(podState: FinalPodState): Unit

Cluster Lifecycle Management

Initialization Sequence

The cluster manager follows a specific initialization sequence:

// 1. Cluster Manager Creation
val clusterManager = new KubernetesClusterManager()

// 2. Task Scheduler Creation
val taskScheduler = clusterManager.createTaskScheduler(sparkContext, masterURL)

// 3. Scheduler Backend Creation
val schedulerBackend = clusterManager.createSchedulerBackend(
  sparkContext, masterURL, taskScheduler
)

// 4. Component Initialization
clusterManager.initialize(taskScheduler, schedulerBackend)

Resource Allocation Flow

Dynamic resource allocation follows this pattern:

// 1. Scheduler requests additional executors
scheduler.requestTotalExecutors(targetNum, localityAwareTasks, hostToLocalTaskCount)

// 2. Backend processes the request
schedulerBackend.doRequestTotalExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount, nodeBlacklist)

// 3. Allocator creates new pods
executorPodsAllocator.setTotalExpectedExecutors(newTotal)

// 4. Lifecycle manager monitors pod states
lifecycleManager.onFinalNonDeletedState(podState)

Configuration Integration

Kubernetes-Specific Settings

The cluster manager integrates with Spark's configuration system:

import org.apache.spark.deploy.k8s.Config._

val conf = new SparkConf()
  // Cluster configuration
  .set(KUBERNETES_NAMESPACE, "spark-cluster")
  .set(KUBERNETES_SERVICE_ACCOUNT_NAME, "spark-service-account")
  
  // Resource limits
  .set(KUBERNETES_EXECUTOR_LIMIT_CORES, "2")
  .set(KUBERNETES_DRIVER_LIMIT_CORES, "1")
  
  // Dynamic allocation
  .set(DYN_ALLOCATION_ENABLED, "true")
  .set(DYN_ALLOCATION_MAX_EXECUTORS, "10")
  .set(DYN_ALLOCATION_MIN_EXECUTORS, "1")

Master URL Format

Kubernetes master URLs follow a specific format:

// Standard format
k8s://https://kubernetes-api-server:port

// Examples
k8s://https://my-cluster.example.com:443
k8s://https://kubernetes.default.svc.cluster.local:443
k8s://https://10.0.0.100:6443

Error Handling and Fault Tolerance

Pod Failure Recovery

The cluster manager provides robust failure recovery:

// Automatic pod restart on failure
class PodFailureHandler {
  def handlePodFailure(failedPod: Pod, reason: String): Unit = {
    logWarning(s"Executor pod ${failedPod.getMetadata.getName} failed: $reason")
    
    // Remove failed executor from scheduler
    scheduler.executorLost(executorId, SlaveLost(reason))
    
    // Allocator will create replacement if needed
    if (shouldReplace(failedPod)) {
      allocator.requestNewExecutor()
    }
  }
}

Network Resilience

Kubernetes API connectivity is handled robustly:

// Retry configuration for API operations
val retryConfig = RetryConfig.builder()
  .maxAttempts(3)
  .backoff(Duration.ofSeconds(1), Duration.ofSeconds(10))
  .build()

// Client with retry capability
val kubernetesClient = clientFactory.createKubernetesClient(
  ClientType.Driver,
  retryConfig
)

Monitoring and Observability

Metrics Integration

The cluster manager exposes metrics for monitoring:

// Executor metrics
sparkContext.statusTracker.getExecutorInfos.foreach { executor =>
  println(s"Executor ${executor.executorId}: ${executor.totalCores} cores, ${executor.maxMemory} memory")
}

// Pod state metrics
val snapshot = snapshotsStore.currentSnapshot
val podCounts = snapshot.executorPods.values.groupBy(_.getClass.getSimpleName)
podCounts.foreach { case (state, pods) =>
  println(s"$state: ${pods.size} pods")
}

Logging Configuration

Comprehensive logging for troubleshooting:

// Enable Kubernetes-specific logging
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.kubernetes.executor.deleteOnTermination", "false") // For debugging

// Log levels
spark.sparkContext.setLogLevel("INFO") // General logs
// Set logger for Kubernetes components
Logger.getLogger("org.apache.spark.scheduler.cluster.k8s").setLevel(Level.DEBUG)

Advanced Features

Custom Pod Templates

Support for custom pod templates:

// Using pod template file
spark.conf.set("spark.kubernetes.driver.podTemplateFile", "/path/to/driver-template.yaml")
spark.conf.set("spark.kubernetes.executor.podTemplateFile", "/path/to/executor-template.yaml")

// Template applied during pod creation
val templatePod = KubernetesUtils.loadPodFromTemplate(
  kubernetesClient, 
  templateFile, 
  containerName
)

Service Account Configuration

RBAC integration for secure cluster access:

// Service account configuration
spark.conf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark-driver")
spark.conf.set("spark.kubernetes.authenticate.executor.serviceAccountName", "spark-executor")

// RBAC permissions required:
// - pods: create, delete, get, list, watch
// - services: create, delete, get
// - configmaps: create, delete, get, list

Node Selection and Affinity

Advanced scheduling capabilities:

// Node selector
spark.conf.set("spark.kubernetes.node.selector.node-type", "compute")

// Pod affinity (via annotations)
spark.conf.set("spark.kubernetes.executor.annotation.scheduler.alpha.kubernetes.io/preferred-anti-affinity", 
  "spark-executor")

// Tolerations for tainted nodes
spark.conf.set("spark.kubernetes.executor.annotation.scheduler.alpha.kubernetes.io/tolerations",
  "[{\"key\":\"dedicated\",\"value\":\"spark\",\"effect\":\"NoSchedule\"}]")

Best Practices

Resource Management

// Proper resource configuration
val conf = new SparkConf()
  .set(KUBERNETES_DRIVER_LIMIT_CORES, "1")
  .set(KUBERNETES_DRIVER_REQUEST_CORES, "0.5")
  .set(KUBERNETES_EXECUTOR_LIMIT_CORES, "2")  
  .set(KUBERNETES_EXECUTOR_REQUEST_CORES, "1")
  .set("spark.kubernetes.driver.memory", "2g")
  .set("spark.kubernetes.executor.memory", "4g")

High Availability

// Multiple API server endpoints
spark.conf.set("spark.kubernetes.apiserver.host", "https://k8s-api-1:6443,https://k8s-api-2:6443")

// Enable driver restart policy
spark.conf.set("spark.kubernetes.driver.restartPolicy", "OnFailure")

Security Configuration

// TLS configuration
spark.conf.set("spark.kubernetes.apiserver.caCertFile", "/path/to/ca.crt")
spark.conf.set("spark.kubernetes.apiserver.clientCertFile", "/path/to/client.crt")
spark.conf.set("spark.kubernetes.apiserver.clientKeyFile", "/path/to/client.key")

// OAuth token authentication
spark.conf.set("spark.kubernetes.apiserver.oauthToken", "your-oauth-token")

The cluster management layer provides a robust foundation for running Spark applications on Kubernetes, with comprehensive integration of Kubernetes-native features and enterprise-grade reliability.