or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

application-handles.mdconfiguration.mdindex.mdlaunchers.md
tile.json

application-handles.mddocs/

Application Handles

Comprehensive interface for monitoring and controlling running Spark applications with state-based lifecycle management and event notifications.

Capabilities

SparkAppHandle Interface

Primary interface for interacting with launched Spark applications, providing state monitoring, control operations, and event handling.

/**
 * Handle to a running Spark application providing runtime information and control actions
 */
public interface SparkAppHandle {
    /** Add listener for state and info change notifications */
    void addListener(Listener l);
    
    /** Get current application state */
    State getState();
    
    /** Get application ID (may return null if not yet known) */
    String getAppId();
    
    /** Request application to stop gracefully (best-effort) */
    void stop();
    
    /** Force kill the underlying application process */
    void kill();
    
    /** Disconnect from application without stopping it */
    void disconnect();
}

Usage Examples:

import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.launcher.SparkAppHandle;

// Launch application and get handle
SparkAppHandle handle = new SparkLauncher()
    .setAppResource("/apps/long-running-job.jar")
    .setMainClass("com.company.LongRunningJob")
    .setMaster("yarn")
    .setDeployMode("cluster")
    .setAppName("Long Running Analytics")
    .startApplication();

// Monitor application state
System.out.println("Initial state: " + handle.getState());
System.out.println("Application ID: " + handle.getAppId());

// Wait for application to start running
while (handle.getState() == SparkAppHandle.State.UNKNOWN || 
       handle.getState() == SparkAppHandle.State.SUBMITTED) {
    Thread.sleep(1000);
    System.out.println("Current state: " + handle.getState());
}

if (handle.getState() == SparkAppHandle.State.RUNNING) {
    System.out.println("Application is running with ID: " + handle.getAppId());
    
    // Application control examples
    // Graceful shutdown after some condition
    if (shouldStopApplication()) {
        System.out.println("Requesting application stop...");
        handle.stop();
        
        // Wait for graceful shutdown with timeout
        long timeout = System.currentTimeMillis() + 30000; // 30 seconds
        while (!handle.getState().isFinal() && System.currentTimeMillis() < timeout) {
            Thread.sleep(1000);
        }
        
        // Force kill if graceful shutdown failed
        if (!handle.getState().isFinal()) {
            System.err.println("Graceful shutdown timed out, force killing...");
            handle.kill();
        }
    }
} else if (handle.getState() == SparkAppHandle.State.FAILED) {
    System.err.println("Application failed to start");
}

// Disconnect from application (application continues running)
// handle.disconnect();

Application State Management

Comprehensive state enumeration with final state detection for application lifecycle tracking.

/**
 * Application state enumeration with final state indicators
 */
public enum State {
    /** Application has not reported back yet */
    UNKNOWN(false),
    
    /** Application has connected to the handle */
    CONNECTED(false),
    
    /** Application has been submitted to cluster */
    SUBMITTED(false),
    
    /** Application is running */
    RUNNING(false),
    
    /** Application finished with successful status (final) */
    FINISHED(true),
    
    /** Application finished with failed status (final) */
    FAILED(true),
    
    /** Application was killed (final) */
    KILLED(true),
    
    /** Spark Submit JVM exited with unknown status (final) */
    LOST(true);
    
    /** Returns true if this is a final state (application not running anymore) */
    public boolean isFinal();
}

Usage Examples:

import org.apache.spark.launcher.SparkAppHandle.State;

// State monitoring and decision making
SparkAppHandle handle = launcher.startApplication();

// Check for specific states
if (handle.getState() == State.UNKNOWN) {
    System.out.println("Application hasn't reported back yet, waiting...");
}

if (handle.getState() == State.RUNNING) {
    System.out.println("Application is actively running");
    performRuntimeOperations();
}

