CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-parent-2-12

Apache Spark - Unified analytics engine for large-scale data processing

Pending
Overview
Eval results
Files

deployment.mddocs/

Configuration and Deployment

Programmatic interfaces for launching, monitoring, and managing Spark applications across different cluster managers and deployment modes. Includes configuration management, application launchers, and cluster-specific deployment utilities.

Capabilities

Application Configuration

Configuration management for Spark applications with support for various deployment scenarios.

/**
 * Configuration for Spark applications
 */
class SparkConf(loadDefaults: Boolean = true) {
  /** Set configuration property */
  def set(key: String, value: String): SparkConf
  /** Set master URL */
  def setMaster(master: String): SparkConf
  /** Set application name */
  def setAppName(name: String): SparkConf
  /** Set JAR files to distribute */
  def setJars(jars: Seq[String]): SparkConf
  /** Set executor environment variable */
  def setExecutorEnv(variable: String, value: String): SparkConf
  def setExecutorEnv(variables: Seq[(String, String)]): SparkConf
  /** Set Spark home directory */
  def setSparkHome(home: String): SparkConf
  /** Set all properties from iterable */
  def setAll(settings: Iterable[(String, String)]): SparkConf
  /** Set if executor can create SparkContext */
  def setExecutorEnv(variables: Array[(String, String)]): SparkConf
  
  /** Get configuration value */
  def get(key: String): String
  /** Get configuration value with default */
  def get(key: String, defaultValue: String): String
  /** Get all configuration settings */
  def getAll: Array[(String, String)]
  /** Get boolean configuration value */
  def getBoolean(key: String, defaultValue: Boolean): Boolean
  /** Get integer configuration value */
  def getInt(key: String, defaultValue: Int): Int
  /** Get long configuration value */
  def getLong(key: String, defaultValue: Long): Long
  /** Get double configuration value */
  def getDouble(key: String, defaultValue: Double): Double
  /** Get size configuration value in bytes */
  def getSizeAsBytes(key: String, defaultValue: String): Long
  /** Get size configuration value in KB */
  def getSizeAsKb(key: String, defaultValue: String): Long
  /** Get size configuration value in MB */
  def getSizeAsMb(key: String, defaultValue: String): Long
  /** Get size configuration value in GB */
  def getSizeAsGb(key: String, defaultValue: String): Long
  /** Get time configuration value in milliseconds */
  def getTimeAsMs(key: String, defaultValue: String): Long
  /** Get time configuration value in seconds */
  def getTimeAsSeconds(key: String, defaultValue: String): Long
  
  /** Remove configuration property */
  def remove(key: String): SparkConf
  /** Check if configuration contains key */
  def contains(key: String): Boolean
  /** Clone configuration */
  def clone(): SparkConf
  /** Convert to properties */
  def toDebugString: String
}

Usage Examples:

import org.apache.spark.SparkConf

// Basic configuration
val conf = new SparkConf()
  .setAppName("MySparkApplication")
  .setMaster("local[4]")
  .set("spark.executor.memory", "2g")
  .set("spark.executor.cores", "2")
  .set("spark.sql.adaptive.enabled", "true")
  .set("spark.sql.adaptive.coalescePartitions.enabled", "true")

// Environment variables
conf.setExecutorEnv("JAVA_HOME", "/usr/lib/jvm/java-11")
conf.setExecutorEnv(Array(
  ("PYTHONPATH", "/path/to/python/libs"),
  ("HADOOP_CONF_DIR", "/etc/hadoop/conf")
))

// File distribution
conf.setJars(Seq("hdfs://path/to/app.jar", "hdfs://path/to/lib.jar"))

// Different master configurations
val localConf = new SparkConf().setMaster("local[*]")  // All available cores
val clusterConf = new SparkConf().setMaster("spark://master:7077")  // Standalone
val yarnConf = new SparkConf().setMaster("yarn")  // YARN cluster
val k8sConf = new SparkConf().setMaster("k8s://https://kubernetes.example.com:443")  // Kubernetes

// Configuration validation
val memoryValue = conf.getSizeAsMb("spark.executor.memory", "1g")
val coresValue = conf.getInt("spark.executor.cores", 1)
val adaptiveEnabled = conf.getBoolean("spark.sql.adaptive.enabled", false)

Application Launcher (Java API)

Programmatic interface for launching Spark applications from Java applications.

/**
 * Programmatic interface for launching Spark applications
 */
