or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

application-submission.mdcluster-management.mdconfiguration.mdfeature-steps.mdindex.mdpod-management.mdutilities.md
tile.json

application-submission.mddocs/

Application Submission

The application submission layer provides complete integration with Spark's spark-submit tool, enabling seamless deployment of Spark applications to Kubernetes clusters using familiar Spark interfaces.

Core Components

KubernetesClientApplication { .api }

The main entry point for application submission that integrates with Spark's application framework:

class KubernetesClientApplication extends SparkApplication {
  def start(args: Array[String], conf: SparkConf): Unit
}

Integration Points:

  • Automatically invoked by spark-submit when using --master k8s:// and --deploy-mode cluster
  • Parses command-line arguments and converts them to Kubernetes-specific configuration
  • Orchestrates the complete application submission workflow

Workflow Overview:

  1. Parse command-line arguments using ClientArguments
  2. Create KubernetesDriverConf from Spark configuration
  3. Build driver pod specification using KubernetesDriverBuilder
  4. Submit driver pod to Kubernetes using Client
  5. Monitor application status using LoggingPodStatusWatcher

ClientArguments { .api }

Encapsulates and validates arguments passed to the submission client:

case class ClientArguments(
  mainAppResource: MainAppResource,
  mainClass: String,
  driverArgs: Array[String]
)

Companion Object:

object ClientArguments {
  def fromCommandLineArgs(args: Array[String]): ClientArguments
}

Supported Resource Types:

// Java/Scala applications
JavaMainAppResource(primaryResource: Option[String])

// Python applications  
PythonMainAppResource(primaryResource: String)

// R applications
RMainAppResource(primaryResource: String)

Usage Example:

// Parsing spark-submit arguments
val args = Array(
  "--class", "org.apache.spark.examples.SparkPi",
  "--conf", "spark.kubernetes.container.image=spark:latest",
  "local:///opt/spark/examples/jars/spark-examples.jar",
  "1000"
)

val clientArgs = ClientArguments.fromCommandLineArgs(args)
// Results in:
// - mainClass: "org.apache.spark.examples.SparkPi"
// - mainAppResource: JavaMainAppResource(Some("local:///opt/spark/examples/jars/spark-examples.jar"))
// - driverArgs: Array("1000")

Client { .api }

Manages the actual submission process and lifecycle of the Spark application:

class Client(
  conf: KubernetesDriverConf,
  builder: KubernetesDriverBuilder,
  kubernetesClient: KubernetesClient,
  watcher: LoggingPodStatusWatcher
)

Key Methods:

def run(): Unit

Submission Process:

  1. Driver Spec Building: Uses KubernetesDriverBuilder to create complete pod specification
  2. Resource Creation: Submits driver pod and associated Kubernetes resources
  3. Status Monitoring: Watches pod status and logs application progress
  4. Cleanup: Handles cleanup of resources on completion or failure

Application Resource Types

MainAppResource Hierarchy { .api }

Sealed trait representing different types of application resources:

sealed trait MainAppResource

// Base for non-JVM languages
sealed trait NonJVMResource extends MainAppResource

// JVM-based applications (Java/Scala)
case class JavaMainAppResource(primaryResource: Option[String]) extends MainAppResource

// Python applications
case class PythonMainAppResource(primaryResource: String) extends NonJVMResource

// R applications  
case class RMainAppResource(primaryResource: String) extends NonJVMResource

Resource Resolution:

  • Local Resources: local:// URLs reference files within the container image
  • Remote Resources: http://, https://, s3a:// URLs are downloaded at runtime
  • No Resource: For applications bundled directly in the container image

Driver Builder System

KubernetesDriverBuilder { .api }

Constructs complete driver pod specifications using the feature step pattern:

class KubernetesDriverBuilder {
  def buildFromFeatures(
    conf: KubernetesDriverConf, 
    client: KubernetesClient
  ): KubernetesDriverSpec
}

Feature Step Integration:

// Automatic feature step selection based on configuration
val featureSteps: Seq[KubernetesFeatureConfigStep] = Seq(
  new BasicDriverFeatureStep(conf),
  new DriverServiceFeatureStep(conf),
  new DriverCommandFeatureStep(conf),
  new DriverKubernetesCredentialsFeatureStep(conf),
  new MountSecretsFeatureStep(conf),
  new EnvSecretsFeatureStep(conf),
  new MountVolumesFeatureStep(conf),
  new LocalDirsFeatureStep(conf)
)

// Apply all feature steps to build complete specification
val finalSpec = featureSteps.foldLeft(initialSpec) { (spec, step) =>
  step.configurePod(spec.pod) // Apply transformations
}

KubernetesDriverSpec { .api }

Complete specification for driver pod and associated resources:

