0
# Application Launcher
1
2
The Spark Launcher provides a programmatic way to launch Spark applications from Java or Scala code. This is useful for building tools and applications that need to submit Spark jobs programmatically.
3
4
## SparkLauncher
5
6
The main class for launching Spark applications programmatically.
7
8
```java { .api }
9
public class SparkLauncher {
10
// Constructors
11
public SparkLauncher()
12
public SparkLauncher(Map<String, String> env)
13
14
// Application configuration
15
public SparkLauncher setAppName(String appName)
16
public SparkLauncher setMaster(String master)
17
public SparkLauncher setAppResource(String resource)
18
public SparkLauncher setMainClass(String mainClass)
19
public SparkLauncher addAppArgs(String... args)
20
public SparkLauncher addJar(String jar)
21
public SparkLauncher addPyFile(String file)
22
public SparkLauncher addFile(String file)
23
24
// Spark configuration
25
public SparkLauncher setConf(String key, String value)
26
public SparkLauncher setPropertiesFile(String path)
27
public SparkLauncher setVerbose(boolean verbose)
28
29
// Environment configuration
30
public SparkLauncher setSparkHome(String sparkHome)
31
public SparkLauncher redirectError()
32
public SparkLauncher redirectError(ProcessBuilder.Redirect redirect)
33
public SparkLauncher redirectOutput(ProcessBuilder.Redirect redirect)
34
public SparkLauncher redirectToLog(String loggerName)
35
36
// Launching methods
37
public Process launch() throws IOException
38
public SparkAppHandle startApplication() throws IOException
39
public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException
40
41
// Constants for configuration keys
42
public static final String SPARK_HOME = "spark.home"
43
public static final String EXECUTOR_MEMORY = "spark.executor.memory"
44
public static final String EXECUTOR_CORES = "spark.executor.cores"
45
public static final String EXECUTOR_INSTANCES = "spark.executor.instances"
46
public static final String DRIVER_MEMORY = "spark.driver.memory"
47
public static final String DRIVER_CORES = "spark.driver.cores"
48
public static final String DRIVER_CLASS_PATH = "spark.driver.extraClassPath"
49
public static final String DRIVER_JAVA_OPTIONS = "spark.driver.extraJavaOptions"
50
public static final String DRIVER_LIBRARY_PATH = "spark.driver.extraLibraryPath"
51
public static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath"
52
public static final String EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions"
53
public static final String EXECUTOR_EXTRA_LIBRARY_PATH = "spark.executor.extraLibraryPath"
54
public static final String PYSPARK_DRIVER_PYTHON = "spark.pyspark.driver.python"
55
public static final String PYSPARK_PYTHON = "spark.pyspark.python"
56
57
// Child process management
58
public static class Builder {
59
public Builder setSparkHome(String sparkHome)
60
public Builder setPropertiesFile(String path)
61
public Builder setConf(String key, String value)
62
public Builder setAppName(String name)
63
public Builder setMaster(String master)
64
public Builder setMainClass(String mainClass)
65
public Builder setAppResource(String resource)
66
public Builder addAppArgs(String... args)
67
public Builder addJar(String jar)
68
public Builder addPyFile(String file)
69
public Builder addFile(String file)
70
public Builder setVerbose(boolean verbose)
71
public SparkLauncher build()
72
}
73
}
74
```
75
76
### Usage Examples
77
78
```java
79
import org.apache.spark.launcher.SparkLauncher;
80
import org.apache.spark.launcher.SparkAppHandle;
81
82
// Basic launcher setup
83
SparkLauncher launcher = new SparkLauncher()
84
.setAppName("MySparkApp")
85
.setMaster("yarn")
86
.setAppResource("/path/to/my-app.jar")
87
.setMainClass("com.example.MyMainClass")
88
.addAppArgs("arg1", "arg2", "arg3")
89
.setConf(SparkLauncher.DRIVER_MEMORY, "2g")
90
.setConf(SparkLauncher.EXECUTOR_MEMORY, "4g")
91
.setConf(SparkLauncher.EXECUTOR_CORES, "4")
92
.setConf(SparkLauncher.EXECUTOR_INSTANCES, "10")
93
.setVerbose(true);
94
95
// Launch and get handle
96
SparkAppHandle handle = launcher.startApplication();
97
98
// Launch as process
99
Process process = launcher.launch();
100
101
// With environment variables
102
Map<String, String> env = new HashMap<>();
103
env.put("JAVA_HOME", "/usr/lib/jvm/java-8-openjdk");
104
SparkLauncher envLauncher = new SparkLauncher(env);
105
```
106
107
```scala
108
// Scala usage
109
import org.apache.spark.launcher.{SparkLauncher, SparkAppHandle}
110
import scala.collection.JavaConverters._
111
112
val launcher = new SparkLauncher()
113
.setAppName("MySparkApp")
114
.setMaster("local[*]")
115
.setAppResource("/path/to/my-app.jar")
116
.setMainClass("com.example.MyMainClass")
117
.addAppArgs("arg1", "arg2")
118
.setConf("spark.sql.adaptive.enabled", "true")
119
.setConf("spark.sql.adaptive.coalescePartitions.enabled", "true")
120
121
val handle = launcher.startApplication()
122
```
123
124
## SparkAppHandle
125
126
Interface for monitoring and controlling launched Spark applications.
127
128
```java { .api }
129
public interface SparkAppHandle {
130
// Application information
131
String getAppId()
132
State getState()
133
134
// Control operations
135
void kill()
136
void disconnect()
137
138
// Monitoring
139
void addListener(Listener listener)
140
141
// State enumeration
142
enum State {
143
UNKNOWN, // The application state is not known.
144
CONNECTED, // The application has just been submitted.
145
SUBMITTED, // The application has been submitted to the cluster manager.
146
RUNNING, // The application is running.
147
FINISHED, // The application finished with a successful status.
148
FAILED, // The application finished with a failed status.
149
KILLED, // The application was killed.
150
LOST // The Spark Launcher is not able to contact the application.
151
}
152
153
// Listener interface for state changes
154
interface Listener {
155
void stateChanged(SparkAppHandle handle)
156
void infoChanged(SparkAppHandle handle)
157
}
158
}
159
```
160
161
### Usage Examples
162
163
```java
164
import org.apache.spark.launcher.SparkAppHandle;
165
166
// Launch application and monitor
167
SparkAppHandle handle = launcher.startApplication();
168
169
// Add listener for state changes
170
handle.addListener(new SparkAppHandle.Listener() {
171
@Override
172
public void stateChanged(SparkAppHandle handle) {
173
System.out.println("Application state changed to: " + handle.getState());
174
if (handle.getState().isFinal()) {
175
System.out.println("Application " + handle.getAppId() + " finished.");
176
}
177
}
178
179
@Override
180
public void infoChanged(SparkAppHandle handle) {
181
System.out.println("Application info changed for: " + handle.getAppId());
182
}
183
});
184
185
// Monitor application state
186
SparkAppHandle.State state = handle.getState();
187
while (!state.isFinal()) {
188
Thread.sleep(1000);
189
state = handle.getState();
190
System.out.println("Current state: " + state);
191
}
192
193
// Get final application ID
194
String appId = handle.getAppId();
195
System.out.println("Application ID: " + appId);
196
197
// Kill application if needed
198
if (someCondition) {
199
handle.kill();
200
}
201
202
// Disconnect when done monitoring
203
handle.disconnect();
204
```
205
206
```scala
207
// Scala usage with future
208
import scala.concurrent.{Future, Promise}
209
import scala.concurrent.ExecutionContext.Implicits.global
210
211
def launchAndWait(launcher: SparkLauncher): Future[SparkAppHandle.State] = {
212
val promise = Promise[SparkAppHandle.State]()
213
val handle = launcher.startApplication()
214
215
handle.addListener(new SparkAppHandle.Listener {
216
override def stateChanged(handle: SparkAppHandle): Unit = {
217
val state = handle.getState
218
println(s"State changed to: $state")
219
if (state.isFinal) {
220
promise.success(state)
221
}
222
}
223
224
override def infoChanged(handle: SparkAppHandle): Unit = {
225
println(s"Info changed for app: ${handle.getAppId}")
226
}
227
})
228
229
promise.future
230
}
231
232
// Usage
233
val finalState = launchAndWait(launcher)
234
finalState.foreach { state =>
235
println(s"Application finished with state: $state")
236
}
237
```
238
239
## InProcessLauncher
240
241
For launching applications in the same JVM process (useful for testing).
242
243
```java { .api }
244
public class InProcessLauncher extends SparkLauncher {
245
public InProcessLauncher()
246
public InProcessLauncher(Map<String, String> env)
247
248
// Override launch methods to run in-process
249
@Override
250
public SparkAppHandle startApplication() throws IOException
251
@Override
252
public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException
253
}
254
```
255
256
### Usage Examples
257
258
```java
259
// For testing or embedding Spark applications
260
import org.apache.spark.launcher.InProcessLauncher;
261
262
InProcessLauncher inProcessLauncher = new InProcessLauncher()
263
.setAppName("TestApp")
264
.setMaster("local[2]")
265
.setAppResource("local:/path/to/app.jar")
266
.setMainClass("com.example.TestApp");
267
268
SparkAppHandle handle = inProcessLauncher.startApplication();
269
```
270
271
## Launcher Configuration
272
273
Common configuration patterns and utilities.
274
275
### Environment Variables
276
277
```java
278
// Setting up environment for launcher
279
Map<String, String> env = new HashMap<>();
280
env.put("SPARK_HOME", "/opt/spark");
281
env.put("JAVA_HOME", "/usr/lib/jvm/java-8-openjdk");
282
env.put("HADOOP_HOME", "/opt/hadoop");
283
env.put("YARN_CONF_DIR", "/etc/hadoop/conf");
284
285
SparkLauncher launcher = new SparkLauncher(env);
286
```
287
288
### Resource Configuration
289
290
```java
291
// Memory and CPU configuration
292
launcher
293
.setConf(SparkLauncher.DRIVER_MEMORY, "4g")
294
.setConf(SparkLauncher.DRIVER_CORES, "2")
295
.setConf(SparkLauncher.EXECUTOR_MEMORY, "8g")
296
.setConf(SparkLauncher.EXECUTOR_CORES, "4")
297
.setConf(SparkLauncher.EXECUTOR_INSTANCES, "20");
298
299
// Dynamic allocation
300
launcher
301
.setConf("spark.dynamicAllocation.enabled", "true")
302
.setConf("spark.dynamicAllocation.minExecutors", "5")
303
.setConf("spark.dynamicAllocation.maxExecutors", "50")
304
.setConf("spark.dynamicAllocation.initialExecutors", "10");
305
```
306
307
### Cluster Configuration
308
309
```java
310
// YARN configuration
311
launcher
312
.setMaster("yarn")
313
.setConf("spark.submit.deployMode", "cluster")
314
.setConf("spark.yarn.queue", "production")
315
.setConf("spark.yarn.jars", "hdfs://namenode:port/spark/jars/*");
316
317
// Kubernetes configuration
318
launcher
319
.setMaster("k8s://https://kubernetes-api-server:443")
320
.setConf("spark.kubernetes.container.image", "spark:latest")
321
.setConf("spark.kubernetes.namespace", "spark-jobs")
322
.setConf("spark.executor.instances", "10");
323
324
// Standalone cluster
325
launcher
326
.setMaster("spark://master-host:7077")
327
.setConf("spark.cores.max", "100")
328
.setConf("spark.executor.memory", "4g");
329
```
330
331
### Dependency Management
332
333
```java
334
// Adding JARs and files
335
launcher
336
.addJar("hdfs://namenode:port/path/to/dependency1.jar")
337
.addJar("hdfs://namenode:port/path/to/dependency2.jar")
338
.addFile("hdfs://namenode:port/path/to/config.properties")
339
.addPyFile("hdfs://namenode:port/path/to/module.py");
340
341
// Python-specific configuration
342
launcher
343
.setConf(SparkLauncher.PYSPARK_DRIVER_PYTHON, "/usr/bin/python3")
344
.setConf(SparkLauncher.PYSPARK_PYTHON, "/usr/bin/python3")
345
.addPyFile("hdfs://path/to/dependencies.zip");
346
```
347
348
### Error Handling and Monitoring
349
350
```java
351
public class SparkJobManager {
352
public void launchWithRetry(SparkLauncher launcher, int maxRetries) {
353
int attempts = 0;
354
while (attempts < maxRetries) {
355
try {
356
SparkAppHandle handle = launcher.startApplication();
357
358
handle.addListener(new SparkAppHandle.Listener() {
359
@Override
360
public void stateChanged(SparkAppHandle handle) {
361
SparkAppHandle.State state = handle.getState();
362
System.out.println("State: " + state + " for app: " + handle.getAppId());
363
364
switch (state) {
365
case FAILED:
366
System.err.println("Application failed: " + handle.getAppId());
367
break;
368
case KILLED:
369
System.out.println("Application killed: " + handle.getAppId());
370
break;
371
case FINISHED:
372
System.out.println("Application completed successfully: " + handle.getAppId());
373
break;
374
}
375
}
376
377
@Override
378
public void infoChanged(SparkAppHandle handle) {
379
// Handle info changes if needed
380
}
381
});
382
383
// Wait for completion
384
SparkAppHandle.State finalState = waitForCompletion(handle);
385
if (finalState == SparkAppHandle.State.FINISHED) {
386
return; // Success
387
}
388
389
} catch (IOException e) {
390
System.err.println("Launch attempt " + (attempts + 1) + " failed: " + e.getMessage());
391
}
392
393
attempts++;
394
if (attempts < maxRetries) {
395
try {
396
Thread.sleep(5000); // Wait before retry
397
} catch (InterruptedException ie) {
398
Thread.currentThread().interrupt();
399
return;
400
}
401
}
402
}
403
404
throw new RuntimeException("Failed to launch application after " + maxRetries + " attempts");
405
}
406
407
private SparkAppHandle.State waitForCompletion(SparkAppHandle handle) {
408
SparkAppHandle.State state = handle.getState();
409
while (!state.isFinal()) {
410
try {
411
Thread.sleep(1000);
412
state = handle.getState();
413
} catch (InterruptedException e) {
414
Thread.currentThread().interrupt();
415
return state;
416
}
417
}
418
return state;
419
}
420
}
421
```
422
423
## Builder Pattern Usage
424
425
Alternative fluent API for creating launchers.
426
427
```java
428
// Using builder pattern
429
SparkLauncher launcher = new SparkLauncher.Builder()
430
.setSparkHome("/opt/spark")
431
.setAppName("MyApp")
432
.setMaster("yarn")
433
.setMainClass("com.example.MyApp")
434
.setAppResource("hdfs://namenode:port/path/to/app.jar")
435
.setConf("spark.executor.memory", "4g")
436
.setConf("spark.executor.cores", "2")
437
.addAppArgs("--input", "/data/input", "--output", "/data/output")
438
.setVerbose(true)
439
.build();
440
441
SparkAppHandle handle = launcher.startApplication();
442
```
443
444
This comprehensive documentation covers all the major aspects of programmatically launching and monitoring Spark applications using the Launcher API.