CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-co-cask-cdap--cdap-spark-core

Core Spark 1.x integration component for the Cask Data Application Platform providing runtime services and execution context for CDAP applications

Pending
Overview
Eval results
Files

runtime-providers.mddocs/

Runtime Providers and Program Execution

Core runtime integration that enables Spark 1.x programs to run within the CDAP platform with full lifecycle management, resource allocation, and proper integration with CDAP's program execution framework.

Capabilities

Spark 1.x Program Runtime Provider

Main runtime provider that integrates Spark 1.x applications with CDAP's program execution lifecycle, handling deployment, execution, and cleanup.

/**
 * Runtime provider for Spark 1.x programs in CDAP
 * Extends SparkProgramRuntimeProvider with Spark 1.x specific compatibility
 */
@ProgramRuntimeProvider.SupportedProgramType(ProgramType.SPARK)
public class Spark1ProgramRuntimeProvider extends SparkProgramRuntimeProvider {
    /**
     * Creates a Spark 1.x runtime provider with SPARK1_2_10 compatibility
     */
    public Spark1ProgramRuntimeProvider();
}

/**
 * Abstract base class for Spark program runtime providers  
 * Handles class loading, runner creation, and program lifecycle management
 */
public abstract class SparkProgramRuntimeProvider implements ProgramRuntimeProvider {
    /**
     * Creates a program runner for the specified program type and execution mode
     * @param type The type of program (must be SPARK)
     * @param mode Execution mode (LOCAL or DISTRIBUTED)
     * @param injector Guice injector for dependency injection
     * @return ProgramRunner instance for executing the program
     * @throws IllegalArgumentException if program type is not SPARK
     */
    public ProgramRunner createProgramRunner(ProgramType type, Mode mode, Injector injector);
    
    /**
     * Checks if the specified program type is supported
     * @param programType The program type to check
     * @param cConf CDAP configuration  
     * @return true if program type is SPARK
     */
    public boolean isSupported(ProgramType programType, CConfiguration cConf);
    
    /**
     * Gets the Spark compatibility version for this provider
     * @return SparkCompat enum value (SPARK1_2_10, SPARK2_2_11, etc.)
     */
    protected SparkCompat getSparkCompat();
}

Program Controller Interface

Interface for controlling Spark program execution, providing lifecycle management and command handling capabilities.

/**
 * Controller interface for managing program execution lifecycle
 */
public interface ProgramController {
    /**
     * Sends a command to the running program
     * @param name Command name
     * @param value Command value
     * @return Future representing the command execution result
     */
    ListenableFuture<ProgramController> command(String name, Object value);
    
    /**
     * Stops the running program gracefully
     * @return Future representing the stop operation
     */
    ListenableFuture<ProgramController> stop();
    
    /**
     * Suspends the running program
     * @return Future representing the suspend operation
     */
    ListenableFuture<ProgramController> suspend();
    
    /**
     * Resumes a suspended program
     * @return Future representing the resume operation
     */
    ListenableFuture<ProgramController> resume();
    
    /**
     * Gets the current state of the program
     * @return Current program state
     */
    State getState();
    
    /**
     * Gets the program run ID
     * @return ProgramRunId for this execution
     */
    ProgramRunId getProgramRunId();
}

Usage Examples

Basic Runtime Provider Setup:

import co.cask.cdap.app.runtime.spark.Spark1ProgramRuntimeProvider;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.app.runtime.ProgramRunner;

// Create runtime provider
Spark1ProgramRuntimeProvider provider = new Spark1ProgramRuntimeProvider();

// Verify it handles Spark programs
ProgramType supportedType = provider.getProgramType();
assert supportedType == ProgramType.SPARK;

// Check if program type is supported
boolean isSupported = provider.isSupported(ProgramType.SPARK, cConf);

// Create program runner
ProgramRunner runner = provider.createProgramRunner(ProgramType.SPARK, Mode.DISTRIBUTED, injector);

Program Lifecycle Management:

import co.cask.cdap.app.runtime.ProgramController;
import co.cask.cdap.app.runtime.ProgramController.State;

// Get program controller (typically from program runner execution)
ProgramController controller = // ... obtained from program execution

// Monitor program state
State currentState = controller.getState();
System.out.println("Program state: " + currentState);

// Send command to program
controller.command("checkpoint", "/path/to/checkpoint").get();

// Suspend program
controller.suspend().get();

// Resume program
controller.resume().get();

// Stop program gracefully
controller.stop().get();

Types

/**
 * Enumeration of possible program states
 */
public enum State {
    STARTING,     // Program is initializing
    ALIVE,        // Program is running normally
    SUSPENDING,   // Program is being suspended
    SUSPENDED,    // Program is suspended
    RESUMING,     // Program is resuming from suspension
    STOPPING,     // Program is shutting down
    COMPLETED,    // Program completed successfully
    KILLED,       // Program was forcibly terminated
    ERROR         // Program encountered an error
}

/**
 * Program type enumeration
 */
public enum ProgramType {
    MAPREDUCE,   // MapReduce programs
    WORKFLOW,    // Workflow programs
    SERVICE,     // Service programs
    SPARK,       // Spark programs
    WORKER       // Worker programs
}

/**
 * Unique identifier for a program run
 */
public class ProgramRunId {
    /**
     * Gets the namespace containing the program
     * @return Namespace name
     */
    public String getNamespace();
    
    /**
     * Gets the application containing the program
     * @return Application name
     */
    public String getApplication();
    
    /**
     * Gets the program type
     * @return ProgramType of this program
     */
    public ProgramType getType();
    
    /**
     * Gets the program name
     * @return Program name
     */
    public String getProgram();
    
    /**
     * Gets the run ID
     * @return Unique run identifier
     */
    public String getRun();
}

/**
 * Configuration options for program execution
 */
public class ProgramOptions {
    /**
     * Gets runtime arguments for the program
     * @return Map of argument key-value pairs
     */
    public Map<String, String> getArguments();
    
    /**
     * Gets user-provided arguments
     * @return Map of user argument key-value pairs
     */
    public Map<String, String> getUserArguments();
    
    /**
     * Checks if debug mode is enabled
     * @return true if debug mode is enabled
     */
    public boolean isDebug();
    
    /**
     * Gets the program JAR location
     * @return Location of the program JAR file
     */
    public Location getProgramJarLocation();
}

/**
 * Interface for program runners that execute CDAP programs
 */
public interface ProgramRunner {
    /**
     * Runs a program with the specified options
     * @param programOptions Configuration for the program execution
     * @return ProgramController for managing the running program
     */
    ProgramController run(ProgramOptions programOptions);
}

/**
 * Execution mode enumeration for programs
 */
public enum Mode {
    LOCAL,        // Local execution mode
    DISTRIBUTED   // Distributed execution mode
}

/**
 * Guice injector interface for dependency injection
 */
public interface Injector {
    /**
     * Gets an instance of the specified type
     * @param type Class type to instantiate
     * @param <T> Type parameter
     * @return Instance of the specified type
     */
    <T> T getInstance(Class<T> type);
}

Install with Tessl CLI

npx tessl i tessl/maven-co-cask-cdap--cdap-spark-core

docs

data-processing.md

distributed-execution.md

dynamic-compilation.md

execution-contexts.md

http-services.md

index.md

runtime-providers.md

transaction-management.md

tile.json