public class SparkLauncher {
  /** Set application name */
  public SparkLauncher setAppName(String appName);
  /** Set master URL */
  public SparkLauncher setMaster(String master);
  /** Set main application resource (JAR file) */
  public SparkLauncher setAppResource(String resource);
  /** Set main class */
  public SparkLauncher setMainClass(String mainClass);
  /** Add application arguments */
  public SparkLauncher addAppArgs(String... args);
  /** Set Spark configuration */
  public SparkLauncher setConf(String key, String value);
  /** Set Spark home directory */
  public SparkLauncher setSparkHome(String sparkHome);
  /** Set properties file */
  public SparkLauncher setPropertiesFile(String path);
  /** Set deploy mode (client/cluster) */
  public SparkLauncher setDeployMode(String mode);
  /** Set verbose logging */
  public SparkLauncher setVerbose(boolean verbose);
  
  /** Launch and return Process */
  public Process launch() throws IOException;
  /** Launch and return application handle */
  public SparkAppHandle startApplication() throws IOException;
  /** Launch and return application handle with listener */
  public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException;
}

/**
 * Handle to monitor and control Spark applications
 */
public interface SparkAppHandle {
  /** Get current application state */
  State getState();
  /** Get application ID */
  String getAppId();
  /** Kill the application */
  void kill();
  /** Disconnect from application */
  void disconnect();
  /** Add state change listener */
  void addListener(Listener listener);
  
  /** Application states */
  enum State {
    UNKNOWN, CONNECTED, SUBMITTED, RUNNING, FINISHED, FAILED, KILLED, LOST
  }
  
  /** Listener for application state changes */
  interface Listener {
    void stateChanged(SparkAppHandle handle);
    void infoChanged(SparkAppHandle handle);
  }
}

Usage Examples:

import org.apache.spark.launcher.*;

// Basic application launch
SparkLauncher launcher = new SparkLauncher()
    .setAppName("MyApp")
    .setMaster("local[2]")
    .setAppResource("path/to/my-app.jar")
    .setMainClass("com.example.MyMainClass")
    .addAppArgs("arg1", "arg2", "arg3");

// Launch and wait
Process process = launcher.launch();
int exitCode = process.waitFor();

// Launch with monitoring
SparkAppHandle handle = launcher.startApplication(new SparkAppHandle.Listener() {
    @Override
    public void stateChanged(SparkAppHandle handle) {
        System.out.println("State changed to: " + handle.getState());
        if (handle.getState().isFinal()) {
            System.out.println("Application finished with state: " + handle.getState());
        }
    }
    
    @Override
    public void infoChanged(SparkAppHandle handle) {
        System.out.println("App ID: " + handle.getAppId());
    }
});

// Monitor application
while (!handle.getState().isFinal()) {
    Thread.sleep(1000);
}

// Cluster mode launch
SparkLauncher clusterLauncher = new SparkLauncher()
    .setAppName("ClusterApp")
    .setMaster("yarn")
    .setDeployMode("cluster")
    .setAppResource("hdfs://path/to/app.jar")
    .setMainClass("com.example.ClusterApp")
    .setConf("spark.executor.instances", "10")
    .setConf("spark.executor.memory", "4g")
    .setConf("spark.executor.cores", "2");

SparkAppHandle clusterHandle = clusterLauncher.startApplication();

Runtime Configuration

Runtime configuration management for active Spark sessions.

/**
 * Runtime configuration interface for SparkSession
 */
class RuntimeConfig {
  /** Set configuration value */
  def set(key: String, value: String): Unit
  def set(key: String, value: Boolean): Unit
  def set(key: String, value: Long): Unit
  /** Get configuration value */
  def get(key: String): String
  def get(key: String, default: String): String
  /** Get all configuration values */
  def getAll: Map[String, String]
  /** Unset configuration value */
  def unset(key: String): Unit
  /** Check if configuration is modifiable */
  def isModifiable(key: String): Boolean
}

Cluster Manager Integration

Support for different cluster managers and deployment modes.

/**
 * Standalone cluster configuration
 */
object StandaloneClusterManager {
  /** Default master port */
  val DEFAULT_MASTER_PORT = 7077
  /** Default worker port */
  val DEFAULT_WORKER_PORT = 7078
  /** Default web UI port */
  val DEFAULT_MASTER_WEBUI_PORT = 8080
}

/**
 * YARN cluster configuration
 */
object YarnClusterManager {
  /** YARN client mode */
  val CLIENT_MODE = "client"
  /** YARN cluster mode */
  val CLUSTER_MODE = "cluster"
}

/**
 * Kubernetes cluster configuration
 */
object KubernetesClusterManager {
  /** Default namespace */
  val DEFAULT_NAMESPACE = "default"
  /** Default service account */
  val DEFAULT_SERVICE_ACCOUNT = "default"
}

Application Monitoring

Interfaces for monitoring application status and metrics.

/**
 * Status tracker for monitoring Spark applications
 */