case class KubernetesDriverSpec(
  pod: SparkPod,
  driverKubernetesResources: Seq[HasMetadata],
  systemProperties: Map[String, String]
)

Components:

  • Pod: Complete driver pod specification with container, volumes, and metadata
  • Resources: Additional Kubernetes resources (Services, ConfigMaps, Secrets)
  • System Properties: JVM system properties to be passed to the driver

Submission Workflow

Complete Submission Process

// 1. Entry Point - spark-submit invokes KubernetesClientApplication
class KubernetesClientApplication extends SparkApplication {
  def start(args: Array[String], conf: SparkConf): Unit = {
    
    // 2. Parse command-line arguments
    val clientArgs = ClientArguments.fromCommandLineArgs(args)
    
    // 3. Create driver configuration
    val kubernetesConf = KubernetesConf.createDriverConf(
      conf,
      appName = conf.get("spark.app.name"),
      appResourceNamePrefix = s"${appName}-${UUID.randomUUID().toString.take(8)}",
      appId = SparkApplication.applicationId,
      mainAppResource = clientArgs.mainAppResource,
      mainClass = clientArgs.mainClass,
      appArgs = clientArgs.driverArgs
    )
    
    // 4. Create Kubernetes client
    val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
      ClientType.Submission,
      kubernetesConf
    )
    
    // 5. Build driver specification
    val builder = new KubernetesDriverBuilder()
    val driverSpec = builder.buildFromFeatures(kubernetesConf, kubernetesClient)
    
    // 6. Create status watcher
    val watcher = new LoggingPodStatusWatcherImpl(kubernetesConf)
    
    // 7. Submit application
    val client = new Client(kubernetesConf, builder, kubernetesClient, watcher)
    client.run()
  }
}

Status Monitoring

LoggingPodStatusWatcher { .api }

Monitors application status and provides real-time feedback:

trait LoggingPodStatusWatcher {
  def watchUntilCompletion(): Unit
  def stop(): Unit
}

class LoggingPodStatusWatcherImpl(
  conf: KubernetesDriverConf,
  kubernetesClient: KubernetesClient,
  clock: Clock
) extends LoggingPodStatusWatcher

Monitoring Capabilities:

  • Real-time pod status updates
  • Driver log streaming
  • Application completion detection
  • Error condition reporting

spark-submit Integration

Command Line Usage

# Basic submission
spark-submit \
  --master k8s://https://kubernetes.example.com:443 \
  --deploy-mode cluster \
  --name spark-pi \
  --class org.apache.spark.examples.SparkPi \
  --conf spark.kubernetes.container.image=spark:latest \
  local:///opt/spark/examples/jars/spark-examples.jar

# Python application
spark-submit \
  --master k8s://https://kubernetes.example.com:443 \
  --deploy-mode cluster \
  --name pyspark-job \
  --conf spark.kubernetes.container.image=spark-py:latest \
  local:///opt/spark/examples/src/main/python/pi.py

# With additional configuration
spark-submit \
  --master k8s://https://kubernetes.example.com:443 \
  --deploy-mode cluster \
  --name my-spark-app \
  --class com.example.MyApp \
  --conf spark.kubernetes.container.image=my-org/spark:v3.0.1 \
  --conf spark.kubernetes.namespace=production \
  --conf spark.kubernetes.driver.memory=2g \
  --conf spark.kubernetes.executor.instances=4 \
  --conf spark.kubernetes.executor.memory=4g \
  --conf spark.kubernetes.executor.cores=2 \
  s3a://my-bucket/jars/my-app.jar \
  --input s3a://my-bucket/data/ \
  --output s3a://my-bucket/results/

Configuration Mapping

spark-submit options are automatically mapped to Kubernetes configuration:

// spark-submit option -> Kubernetes configuration
--name -> spark.app.name
--class -> spark.kubernetes.driver.mainClass  
--driver-memory -> spark.kubernetes.driver.memory
--executor-memory -> spark.kubernetes.executor.memory
--executor-cores -> spark.kubernetes.executor.cores
--num-executors -> spark.kubernetes.executor.instances

Error Handling and Diagnostics

Submission Validation

Comprehensive validation of submission parameters:

// Configuration validation
def validateSubmission(conf: KubernetesDriverConf): Unit = {
  // Required configurations
  require(conf.get(CONTAINER_IMAGE).nonEmpty, 
    "Container image must be specified")
  require(conf.namespace.nonEmpty, 
    "Kubernetes namespace must be specified")
  
  // Resource validation
  val driverMemory = conf.get(KUBERNETES_DRIVER_MEMORY)
  require(driverMemory > 0, "Driver memory must be positive")
  
  // Image pull policy validation
  val pullPolicy = conf.imagePullPolicy
  require(Set("Always", "Never", "IfNotPresent").contains(pullPolicy),
    s"Invalid image pull policy: $pullPolicy")
}

