0
# Application Handles
1
2
Comprehensive interface for monitoring and controlling running Spark applications with state-based lifecycle management and event notifications.
3
4
## Capabilities
5
6
### SparkAppHandle Interface
7
8
Primary interface for interacting with launched Spark applications, providing state monitoring, control operations, and event handling.
9
10
```java { .api }
11
/**
12
* Handle to a running Spark application providing runtime information and control actions
13
*/
14
public interface SparkAppHandle {
15
/** Add listener for state and info change notifications */
16
void addListener(Listener l);
17
18
/** Get current application state */
19
State getState();
20
21
/** Get application ID (may return null if not yet known) */
22
String getAppId();
23
24
/** Request application to stop gracefully (best-effort) */
25
void stop();
26
27
/** Force kill the underlying application process */
28
void kill();
29
30
/** Disconnect from application without stopping it */
31
void disconnect();
32
}
33
```
34
35
**Usage Examples:**
36
37
```java
38
import org.apache.spark.launcher.SparkLauncher;
39
import org.apache.spark.launcher.SparkAppHandle;
40
41
// Launch application and get handle
42
SparkAppHandle handle = new SparkLauncher()
43
.setAppResource("/apps/long-running-job.jar")
44
.setMainClass("com.company.LongRunningJob")
45
.setMaster("yarn")
46
.setDeployMode("cluster")
47
.setAppName("Long Running Analytics")
48
.startApplication();
49
50
// Monitor application state
51
System.out.println("Initial state: " + handle.getState());
52
System.out.println("Application ID: " + handle.getAppId());
53
54
// Wait for application to start running
55
while (handle.getState() == SparkAppHandle.State.UNKNOWN ||
56
handle.getState() == SparkAppHandle.State.SUBMITTED) {
57
Thread.sleep(1000);
58
System.out.println("Current state: " + handle.getState());
59
}
60
61
if (handle.getState() == SparkAppHandle.State.RUNNING) {
62
System.out.println("Application is running with ID: " + handle.getAppId());
63
64
// Application control examples
65
// Graceful shutdown after some condition
66
if (shouldStopApplication()) {
67
System.out.println("Requesting application stop...");
68
handle.stop();
69
70
// Wait for graceful shutdown with timeout
71
long timeout = System.currentTimeMillis() + 30000; // 30 seconds
72
while (!handle.getState().isFinal() && System.currentTimeMillis() < timeout) {
73
Thread.sleep(1000);
74
}
75
76
// Force kill if graceful shutdown failed
77
if (!handle.getState().isFinal()) {
78
System.err.println("Graceful shutdown timed out, force killing...");
79
handle.kill();
80
}
81
}
82
} else if (handle.getState() == SparkAppHandle.State.FAILED) {
83
System.err.println("Application failed to start");
84
}
85
86
// Disconnect from application (application continues running)
87
// handle.disconnect();
88
```
89
90
### Application State Management
91
92
Comprehensive state enumeration with final state detection for application lifecycle tracking.
93
94
```java { .api }
95
/**
96
* Application state enumeration with final state indicators
97
*/
98
public enum State {
99
/** Application has not reported back yet */
100
UNKNOWN(false),
101
102
/** Application has connected to the handle */
103
CONNECTED(false),
104
105
/** Application has been submitted to cluster */
106
SUBMITTED(false),
107
108
/** Application is running */
109
RUNNING(false),
110
111
/** Application finished with successful status (final) */
112
FINISHED(true),
113
114
/** Application finished with failed status (final) */
115
FAILED(true),
116
117
/** Application was killed (final) */
118
KILLED(true),
119
120
/** Spark Submit JVM exited with unknown status (final) */
121
LOST(true);
122
123
/** Returns true if this is a final state (application not running anymore) */
124
public boolean isFinal();
125
}
126
```
127
128
**Usage Examples:**
129
130
```java
131
import org.apache.spark.launcher.SparkAppHandle.State;
132
133
// State monitoring and decision making
134
SparkAppHandle handle = launcher.startApplication();
135
136
// Check for specific states
137
if (handle.getState() == State.UNKNOWN) {
138
System.out.println("Application hasn't reported back yet, waiting...");
139
}
140
141
if (handle.getState() == State.RUNNING) {
142
System.out.println("Application is actively running");
143
performRuntimeOperations();
144
}
145
146
// Check for final states
147
if (handle.getState().isFinal()) {
148
System.out.println("Application has completed");
149
150
switch (handle.getState()) {
151
case FINISHED:
152
System.out.println("Application completed successfully");
153
processSuccessfulCompletion();
154
break;
155
case FAILED:
156
System.err.println("Application failed");
157
handleFailure();
158
break;
159
case KILLED:
160
System.out.println("Application was killed");
161
handleKilledApplication();
162
break;
163
case LOST:
164
System.err.println("Lost connection to application");
165
handleLostConnection();
166
break;
167
}
168
}
169
170
// State transition logic
171
State previousState = State.UNKNOWN;
172
while (!handle.getState().isFinal()) {
173
State currentState = handle.getState();
174
175
if (currentState != previousState) {
176
System.out.println("State transition: " + previousState + " -> " + currentState);
177
178
// Handle specific transitions
179
if (previousState == State.SUBMITTED && currentState == State.RUNNING) {
180
System.out.println("Application started successfully");
181
onApplicationStarted();
182
}
183
184
previousState = currentState;
185
}
186
187
Thread.sleep(2000);
188
}
189
190
// Final state handling
191
System.out.println("Final application state: " + handle.getState());
192
if (handle.getState() == State.FINISHED) {
193
generateSuccessReport();
194
} else {
195
generateErrorReport();
196
}
197
```
198
199
### Event Listener Interface
200
201
Callback interface for receiving real-time notifications about application state changes and information updates.
202
203
```java { .api }
204
/**
205
* Listener interface for application handle events
206
*/
207
public interface Listener {
208
/** Called when application state changes */
209
void stateChanged(SparkAppHandle handle);
210
211
/** Called when application information changes (not state) */
212
void infoChanged(SparkAppHandle handle);
213
}
214
```
215
216
**Usage Examples:**
217
218
```java
219
import org.apache.spark.launcher.SparkAppHandle;
220
221
// Custom listener implementation
222
public class ApplicationMonitor implements SparkAppHandle.Listener {
223
private long startTime;
224
private String applicationName;
225
226
public ApplicationMonitor(String applicationName) {
227
this.applicationName = applicationName;
228
this.startTime = System.currentTimeMillis();
229
}
230
231
@Override
232
public void stateChanged(SparkAppHandle handle) {
233
long elapsed = System.currentTimeMillis() - startTime;
234
System.out.printf("[%s] %s - State changed to: %s (elapsed: %d ms)%n",
235
new java.util.Date(), applicationName, handle.getState(), elapsed);
236
237
switch (handle.getState()) {
238
case CONNECTED:
239
System.out.println("Application connected to launcher");
240
break;
241
case SUBMITTED:
242
System.out.println("Application submitted to cluster");
243
break;
244
case RUNNING:
245
System.out.println("Application is now running with ID: " + handle.getAppId());
246
sendNotification("Application started successfully");
247
break;
248
case FINISHED:
249
System.out.println("Application completed successfully");
250
sendNotification("Application finished");
251
break;
252
case FAILED:
253
System.err.println("Application failed!");
254
sendAlert("Application failure detected");
255
break;
256
case KILLED:
257
System.out.println("Application was terminated");
258
sendNotification("Application killed");
259
break;
260
case LOST:
261
System.err.println("Lost connection to application");
262
sendAlert("Connection lost to application");
263
break;
264
}
265
}
266
267
@Override
268
public void infoChanged(SparkAppHandle handle) {
269
System.out.printf("[%s] %s - Info updated for application: %s%n",
270
new java.util.Date(), applicationName, handle.getAppId());
271
}
272
273
private void sendNotification(String message) {
274
// Implementation for notifications
275
System.out.println("NOTIFICATION: " + message);
276
}
277
278
private void sendAlert(String message) {
279
// Implementation for alerts
280
System.err.println("ALERT: " + message);
281
}
282
}
283
284
// Using the custom listener
285
SparkAppHandle handle = new SparkLauncher()
286
.setAppResource("/apps/critical-job.jar")
287
.setMainClass("com.company.CriticalJob")
288
.setMaster("yarn")
289
.setDeployMode("cluster")
290
.setAppName("Critical Production Job")
291
.startApplication(new ApplicationMonitor("Critical Production Job"));
292
293
// Multiple listeners
294
handle.addListener(new SparkAppHandle.Listener() {
295
@Override
296
public void stateChanged(SparkAppHandle handle) {
297
if (handle.getState().isFinal()) {
298
logFinalState(handle);
299
cleanupResources();
300
}
301
}
302
303
@Override
304
public void infoChanged(SparkAppHandle handle) {
305
updateDashboard(handle);
306
}
307
});
308
309
// Anonymous listener for simple cases
310
handle.addListener(new SparkAppHandle.Listener() {
311
@Override
312
public void stateChanged(SparkAppHandle handle) {
313
if (handle.getState() == SparkAppHandle.State.FAILED) {
314
restartApplication();
315
}
316
}
317
318
@Override
319
public void infoChanged(SparkAppHandle handle) {
320
// No-op for info changes
321
}
322
});
323
```
324
325
## Advanced Monitoring Patterns
326
327
### Application Lifecycle Manager
328
329
```java
330
public class SparkApplicationManager {
331
private final Map<String, SparkAppHandle> runningApps = new ConcurrentHashMap<>();
332
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
333
334
public void launchAndMonitor(String appName, SparkLauncher launcher) {
335
try {
336
SparkAppHandle handle = launcher.setAppName(appName)
337
.startApplication(new ApplicationLifecycleListener(appName));
338
339
runningApps.put(appName, handle);
340
341
// Schedule periodic health checks
342
scheduler.scheduleWithFixedDelay(() -> {
343
checkApplicationHealth(appName, handle);
344
}, 30, 30, TimeUnit.SECONDS);
345
346
} catch (IOException e) {
347
System.err.println("Failed to launch application " + appName + ": " + e.getMessage());
348
}
349
}
350
351
private void checkApplicationHealth(String appName, SparkAppHandle handle) {
352
if (handle.getState().isFinal()) {
353
runningApps.remove(appName);
354
System.out.println("Removed completed application: " + appName);
355
} else if (handle.getState() == SparkAppHandle.State.UNKNOWN) {
356
// Handle stuck applications
357
System.err.println("Application " + appName + " appears stuck in UNKNOWN state");
358
}
359
}
360
361
public void stopAllApplications() {
362
runningApps.values().forEach(handle -> {
363
if (!handle.getState().isFinal()) {
364
handle.stop();
365
}
366
});
367
}
368
369
public void emergencyKillAll() {
370
runningApps.values().forEach(SparkAppHandle::kill);
371
}
372
373
private class ApplicationLifecycleListener implements SparkAppHandle.Listener {
374
private final String appName;
375
376
public ApplicationLifecycleListener(String appName) {
377
this.appName = appName;
378
}
379
380
@Override
381
public void stateChanged(SparkAppHandle handle) {
382
if (handle.getState().isFinal()) {
383
handleApplicationCompletion(appName, handle);
384
}
385
}
386
387
@Override
388
public void infoChanged(SparkAppHandle handle) {
389
updateApplicationMetrics(appName, handle);
390
}
391
}
392
393
private void handleApplicationCompletion(String appName, SparkAppHandle handle) {
394
if (handle.getState() == SparkAppHandle.State.FAILED) {
395
// Implement retry logic or failure notifications
396
scheduleRetry(appName);
397
}
398
}
399
400
private void updateApplicationMetrics(String appName, SparkAppHandle handle) {
401
// Update monitoring dashboard or metrics system
402
}
403
404
private void scheduleRetry(String appName) {
405
// Implement application retry logic
406
}
407
}
408
```
409
410
### Batch Processing Coordinator
411
412
```java
413
public class BatchProcessingCoordinator {
414
private final List<SparkAppHandle> batchJobs = new ArrayList<>();
415
private final CountDownLatch completionLatch;
416
417
public BatchProcessingCoordinator(int jobCount) {
418
this.completionLatch = new CountDownLatch(jobCount);
419
}
420
421
public void submitBatchJob(SparkLauncher launcher, String jobName) {
422
try {
423
SparkAppHandle handle = launcher.setAppName(jobName)
424
.startApplication(new BatchJobListener(jobName));
425
426
batchJobs.add(handle);
427
428
} catch (IOException e) {
429
System.err.println("Failed to submit batch job " + jobName + ": " + e.getMessage());
430
completionLatch.countDown(); // Count failed jobs as completed
431
}
432
}
433
434
public boolean waitForAllJobs(long timeout, TimeUnit unit) throws InterruptedException {
435
return completionLatch.await(timeout, unit);
436
}
437
438
public BatchResults getResults() {
439
long successful = batchJobs.stream()
440
.mapToLong(handle -> handle.getState() == SparkAppHandle.State.FINISHED ? 1 : 0)
441
.sum();
442
443
long failed = batchJobs.stream()
444
.mapToLong(handle -> handle.getState() == SparkAppHandle.State.FAILED ? 1 : 0)
445
.sum();
446
447
return new BatchResults(successful, failed, batchJobs.size());
448
}
449
450
private class BatchJobListener implements SparkAppHandle.Listener {
451
private final String jobName;
452
453
public BatchJobListener(String jobName) {
454
this.jobName = jobName;
455
}
456
457
@Override
458
public void stateChanged(SparkAppHandle handle) {
459
if (handle.getState().isFinal()) {
460
System.out.println("Batch job " + jobName + " completed with state: " + handle.getState());
461
completionLatch.countDown();
462
}
463
}
464
465
@Override
466
public void infoChanged(SparkAppHandle handle) {
467
// Log info changes for batch tracking
468
}
469
}
470
471
public static class BatchResults {
472
public final long successful;
473
public final long failed;
474
public final long total;
475
476
public BatchResults(long successful, long failed, long total) {
477
this.successful = successful;
478
this.failed = failed;
479
this.total = total;
480
}
481
482
public boolean allSuccessful() {
483
return successful == total;
484
}
485
486
public double successRate() {
487
return total > 0 ? (double) successful / total : 0.0;
488
}
489
}
490
}
491
```
492
493
## Error Handling and Recovery
494
495
### State-Based Error Detection
496
497
```java
498
public class ApplicationErrorHandler {
499
500
public void handleApplicationWithRecovery(SparkLauncher launcher, String appName) {
501
int maxRetries = 3;
502
int retryCount = 0;
503
504
while (retryCount < maxRetries) {
505
try {
506
SparkAppHandle handle = launcher.setAppName(appName + "-attempt-" + (retryCount + 1))
507
.startApplication(new RetryListener(appName, retryCount));
508
509
// Wait for completion
510
while (!handle.getState().isFinal()) {
511
Thread.sleep(5000);
512
513
// Check for stuck states
514
if (isApplicationStuck(handle)) {
515
System.err.println("Application appears stuck, killing and retrying...");
516
handle.kill();
517
break;
518
}
519
}
520
521
if (handle.getState() == SparkAppHandle.State.FINISHED) {
522
System.out.println("Application completed successfully");
523
return; // Success, exit retry loop
524
} else {
525
System.err.println("Application failed with state: " + handle.getState());
526
}
527
528
} catch (IOException e) {
529
System.err.println("Failed to launch application: " + e.getMessage());
530
} catch (InterruptedException e) {
531
Thread.currentThread().interrupt();
532
return;
533
}
534
535
retryCount++;
536
if (retryCount < maxRetries) {
537
System.out.println("Retrying application launch (" + retryCount + "/" + maxRetries + ")");
538
try {
539
Thread.sleep(10000); // Wait before retry
540
} catch (InterruptedException e) {
541
Thread.currentThread().interrupt();
542
return;
543
}
544
}
545
}
546
547
System.err.println("Application failed after " + maxRetries + " attempts");
548
}
549
550
private boolean isApplicationStuck(SparkAppHandle handle) {
551
// Implement logic to detect stuck applications
552
// e.g., application in SUBMITTED state for too long
553
return false;
554
}
555
556
private class RetryListener implements SparkAppHandle.Listener {
557
private final String appName;
558
private final int attempt;
559
560
public RetryListener(String appName, int attempt) {
561
this.appName = appName;
562
this.attempt = attempt;
563
}
564
565
@Override
566
public void stateChanged(SparkAppHandle handle) {
567
System.out.println(String.format("[%s-attempt-%d] State: %s",
568
appName, attempt + 1, handle.getState()));
569
}
570
571
@Override
572
public void infoChanged(SparkAppHandle handle) {
573
System.out.println(String.format("[%s-attempt-%d] Info updated: %s",
574
appName, attempt + 1, handle.getAppId()));
575
}
576
}
577
}
578
```
579
580
## Performance Considerations
581
582
### Listener Thread Safety
583
- Listeners are called from background threads processing application updates
584
- Avoid blocking operations in listener callbacks
585
- Use thread-safe data structures for shared state
586
- Consider using executor services for heavy processing in listeners
587
588
### State Polling vs Event-Driven
589
- Use listeners for reactive programming patterns
590
- Avoid busy-waiting on `getState()` calls
591
- Combine listeners with periodic health checks for robust monitoring
592
- Handle listener exceptions to prevent callback chain failures
593
594
### Resource Management
595
- Always handle final states to clean up resources
596
- Use `disconnect()` when monitoring is no longer needed
597
- Implement timeouts for long-running operations
598
- Consider using `kill()` as last resort for cleanup