The application submission layer provides complete integration with Spark's spark-submit tool, enabling seamless deployment of Spark applications to Kubernetes clusters using familiar Spark interfaces.
The main entry point for application submission that integrates with Spark's application framework:
class KubernetesClientApplication extends SparkApplication {
def start(args: Array[String], conf: SparkConf): Unit
}Integration Points:
spark-submit when using --master k8s:// and --deploy-mode clusterWorkflow Overview:
ClientArgumentsKubernetesDriverConf from Spark configurationKubernetesDriverBuilderClientLoggingPodStatusWatcherEncapsulates and validates arguments passed to the submission client:
case class ClientArguments(
mainAppResource: MainAppResource,
mainClass: String,
driverArgs: Array[String]
)Companion Object:
object ClientArguments {
def fromCommandLineArgs(args: Array[String]): ClientArguments
}Supported Resource Types:
// Java/Scala applications
JavaMainAppResource(primaryResource: Option[String])
// Python applications
PythonMainAppResource(primaryResource: String)
// R applications
RMainAppResource(primaryResource: String)Usage Example:
// Parsing spark-submit arguments
val args = Array(
"--class", "org.apache.spark.examples.SparkPi",
"--conf", "spark.kubernetes.container.image=spark:latest",
"local:///opt/spark/examples/jars/spark-examples.jar",
"1000"
)
val clientArgs = ClientArguments.fromCommandLineArgs(args)
// Results in:
// - mainClass: "org.apache.spark.examples.SparkPi"
// - mainAppResource: JavaMainAppResource(Some("local:///opt/spark/examples/jars/spark-examples.jar"))
// - driverArgs: Array("1000")Manages the actual submission process and lifecycle of the Spark application:
class Client(
conf: KubernetesDriverConf,
builder: KubernetesDriverBuilder,
kubernetesClient: KubernetesClient,
watcher: LoggingPodStatusWatcher
)Key Methods:
def run(): UnitSubmission Process:
KubernetesDriverBuilder to create complete pod specificationSealed trait representing different types of application resources:
sealed trait MainAppResource
// Base for non-JVM languages
sealed trait NonJVMResource extends MainAppResource
// JVM-based applications (Java/Scala)
case class JavaMainAppResource(primaryResource: Option[String]) extends MainAppResource
// Python applications
case class PythonMainAppResource(primaryResource: String) extends NonJVMResource
// R applications
case class RMainAppResource(primaryResource: String) extends NonJVMResourceResource Resolution:
local:// URLs reference files within the container imagehttp://, https://, s3a:// URLs are downloaded at runtimeConstructs complete driver pod specifications using the feature step pattern:
class KubernetesDriverBuilder {
def buildFromFeatures(
conf: KubernetesDriverConf,
client: KubernetesClient
): KubernetesDriverSpec
}Feature Step Integration:
// Automatic feature step selection based on configuration
val featureSteps: Seq[KubernetesFeatureConfigStep] = Seq(
new BasicDriverFeatureStep(conf),
new DriverServiceFeatureStep(conf),
new DriverCommandFeatureStep(conf),
new DriverKubernetesCredentialsFeatureStep(conf),
new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf),
new MountVolumesFeatureStep(conf),
new LocalDirsFeatureStep(conf)
)
// Apply all feature steps to build complete specification
val finalSpec = featureSteps.foldLeft(initialSpec) { (spec, step) =>
step.configurePod(spec.pod) // Apply transformations
}Complete specification for driver pod and associated resources:
case class KubernetesDriverSpec(
pod: SparkPod,
driverKubernetesResources: Seq[HasMetadata],
systemProperties: Map[String, String]
)Components:
// 1. Entry Point - spark-submit invokes KubernetesClientApplication
class KubernetesClientApplication extends SparkApplication {
def start(args: Array[String], conf: SparkConf): Unit = {
// 2. Parse command-line arguments
val clientArgs = ClientArguments.fromCommandLineArgs(args)
// 3. Create driver configuration
val kubernetesConf = KubernetesConf.createDriverConf(
conf,
appName = conf.get("spark.app.name"),
appResourceNamePrefix = s"${appName}-${UUID.randomUUID().toString.take(8)}",
appId = SparkApplication.applicationId,
mainAppResource = clientArgs.mainAppResource,
mainClass = clientArgs.mainClass,
appArgs = clientArgs.driverArgs
)
// 4. Create Kubernetes client
val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
ClientType.Submission,
kubernetesConf
)
// 5. Build driver specification
val builder = new KubernetesDriverBuilder()
val driverSpec = builder.buildFromFeatures(kubernetesConf, kubernetesClient)
// 6. Create status watcher
val watcher = new LoggingPodStatusWatcherImpl(kubernetesConf)
// 7. Submit application
val client = new Client(kubernetesConf, builder, kubernetesClient, watcher)
client.run()
}
}Monitors application status and provides real-time feedback:
trait LoggingPodStatusWatcher {
def watchUntilCompletion(): Unit
def stop(): Unit
}
class LoggingPodStatusWatcherImpl(
conf: KubernetesDriverConf,
kubernetesClient: KubernetesClient,
clock: Clock
) extends LoggingPodStatusWatcherMonitoring Capabilities:
# Basic submission
spark-submit \
--master k8s://https://kubernetes.example.com:443 \
--deploy-mode cluster \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.container.image=spark:latest \
local:///opt/spark/examples/jars/spark-examples.jar
# Python application
spark-submit \
--master k8s://https://kubernetes.example.com:443 \
--deploy-mode cluster \
--name pyspark-job \
--conf spark.kubernetes.container.image=spark-py:latest \
local:///opt/spark/examples/src/main/python/pi.py
# With additional configuration
spark-submit \
--master k8s://https://kubernetes.example.com:443 \
--deploy-mode cluster \
--name my-spark-app \
--class com.example.MyApp \
--conf spark.kubernetes.container.image=my-org/spark:v3.0.1 \
--conf spark.kubernetes.namespace=production \
--conf spark.kubernetes.driver.memory=2g \
--conf spark.kubernetes.executor.instances=4 \
--conf spark.kubernetes.executor.memory=4g \
--conf spark.kubernetes.executor.cores=2 \
s3a://my-bucket/jars/my-app.jar \
--input s3a://my-bucket/data/ \
--output s3a://my-bucket/results/spark-submit options are automatically mapped to Kubernetes configuration:
// spark-submit option -> Kubernetes configuration
--name -> spark.app.name
--class -> spark.kubernetes.driver.mainClass
--driver-memory -> spark.kubernetes.driver.memory
--executor-memory -> spark.kubernetes.executor.memory
--executor-cores -> spark.kubernetes.executor.cores
--num-executors -> spark.kubernetes.executor.instancesComprehensive validation of submission parameters:
// Configuration validation
def validateSubmission(conf: KubernetesDriverConf): Unit = {
// Required configurations
require(conf.get(CONTAINER_IMAGE).nonEmpty,
"Container image must be specified")
require(conf.namespace.nonEmpty,
"Kubernetes namespace must be specified")
// Resource validation
val driverMemory = conf.get(KUBERNETES_DRIVER_MEMORY)
require(driverMemory > 0, "Driver memory must be positive")
// Image pull policy validation
val pullPolicy = conf.imagePullPolicy
require(Set("Always", "Never", "IfNotPresent").contains(pullPolicy),
s"Invalid image pull policy: $pullPolicy")
}// Image pull failures
case class ImagePullError(imageName: String, reason: String)
// Insufficient resources
case class InsufficientResourcesError(requested: String, available: String)
// RBAC permission issues
case class PermissionError(operation: String, resource: String)
// Network connectivity issues
case class ApiServerConnectionError(endpoint: String, cause: Throwable)// Collect diagnostic information for troubleshooting
def collectDiagnostics(conf: KubernetesDriverConf): Map[String, Any] = Map(
"kubernetesVersion" -> kubernetesClient.getVersion.getGitVersion,
"namespace" -> conf.namespace,
"serviceAccount" -> conf.get(KUBERNETES_SERVICE_ACCOUNT_NAME),
"containerImage" -> conf.get(CONTAINER_IMAGE),
"driverPodName" -> conf.resourceNamePrefix + "-driver",
"submissionTime" -> System.currentTimeMillis(),
"sparkVersion" -> org.apache.spark.SPARK_VERSION
)// Advanced driver pod customization
spark.conf.set("spark.kubernetes.driver.podTemplateFile", "/templates/driver.yaml")
spark.conf.set("spark.kubernetes.driver.annotation.prometheus.io/scrape", "true")
spark.conf.set("spark.kubernetes.driver.label.version", "v1.2.3")
// Resource limits and requests
spark.conf.set("spark.kubernetes.driver.limit.cores", "2")
spark.conf.set("spark.kubernetes.driver.request.cores", "1")
spark.conf.set("spark.kubernetes.driver.memory", "2g")
spark.conf.set("spark.kubernetes.driver.memoryOverhead", "512m")// Remote dependency resolution
spark.conf.set("spark.jars", "s3a://my-bucket/jars/dependency1.jar,s3a://my-bucket/jars/dependency2.jar")
spark.conf.set("spark.files", "s3a://my-bucket/conf/app.properties")
// Python package dependencies
spark.conf.set("spark.kubernetes.pyspark.pythonVersion", "3.8")
spark.conf.set("spark.submit.pyFiles", "s3a://my-bucket/python/modules.zip")// Service account and RBAC
spark.conf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark-driver-sa")
// Secret mounting
spark.conf.set("spark.kubernetes.driver.secrets.mysecret", "/opt/spark/secrets")
spark.conf.set("spark.kubernetes.executor.secrets.mysecret", "/opt/spark/secrets")
// Environment variable secrets
spark.conf.set("spark.kubernetes.driver.secretKeyRef.DB_PASSWORD", "db-secret:password")// Enable driver log collection
spark.conf.set("spark.kubernetes.driver.log.persistentVolumeClaim.claimName", "driver-logs")
spark.conf.set("spark.kubernetes.driver.log.persistentVolumeClaim.mountPath", "/opt/spark/logs")
// Metrics configuration
spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
spark.conf.set("spark.kubernetes.driver.annotation.prometheus.io/scrape", "true")
spark.conf.set("spark.kubernetes.driver.annotation.prometheus.io/port", "4040")// Container image best practices
// 1. Use multi-stage builds for smaller images
// 2. Include all dependencies in the image
// 3. Set appropriate USER directive
// 4. Use specific version tags
// Dockerfile example
FROM spark:3.0.1 AS base
USER spark
COPY target/my-app.jar /opt/spark/jars/// Right-sizing resources
val conf = new SparkConf()
// Driver sizing for application coordination
.set("spark.kubernetes.driver.memory", "2g")
.set("spark.kubernetes.driver.limit.cores", "1")
// Executor sizing for parallel processing
.set("spark.kubernetes.executor.memory", "4g")
.set("spark.kubernetes.executor.cores", "2")
.set("spark.kubernetes.executor.instances", "10")
// Memory overhead for Kubernetes pod management
.set("spark.kubernetes.memoryOverheadFactor", "0.1")// Service and networking
spark.conf.set("spark.kubernetes.driver.pod.name", "my-app-driver")
spark.conf.set("spark.kubernetes.driver.service.type", "ClusterIP")
// Port configuration for external access
spark.conf.set("spark.kubernetes.driver.port", "7077")
spark.conf.set("spark.kubernetes.driver.blockManager.port", "7078")The application submission layer provides a seamless bridge between Spark's familiar submission interface and Kubernetes-native deployment, enabling developers to leverage Kubernetes benefits without changing their existing Spark workflows.