Core Spark 1.x integration component for the Cask Data Application Platform providing runtime services and execution context for CDAP applications
—
Distributed execution framework built on Apache Twill that provides scalable, fault-tolerant Spark application deployment across YARN clusters with proper resource management, lifecycle control, and integration with CDAP's distributed application infrastructure.
Service for managing distributed Spark execution with full lifecycle management and resource allocation across cluster nodes.
/**
* Service for managing distributed Spark execution
* Provides scalable deployment and management of Spark applications across clusters
*/
public class SparkExecutionService {
/**
* Submits a Spark program for distributed execution
* @param programRunId Unique identifier for the program run
* @param programOptions Configuration options for program execution
* @return Future containing the program controller for managing execution
* @throws ExecutionException if submission fails
*/
public ListenableFuture<ProgramController> submit(ProgramRunId programRunId, ProgramOptions programOptions);
/**
* Stops the execution service and all running programs
* Gracefully shuts down all managed Spark applications
*/
public void stop();
/**
* Gets the current state of the execution service
* @return ServiceState indicating current service status
*/
public ServiceState getState();
/**
* Gets information about running programs
* @return Set of ProgramRunId for currently running programs
*/
public Set<ProgramRunId> getRunningPrograms();
/**
* Gets program controller for a specific run
* @param programRunId Program run identifier
* @return ProgramController for the specified run, or null if not found
*/
public ProgramController getProgramController(ProgramRunId programRunId);
}Twill runnable implementation that enables Spark applications to run as distributed applications with proper resource management and fault tolerance.
/**
* Twill runnable for distributed Spark execution
* Enables Spark applications to run as distributed services with fault tolerance
*/
public class SparkTwillRunnable implements TwillRunnable {
/**
* Main execution method for the runnable
* Starts the Spark application and manages its lifecycle
*/
public void run();
/**
* Stops the running Spark application gracefully
* Ensures proper cleanup of resources and state
*/
public void stop();
/**
* Handles commands sent to the running application
* @param command Command to execute
* @throws Exception if command execution fails
*/
public void handleCommand(Command command) throws Exception;
/**
* Initializes the runnable with context
* @param context Twill runtime context
*/
public void initialize(TwillContext context);
/**
* Destroys the runnable and cleans up resources
*/
public void destroy();
/**
* Gets the Twill context
* @return TwillContext for accessing runtime information
*/
protected TwillContext getContext();
}Program controller implementation for managing distributed Spark execution through the Twill framework.
/**
* Program controller for distributed Spark execution via Twill
* Provides lifecycle management and command interface for distributed Spark programs
*/
public class SparkTwillProgramController implements ProgramController {
/**
* Sends a command to the distributed Spark program
* @param command Command name to execute
* @param args Command arguments
* @return Future representing the command execution result
* @throws Exception if command execution fails
*/
public ListenableFuture<ProgramController> command(String command, Object... args) throws Exception;
/**
* Stops the distributed Spark program gracefully
* @return Future representing the stop operation
* @throws Exception if stop operation fails
*/
public ListenableFuture<ProgramController> stop() throws Exception;
/**
* Kills the distributed Spark program forcefully
* @return Future representing the kill operation
*/
public ListenableFuture<ProgramController> kill();
/**
* Gets the current state of the program
* @return Current program state
*/
public State getState();
/**
* Gets the program run ID
* @return ProgramRunId identifying this program run
*/
public ProgramRunId getProgramRunId();
/**
* Gets the Twill controller for low-level operations
* @return TwillController for direct Twill operations
*/
public TwillController getTwillController();
/**
* Gets resource report for the running program
* @return ResourceReport containing resource usage information
*/
public ResourceReport getResourceReport();
/**
* Adds a listener for program state changes
* @param listener Listener to be notified of state changes
*/
public void addListener(Listener listener);
}Context for distributed execution that provides access to cluster information and resource management.
/**
* Context for distributed Spark execution
* Provides access to cluster information and distributed resources
*/
public class DistributedExecutionContext {
/**
* Gets the number of executor instances
* @return Number of Spark executor instances
*/
public int getExecutorInstances();
/**
* Gets executor resource allocation
* @return Resources allocated to each executor
*/
public Resources getExecutorResources();
/**
* Gets driver resource allocation
* @return Resources allocated to the driver
*/
public Resources getDriverResources();
/**
* Gets the cluster configuration
* @return Configuration for the target cluster
*/
public Configuration getClusterConfiguration();
/**
* Gets the YARN application ID (if running on YARN)
* @return Application ID or null if not running on YARN
*/
public ApplicationId getYarnApplicationId();
/**
* Gets the list of executor hosts
* @return Set of hostnames running executors
*/
public Set<String> getExecutorHosts();
/**
* Scales the number of executors
* @param targetExecutors Desired number of executors
* @return Future indicating completion of scaling operation
*/
public ListenableFuture<Boolean> scaleExecutors(int targetExecutors);
}Basic Distributed Execution:
import co.cask.cdap.app.runtime.spark.distributed.SparkExecutionService;
import co.cask.cdap.app.runtime.ProgramController;
import co.cask.cdap.proto.id.ProgramRunId;
// Create execution service
SparkExecutionService executionService = new SparkExecutionService(
cConf, locationFactory, discoveryServiceClient
);
// Submit Spark program for distributed execution
ProgramRunId runId = new ProgramRunId("namespace", "app", ProgramType.SPARK, "program", "run-1");
ListenableFuture<ProgramController> future = executionService.submit(runId, programOptions);
// Get controller when submission completes
ProgramController controller = future.get();
// Monitor program state
System.out.println("Program state: " + controller.getState());
// Send commands to program
controller.command("scale-executors", 10).get();
// Stop program gracefully
controller.stop().get();Twill Runnable Implementation:
import co.cask.cdap.app.runtime.spark.distributed.SparkTwillRunnable;
import org.apache.twill.api.TwillContext;
import org.apache.twill.api.Command;
public class MySparkTwillRunnable extends SparkTwillRunnable {
@Override
public void initialize(TwillContext context) {
super.initialize(context);
// Get instance information
int instanceId = context.getInstanceId();
int instanceCount = context.getInstanceCount();
System.out.println(String.format(
"Initializing instance %d of %d", instanceId, instanceCount
));
}
@Override
public void run() {
try {
// Initialize Spark context
SparkContext sparkContext = createSparkContext();
// Run Spark application
runSparkApplication(sparkContext);
// Keep running until stopped
while (!Thread.currentThread().isInterrupted()) {
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
cleanup();
}
}
@Override
public void handleCommand(Command command) throws Exception {
String commandName = command.getCommand();
switch (commandName) {
case "scale-executors":
int targetCount = Integer.parseInt(command.getOptions().get("count"));
scaleExecutors(targetCount);
break;
case "checkpoint":
checkpointApplication();
break;
default:
super.handleCommand(command);
}
}
}Program Controller Usage:
import co.cask.cdap.app.runtime.spark.distributed.SparkTwillProgramController;
import co.cask.cdap.app.runtime.ProgramController.Listener;
// Create program controller
SparkTwillProgramController controller = new SparkTwillProgramController(
twillController, programRunId
);
// Add state change listener
controller.addListener(new Listener() {
@Override
public void init(State currentState, Throwable cause) {
System.out.println("Program initialized with state: " + currentState);
}
@Override
public void stateChanged(State newState, Throwable cause) {
System.out.println("Program state changed to: " + newState);
if (cause != null) {
System.err.println("State change caused by error: " + cause.getMessage());
}
}
});
// Monitor resource usage
ResourceReport report = controller.getResourceReport();
for (TwillRunResources resources : report.getResources()) {
System.out.println(String.format(
"Instance %d: %d cores, %d MB memory",
resources.getInstanceId(),
resources.getVirtualCores(),
resources.getMemoryMB()
));
}
// Send custom commands
controller.command("scale-executors", "count", "20").get();
controller.command("checkpoint").get();Cluster Resource Management:
import co.cask.cdap.app.runtime.spark.distributed.DistributedExecutionContext;
// Create execution context
DistributedExecutionContext context = new DistributedExecutionContext(
sparkConf, yarnClient, resourceManager
);
// Get current resource allocation
int executors = context.getExecutorInstances();
Resources executorResources = context.getExecutorResources();
Resources driverResources = context.getDriverResources();
System.out.println(String.format(
"Current allocation: %d executors, %d MB memory each, %d cores each",
executors,
executorResources.getMemoryMB(),
executorResources.getVirtualCores()
));
// Scale based on workload
if (workloadSize > threshold) {
int targetExecutors = Math.min(workloadSize / batchSize, maxExecutors);
context.scaleExecutors(targetExecutors).get();
}
// Monitor executor distribution
Set<String> executorHosts = context.getExecutorHosts();
System.out.println("Executors running on hosts: " + executorHosts);/**
* Service state enumeration for execution services
*/
public enum ServiceState {
STARTING, // Service is starting up
RUNNING, // Service is running and accepting requests
STOPPING, // Service is shutting down
STOPPED, // Service has stopped
FAILED // Service encountered a fatal error
}
/**
* Resource report containing information about distributed resources
*/
public interface ResourceReport {
/**
* Gets resources for all instances
* @return Collection of TwillRunResources for each instance
*/
Collection<TwillRunResources> getResources();
/**
* Gets the application master resources
* @return TwillRunResources for the application master
*/
TwillRunResources getAppMasterResources();
/**
* Gets the services information
* @return Map of service names to their resource information
*/
Map<String, Collection<TwillRunResources>> getServices();
}
/**
* Resources for a Twill run instance
*/
public interface TwillRunResources {
/**
* Gets the instance ID
* @return Instance identifier
*/
int getInstanceId();
/**
* Gets allocated virtual cores
* @return Number of virtual cores
*/
int getVirtualCores();
/**
* Gets allocated memory in MB
* @return Memory allocation in megabytes
*/
int getMemoryMB();
/**
* Gets the host name
* @return Host where this instance is running
*/
String getHost();
/**
* Gets the container ID
* @return Container identifier from resource manager
*/
String getContainerId();
}
/**
* Twill controller interface for low-level operations
*/
public interface TwillController {
/**
* Starts the Twill application
* @return Future indicating start completion
*/
ListenableFuture<TwillController> start();
/**
* Stops the Twill application
* @return Future indicating stop completion
*/
ListenableFuture<TwillController> terminate();
/**
* Kills the Twill application
* @return Future indicating kill completion
*/
ListenableFuture<TwillController> kill();
/**
* Sends a command to the application
* @param command Command to send
* @return Future indicating command completion
*/
ListenableFuture<TwillController> sendCommand(Command command);
/**
* Gets resource report
* @return ResourceReport containing current resource usage
*/
ResourceReport getResourceReport();
}
/**
* Command interface for Twill applications
*/
public interface Command {
/**
* Gets the command name
* @return Command identifier
*/
String getCommand();
/**
* Gets command options
* @return Map of option key-value pairs
*/
Map<String, String> getOptions();
}
/**
* Twill context providing runtime information
*/
public interface TwillContext {
/**
* Gets the instance ID
* @return Instance identifier (0-based)
*/
int getInstanceId();
/**
* Gets the total instance count
* @return Total number of instances
*/
int getInstanceCount();
/**
* Gets the host information
* @return Host where this instance is running
*/
String getHost();
/**
* Gets allocated resources
* @return TwillRunResources for this instance
*/
TwillRunResources getResourceAllocation();
/**
* Announces a service endpoint
* @param serviceName Name of the service
* @param port Port number
*/
void announce(String serviceName, int port);
}
/**
* YARN application ID wrapper
*/
public class ApplicationId {
/**
* Gets the cluster timestamp
* @return Cluster timestamp component
*/
public long getClusterTimestamp();
/**
* Gets the application ID
* @return Application ID component
*/
public int getId();
/**
* Gets the string representation
* @return Full application ID string
*/
@Override
public String toString();
}Install with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-spark-core