0
# Application Launchers
1
2
Comprehensive launcher implementations for starting Spark applications in different execution modes with extensive configuration options.
3
4
## Capabilities
5
6
### SparkLauncher
7
8
Primary launcher for Spark applications executed as child processes with full monitoring and output control capabilities.
9
10
```java { .api }
11
/**
12
* Launcher for Spark applications as child processes using builder pattern
13
*/
14
public class SparkLauncher extends AbstractLauncher<SparkLauncher> {
15
/** Default constructor */
16
public SparkLauncher();
17
18
/** Constructor with environment variables for child process */
19
public SparkLauncher(Map<String, String> env);
20
21
/** Set custom JAVA_HOME for launching the Spark application */
22
public SparkLauncher setJavaHome(String javaHome);
23
24
/** Set custom Spark installation location */
25
public SparkLauncher setSparkHome(String sparkHome);
26
27
/** Set working directory for spark-submit */
28
public SparkLauncher directory(File dir);
29
30
/** Redirect stderr to stdout */
31
public SparkLauncher redirectError();
32
33
/** Redirect error output to specified target */
34
public SparkLauncher redirectError(ProcessBuilder.Redirect to);
35
36
/** Redirect standard output to specified target */
37
public SparkLauncher redirectOutput(ProcessBuilder.Redirect to);
38
39
/** Redirect error output to file */
40
public SparkLauncher redirectError(File errFile);
41
42
/** Redirect standard output to file */
43
public SparkLauncher redirectOutput(File outFile);
44
45
/** Redirect all output to logger with specified name */
46
public SparkLauncher redirectToLog(String loggerName);
47
48
/** Launch as raw child process (manual management required) */
49
public Process launch() throws IOException;
50
51
/** Launch with monitoring and control capabilities */
52
public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException;
53
54
/** Set launcher library configuration (affects launcher behavior, not Spark app) */
55
public static void setConfig(String name, String value);
56
}
57
```
58
59
**Usage Examples:**
60
61
```java
62
import org.apache.spark.launcher.SparkLauncher;
63
import org.apache.spark.launcher.SparkAppHandle;
64
import java.io.File;
65
import java.util.HashMap;
66
import java.util.Map;
67
68
// Basic configuration with monitoring
69
SparkAppHandle handle = new SparkLauncher()
70
.setAppResource("/opt/myapp/target/myapp-1.0.jar")
71
.setMainClass("com.company.MySparkApplication")
72
.setMaster("yarn")
73
.setDeployMode("cluster")
74
.setConf(SparkLauncher.DRIVER_MEMORY, "4g")
75
.setConf(SparkLauncher.EXECUTOR_MEMORY, "2g")
76
.setConf(SparkLauncher.EXECUTOR_CORES, "2")
77
.setAppName("Production Data Pipeline")
78
.addAppArgs("--input-path", "/data/input", "--output-path", "/data/output")
79
.startApplication();
80
81
// Custom environment and paths
82
Map<String, String> env = new HashMap<>();
83
env.put("HADOOP_CONF_DIR", "/etc/hadoop/conf");
84
env.put("YARN_CONF_DIR", "/etc/hadoop/conf");
85
86
SparkLauncher launcher = new SparkLauncher(env)
87
.setJavaHome("/usr/lib/jvm/java-8-openjdk")
88
.setSparkHome("/opt/spark-2.4.8")
89
.directory(new File("/tmp/spark-work"))
90
.setAppResource("/apps/analytics.jar")
91
.setMainClass("com.analytics.ETLPipeline")
92
.setMaster("local[4]");
93
94
// Output redirection options
95
launcher.redirectOutput(new File("/logs/spark-output.log"))
96
.redirectError(new File("/logs/spark-error.log"));
97
98
// Alternative: redirect to logger
99
launcher.redirectToLog("com.company.spark.launcher");
100
101
SparkAppHandle handle = launcher.startApplication();
102
103
// Raw process launch (manual management)
104
Process sparkProcess = new SparkLauncher()
105
.setAppResource("/apps/batch-job.jar")
106
.setMainClass("com.company.BatchProcessor")
107
.setMaster("yarn")
108
.setDeployMode("cluster")
109
.addJar("/libs/external-lib.jar")
110
.addFile("/config/app.properties")
111
.setVerbose(true)
112
.launch();
113
114
// Wait for completion
115
int exitCode = sparkProcess.waitFor();
116
if (exitCode == 0) {
117
System.out.println("Spark application completed successfully");
118
} else {
119
System.err.println("Spark application failed with exit code: " + exitCode);
120
}
121
```
122
123
### InProcessLauncher
124
125
Launcher for running Spark applications within the same JVM process, recommended only for cluster mode deployments.
126
127
```java { .api }
128
/**
129
* In-process launcher for Spark applications within the same JVM
130
* Recommended only for cluster mode due to SparkContext limitations
131
*/
132
public class InProcessLauncher extends AbstractLauncher<InProcessLauncher> {
133
/** Start application in-process with monitoring */
134
public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException;
135
}
136
```
137
138
**Usage Examples:**
139
140
```java
141
import org.apache.spark.launcher.InProcessLauncher;
142
import org.apache.spark.launcher.SparkAppHandle;
143
144
// In-process launch for cluster mode (recommended usage)
145
SparkAppHandle handle = new InProcessLauncher()
146
.setAppResource("/opt/myapp/analytics.jar")
147
.setMainClass("com.company.ClusterAnalytics")
148
.setMaster("yarn")
149
.setDeployMode("cluster") // Cluster mode recommended
150
.setConf("spark.sql.adaptive.enabled", "true")
151
.setConf("spark.sql.adaptive.coalescePartitions.enabled", "true")
152
.setConf("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
153
.setAppName("In-Process Analytics Job")
154
.addAppArgs("--config", "/config/analytics.conf")
155
.startApplication();
156
157
// Add listener for state monitoring
158
handle.addListener(new SparkAppHandle.Listener() {
159
@Override
160
public void stateChanged(SparkAppHandle handle) {
161
System.out.println("Application state changed to: " + handle.getState());
162
163
if (handle.getState() == SparkAppHandle.State.RUNNING) {
164
System.out.println("Application is now running with ID: " + handle.getAppId());
165
} else if (handle.getState().isFinal()) {
166
System.out.println("Application completed with final state: " + handle.getState());
167
if (handle.getState() == SparkAppHandle.State.FAILED) {
168
System.err.println("Application failed!");
169
}
170
}
171
}
172
173
@Override
174
public void infoChanged(SparkAppHandle handle) {
175
System.out.println("Application info updated: " + handle.getAppId());
176
}
177
});
178
179
// Client mode warning (not recommended but possible)
180
SparkAppHandle clientHandle = new InProcessLauncher()
181
.setAppResource("/apps/client-app.jar")
182
.setMainClass("com.company.ClientApp")
183
.setMaster("local[2]") // Local mode for client
184
.setDeployMode("client")
185
.setAppName("Client Mode App")
186
.startApplication();
187
// Warning will be logged: "It's not recommended to run client-mode applications using InProcessLauncher"
188
```
189
190
### AbstractLauncher
191
192
Base class providing common configuration functionality shared by both launcher implementations.
193
194
```java { .api }
195
/**
196
* Base class for launcher implementations with fluent configuration API
197
*/
198
public abstract class AbstractLauncher<T extends AbstractLauncher<T>> {
199
/** Set custom properties file with Spark configuration */
200
public T setPropertiesFile(String path);
201
202
/** Set single configuration value (key must start with "spark.") */
203
public T setConf(String key, String value);
204
205
/** Set application name */
206
public T setAppName(String appName);
207
208
/** Set Spark master (local, yarn, mesos, k8s, spark://) */
209
public T setMaster(String master);
210
211
/** Set deploy mode (client or cluster) */
212
public T setDeployMode(String mode);
213
214
/** Set main application resource (jar for Java/Scala, python script for PySpark) */
215
public T setAppResource(String resource);
216
217
/** Set main class name for Java/Scala applications */
218
public T setMainClass(String mainClass);
219
220
/** Add no-value argument to Spark invocation */
221
public T addSparkArg(String arg);
222
223
/** Add argument with value to Spark invocation */
224
public T addSparkArg(String name, String value);
225
226
/** Add command line arguments for the application */
227
public T addAppArgs(String... args);
228
229
/** Add jar file to be submitted with application */
230
public T addJar(String jar);
231
232
/** Add file to be submitted with application */
233
public T addFile(String file);
234
235
/** Add Python file/zip/egg to be submitted with application */
236
public T addPyFile(String file);
237
238
/** Enable verbose reporting for SparkSubmit */
239
public T setVerbose(boolean verbose);
240
241
/** Start Spark application with monitoring (implemented by subclasses) */
242
public abstract SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException;
243
}
244
```
245
246
**Usage Examples:**
247
248
```java
249
// All launcher types support these common configuration methods
250
251
// Basic application configuration
252
launcher.setAppName("ETL Pipeline")
253
.setMaster("yarn")
254
.setDeployMode("cluster")
255
.setAppResource("/apps/etl-pipeline.jar")
256
.setMainClass("com.company.etl.ETLMain");
257
258
// Spark configuration
259
launcher.setConf("spark.sql.shuffle.partitions", "400")
260
.setConf("spark.sql.adaptive.enabled", "true")
261
.setConf("spark.sql.adaptive.skewJoin.enabled", "true")
262
.setConf("spark.dynamicAllocation.enabled", "true");
263
264
// Resources and dependencies
265
launcher.addJar("/libs/mysql-connector.jar")
266
.addJar("/libs/custom-utils.jar")
267
.addFile("/config/database.properties")
268
.addFile("/config/log4j.properties");
269
270
// Application arguments
271
launcher.addAppArgs("--input-format", "parquet")
272
.addAppArgs("--output-format", "delta")
273
.addAppArgs("--parallelism", "100");
274
275
// Advanced Spark arguments
276
launcher.addSparkArg("--archives", "env.zip#myenv")
277
.addSparkArg("--py-files", "utils.py,helpers.py");
278
279
// Properties file configuration
280
launcher.setPropertiesFile("/config/spark-defaults.conf");
281
282
// Enable verbose output for debugging
283
launcher.setVerbose(true);
284
```
285
286
## Launch Mode Comparison
287
288
| Feature | SparkLauncher | InProcessLauncher |
289
|---------|---------------|-------------------|
290
| **Execution** | Child process | Same JVM |
291
| **Monitoring** | Full handle control | Full handle control |
292
| **Resource Isolation** | Complete | Shared JVM resources |
293
| **Output Control** | Extensive redirection | Inherited from parent |
294
| **Recommended Mode** | Any (client/cluster) | Cluster only |
295
| **Setup Requirements** | SPARK_HOME needed | Spark jars in classpath |
296
| **Process Management** | Manual or automatic | Automatic |
297
| **Performance** | Process overhead | Faster startup |
298
| **Debugging** | Separate process logs | Shared logging |
299
300
## Configuration Constants
301
302
SparkLauncher provides predefined constants for common configuration keys:
303
304
```java { .api }
305
// Master and deployment
306
public static final String SPARK_MASTER = "spark.master";
307
public static final String DEPLOY_MODE = "spark.submit.deployMode";
308
309
// Driver configuration
310
public static final String DRIVER_MEMORY = "spark.driver.memory";
311
public static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath";
312
public static final String DRIVER_EXTRA_JAVA_OPTIONS = "spark.driver.extraJavaOptions";
313
public static final String DRIVER_EXTRA_LIBRARY_PATH = "spark.driver.extraLibraryPath";
314
315
// Executor configuration
316
public static final String EXECUTOR_MEMORY = "spark.executor.memory";
317
public static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath";
318
public static final String EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions";
319
public static final String EXECUTOR_EXTRA_LIBRARY_PATH = "spark.executor.extraLibraryPath";
320
public static final String EXECUTOR_CORES = "spark.executor.cores";
321
322
// Special values and settings
323
public static final String NO_RESOURCE = "spark-internal";
324
public static final String CHILD_PROCESS_LOGGER_NAME = "spark.launcher.childProcLoggerName";
325
public static final String CHILD_CONNECTION_TIMEOUT = "spark.launcher.childConectionTimeout";
326
```
327
328
## Error Handling
329
330
### Launch Failures
331
332
```java
333
try {
334
SparkAppHandle handle = new SparkLauncher()
335
.setAppResource("/apps/myapp.jar")
336
.setMainClass("com.company.App")
337
.setMaster("yarn")
338
.startApplication();
339
} catch (IOException e) {
340
System.err.println("Failed to launch Spark application: " + e.getMessage());
341
// Handle launch failure (missing files, invalid configuration, etc.)
342
}
343
```
344
345
### Configuration Validation
346
347
```java
348
try {
349
launcher.setConf("invalid.key", "value"); // Must start with "spark."
350
} catch (IllegalArgumentException e) {
351
System.err.println("Invalid configuration key: " + e.getMessage());
352
}
353
354
try {
355
launcher.setMainClass(null); // Null validation
356
} catch (IllegalArgumentException e) {
357
System.err.println("Null parameter not allowed: " + e.getMessage());
358
}
359
```
360
361
### Process Management
362
363
```java
364
SparkLauncher launcher = new SparkLauncher()
365
.setAppResource("/apps/unreliable-app.jar")
366
.setMainClass("com.company.UnreliableApp")
367
.setMaster("local[*]");
368
369
try {
370
Process process = launcher.launch();
371
372
// Set timeout for process completion
373
boolean finished = process.waitFor(300, TimeUnit.SECONDS);
374
if (!finished) {
375
System.err.println("Process timed out, killing...");
376
process.destroyForcibly();
377
}
378
379
int exitCode = process.exitValue();
380
if (exitCode != 0) {
381
System.err.println("Process failed with exit code: " + exitCode);
382
}
383
} catch (InterruptedException e) {
384
System.err.println("Process interrupted: " + e.getMessage());
385
Thread.currentThread().interrupt();
386
}
387
```