Comprehensive interface for monitoring and controlling running Spark applications with state-based lifecycle management and event notifications.
Primary interface for interacting with launched Spark applications, providing state monitoring, control operations, and event handling.
/**
* Handle to a running Spark application providing runtime information and control actions
*/
public interface SparkAppHandle {
/** Add listener for state and info change notifications */
void addListener(Listener l);
/** Get current application state */
State getState();
/** Get application ID (may return null if not yet known) */
String getAppId();
/** Request application to stop gracefully (best-effort) */
void stop();
/** Force kill the underlying application process */
void kill();
/** Disconnect from application without stopping it */
void disconnect();
}Usage Examples:
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.launcher.SparkAppHandle;
// Launch application and get handle
SparkAppHandle handle = new SparkLauncher()
.setAppResource("/apps/long-running-job.jar")
.setMainClass("com.company.LongRunningJob")
.setMaster("yarn")
.setDeployMode("cluster")
.setAppName("Long Running Analytics")
.startApplication();
// Monitor application state
System.out.println("Initial state: " + handle.getState());
System.out.println("Application ID: " + handle.getAppId());
// Wait for application to start running
while (handle.getState() == SparkAppHandle.State.UNKNOWN ||
handle.getState() == SparkAppHandle.State.SUBMITTED) {
Thread.sleep(1000);
System.out.println("Current state: " + handle.getState());
}
if (handle.getState() == SparkAppHandle.State.RUNNING) {
System.out.println("Application is running with ID: " + handle.getAppId());
// Application control examples
// Graceful shutdown after some condition
if (shouldStopApplication()) {
System.out.println("Requesting application stop...");
handle.stop();
// Wait for graceful shutdown with timeout
long timeout = System.currentTimeMillis() + 30000; // 30 seconds
while (!handle.getState().isFinal() && System.currentTimeMillis() < timeout) {
Thread.sleep(1000);
}
// Force kill if graceful shutdown failed
if (!handle.getState().isFinal()) {
System.err.println("Graceful shutdown timed out, force killing...");
handle.kill();
}
}
} else if (handle.getState() == SparkAppHandle.State.FAILED) {
System.err.println("Application failed to start");
}
// Disconnect from application (application continues running)
// handle.disconnect();Comprehensive state enumeration with final state detection for application lifecycle tracking.
/**
* Application state enumeration with final state indicators
*/
public enum State {
/** Application has not reported back yet */
UNKNOWN(false),
/** Application has connected to the handle */
CONNECTED(false),
/** Application has been submitted to cluster */
SUBMITTED(false),
/** Application is running */
RUNNING(false),
/** Application finished with successful status (final) */
FINISHED(true),
/** Application finished with failed status (final) */
FAILED(true),
/** Application was killed (final) */
KILLED(true),
/** Spark Submit JVM exited with unknown status (final) */
LOST(true);
/** Returns true if this is a final state (application not running anymore) */
public boolean isFinal();
}Usage Examples:
import org.apache.spark.launcher.SparkAppHandle.State;
// State monitoring and decision making
SparkAppHandle handle = launcher.startApplication();
// Check for specific states
if (handle.getState() == State.UNKNOWN) {
System.out.println("Application hasn't reported back yet, waiting...");
}
if (handle.getState() == State.RUNNING) {
System.out.println("Application is actively running");
performRuntimeOperations();
}
// Check for final states
if (handle.getState().isFinal()) {
System.out.println("Application has completed");
switch (handle.getState()) {
case FINISHED:
System.out.println("Application completed successfully");
processSuccessfulCompletion();
break;
case FAILED:
System.err.println("Application failed");
handleFailure();
break;
case KILLED:
System.out.println("Application was killed");
handleKilledApplication();
break;
case LOST:
System.err.println("Lost connection to application");
handleLostConnection();
break;
}
}
// State transition logic
State previousState = State.UNKNOWN;
while (!handle.getState().isFinal()) {
State currentState = handle.getState();
if (currentState != previousState) {
System.out.println("State transition: " + previousState + " -> " + currentState);
// Handle specific transitions
if (previousState == State.SUBMITTED && currentState == State.RUNNING) {
System.out.println("Application started successfully");
onApplicationStarted();
}
previousState = currentState;
}
Thread.sleep(2000);
}
// Final state handling
System.out.println("Final application state: " + handle.getState());
if (handle.getState() == State.FINISHED) {
generateSuccessReport();
} else {
generateErrorReport();
}Callback interface for receiving real-time notifications about application state changes and information updates.
/**
* Listener interface for application handle events
*/
public interface Listener {
/** Called when application state changes */
void stateChanged(SparkAppHandle handle);
/** Called when application information changes (not state) */
void infoChanged(SparkAppHandle handle);
}Usage Examples:
import org.apache.spark.launcher.SparkAppHandle;
// Custom listener implementation
public class ApplicationMonitor implements SparkAppHandle.Listener {
private long startTime;
private String applicationName;
public ApplicationMonitor(String applicationName) {
this.applicationName = applicationName;
this.startTime = System.currentTimeMillis();
}
@Override
public void stateChanged(SparkAppHandle handle) {
long elapsed = System.currentTimeMillis() - startTime;
System.out.printf("[%s] %s - State changed to: %s (elapsed: %d ms)%n",
new java.util.Date(), applicationName, handle.getState(), elapsed);
switch (handle.getState()) {
case CONNECTED:
System.out.println("Application connected to launcher");
break;
case SUBMITTED:
System.out.println("Application submitted to cluster");
break;
case RUNNING:
System.out.println("Application is now running with ID: " + handle.getAppId());
sendNotification("Application started successfully");
break;
case FINISHED:
System.out.println("Application completed successfully");
sendNotification("Application finished");
break;
case FAILED:
System.err.println("Application failed!");
sendAlert("Application failure detected");
break;
case KILLED:
System.out.println("Application was terminated");
sendNotification("Application killed");
break;
case LOST:
System.err.println("Lost connection to application");
sendAlert("Connection lost to application");
break;
}
}
@Override
public void infoChanged(SparkAppHandle handle) {
System.out.printf("[%s] %s - Info updated for application: %s%n",
new java.util.Date(), applicationName, handle.getAppId());
}
private void sendNotification(String message) {
// Implementation for notifications
System.out.println("NOTIFICATION: " + message);
}
private void sendAlert(String message) {
// Implementation for alerts
System.err.println("ALERT: " + message);
}
}
// Using the custom listener
SparkAppHandle handle = new SparkLauncher()
.setAppResource("/apps/critical-job.jar")
.setMainClass("com.company.CriticalJob")
.setMaster("yarn")
.setDeployMode("cluster")
.setAppName("Critical Production Job")
.startApplication(new ApplicationMonitor("Critical Production Job"));
// Multiple listeners
handle.addListener(new SparkAppHandle.Listener() {
@Override
public void stateChanged(SparkAppHandle handle) {
if (handle.getState().isFinal()) {
logFinalState(handle);
cleanupResources();
}
}
@Override
public void infoChanged(SparkAppHandle handle) {
updateDashboard(handle);
}
});
// Anonymous listener for simple cases
handle.addListener(new SparkAppHandle.Listener() {
@Override
public void stateChanged(SparkAppHandle handle) {
if (handle.getState() == SparkAppHandle.State.FAILED) {
restartApplication();
}
}
@Override
public void infoChanged(SparkAppHandle handle) {
// No-op for info changes
}
});public class SparkApplicationManager {
private final Map<String, SparkAppHandle> runningApps = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
public void launchAndMonitor(String appName, SparkLauncher launcher) {
try {
SparkAppHandle handle = launcher.setAppName(appName)
.startApplication(new ApplicationLifecycleListener(appName));
runningApps.put(appName, handle);
// Schedule periodic health checks
scheduler.scheduleWithFixedDelay(() -> {
checkApplicationHealth(appName, handle);
}, 30, 30, TimeUnit.SECONDS);
} catch (IOException e) {
System.err.println("Failed to launch application " + appName + ": " + e.getMessage());
}
}
private void checkApplicationHealth(String appName, SparkAppHandle handle) {
if (handle.getState().isFinal()) {
runningApps.remove(appName);
System.out.println("Removed completed application: " + appName);
} else if (handle.getState() == SparkAppHandle.State.UNKNOWN) {
// Handle stuck applications
System.err.println("Application " + appName + " appears stuck in UNKNOWN state");
}
}
public void stopAllApplications() {
runningApps.values().forEach(handle -> {
if (!handle.getState().isFinal()) {
handle.stop();
}
});
}
public void emergencyKillAll() {
runningApps.values().forEach(SparkAppHandle::kill);
}
private class ApplicationLifecycleListener implements SparkAppHandle.Listener {
private final String appName;
public ApplicationLifecycleListener(String appName) {
this.appName = appName;
}
@Override
public void stateChanged(SparkAppHandle handle) {
if (handle.getState().isFinal()) {
handleApplicationCompletion(appName, handle);
}
}
@Override
public void infoChanged(SparkAppHandle handle) {
updateApplicationMetrics(appName, handle);
}
}
private void handleApplicationCompletion(String appName, SparkAppHandle handle) {
if (handle.getState() == SparkAppHandle.State.FAILED) {
// Implement retry logic or failure notifications
scheduleRetry(appName);
}
}
private void updateApplicationMetrics(String appName, SparkAppHandle handle) {
// Update monitoring dashboard or metrics system
}
private void scheduleRetry(String appName) {
// Implement application retry logic
}
}public class BatchProcessingCoordinator {
private final List<SparkAppHandle> batchJobs = new ArrayList<>();
private final CountDownLatch completionLatch;
public BatchProcessingCoordinator(int jobCount) {
this.completionLatch = new CountDownLatch(jobCount);
}
public void submitBatchJob(SparkLauncher launcher, String jobName) {
try {
SparkAppHandle handle = launcher.setAppName(jobName)
.startApplication(new BatchJobListener(jobName));
batchJobs.add(handle);
} catch (IOException e) {
System.err.println("Failed to submit batch job " + jobName + ": " + e.getMessage());
completionLatch.countDown(); // Count failed jobs as completed
}
}
public boolean waitForAllJobs(long timeout, TimeUnit unit) throws InterruptedException {
return completionLatch.await(timeout, unit);
}
public BatchResults getResults() {
long successful = batchJobs.stream()
.mapToLong(handle -> handle.getState() == SparkAppHandle.State.FINISHED ? 1 : 0)
.sum();
long failed = batchJobs.stream()
.mapToLong(handle -> handle.getState() == SparkAppHandle.State.FAILED ? 1 : 0)
.sum();
return new BatchResults(successful, failed, batchJobs.size());
}
private class BatchJobListener implements SparkAppHandle.Listener {
private final String jobName;
public BatchJobListener(String jobName) {
this.jobName = jobName;
}
@Override
public void stateChanged(SparkAppHandle handle) {
if (handle.getState().isFinal()) {
System.out.println("Batch job " + jobName + " completed with state: " + handle.getState());
completionLatch.countDown();
}
}
@Override
public void infoChanged(SparkAppHandle handle) {
// Log info changes for batch tracking
}
}
public static class BatchResults {
public final long successful;
public final long failed;
public final long total;
public BatchResults(long successful, long failed, long total) {
this.successful = successful;
this.failed = failed;
this.total = total;
}
public boolean allSuccessful() {
return successful == total;
}
public double successRate() {
return total > 0 ? (double) successful / total : 0.0;
}
}
}public class ApplicationErrorHandler {
public void handleApplicationWithRecovery(SparkLauncher launcher, String appName) {
int maxRetries = 3;
int retryCount = 0;
while (retryCount < maxRetries) {
try {
SparkAppHandle handle = launcher.setAppName(appName + "-attempt-" + (retryCount + 1))
.startApplication(new RetryListener(appName, retryCount));
// Wait for completion
while (!handle.getState().isFinal()) {
Thread.sleep(5000);
// Check for stuck states
if (isApplicationStuck(handle)) {
System.err.println("Application appears stuck, killing and retrying...");
handle.kill();
break;
}
}
if (handle.getState() == SparkAppHandle.State.FINISHED) {
System.out.println("Application completed successfully");
return; // Success, exit retry loop
} else {
System.err.println("Application failed with state: " + handle.getState());
}
} catch (IOException e) {
System.err.println("Failed to launch application: " + e.getMessage());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
retryCount++;
if (retryCount < maxRetries) {
System.out.println("Retrying application launch (" + retryCount + "/" + maxRetries + ")");
try {
Thread.sleep(10000); // Wait before retry
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
System.err.println("Application failed after " + maxRetries + " attempts");
}
private boolean isApplicationStuck(SparkAppHandle handle) {
// Implement logic to detect stuck applications
// e.g., application in SUBMITTED state for too long
return false;
}
private class RetryListener implements SparkAppHandle.Listener {
private final String appName;
private final int attempt;
public RetryListener(String appName, int attempt) {
this.appName = appName;
this.attempt = attempt;
}
@Override
public void stateChanged(SparkAppHandle handle) {
System.out.println(String.format("[%s-attempt-%d] State: %s",
appName, attempt + 1, handle.getState()));
}
@Override
public void infoChanged(SparkAppHandle handle) {
System.out.println(String.format("[%s-attempt-%d] Info updated: %s",
appName, attempt + 1, handle.getAppId()));
}
}
}getState() callsdisconnect() when monitoring is no longer neededkill() as last resort for cleanup