Comprehensive configuration system with predefined constants for common Spark settings and fluent configuration methods.
Predefined constants for common Spark configuration keys, providing type safety and preventing configuration errors.
/**
* Master and deployment configuration
*/
public static final String SPARK_MASTER = "spark.master";
public static final String DEPLOY_MODE = "spark.submit.deployMode";
/**
* Driver configuration keys
*/
public static final String DRIVER_MEMORY = "spark.driver.memory";
public static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath";
public static final String DRIVER_EXTRA_JAVA_OPTIONS = "spark.driver.extraJavaOptions";
public static final String DRIVER_EXTRA_LIBRARY_PATH = "spark.driver.extraLibraryPath";
/**
* Executor configuration keys
*/
public static final String EXECUTOR_MEMORY = "spark.executor.memory";
public static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath";
public static final String EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions";
public static final String EXECUTOR_EXTRA_LIBRARY_PATH = "spark.executor.extraLibraryPath";
public static final String EXECUTOR_CORES = "spark.executor.cores";
/**
* Special configuration values and launcher settings
*/
public static final String NO_RESOURCE = "spark-internal";
public static final String CHILD_PROCESS_LOGGER_NAME = "spark.launcher.childProcLoggerName";
public static final String CHILD_CONNECTION_TIMEOUT = "spark.launcher.childConectionTimeout";Usage Examples:
import org.apache.spark.launcher.SparkLauncher;
// Driver configuration using constants
SparkLauncher launcher = new SparkLauncher()
.setConf(SparkLauncher.DRIVER_MEMORY, "4g")
.setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "-XX:+UseG1GC -XX:MaxGCPauseMillis=200")
.setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, "/libs/mysql-connector.jar:/libs/custom.jar");
// Executor configuration using constants
launcher.setConf(SparkLauncher.EXECUTOR_MEMORY, "2g")
.setConf(SparkLauncher.EXECUTOR_CORES, "4")
.setConf(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS, "-XX:+UseG1GC")
.setConf(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH, "/libs/shared-utils.jar");
// Master and deployment mode
launcher.setConf(SparkLauncher.SPARK_MASTER, "yarn")
.setConf(SparkLauncher.DEPLOY_MODE, "cluster");
// Special resource handling
launcher.setAppResource(SparkLauncher.NO_RESOURCE) // Skip resource processing
.setMainClass("com.company.AppWithJarless");
// Launcher-specific configuration
launcher.setConf(SparkLauncher.CHILD_PROCESS_LOGGER_NAME, "com.company.spark.launcher")
.setConf(SparkLauncher.CHILD_CONNECTION_TIMEOUT, "60000"); // 60 secondsFluent configuration API providing type-safe methods for setting application parameters and Spark configuration.
/**
* Configuration methods available on all launcher types
*/
/** Set custom properties file with Spark configuration */
public T setPropertiesFile(String path);
/** Set single configuration value (key must start with "spark.") */
public T setConf(String key, String value);
/** Set application name displayed in Spark UI */
public T setAppName(String appName);
/** Set Spark master URL (local, yarn, mesos, k8s, spark://) */
public T setMaster(String master);
/** Set deploy mode (client or cluster) */
public T setDeployMode(String mode);
/** Set main application resource (jar for Java/Scala, python script for PySpark) */
public T setAppResource(String resource);
/** Set main class name for Java/Scala applications */
public T setMainClass(String mainClass);
/** Add jar file to be submitted with application */
public T addJar(String jar);
/** Add file to be submitted with application */
public T addFile(String file);
/** Add Python file/zip/egg to be submitted with application */
public T addPyFile(String file);
/** Add command line arguments for the application */
public T addAppArgs(String... args);
/** Add no-value argument to Spark invocation */
public T addSparkArg(String arg);
/** Add argument with value to Spark invocation */
public T addSparkArg(String name, String value);
/** Enable verbose reporting for SparkSubmit */
public T setVerbose(boolean verbose);Usage Examples:
import org.apache.spark.launcher.SparkLauncher;
// Comprehensive application configuration
SparkLauncher launcher = new SparkLauncher()
// Basic application setup
.setAppName("Production ETL Pipeline")
.setMaster("yarn")
.setDeployMode("cluster")
.setAppResource("/apps/etl-pipeline-1.0.jar")
.setMainClass("com.company.etl.ETLMain")
// Spark configuration using constants
.setConf(SparkLauncher.DRIVER_MEMORY, "8g")
.setConf(SparkLauncher.EXECUTOR_MEMORY, "4g")
.setConf(SparkLauncher.EXECUTOR_CORES, "4")
// Additional Spark configuration
.setConf("spark.sql.adaptive.enabled", "true")
.setConf("spark.sql.adaptive.coalescePartitions.enabled", "true")
.setConf("spark.sql.adaptive.skewJoin.enabled", "true")
.setConf("spark.sql.shuffle.partitions", "400")
.setConf("spark.dynamicAllocation.enabled", "true")
.setConf("spark.dynamicAllocation.minExecutors", "2")
.setConf("spark.dynamicAllocation.maxExecutors", "20")
// Dependencies and resources
.addJar("/libs/mysql-connector-java-8.0.25.jar")
.addJar("/libs/spark-avro_2.11-2.4.8.jar")
.addFile("/config/application.conf")
.addFile("/config/log4j.properties")
// Application arguments
.addAppArgs("--input-path", "/data/raw/2023/12/01")
.addAppArgs("--output-path", "/data/processed/2023/12/01")
.addAppArgs("--config-file", "application.conf")
.addAppArgs("--partition-count", "100")
// Enable verbose output for debugging
.setVerbose(true);
// Properties file configuration
launcher.setPropertiesFile("/etc/spark/spark-defaults.conf");
// Advanced Spark arguments
launcher.addSparkArg("--archives", "python-env.zip#env")
.addSparkArg("--py-files", "utils.py,transforms.py")
.addSparkArg("--queue", "production")
.addSparkArg("--principal", "spark@COMPANY.COM")
.addSparkArg("--keytab", "/etc/security/spark.keytab");External configuration file support for managing complex Spark configurations.
Usage Examples:
// Using external properties file
SparkLauncher launcher = new SparkLauncher()
.setPropertiesFile("/config/spark-production.conf")
.setAppResource("/apps/myapp.jar")
.setMainClass("com.company.MyApp");
// Properties file can override individual settings
launcher.setConf("spark.app.name", "Override App Name") // Overrides file setting
.setConf("spark.driver.memory", "16g"); // Overrides file settingExample properties file (spark-production.conf):
spark.master=yarn
spark.submit.deployMode=cluster
spark.driver.memory=8g
spark.executor.memory=4g
spark.executor.cores=4
spark.executor.instances=10
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.sql.hive.metastore.uris=thrift://metastore:9083
spark.hadoop.fs.defaultFS=hdfs://namenode:8020Built-in validation for configuration parameters to prevent common errors.
/**
* Configuration validation rules:
* - setConf() keys must start with "spark."
* - Parameters cannot be null
* - Known Spark arguments are validated for correct value expectations
*/Usage Examples:
try {
// Valid configuration calls
launcher.setConf("spark.driver.memory", "4g"); // Valid: starts with "spark."
launcher.setConf("spark.custom.property", "value"); // Valid: custom spark property
launcher.setMaster("yarn"); // Valid: known master type
launcher.setDeployMode("cluster"); // Valid: known deploy mode
// Invalid configuration calls (will throw IllegalArgumentException)
launcher.setConf("invalid.key", "value"); // Invalid: doesn't start with "spark."
launcher.setConf("spark.driver.memory", null); // Invalid: null value
launcher.setMaster(null); // Invalid: null master
launcher.setAppName(null); // Invalid: null app name
} catch (IllegalArgumentException e) {
System.err.println("Configuration error: " + e.getMessage());
}
// Spark argument validation
try {
launcher.addSparkArg("--master", "yarn"); // Valid: known argument with value
launcher.addSparkArg("--version"); // Valid: known no-value argument
launcher.addSparkArg("--custom-arg", "value"); // Valid: unknown args allowed for compatibility
launcher.addSparkArg("--master"); // Invalid: --master expects a value
} catch (IllegalArgumentException e) {
System.err.println("Spark argument error: " + e.getMessage());
}public class DevelopmentConfig {
public static SparkLauncher createDevelopmentLauncher() {
return new SparkLauncher()
.setMaster("local[*]") // Use all local cores
.setDeployMode("client") // Client mode for development
.setConf(SparkLauncher.DRIVER_MEMORY, "2g")
.setConf("spark.sql.adaptive.enabled", "false") // Disable for predictable behavior
.setConf("spark.sql.shuffle.partitions", "4") // Small partition count
.setVerbose(true); // Enable verbose logging
}
}
// Usage
SparkLauncher devLauncher = DevelopmentConfig.createDevelopmentLauncher()
.setAppResource("/target/myapp-dev.jar")
.setMainClass("com.company.DevApp")
.setAppName("Development Test");public class ProductionConfig {
public static SparkLauncher createProductionLauncher() {
return new SparkLauncher()
.setMaster("yarn")
.setDeployMode("cluster")
.setPropertiesFile("/etc/spark/production.conf")
// Production-specific overrides
.setConf(SparkLauncher.DRIVER_MEMORY, "8g")
.setConf(SparkLauncher.EXECUTOR_MEMORY, "6g")
.setConf(SparkLauncher.EXECUTOR_CORES, "5")
// Performance tuning
.setConf("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.setConf("spark.sql.adaptive.enabled", "true")
.setConf("spark.sql.adaptive.coalescePartitions.enabled", "true")
.setConf("spark.sql.adaptive.skewJoin.enabled", "true")
// Resource management
.setConf("spark.dynamicAllocation.enabled", "true")
.setConf("spark.dynamicAllocation.minExecutors", "5")
.setConf("spark.dynamicAllocation.maxExecutors", "50")
.setConf("spark.dynamicAllocation.initialExecutors", "10")
// Logging and monitoring
.setConf("spark.eventLog.enabled", "true")
.setConf("spark.eventLog.dir", "hdfs://logs/spark-events")
.setConf("spark.history.fs.logDirectory", "hdfs://logs/spark-events");
}
}public class ConfigurationTemplates {
// Template for memory-intensive applications
public static void configureMemoryIntensive(SparkLauncher launcher) {
launcher.setConf(SparkLauncher.DRIVER_MEMORY, "16g")
.setConf(SparkLauncher.EXECUTOR_MEMORY, "8g")
.setConf("spark.executor.memoryFraction", "0.8")
.setConf("spark.storage.memoryFraction", "0.6")
.setConf("spark.sql.shuffle.partitions", "800");
}
// Template for CPU-intensive applications
public static void configureCpuIntensive(SparkLauncher launcher) {
launcher.setConf(SparkLauncher.EXECUTOR_CORES, "6")
.setConf(SparkLauncher.EXECUTOR_MEMORY, "4g")
.setConf("spark.task.cpus", "1")
.setConf("spark.sql.shuffle.partitions", "1200");
}
// Template for streaming applications
public static void configureStreaming(SparkLauncher launcher) {
launcher.setConf("spark.streaming.backpressure.enabled", "true")
.setConf("spark.streaming.dynamicAllocation.enabled", "true")
.setConf("spark.streaming.receiver.maxRate", "10000")
.setConf("spark.streaming.kafka.maxRatePerPartition", "1000");
}
// Template for machine learning workloads
public static void configureMachineLearning(SparkLauncher launcher) {
launcher.setConf("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.setConf("spark.kryo.unsafe", "true")
.setConf("spark.sql.execution.arrow.pyspark.enabled", "true")
.setConf("spark.sql.adaptive.enabled", "true")
.setConf("spark.sql.adaptive.coalescePartitions.enabled", "true");
}
}
// Usage
SparkLauncher launcher = new SparkLauncher()
.setAppResource("/apps/ml-pipeline.jar")
.setMainClass("com.company.ml.Pipeline")
.setMaster("yarn")
.setDeployMode("cluster");
ConfigurationTemplates.configureMachineLearning(launcher);
ConfigurationTemplates.configureMemoryIntensive(launcher);public class EnvironmentConfiguration {
public enum Environment {
DEVELOPMENT, TESTING, STAGING, PRODUCTION
}
public static SparkLauncher configureForEnvironment(SparkLauncher launcher, Environment env) {
switch (env) {
case DEVELOPMENT:
return launcher.setMaster("local[*]")
.setDeployMode("client")
.setConf(SparkLauncher.DRIVER_MEMORY, "2g")
.setVerbose(true);
case TESTING:
return launcher.setMaster("local[4]")
.setDeployMode("client")
.setConf(SparkLauncher.DRIVER_MEMORY, "4g")
.setConf("spark.sql.shuffle.partitions", "8");
case STAGING:
return launcher.setMaster("yarn")
.setDeployMode("cluster")
.setConf(SparkLauncher.DRIVER_MEMORY, "4g")
.setConf(SparkLauncher.EXECUTOR_MEMORY, "2g")
.setPropertiesFile("/etc/spark/staging.conf");
case PRODUCTION:
return launcher.setMaster("yarn")
.setDeployMode("cluster")
.setPropertiesFile("/etc/spark/production.conf")
.setConf(SparkLauncher.CHILD_PROCESS_LOGGER_NAME, "production.spark.launcher");
default:
throw new IllegalArgumentException("Unknown environment: " + env);
}
}
}
// Usage
SparkLauncher launcher = new SparkLauncher()
.setAppResource("/apps/etl.jar")
.setMainClass("com.company.ETL");
Environment currentEnv = Environment.valueOf(System.getProperty("env", "DEVELOPMENT"));
launcher = EnvironmentConfiguration.configureForEnvironment(launcher, currentEnv);public class DynamicConfiguration {
public static void configureBasedOnData(SparkLauncher launcher, DataCharacteristics data) {
// Adjust partitions based on data size
int partitions = calculateOptimalPartitions(data.getSizeGB());
launcher.setConf("spark.sql.shuffle.partitions", String.valueOf(partitions));
// Adjust memory based on data complexity
String executorMemory = data.isComplexProcessing() ? "8g" : "4g";
launcher.setConf(SparkLauncher.EXECUTOR_MEMORY, executorMemory);
// Enable adaptive features for large datasets
if (data.getSizeGB() > 100) {
launcher.setConf("spark.sql.adaptive.enabled", "true")
.setConf("spark.sql.adaptive.skewJoin.enabled", "true");
}
}
private static int calculateOptimalPartitions(double sizeGB) {
// Rule of thumb: 128MB per partition
return Math.max(4, (int) Math.ceil(sizeGB * 1024 / 128));
}
}public class ConfigurationBuilder {
private SparkLauncher launcher;
public ConfigurationBuilder(SparkLauncher launcher) {
this.launcher = launcher;
}
public ConfigurationBuilder withBaseConfiguration() {
launcher.setConf("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.setConf("spark.sql.adaptive.enabled", "true");
return this;
}
public ConfigurationBuilder withHighMemory() {
launcher.setConf(SparkLauncher.DRIVER_MEMORY, "16g")
.setConf(SparkLauncher.EXECUTOR_MEMORY, "8g");
return this;
}
public ConfigurationBuilder withHighCpu() {
launcher.setConf(SparkLauncher.EXECUTOR_CORES, "8");
return this;
}
public ConfigurationBuilder withCustom(String key, String value) {
launcher.setConf(key, value);
return this;
}
public SparkLauncher build() {
return launcher;
}
}
// Usage
SparkLauncher launcher = new ConfigurationBuilder(new SparkLauncher())
.withBaseConfiguration()
.withHighMemory()
.withCustom("spark.sql.shuffle.partitions", "1000")
.build();// Good: Balanced memory allocation
launcher.setConf(SparkLauncher.DRIVER_MEMORY, "4g")
.setConf(SparkLauncher.EXECUTOR_MEMORY, "6g")
.setConf("spark.executor.memoryFraction", "0.8");
// Avoid: Overallocating driver memory for non-collect operations
// launcher.setConf(SparkLauncher.DRIVER_MEMORY, "32g"); // Usually unnecessary
// Good: Consider storage vs execution memory balance
launcher.setConf("spark.storage.memoryFraction", "0.5")
.setConf("spark.executor.memoryFraction", "0.8");// Calculate executor count based on cluster resources
public static void configureExecutors(SparkLauncher launcher, ClusterResources cluster) {
int coresPerNode = cluster.getCoresPerNode();
int memoryPerNodeGB = cluster.getMemoryPerNodeGB();
int nodeCount = cluster.getNodeCount();
// Leave 1 core for OS, aim for 2-5 cores per executor
int coresPerExecutor = Math.min(5, Math.max(2, coresPerNode / 2));
int executorsPerNode = (coresPerNode - 1) / coresPerExecutor;
int totalExecutors = executorsPerNode * nodeCount;
// Leave memory for OS and other processes
int executorMemoryGB = (memoryPerNodeGB - 2) / executorsPerNode;
launcher.setConf("spark.executor.instances", String.valueOf(totalExecutors))
.setConf(SparkLauncher.EXECUTOR_CORES, String.valueOf(coresPerExecutor))
.setConf(SparkLauncher.EXECUTOR_MEMORY, executorMemoryGB + "g");
}// Good: Use constants to prevent typos
launcher.setConf(SparkLauncher.DRIVER_MEMORY, "4g");
// Avoid: String literals prone to typos
// launcher.setConf("spark.driver.memory", "4g");
// Good: Validate configuration values
public static void setMemoryWithValidation(SparkLauncher launcher, String memory) {
if (!memory.matches("\\d+[gG]")) {
throw new IllegalArgumentException("Memory must be in format '4g' or '4G'");
}
launcher.setConf(SparkLauncher.DRIVER_MEMORY, memory);
}
// Good: Use type-safe configuration methods when available
launcher.setMaster("yarn") // Type-safe method
.setDeployMode("cluster") // Type-safe method
.setVerbose(true); // Type-safe method