Apache Spark Kubernetes resource manager that enables running Spark applications on Kubernetes clusters
npx @tessl/cli install tessl/maven-org-apache-spark--spark-kubernetes-2-12@3.0.0A comprehensive Kubernetes resource manager for Apache Spark that enables running Spark applications natively on Kubernetes clusters with full integration of Kubernetes features and APIs.
Group ID: org.apache.spark
Artifact ID: spark-kubernetes_2.12
Version: 3.0.1
Maven Dependency:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-kubernetes_2.12</artifactId>
<version>3.0.1</version>
</dependency>SBT Dependency:
libraryDependencies += "org.apache.spark" %% "spark-kubernetes" % "3.0.1"import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.deploy.k8s.features._
import org.apache.spark.scheduler.cluster.k8s._The Apache Spark Kubernetes resource manager provides native integration between Apache Spark and Kubernetes, enabling seamless deployment and execution of Spark applications in containerized environments. It implements a complete cluster manager that schedules Spark driver and executor pods, manages dynamic resource allocation, and provides fault tolerance through Kubernetes-native capabilities.
The Kubernetes resource manager follows a layered architecture:
Primary entry point for Kubernetes cluster operations:
class KubernetesClusterManager extends ExternalClusterManager {
def canCreate(masterURL: String): Boolean
def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler
def createSchedulerBackend(
sc: SparkContext,
masterURL: String,
scheduler: TaskScheduler
): SchedulerBackend
def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit
}Usage: Automatically registered with Spark when using k8s:// master URLs. Handles cluster manager lifecycle and component creation.
Main entry point for application submission:
class KubernetesClientApplication extends SparkApplication {
def start(args: Array[String], conf: SparkConf): Unit
}Usage: Invoked by spark-submit when using Kubernetes cluster mode. Manages the complete application submission workflow.
Base configuration class for Kubernetes operations:
abstract class KubernetesConf(
val sparkConf: SparkConf,
val appId: String,
val resourceNamePrefix: String,
val appName: String,
val namespace: String,
val labels: Map[String, String],
val environment: Map[String, String],
val annotations: Map[String, String],
val secretEnvNamesToKeyRefs: Map[String, String],
val secretNamesToMountPaths: Map[String, String],
val volumes: Seq[KubernetesVolumeSpec],
val imagePullPolicy: String,
val nodeSelector: Map[String, String]
)
class KubernetesDriverConf extends KubernetesConf {
def serviceAnnotations: Map[String, String]
}
class KubernetesExecutorConf extends KubernetesConf// Using spark-submit with Kubernetes cluster mode
spark-submit \
--master k8s://https://kubernetes.example.com:443 \
--deploy-mode cluster \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.container.image=spark:latest \
--conf spark.kubernetes.namespace=spark \
local:///opt/spark/examples/jars/spark-examples.jarimport org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.deploy.k8s.Config._
val conf = new SparkConf()
.setAppName("MySparkApp")
.setMaster("k8s://https://kubernetes.example.com:443")
.set(CONTAINER_IMAGE, "my-spark:latest")
.set(KUBERNETES_NAMESPACE, "spark-apps")
.set(KUBERNETES_DRIVER_LIMIT_CORES, "2")
.set(KUBERNETES_EXECUTOR_INSTANCES, "4")
val sc = new SparkContext(conf)This library provides comprehensive Kubernetes integration through several key capability areas. Each area has its own detailed documentation:
The Kubernetes cluster manager automatically registers with Spark's cluster manager registry:
// Automatic registration for k8s:// URLs
val spark = SparkSession.builder()
.appName("MyApp")
.master("k8s://https://my-cluster:443")
.getOrCreate()The feature step system allows modular configuration of pods:
// Feature steps are automatically applied based on configuration
val steps: Seq[KubernetesFeatureConfigStep] = Seq(
new BasicDriverFeatureStep(conf),
new DriverServiceFeatureStep(conf),
new MountVolumesFeatureStep(conf)
)Executor pods are monitored through a snapshot-based system:
// Automatic snapshot updates via Kubernetes API
val snapshot: ExecutorPodsSnapshot = snapshotStore.currentSnapshot
val runningExecutors = snapshot.executorPods.values.collect {
case PodRunning(pod) => pod
}The library is designed for concurrent use in Spark's multi-threaded environment:
ExecutorPodsSnapshot and SparkPod are immutableComprehensive error handling and fault tolerance mechanisms:
For detailed implementation guidance, see the specific capability documentation linked above.