Apache Spark - Unified analytics engine for large-scale data processing
—
Programmatic interfaces for launching, monitoring, and managing Spark applications across different cluster managers and deployment modes. Includes configuration management, application launchers, and cluster-specific deployment utilities.
Configuration management for Spark applications with support for various deployment scenarios.
/**
* Configuration for Spark applications
*/
class SparkConf(loadDefaults: Boolean = true) {
/** Set configuration property */
def set(key: String, value: String): SparkConf
/** Set master URL */
def setMaster(master: String): SparkConf
/** Set application name */
def setAppName(name: String): SparkConf
/** Set JAR files to distribute */
def setJars(jars: Seq[String]): SparkConf
/** Set executor environment variable */
def setExecutorEnv(variable: String, value: String): SparkConf
def setExecutorEnv(variables: Seq[(String, String)]): SparkConf
/** Set Spark home directory */
def setSparkHome(home: String): SparkConf
/** Set all properties from iterable */
def setAll(settings: Iterable[(String, String)]): SparkConf
/** Set if executor can create SparkContext */
def setExecutorEnv(variables: Array[(String, String)]): SparkConf
/** Get configuration value */
def get(key: String): String
/** Get configuration value with default */
def get(key: String, defaultValue: String): String
/** Get all configuration settings */
def getAll: Array[(String, String)]
/** Get boolean configuration value */
def getBoolean(key: String, defaultValue: Boolean): Boolean
/** Get integer configuration value */
def getInt(key: String, defaultValue: Int): Int
/** Get long configuration value */
def getLong(key: String, defaultValue: Long): Long
/** Get double configuration value */
def getDouble(key: String, defaultValue: Double): Double
/** Get size configuration value in bytes */
def getSizeAsBytes(key: String, defaultValue: String): Long
/** Get size configuration value in KB */
def getSizeAsKb(key: String, defaultValue: String): Long
/** Get size configuration value in MB */
def getSizeAsMb(key: String, defaultValue: String): Long
/** Get size configuration value in GB */
def getSizeAsGb(key: String, defaultValue: String): Long
/** Get time configuration value in milliseconds */
def getTimeAsMs(key: String, defaultValue: String): Long
/** Get time configuration value in seconds */
def getTimeAsSeconds(key: String, defaultValue: String): Long
/** Remove configuration property */
def remove(key: String): SparkConf
/** Check if configuration contains key */
def contains(key: String): Boolean
/** Clone configuration */
def clone(): SparkConf
/** Convert to properties */
def toDebugString: String
}Usage Examples:
import org.apache.spark.SparkConf
// Basic configuration
val conf = new SparkConf()
.setAppName("MySparkApplication")
.setMaster("local[4]")
.set("spark.executor.memory", "2g")
.set("spark.executor.cores", "2")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
// Environment variables
conf.setExecutorEnv("JAVA_HOME", "/usr/lib/jvm/java-11")
conf.setExecutorEnv(Array(
("PYTHONPATH", "/path/to/python/libs"),
("HADOOP_CONF_DIR", "/etc/hadoop/conf")
))
// File distribution
conf.setJars(Seq("hdfs://path/to/app.jar", "hdfs://path/to/lib.jar"))
// Different master configurations
val localConf = new SparkConf().setMaster("local[*]") // All available cores
val clusterConf = new SparkConf().setMaster("spark://master:7077") // Standalone
val yarnConf = new SparkConf().setMaster("yarn") // YARN cluster
val k8sConf = new SparkConf().setMaster("k8s://https://kubernetes.example.com:443") // Kubernetes
// Configuration validation
val memoryValue = conf.getSizeAsMb("spark.executor.memory", "1g")
val coresValue = conf.getInt("spark.executor.cores", 1)
val adaptiveEnabled = conf.getBoolean("spark.sql.adaptive.enabled", false)Programmatic interface for launching Spark applications from Java applications.
/**
* Programmatic interface for launching Spark applications
*/
public class SparkLauncher {
/** Set application name */
public SparkLauncher setAppName(String appName);
/** Set master URL */
public SparkLauncher setMaster(String master);
/** Set main application resource (JAR file) */
public SparkLauncher setAppResource(String resource);
/** Set main class */
public SparkLauncher setMainClass(String mainClass);
/** Add application arguments */
public SparkLauncher addAppArgs(String... args);
/** Set Spark configuration */
public SparkLauncher setConf(String key, String value);
/** Set Spark home directory */
public SparkLauncher setSparkHome(String sparkHome);
/** Set properties file */
public SparkLauncher setPropertiesFile(String path);
/** Set deploy mode (client/cluster) */
public SparkLauncher setDeployMode(String mode);
/** Set verbose logging */
public SparkLauncher setVerbose(boolean verbose);
/** Launch and return Process */
public Process launch() throws IOException;
/** Launch and return application handle */
public SparkAppHandle startApplication() throws IOException;
/** Launch and return application handle with listener */
public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException;
}
/**
* Handle to monitor and control Spark applications
*/
public interface SparkAppHandle {
/** Get current application state */
State getState();
/** Get application ID */
String getAppId();
/** Kill the application */
void kill();
/** Disconnect from application */
void disconnect();
/** Add state change listener */
void addListener(Listener listener);
/** Application states */
enum State {
UNKNOWN, CONNECTED, SUBMITTED, RUNNING, FINISHED, FAILED, KILLED, LOST
}
/** Listener for application state changes */
interface Listener {
void stateChanged(SparkAppHandle handle);
void infoChanged(SparkAppHandle handle);
}
}Usage Examples:
import org.apache.spark.launcher.*;
// Basic application launch
SparkLauncher launcher = new SparkLauncher()
.setAppName("MyApp")
.setMaster("local[2]")
.setAppResource("path/to/my-app.jar")
.setMainClass("com.example.MyMainClass")
.addAppArgs("arg1", "arg2", "arg3");
// Launch and wait
Process process = launcher.launch();
int exitCode = process.waitFor();
// Launch with monitoring
SparkAppHandle handle = launcher.startApplication(new SparkAppHandle.Listener() {
@Override
public void stateChanged(SparkAppHandle handle) {
System.out.println("State changed to: " + handle.getState());
if (handle.getState().isFinal()) {
System.out.println("Application finished with state: " + handle.getState());
}
}
@Override
public void infoChanged(SparkAppHandle handle) {
System.out.println("App ID: " + handle.getAppId());
}
});
// Monitor application
while (!handle.getState().isFinal()) {
Thread.sleep(1000);
}
// Cluster mode launch
SparkLauncher clusterLauncher = new SparkLauncher()
.setAppName("ClusterApp")
.setMaster("yarn")
.setDeployMode("cluster")
.setAppResource("hdfs://path/to/app.jar")
.setMainClass("com.example.ClusterApp")
.setConf("spark.executor.instances", "10")
.setConf("spark.executor.memory", "4g")
.setConf("spark.executor.cores", "2");
SparkAppHandle clusterHandle = clusterLauncher.startApplication();Runtime configuration management for active Spark sessions.
/**
* Runtime configuration interface for SparkSession
*/
class RuntimeConfig {
/** Set configuration value */
def set(key: String, value: String): Unit
def set(key: String, value: Boolean): Unit
def set(key: String, value: Long): Unit
/** Get configuration value */
def get(key: String): String
def get(key: String, default: String): String
/** Get all configuration values */
def getAll: Map[String, String]
/** Unset configuration value */
def unset(key: String): Unit
/** Check if configuration is modifiable */
def isModifiable(key: String): Boolean
}Support for different cluster managers and deployment modes.
/**
* Standalone cluster configuration
*/
object StandaloneClusterManager {
/** Default master port */
val DEFAULT_MASTER_PORT = 7077
/** Default worker port */
val DEFAULT_WORKER_PORT = 7078
/** Default web UI port */
val DEFAULT_MASTER_WEBUI_PORT = 8080
}
/**
* YARN cluster configuration
*/
object YarnClusterManager {
/** YARN client mode */
val CLIENT_MODE = "client"
/** YARN cluster mode */
val CLUSTER_MODE = "cluster"
}
/**
* Kubernetes cluster configuration
*/
object KubernetesClusterManager {
/** Default namespace */
val DEFAULT_NAMESPACE = "default"
/** Default service account */
val DEFAULT_SERVICE_ACCOUNT = "default"
}Interfaces for monitoring application status and metrics.
/**
* Status tracker for monitoring Spark applications
*/
class SparkStatusTracker(sc: SparkContext) {
/** Get executor infos */
def getExecutorInfos: Array[SparkExecutorInfo]
/** Get active stage ids */
def getActiveStageIds: Array[Int]
/** Get active job ids */
def getActiveJobIds: Array[Int]
/** Get stage info */
def getStageInfo(stageId: Int): Option[SparkStageInfo]
/** Get job info */
def getJobInfo(jobId: Int): Option[SparkJobInfo]
/** Get job ids for job group */
def getJobIdsForGroup(jobGroup: String): Array[Int]
}
/**
* Executor information
*/
case class SparkExecutorInfo(
executorId: String,
executorHost: String,
totalCores: Int,
maxTasks: Int,
activeTasks: Int,
failedTasks: Int,
completedTasks: Int,
totalTasks: Int,
totalDuration: Long,
totalGCTime: Long,
totalInputBytes: Long,
totalShuffleRead: Long,
totalShuffleWrite: Long,
isActive: Boolean,
maxMemory: Long,
addTime: java.util.Date,
removeTime: Option[java.util.Date],
removeReason: Option[String]
)Utilities for accessing environment and system information.
/**
* Utilities for accessing files distributed with application
*/
object SparkFiles {
/** Get absolute path of added file */
def get(filename: String): String
/** Get root directory of added files */
def getRootDirectory(): String
}
/**
* Environment information and utilities
*/
class SparkEnv {
/** Get serializer */
def serializer: Serializer
/** Get closure serializer */
def closureSerializer: Serializer
/** Get cache manager */
def cacheManager: CacheManager
/** Get map output tracker */
def mapOutputTracker: MapOutputTracker
/** Get shuffle manager */
def shuffleManager: ShuffleManager
/** Get broadcast manager */
def broadcastManager: BroadcastManager
/** Get block manager */
def blockManager: BlockManager
/** Get security manager */
def securityManager: SecurityManager
/** Get metrics system */
def metricsSystem: MetricsSystem
/** Get executor ID */
def executorId: String
}Configuration for different deployment scenarios.
/**
* Deploy modes
*/
object DeployMode extends Enumeration {
type DeployMode = Value
val CLIENT, CLUSTER = Value
}
/**
* Master URLs for different cluster types
*/
object MasterURLs {
/** Local mode with n threads */
def local(n: Int = 1): String = s"local[$n]"
/** Local mode with all available cores */
def localAll: String = "local[*]"
/** Standalone cluster */
def standalone(host: String, port: Int = 7077): String = s"spark://$host:$port"
/** YARN cluster */
def yarn: String = "yarn"
/** Mesos cluster */
def mesos(host: String, port: Int = 5050): String = s"mesos://$host:$port"
/** Kubernetes cluster */
def kubernetes(apiServer: String): String = s"k8s://$apiServer"
}Usage Examples:
// Different deployment configurations
val localConf = new SparkConf()
.setMaster(MasterURLs.localAll)
.setAppName("LocalApp")
val standaloneConf = new SparkConf()
.setMaster(MasterURLs.standalone("spark-master.example.com"))
.setAppName("StandaloneApp")
.set("spark.executor.instances", "4")
.set("spark.executor.memory", "2g")
.set("spark.executor.cores", "2")
val yarnConf = new SparkConf()
.setMaster(MasterURLs.yarn)
.setAppName("YarnApp")
.set("spark.submit.deployMode", "cluster")
.set("spark.executor.instances", "10")
.set("spark.executor.memory", "4g")
.set("spark.executor.cores", "3")
.set("spark.yarn.queue", "production")
val k8sConf = new SparkConf()
.setMaster(MasterURLs.kubernetes("https://k8s.example.com:443"))
.setAppName("K8sApp")
.set("spark.kubernetes.container.image", "spark:3.5.6")
.set("spark.executor.instances", "5")
.set("spark.kubernetes.namespace", "spark-jobs")
.set("spark.kubernetes.executor.request.cores", "1")
.set("spark.kubernetes.executor.limit.cores", "2")
// Application monitoring
val sc = new SparkContext(conf)
val statusTracker = sc.statusTracker
// Monitor executors
val executors = statusTracker.getExecutorInfos
executors.foreach { executor =>
println(s"Executor ${executor.executorId}: ${executor.activeTasks} active tasks")
}
// Monitor jobs and stages
val activeJobs = statusTracker.getActiveJobIds
val activeStages = statusTracker.getActiveStageIds
activeJobs.foreach { jobId =>
statusTracker.getJobInfo(jobId).foreach { jobInfo =>
println(s"Job $jobId: ${jobInfo.status}")
}
}Key configuration properties for different aspects of Spark applications.
/**
* Common configuration properties
*/
object SparkConfigs {
// Application properties
val APP_NAME = "spark.app.name"
val MASTER = "spark.master"
val DEPLOY_MODE = "spark.submit.deployMode"
val DRIVER_MEMORY = "spark.driver.memory"
val DRIVER_CORES = "spark.driver.cores"
val EXECUTOR_MEMORY = "spark.executor.memory"
val EXECUTOR_CORES = "spark.executor.cores"
val EXECUTOR_INSTANCES = "spark.executor.instances"
// Dynamic allocation
val DYNAMIC_ALLOCATION_ENABLED = "spark.dynamicAllocation.enabled"
val DYNAMIC_ALLOCATION_MIN_EXECUTORS = "spark.dynamicAllocation.minExecutors"
val DYNAMIC_ALLOCATION_MAX_EXECUTORS = "spark.dynamicAllocation.maxExecutors"
val DYNAMIC_ALLOCATION_INITIAL_EXECUTORS = "spark.dynamicAllocation.initialExecutors"
// Serialization
val SERIALIZER = "spark.serializer"
val KRYO_REGISTRATOR = "spark.kryo.registrator"
val KRYO_UNSAFE = "spark.kryo.unsafe"
// SQL properties
val SQL_ADAPTIVE_ENABLED = "spark.sql.adaptive.enabled"
val SQL_ADAPTIVE_COALESCE_PARTITIONS = "spark.sql.adaptive.coalescePartitions.enabled"
val SQL_ADAPTIVE_SKEW_JOIN = "spark.sql.adaptive.skewJoin.enabled"
val SQL_WAREHOUSE_DIR = "spark.sql.warehouse.dir"
// Shuffle properties
val SHUFFLE_COMPRESS = "spark.shuffle.compress"
val SHUFFLE_SPILL_COMPRESS = "spark.shuffle.spill.compress"
val SHUFFLE_SERVICE_ENABLED = "spark.shuffle.service.enabled"
// Storage properties
val STORAGE_LEVEL = "spark.storage.level"
val STORAGE_MEMORY_FRACTION = "spark.storage.memoryFraction"
val STORAGE_SAFETY_FRACTION = "spark.storage.safetyFraction"
}Configuration Examples:
// Performance tuning configuration
val perfConf = new SparkConf()
.setAppName("HighPerformanceApp")
.set(SparkConfigs.SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
.set(SparkConfigs.SQL_ADAPTIVE_ENABLED, "true")
.set(SparkConfigs.SQL_ADAPTIVE_COALESCE_PARTITIONS, "true")
.set(SparkConfigs.SQL_ADAPTIVE_SKEW_JOIN, "true")
.set(SparkConfigs.DYNAMIC_ALLOCATION_ENABLED, "true")
.set(SparkConfigs.DYNAMIC_ALLOCATION_MIN_EXECUTORS, "2")
.set(SparkConfigs.DYNAMIC_ALLOCATION_MAX_EXECUTORS, "20")
.set(SparkConfigs.DYNAMIC_ALLOCATION_INITIAL_EXECUTORS, "5")
// Memory-intensive configuration
val memoryConf = new SparkConf()
.setAppName("MemoryIntensiveApp")
.set(SparkConfigs.EXECUTOR_MEMORY, "8g")
.set(SparkConfigs.DRIVER_MEMORY, "4g")
.set(SparkConfigs.STORAGE_MEMORY_FRACTION, "0.8")
.set("spark.executor.memoryOffHeap.enabled", "true")
.set("spark.executor.memoryOffHeap.size", "2g")
// Cluster-specific configurations
val yarnProdConf = new SparkConf()
.setAppName("ProductionYarnApp")
.setMaster("yarn")
.set("spark.submit.deployMode", "cluster")
.set("spark.yarn.queue", "production")
.set("spark.yarn.archive", "hdfs://path/to/spark-archive.zip")
.set("spark.eventLog.enabled", "true")
.set("spark.eventLog.dir", "hdfs://path/to/spark-events")
.set("spark.history.fs.logDirectory", "hdfs://path/to/spark-events")Common deployment and configuration exceptions:
IllegalArgumentException - Invalid configuration values or parametersSparkException - General Spark application launch/runtime errorsIOException - File system or network errors during application launchSecurityException - Security-related configuration or access errorsClassNotFoundException - Missing application or dependency classesInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-parent-2-12