Common Error Scenarios

// Image pull failures
case class ImagePullError(imageName: String, reason: String)

// Insufficient resources
case class InsufficientResourcesError(requested: String, available: String)

// RBAC permission issues
case class PermissionError(operation: String, resource: String)

// Network connectivity issues  
case class ApiServerConnectionError(endpoint: String, cause: Throwable)

Diagnostic Information

// Collect diagnostic information for troubleshooting
def collectDiagnostics(conf: KubernetesDriverConf): Map[String, Any] = Map(
  "kubernetesVersion" -> kubernetesClient.getVersion.getGitVersion,
  "namespace" -> conf.namespace,
  "serviceAccount" -> conf.get(KUBERNETES_SERVICE_ACCOUNT_NAME),
  "containerImage" -> conf.get(CONTAINER_IMAGE),
  "driverPodName" -> conf.resourceNamePrefix + "-driver",
  "submissionTime" -> System.currentTimeMillis(),
  "sparkVersion" -> org.apache.spark.SPARK_VERSION
)

Advanced Submission Features

Custom Driver Configuration

// Advanced driver pod customization
spark.conf.set("spark.kubernetes.driver.podTemplateFile", "/templates/driver.yaml")
spark.conf.set("spark.kubernetes.driver.annotation.prometheus.io/scrape", "true")
spark.conf.set("spark.kubernetes.driver.label.version", "v1.2.3")

// Resource limits and requests
spark.conf.set("spark.kubernetes.driver.limit.cores", "2")
spark.conf.set("spark.kubernetes.driver.request.cores", "1") 
spark.conf.set("spark.kubernetes.driver.memory", "2g")
spark.conf.set("spark.kubernetes.driver.memoryOverhead", "512m")

Dependency Management

// Remote dependency resolution
spark.conf.set("spark.jars", "s3a://my-bucket/jars/dependency1.jar,s3a://my-bucket/jars/dependency2.jar")
spark.conf.set("spark.files", "s3a://my-bucket/conf/app.properties")

// Python package dependencies
spark.conf.set("spark.kubernetes.pyspark.pythonVersion", "3.8")
spark.conf.set("spark.submit.pyFiles", "s3a://my-bucket/python/modules.zip")

Security Configuration

// Service account and RBAC
spark.conf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark-driver-sa")

// Secret mounting
spark.conf.set("spark.kubernetes.driver.secrets.mysecret", "/opt/spark/secrets")
spark.conf.set("spark.kubernetes.executor.secrets.mysecret", "/opt/spark/secrets")

// Environment variable secrets
spark.conf.set("spark.kubernetes.driver.secretKeyRef.DB_PASSWORD", "db-secret:password")

Monitoring and Logging

// Enable driver log collection
spark.conf.set("spark.kubernetes.driver.log.persistentVolumeClaim.claimName", "driver-logs")
spark.conf.set("spark.kubernetes.driver.log.persistentVolumeClaim.mountPath", "/opt/spark/logs")

// Metrics configuration
spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
spark.conf.set("spark.kubernetes.driver.annotation.prometheus.io/scrape", "true")
spark.conf.set("spark.kubernetes.driver.annotation.prometheus.io/port", "4040")

Best Practices

Application Packaging

// Container image best practices
// 1. Use multi-stage builds for smaller images
// 2. Include all dependencies in the image
// 3. Set appropriate USER directive
// 4. Use specific version tags

// Dockerfile example
FROM spark:3.0.1 AS base
USER spark
COPY target/my-app.jar /opt/spark/jars/

Resource Planning

// Right-sizing resources
val conf = new SparkConf()
  // Driver sizing for application coordination
  .set("spark.kubernetes.driver.memory", "2g")
  .set("spark.kubernetes.driver.limit.cores", "1")
  
  // Executor sizing for parallel processing
  .set("spark.kubernetes.executor.memory", "4g") 
  .set("spark.kubernetes.executor.cores", "2")
  .set("spark.kubernetes.executor.instances", "10")
  
  // Memory overhead for Kubernetes pod management
  .set("spark.kubernetes.memoryOverheadFactor", "0.1")

Network Configuration

// Service and networking
spark.conf.set("spark.kubernetes.driver.pod.name", "my-app-driver")
spark.conf.set("spark.kubernetes.driver.service.type", "ClusterIP")

// Port configuration for external access
spark.conf.set("spark.kubernetes.driver.port", "7077")
spark.conf.set("spark.kubernetes.driver.blockManager.port", "7078")

The application submission layer provides a seamless bridge between Spark's familiar submission interface and Kubernetes-native deployment, enabling developers to leverage Kubernetes benefits without changing their existing Spark workflows.