class SparkStatusTracker(sc: SparkContext) {
  /** Get executor infos */
  def getExecutorInfos: Array[SparkExecutorInfo]
  /** Get active stage ids */
  def getActiveStageIds: Array[Int]
  /** Get active job ids */
  def getActiveJobIds: Array[Int]
  /** Get stage info */
  def getStageInfo(stageId: Int): Option[SparkStageInfo]
  /** Get job info */
  def getJobInfo(jobId: Int): Option[SparkJobInfo]
  /** Get job ids for job group */
  def getJobIdsForGroup(jobGroup: String): Array[Int]
}

/**
 * Executor information
 */
case class SparkExecutorInfo(
  executorId: String,
  executorHost: String,
  totalCores: Int,
  maxTasks: Int,
  activeTasks: Int,
  failedTasks: Int,
  completedTasks: Int,
  totalTasks: Int,
  totalDuration: Long,
  totalGCTime: Long,
  totalInputBytes: Long,
  totalShuffleRead: Long,
  totalShuffleWrite: Long,
  isActive: Boolean,
  maxMemory: Long,
  addTime: java.util.Date,
  removeTime: Option[java.util.Date],
  removeReason: Option[String]
)

Environment and System Information

Utilities for accessing environment and system information.

/**
 * Utilities for accessing files distributed with application
 */
object SparkFiles {
  /** Get absolute path of added file */
  def get(filename: String): String
  /** Get root directory of added files */
  def getRootDirectory(): String
}

/**
 * Environment information and utilities
 */
class SparkEnv {
  /** Get serializer */
  def serializer: Serializer
  /** Get closure serializer */
  def closureSerializer: Serializer
  /** Get cache manager */
  def cacheManager: CacheManager
  /** Get map output tracker */
  def mapOutputTracker: MapOutputTracker
  /** Get shuffle manager */
  def shuffleManager: ShuffleManager
  /** Get broadcast manager */
  def broadcastManager: BroadcastManager
  /** Get block manager */
  def blockManager: BlockManager
  /** Get security manager */
  def securityManager: SecurityManager
  /** Get metrics system */
  def metricsSystem: MetricsSystem
  /** Get executor ID */
  def executorId: String
}

Deployment Modes

Configuration for different deployment scenarios.

/**
 * Deploy modes
 */
object DeployMode extends Enumeration {
  type DeployMode = Value
  val CLIENT, CLUSTER = Value
}

/**
 * Master URLs for different cluster types
 */
object MasterURLs {
  /** Local mode with n threads */
  def local(n: Int = 1): String = s"local[$n]"
  /** Local mode with all available cores */
  def localAll: String = "local[*]"
  /** Standalone cluster */
  def standalone(host: String, port: Int = 7077): String = s"spark://$host:$port"
  /** YARN cluster */
  def yarn: String = "yarn"
  /** Mesos cluster */
  def mesos(host: String, port: Int = 5050): String = s"mesos://$host:$port"
  /** Kubernetes cluster */
  def kubernetes(apiServer: String): String = s"k8s://$apiServer"
}

Usage Examples:

// Different deployment configurations
val localConf = new SparkConf()
  .setMaster(MasterURLs.localAll)
  .setAppName("LocalApp")

val standaloneConf = new SparkConf()
  .setMaster(MasterURLs.standalone("spark-master.example.com"))
  .setAppName("StandaloneApp")
  .set("spark.executor.instances", "4")
  .set("spark.executor.memory", "2g")
  .set("spark.executor.cores", "2")

val yarnConf = new SparkConf()
  .setMaster(MasterURLs.yarn)
  .setAppName("YarnApp")
  .set("spark.submit.deployMode", "cluster")
  .set("spark.executor.instances", "10")
  .set("spark.executor.memory", "4g")
  .set("spark.executor.cores", "3")
  .set("spark.yarn.queue", "production")

val k8sConf = new SparkConf()
  .setMaster(MasterURLs.kubernetes("https://k8s.example.com:443"))
  .setAppName("K8sApp")
  .set("spark.kubernetes.container.image", "spark:3.5.6")
  .set("spark.executor.instances", "5")
  .set("spark.kubernetes.namespace", "spark-jobs")
  .set("spark.kubernetes.executor.request.cores", "1")
  .set("spark.kubernetes.executor.limit.cores", "2")

// Application monitoring
val sc = new SparkContext(conf)
val statusTracker = sc.statusTracker

// Monitor executors
val executors = statusTracker.getExecutorInfos
executors.foreach { executor =>
  println(s"Executor ${executor.executorId}: ${executor.activeTasks} active tasks")
}

// Monitor jobs and stages
val activeJobs = statusTracker.getActiveJobIds
val activeStages = statusTracker.getActiveStageIds

activeJobs.foreach { jobId =>
  statusTracker.getJobInfo(jobId).foreach { jobInfo =>
    println(s"Job $jobId: ${jobInfo.status}")
  }
}