// Check for final states
if (handle.getState().isFinal()) {
    System.out.println("Application has completed");
    
    switch (handle.getState()) {
        case FINISHED:
            System.out.println("Application completed successfully");
            processSuccessfulCompletion();
            break;
        case FAILED:
            System.err.println("Application failed");
            handleFailure();
            break;
        case KILLED:
            System.out.println("Application was killed");
            handleKilledApplication();
            break;
        case LOST:
            System.err.println("Lost connection to application");
            handleLostConnection();
            break;
    }
}

// State transition logic
State previousState = State.UNKNOWN;
while (!handle.getState().isFinal()) {
    State currentState = handle.getState();
    
    if (currentState != previousState) {
        System.out.println("State transition: " + previousState + " -> " + currentState);
        
        // Handle specific transitions
        if (previousState == State.SUBMITTED && currentState == State.RUNNING) {
            System.out.println("Application started successfully");
            onApplicationStarted();
        }
        
        previousState = currentState;
    }
    
    Thread.sleep(2000);
}

// Final state handling
System.out.println("Final application state: " + handle.getState());
if (handle.getState() == State.FINISHED) {
    generateSuccessReport();
} else {
    generateErrorReport();
}

Event Listener Interface

Callback interface for receiving real-time notifications about application state changes and information updates.

/**
 * Listener interface for application handle events
 */
public interface Listener {
    /** Called when application state changes */
    void stateChanged(SparkAppHandle handle);
    
    /** Called when application information changes (not state) */
    void infoChanged(SparkAppHandle handle);
}

Usage Examples:

import org.apache.spark.launcher.SparkAppHandle;

// Custom listener implementation
public class ApplicationMonitor implements SparkAppHandle.Listener {
    private long startTime;
    private String applicationName;
    
    public ApplicationMonitor(String applicationName) {
        this.applicationName = applicationName;
        this.startTime = System.currentTimeMillis();
    }
    
    @Override
    public void stateChanged(SparkAppHandle handle) {
        long elapsed = System.currentTimeMillis() - startTime;
        System.out.printf("[%s] %s - State changed to: %s (elapsed: %d ms)%n", 
            new java.util.Date(), applicationName, handle.getState(), elapsed);
        
        switch (handle.getState()) {
            case CONNECTED:
                System.out.println("Application connected to launcher");
                break;
            case SUBMITTED:
                System.out.println("Application submitted to cluster");
                break;
            case RUNNING:
                System.out.println("Application is now running with ID: " + handle.getAppId());
                sendNotification("Application started successfully");
                break;
            case FINISHED:
                System.out.println("Application completed successfully");
                sendNotification("Application finished");
                break;
            case FAILED:
                System.err.println("Application failed!");
                sendAlert("Application failure detected");
                break;
            case KILLED:
                System.out.println("Application was terminated");
                sendNotification("Application killed");
                break;
            case LOST:
                System.err.println("Lost connection to application");
                sendAlert("Connection lost to application");
                break;
        }
    }
    
    @Override
    public void infoChanged(SparkAppHandle handle) {
        System.out.printf("[%s] %s - Info updated for application: %s%n", 
            new java.util.Date(), applicationName, handle.getAppId());
    }
    
    private void sendNotification(String message) {
        // Implementation for notifications
        System.out.println("NOTIFICATION: " + message);
    }
    
    private void sendAlert(String message) {
        // Implementation for alerts
        System.err.println("ALERT: " + message);
    }
}

// Using the custom listener
SparkAppHandle handle = new SparkLauncher()
    .setAppResource("/apps/critical-job.jar")
    .setMainClass("com.company.CriticalJob")
    .setMaster("yarn")
    .setDeployMode("cluster")
    .setAppName("Critical Production Job")
    .startApplication(new ApplicationMonitor("Critical Production Job"));

// Multiple listeners
handle.addListener(new SparkAppHandle.Listener() {
    @Override
    public void stateChanged(SparkAppHandle handle) {
        if (handle.getState().isFinal()) {
            logFinalState(handle);
            cleanupResources();
        }
    }
    
    @Override
    public void infoChanged(SparkAppHandle handle) {
        updateDashboard(handle);
    }
});

