Comprehensive launcher implementations for starting Spark applications in different execution modes with extensive configuration options.
Primary launcher for Spark applications executed as child processes with full monitoring and output control capabilities.
/**
* Launcher for Spark applications as child processes using builder pattern
*/
public class SparkLauncher extends AbstractLauncher<SparkLauncher> {
/** Default constructor */
public SparkLauncher();
/** Constructor with environment variables for child process */
public SparkLauncher(Map<String, String> env);
/** Set custom JAVA_HOME for launching the Spark application */
public SparkLauncher setJavaHome(String javaHome);
/** Set custom Spark installation location */
public SparkLauncher setSparkHome(String sparkHome);
/** Set working directory for spark-submit */
public SparkLauncher directory(File dir);
/** Redirect stderr to stdout */
public SparkLauncher redirectError();
/** Redirect error output to specified target */
public SparkLauncher redirectError(ProcessBuilder.Redirect to);
/** Redirect standard output to specified target */
public SparkLauncher redirectOutput(ProcessBuilder.Redirect to);
/** Redirect error output to file */
public SparkLauncher redirectError(File errFile);
/** Redirect standard output to file */
public SparkLauncher redirectOutput(File outFile);
/** Redirect all output to logger with specified name */
public SparkLauncher redirectToLog(String loggerName);
/** Launch as raw child process (manual management required) */
public Process launch() throws IOException;
/** Launch with monitoring and control capabilities */
public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException;
/** Set launcher library configuration (affects launcher behavior, not Spark app) */
public static void setConfig(String name, String value);
}Usage Examples:
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.launcher.SparkAppHandle;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
// Basic configuration with monitoring
SparkAppHandle handle = new SparkLauncher()
.setAppResource("/opt/myapp/target/myapp-1.0.jar")
.setMainClass("com.company.MySparkApplication")
.setMaster("yarn")
.setDeployMode("cluster")
.setConf(SparkLauncher.DRIVER_MEMORY, "4g")
.setConf(SparkLauncher.EXECUTOR_MEMORY, "2g")
.setConf(SparkLauncher.EXECUTOR_CORES, "2")
.setAppName("Production Data Pipeline")
.addAppArgs("--input-path", "/data/input", "--output-path", "/data/output")
.startApplication();
// Custom environment and paths
Map<String, String> env = new HashMap<>();
env.put("HADOOP_CONF_DIR", "/etc/hadoop/conf");
env.put("YARN_CONF_DIR", "/etc/hadoop/conf");
SparkLauncher launcher = new SparkLauncher(env)
.setJavaHome("/usr/lib/jvm/java-8-openjdk")
.setSparkHome("/opt/spark-2.4.8")
.directory(new File("/tmp/spark-work"))
.setAppResource("/apps/analytics.jar")
.setMainClass("com.analytics.ETLPipeline")
.setMaster("local[4]");
// Output redirection options
launcher.redirectOutput(new File("/logs/spark-output.log"))
.redirectError(new File("/logs/spark-error.log"));
// Alternative: redirect to logger
launcher.redirectToLog("com.company.spark.launcher");
SparkAppHandle handle = launcher.startApplication();
// Raw process launch (manual management)
Process sparkProcess = new SparkLauncher()
.setAppResource("/apps/batch-job.jar")
.setMainClass("com.company.BatchProcessor")
.setMaster("yarn")
.setDeployMode("cluster")
.addJar("/libs/external-lib.jar")
.addFile("/config/app.properties")
.setVerbose(true)
.launch();
// Wait for completion
int exitCode = sparkProcess.waitFor();
if (exitCode == 0) {
System.out.println("Spark application completed successfully");
} else {
System.err.println("Spark application failed with exit code: " + exitCode);
}Launcher for running Spark applications within the same JVM process, recommended only for cluster mode deployments.
/**
* In-process launcher for Spark applications within the same JVM
* Recommended only for cluster mode due to SparkContext limitations
*/
public class InProcessLauncher extends AbstractLauncher<InProcessLauncher> {
/** Start application in-process with monitoring */
public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException;
}Usage Examples:
import org.apache.spark.launcher.InProcessLauncher;
import org.apache.spark.launcher.SparkAppHandle;
// In-process launch for cluster mode (recommended usage)
SparkAppHandle handle = new InProcessLauncher()
.setAppResource("/opt/myapp/analytics.jar")
.setMainClass("com.company.ClusterAnalytics")
.setMaster("yarn")
.setDeployMode("cluster") // Cluster mode recommended
.setConf("spark.sql.adaptive.enabled", "true")
.setConf("spark.sql.adaptive.coalescePartitions.enabled", "true")
.setConf("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.setAppName("In-Process Analytics Job")
.addAppArgs("--config", "/config/analytics.conf")
.startApplication();
// Add listener for state monitoring
handle.addListener(new SparkAppHandle.Listener() {
@Override
public void stateChanged(SparkAppHandle handle) {
System.out.println("Application state changed to: " + handle.getState());
if (handle.getState() == SparkAppHandle.State.RUNNING) {
System.out.println("Application is now running with ID: " + handle.getAppId());
} else if (handle.getState().isFinal()) {
System.out.println("Application completed with final state: " + handle.getState());
if (handle.getState() == SparkAppHandle.State.FAILED) {
System.err.println("Application failed!");
}
}
}
@Override
public void infoChanged(SparkAppHandle handle) {
System.out.println("Application info updated: " + handle.getAppId());
}
});
// Client mode warning (not recommended but possible)
SparkAppHandle clientHandle = new InProcessLauncher()
.setAppResource("/apps/client-app.jar")
.setMainClass("com.company.ClientApp")
.setMaster("local[2]") // Local mode for client
.setDeployMode("client")
.setAppName("Client Mode App")
.startApplication();
// Warning will be logged: "It's not recommended to run client-mode applications using InProcessLauncher"Base class providing common configuration functionality shared by both launcher implementations.
/**
* Base class for launcher implementations with fluent configuration API
*/
public abstract class AbstractLauncher<T extends AbstractLauncher<T>> {
/** Set custom properties file with Spark configuration */
public T setPropertiesFile(String path);
/** Set single configuration value (key must start with "spark.") */
public T setConf(String key, String value);
/** Set application name */
public T setAppName(String appName);
/** Set Spark master (local, yarn, mesos, k8s, spark://) */
public T setMaster(String master);
/** Set deploy mode (client or cluster) */
public T setDeployMode(String mode);
/** Set main application resource (jar for Java/Scala, python script for PySpark) */
public T setAppResource(String resource);
/** Set main class name for Java/Scala applications */
public T setMainClass(String mainClass);
/** Add no-value argument to Spark invocation */
public T addSparkArg(String arg);
/** Add argument with value to Spark invocation */
public T addSparkArg(String name, String value);
/** Add command line arguments for the application */
public T addAppArgs(String... args);
/** Add jar file to be submitted with application */
public T addJar(String jar);
/** Add file to be submitted with application */
public T addFile(String file);
/** Add Python file/zip/egg to be submitted with application */
public T addPyFile(String file);
/** Enable verbose reporting for SparkSubmit */
public T setVerbose(boolean verbose);
/** Start Spark application with monitoring (implemented by subclasses) */
public abstract SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException;
}Usage Examples:
// All launcher types support these common configuration methods
// Basic application configuration
launcher.setAppName("ETL Pipeline")
.setMaster("yarn")
.setDeployMode("cluster")
.setAppResource("/apps/etl-pipeline.jar")
.setMainClass("com.company.etl.ETLMain");
// Spark configuration
launcher.setConf("spark.sql.shuffle.partitions", "400")
.setConf("spark.sql.adaptive.enabled", "true")
.setConf("spark.sql.adaptive.skewJoin.enabled", "true")
.setConf("spark.dynamicAllocation.enabled", "true");
// Resources and dependencies
launcher.addJar("/libs/mysql-connector.jar")
.addJar("/libs/custom-utils.jar")
.addFile("/config/database.properties")
.addFile("/config/log4j.properties");
// Application arguments
launcher.addAppArgs("--input-format", "parquet")
.addAppArgs("--output-format", "delta")
.addAppArgs("--parallelism", "100");
// Advanced Spark arguments
launcher.addSparkArg("--archives", "env.zip#myenv")
.addSparkArg("--py-files", "utils.py,helpers.py");
// Properties file configuration
launcher.setPropertiesFile("/config/spark-defaults.conf");
// Enable verbose output for debugging
launcher.setVerbose(true);| Feature | SparkLauncher | InProcessLauncher |
|---|---|---|
| Execution | Child process | Same JVM |
| Monitoring | Full handle control | Full handle control |
| Resource Isolation | Complete | Shared JVM resources |
| Output Control | Extensive redirection | Inherited from parent |
| Recommended Mode | Any (client/cluster) | Cluster only |
| Setup Requirements | SPARK_HOME needed | Spark jars in classpath |
| Process Management | Manual or automatic | Automatic |
| Performance | Process overhead | Faster startup |
| Debugging | Separate process logs | Shared logging |
SparkLauncher provides predefined constants for common configuration keys:
// Master and deployment
public static final String SPARK_MASTER = "spark.master";
public static final String DEPLOY_MODE = "spark.submit.deployMode";
// Driver configuration
public static final String DRIVER_MEMORY = "spark.driver.memory";
public static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath";
public static final String DRIVER_EXTRA_JAVA_OPTIONS = "spark.driver.extraJavaOptions";
public static final String DRIVER_EXTRA_LIBRARY_PATH = "spark.driver.extraLibraryPath";
// Executor configuration
public static final String EXECUTOR_MEMORY = "spark.executor.memory";
public static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath";
public static final String EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions";
public static final String EXECUTOR_EXTRA_LIBRARY_PATH = "spark.executor.extraLibraryPath";
public static final String EXECUTOR_CORES = "spark.executor.cores";
// Special values and settings
public static final String NO_RESOURCE = "spark-internal";
public static final String CHILD_PROCESS_LOGGER_NAME = "spark.launcher.childProcLoggerName";
public static final String CHILD_CONNECTION_TIMEOUT = "spark.launcher.childConectionTimeout";try {
SparkAppHandle handle = new SparkLauncher()
.setAppResource("/apps/myapp.jar")
.setMainClass("com.company.App")
.setMaster("yarn")
.startApplication();
} catch (IOException e) {
System.err.println("Failed to launch Spark application: " + e.getMessage());
// Handle launch failure (missing files, invalid configuration, etc.)
}try {
launcher.setConf("invalid.key", "value"); // Must start with "spark."
} catch (IllegalArgumentException e) {
System.err.println("Invalid configuration key: " + e.getMessage());
}
try {
launcher.setMainClass(null); // Null validation
} catch (IllegalArgumentException e) {
System.err.println("Null parameter not allowed: " + e.getMessage());
}SparkLauncher launcher = new SparkLauncher()
.setAppResource("/apps/unreliable-app.jar")
.setMainClass("com.company.UnreliableApp")
.setMaster("local[*]");
try {
Process process = launcher.launch();
// Set timeout for process completion
boolean finished = process.waitFor(300, TimeUnit.SECONDS);
if (!finished) {
System.err.println("Process timed out, killing...");
process.destroyForcibly();
}
int exitCode = process.exitValue();
if (exitCode != 0) {
System.err.println("Process failed with exit code: " + exitCode);
}
} catch (InterruptedException e) {
System.err.println("Process interrupted: " + e.getMessage());
Thread.currentThread().interrupt();
}