Configuration Properties

Key configuration properties for different aspects of Spark applications.

/**
 * Common configuration properties
 */
object SparkConfigs {
  // Application properties
  val APP_NAME = "spark.app.name"
  val MASTER = "spark.master"
  val DEPLOY_MODE = "spark.submit.deployMode"
  val DRIVER_MEMORY = "spark.driver.memory"
  val DRIVER_CORES = "spark.driver.cores"
  val EXECUTOR_MEMORY = "spark.executor.memory"
  val EXECUTOR_CORES = "spark.executor.cores"
  val EXECUTOR_INSTANCES = "spark.executor.instances"
  
  // Dynamic allocation
  val DYNAMIC_ALLOCATION_ENABLED = "spark.dynamicAllocation.enabled"
  val DYNAMIC_ALLOCATION_MIN_EXECUTORS = "spark.dynamicAllocation.minExecutors"
  val DYNAMIC_ALLOCATION_MAX_EXECUTORS = "spark.dynamicAllocation.maxExecutors"
  val DYNAMIC_ALLOCATION_INITIAL_EXECUTORS = "spark.dynamicAllocation.initialExecutors"
  
  // Serialization
  val SERIALIZER = "spark.serializer"
  val KRYO_REGISTRATOR = "spark.kryo.registrator"
  val KRYO_UNSAFE = "spark.kryo.unsafe"
  
  // SQL properties
  val SQL_ADAPTIVE_ENABLED = "spark.sql.adaptive.enabled"
  val SQL_ADAPTIVE_COALESCE_PARTITIONS = "spark.sql.adaptive.coalescePartitions.enabled"
  val SQL_ADAPTIVE_SKEW_JOIN = "spark.sql.adaptive.skewJoin.enabled"
  val SQL_WAREHOUSE_DIR = "spark.sql.warehouse.dir"
  
  // Shuffle properties
  val SHUFFLE_COMPRESS = "spark.shuffle.compress"
  val SHUFFLE_SPILL_COMPRESS = "spark.shuffle.spill.compress"
  val SHUFFLE_SERVICE_ENABLED = "spark.shuffle.service.enabled"
  
  // Storage properties
  val STORAGE_LEVEL = "spark.storage.level"
  val STORAGE_MEMORY_FRACTION = "spark.storage.memoryFraction"
  val STORAGE_SAFETY_FRACTION = "spark.storage.safetyFraction"
}

Configuration Examples:

// Performance tuning configuration
val perfConf = new SparkConf()
  .setAppName("HighPerformanceApp")
  .set(SparkConfigs.SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
  .set(SparkConfigs.SQL_ADAPTIVE_ENABLED, "true")
  .set(SparkConfigs.SQL_ADAPTIVE_COALESCE_PARTITIONS, "true")
  .set(SparkConfigs.SQL_ADAPTIVE_SKEW_JOIN, "true")
  .set(SparkConfigs.DYNAMIC_ALLOCATION_ENABLED, "true")
  .set(SparkConfigs.DYNAMIC_ALLOCATION_MIN_EXECUTORS, "2")
  .set(SparkConfigs.DYNAMIC_ALLOCATION_MAX_EXECUTORS, "20")
  .set(SparkConfigs.DYNAMIC_ALLOCATION_INITIAL_EXECUTORS, "5")

// Memory-intensive configuration
val memoryConf = new SparkConf()
  .setAppName("MemoryIntensiveApp")
  .set(SparkConfigs.EXECUTOR_MEMORY, "8g")
  .set(SparkConfigs.DRIVER_MEMORY, "4g")
  .set(SparkConfigs.STORAGE_MEMORY_FRACTION, "0.8")
  .set("spark.executor.memoryOffHeap.enabled", "true")
  .set("spark.executor.memoryOffHeap.size", "2g")

// Cluster-specific configurations
val yarnProdConf = new SparkConf()
  .setAppName("ProductionYarnApp")
  .setMaster("yarn")
  .set("spark.submit.deployMode", "cluster")
  .set("spark.yarn.queue", "production")
  .set("spark.yarn.archive", "hdfs://path/to/spark-archive.zip")
  .set("spark.eventLog.enabled", "true")
  .set("spark.eventLog.dir", "hdfs://path/to/spark-events")
  .set("spark.history.fs.logDirectory", "hdfs://path/to/spark-events")

Error Handling

Common deployment and configuration exceptions:

  • IllegalArgumentException - Invalid configuration values or parameters
  • SparkException - General Spark application launch/runtime errors
  • IOException - File system or network errors during application launch
  • SecurityException - Security-related configuration or access errors
  • ClassNotFoundException - Missing application or dependency classes

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-parent-2-12

docs

core.md

deployment.md

graphx.md

index.md

ml.md

sql.md

streaming.md

tile.json