// Anonymous listener for simple cases
handle.addListener(new SparkAppHandle.Listener() {
    @Override
    public void stateChanged(SparkAppHandle handle) {
        if (handle.getState() == SparkAppHandle.State.FAILED) {
            restartApplication();
        }
    }
    
    @Override
    public void infoChanged(SparkAppHandle handle) {
        // No-op for info changes
    }
});

Advanced Monitoring Patterns

Application Lifecycle Manager

public class SparkApplicationManager {
    private final Map<String, SparkAppHandle> runningApps = new ConcurrentHashMap<>();
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
    
    public void launchAndMonitor(String appName, SparkLauncher launcher) {
        try {
            SparkAppHandle handle = launcher.setAppName(appName)
                .startApplication(new ApplicationLifecycleListener(appName));
            
            runningApps.put(appName, handle);
            
            // Schedule periodic health checks
            scheduler.scheduleWithFixedDelay(() -> {
                checkApplicationHealth(appName, handle);
            }, 30, 30, TimeUnit.SECONDS);
            
        } catch (IOException e) {
            System.err.println("Failed to launch application " + appName + ": " + e.getMessage());
        }
    }
    
    private void checkApplicationHealth(String appName, SparkAppHandle handle) {
        if (handle.getState().isFinal()) {
            runningApps.remove(appName);
            System.out.println("Removed completed application: " + appName);
        } else if (handle.getState() == SparkAppHandle.State.UNKNOWN) {
            // Handle stuck applications
            System.err.println("Application " + appName + " appears stuck in UNKNOWN state");
        }
    }
    
    public void stopAllApplications() {
        runningApps.values().forEach(handle -> {
            if (!handle.getState().isFinal()) {
                handle.stop();
            }
        });
    }
    
    public void emergencyKillAll() {
        runningApps.values().forEach(SparkAppHandle::kill);
    }
    
    private class ApplicationLifecycleListener implements SparkAppHandle.Listener {
        private final String appName;
        
        public ApplicationLifecycleListener(String appName) {
            this.appName = appName;
        }
        
        @Override
        public void stateChanged(SparkAppHandle handle) {
            if (handle.getState().isFinal()) {
                handleApplicationCompletion(appName, handle);
            }
        }
        
        @Override
        public void infoChanged(SparkAppHandle handle) {
            updateApplicationMetrics(appName, handle);
        }
    }
    
    private void handleApplicationCompletion(String appName, SparkAppHandle handle) {
        if (handle.getState() == SparkAppHandle.State.FAILED) {
            // Implement retry logic or failure notifications
            scheduleRetry(appName);
        }
    }
    
    private void updateApplicationMetrics(String appName, SparkAppHandle handle) {
        // Update monitoring dashboard or metrics system
    }
    
    private void scheduleRetry(String appName) {
        // Implement application retry logic
    }
}

Batch Processing Coordinator

public class BatchProcessingCoordinator {
    private final List<SparkAppHandle> batchJobs = new ArrayList<>();
    private final CountDownLatch completionLatch;
    
    public BatchProcessingCoordinator(int jobCount) {
        this.completionLatch = new CountDownLatch(jobCount);
    }
    
    public void submitBatchJob(SparkLauncher launcher, String jobName) {
        try {
            SparkAppHandle handle = launcher.setAppName(jobName)
                .startApplication(new BatchJobListener(jobName));
            
            batchJobs.add(handle);
            
        } catch (IOException e) {
            System.err.println("Failed to submit batch job " + jobName + ": " + e.getMessage());
            completionLatch.countDown(); // Count failed jobs as completed
        }
    }
    
    public boolean waitForAllJobs(long timeout, TimeUnit unit) throws InterruptedException {
        return completionLatch.await(timeout, unit);
    }
    
    public BatchResults getResults() {
        long successful = batchJobs.stream()
            .mapToLong(handle -> handle.getState() == SparkAppHandle.State.FINISHED ? 1 : 0)
            .sum();
        
        long failed = batchJobs.stream()
            .mapToLong(handle -> handle.getState() == SparkAppHandle.State.FAILED ? 1 : 0)
            .sum();
        
        return new BatchResults(successful, failed, batchJobs.size());
    }
    
    private class BatchJobListener implements SparkAppHandle.Listener {
        private final String jobName;
        
        public BatchJobListener(String jobName) {
            this.jobName = jobName;
        }
        
        @Override
        public void stateChanged(SparkAppHandle handle) {
            if (handle.getState().isFinal()) {
                System.out.println("Batch job " + jobName + " completed with state: " + handle.getState());
                completionLatch.countDown();
            }
        }
        
        @Override
        public void infoChanged(SparkAppHandle handle) {
            // Log info changes for batch tracking
        }
    }
    
    public static class BatchResults {
        public final long successful;
        public final long failed;
        public final long total;
        
        public BatchResults(long successful, long failed, long total) {
            this.successful = successful;
            this.failed = failed;
            this.total = total;
        }
        
        public boolean allSuccessful() {
            return successful == total;
        }
        
        public double successRate() {
            return total > 0 ? (double) successful / total : 0.0;
        }
    }
}

Error Handling and Recovery

State-Based Error Detection

public class ApplicationErrorHandler {
    
    public void handleApplicationWithRecovery(SparkLauncher launcher, String appName) {
        int maxRetries = 3;
        int retryCount = 0;
        
        while (retryCount < maxRetries) {
            try {
                SparkAppHandle handle = launcher.setAppName(appName + "-attempt-" + (retryCount + 1))
                    .startApplication(new RetryListener(appName, retryCount));
                
                // Wait for completion
                while (!handle.getState().isFinal()) {
                    Thread.sleep(5000);
                    
                    // Check for stuck states
                    if (isApplicationStuck(handle)) {
                        System.err.println("Application appears stuck, killing and retrying...");
                        handle.kill();
                        break;
                    }
                }
                
                if (handle.getState() == SparkAppHandle.State.FINISHED) {
                    System.out.println("Application completed successfully");
                    return; // Success, exit retry loop
                } else {
                    System.err.println("Application failed with state: " + handle.getState());
                }
                
            } catch (IOException e) {
                System.err.println("Failed to launch application: " + e.getMessage());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }
            
            retryCount++;
            if (retryCount < maxRetries) {
                System.out.println("Retrying application launch (" + retryCount + "/" + maxRetries + ")");
                try {
                    Thread.sleep(10000); // Wait before retry
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }
        
        System.err.println("Application failed after " + maxRetries + " attempts");
    }
    
    private boolean isApplicationStuck(SparkAppHandle handle) {
        // Implement logic to detect stuck applications
        // e.g., application in SUBMITTED state for too long
        return false;
    }
    
    private class RetryListener implements SparkAppHandle.Listener {
        private final String appName;
        private final int attempt;
        
        public RetryListener(String appName, int attempt) {
            this.appName = appName;
            this.attempt = attempt;
        }
        
        @Override
        public void stateChanged(SparkAppHandle handle) {
            System.out.println(String.format("[%s-attempt-%d] State: %s", 
                appName, attempt + 1, handle.getState()));
        }
        
        @Override
        public void infoChanged(SparkAppHandle handle) {
            System.out.println(String.format("[%s-attempt-%d] Info updated: %s", 
                appName, attempt + 1, handle.getAppId()));
        }
    }
}

Performance Considerations

Listener Thread Safety

  • Listeners are called from background threads processing application updates
  • Avoid blocking operations in listener callbacks
  • Use thread-safe data structures for shared state
  • Consider using executor services for heavy processing in listeners

State Polling vs Event-Driven

  • Use listeners for reactive programming patterns
  • Avoid busy-waiting on getState() calls
  • Combine listeners with periodic health checks for robust monitoring
  • Handle listener exceptions to prevent callback chain failures

Resource Management

  • Always handle final states to clean up resources
  • Use disconnect() when monitoring is no longer needed
  • Implement timeouts for long-running operations
  • Consider using kill() as last resort for cleanup