CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-co-cask-cdap--cdap-spark-core

Core Spark 1.x integration component for the Cask Data Application Platform providing runtime services and execution context for CDAP applications

Pending
Overview
Eval results
Files

execution-contexts.mddocs/

Execution Contexts

Execution contexts that provide access to CDAP services, metadata, and configuration within Spark applications, enabling seamless integration with the broader CDAP ecosystem including datasets, messaging, and service discovery.

Capabilities

Spark Runtime Context

Java-based runtime context that provides access to core CDAP services and configuration for Spark program execution.

/**
 * Runtime context for Spark program execution within CDAP
 * Provides access to CDAP services, configuration, and program metadata
 */
public class SparkRuntimeContext extends AbstractContext {
    /**
     * Gets the Spark program specification
     * @return SparkSpecification containing program metadata
     */
    public SparkSpecification getSpecification();
    
    /**
     * Gets the logical start time of the program
     * @return Start time in milliseconds since epoch
     */
    public long getLogicalStartTime();
    
    /**
     * Gets runtime arguments provided to the program
     * @return Map of argument key-value pairs
     */
    public Map<String, String> getRuntimeArguments();
    
    /**
     * Gets the service discovery client for finding services
     * @return DiscoveryServiceClient for service discovery
     */
    public DiscoveryServiceClient getDiscoveryServiceClient();
    
    /**
     * Gets the location factory for accessing file systems
     * @return LocationFactory for creating file system locations
     */
    public LocationFactory getLocationFactory();
    
    /**
     * Gets the dataset framework for dataset operations
     * @return DatasetFramework for dataset access
     */
    public DatasetFramework getDatasetFramework();
    
    /**
     * Gets the admin interface for CDAP operations
     * @return Admin interface for administrative operations
     */
    public Admin getAdmin();
    
    /**
     * Gets the messaging context for pub-sub operations
     * @return MessagingContext for messaging operations
     */
    public MessagingContext getMessagingContext();
}

Default Spark Execution Context (Scala)

Scala-based execution context implementation that provides typed access to CDAP services with functional programming patterns.

/**
 * Default implementation of Spark execution context for CDAP applications
 * Provides access to CDAP services with Scala-friendly interfaces
 */
class DefaultSparkExecutionContext(runtimeContext: SparkRuntimeContext) extends SparkExecutionContext {
    /**
     * Gets the Spark program specification
     * @return SparkSpecification containing program metadata
     */
    def getSpecification: SparkSpecification
    
    /**
     * Gets the logical start time of the program
     * @return Start time in milliseconds since epoch
     */
    def getLogicalStartTime: Long
    
    /**
     * Gets runtime arguments as a Scala map
     * @return Map of argument key-value pairs
     */
    def getRuntimeArguments: Map[String, String]
    
    /**
     * Gets the admin interface for CDAP operations
     * @return Admin interface for administrative operations
     */
    def getAdmin: Admin
    
    /**
     * Gets the dataset framework for dataset operations
     * @return DatasetFramework for dataset access
     */
    def getDatasetFramework: DatasetFramework
    
    /**
     * Gets the messaging context for pub-sub operations
     * @return MessagingContext for messaging operations
     */
    def getMessagingContext: MessagingContext
    
    /**
     * Gets the location factory for accessing file systems
     * @return LocationFactory for creating file system locations
     */
    def getLocationFactory: LocationFactory
    
    /**
     * Gets the service discovery client
     * @return DiscoveryServiceClient for service discovery
     */
    def getDiscoveryServiceClient: DiscoveryServiceClient
}

Abstract Context Base

Base class providing common functionality for all CDAP program contexts.

/**
 * Abstract base class for CDAP program contexts
 * Provides common functionality and service access patterns
 */
public abstract class AbstractContext {
    /**
     * Gets the program run ID
     * @return ProgramRunId identifying this program run
     */
    public ProgramRunId getProgramRunId();
    
    /**
     * Gets the namespace of the program
     * @return Namespace name
     */
    public String getNamespace();
    
    /**
     * Gets the application name
     * @return Application name
     */
    public String getApplicationName();
    
    /**
     * Gets the program name
     * @return Program name
     */
    public String getProgramName();
    
    /**
     * Gets the run ID
     * @return Run ID string
     */
    public String getRunId();
    
    /**
     * Gets the CDAP configuration
     * @return CConfiguration containing CDAP settings
     */
    protected CConfiguration getCConfiguration();
    
    /**
     * Gets the Hadoop configuration
     * @return Configuration containing Hadoop settings
     */
    protected Configuration getConfiguration();
}

Usage Examples

Java Runtime Context Usage:

import co.cask.cdap.app.runtime.spark.SparkRuntimeContext;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.proto.id.DatasetId;

// Access dataset framework
SparkRuntimeContext context = // ... obtained from runtime
DatasetFramework datasetFramework = context.getDatasetFramework();

// Access a dataset
DatasetId datasetId = new DatasetId("namespace", "my-dataset");
Dataset dataset = datasetFramework.getDataset(datasetId, Collections.emptyMap(), null);

// Get runtime arguments
Map<String, String> args = context.getRuntimeArguments();
String configValue = args.get("config.key");

// Access location factory for file operations
LocationFactory locationFactory = context.getLocationFactory();
Location fileLocation = locationFactory.create("/path/to/file");

Scala Execution Context Usage:

import co.cask.cdap.app.runtime.spark.DefaultSparkExecutionContext
import co.cask.cdap.api.data.DatasetContext

// Create execution context
val sparkContext = new DefaultSparkExecutionContext(runtimeContext)

// Access program metadata
val spec = sparkContext.getSpecification
val startTime = sparkContext.getLogicalStartTime
val args = sparkContext.getRuntimeArguments

// Access CDAP services
val admin = sparkContext.getAdmin
val datasetFramework = sparkContext.getDatasetFramework
val messagingContext = sparkContext.getMessagingContext

// Use functional patterns with runtime arguments
val configPrefix = "spark.config."
val sparkConfigs = args.filter(_._1.startsWith(configPrefix))
  .map { case (key, value) => key.substring(configPrefix.length) -> value }

Service Discovery Usage:

import co.cask.cdap.common.discovery.DiscoveryServiceClient;
import org.apache.twill.discovery.Discoverable;
import org.apache.twill.discovery.ServiceDiscovered;

// Get service discovery client
DiscoveryServiceClient discoveryClient = context.getDiscoveryServiceClient();

// Discover a service
ServiceDiscovered serviceDiscovered = discoveryClient.discover("my-service");
Iterable<Discoverable> discoverables = serviceDiscovered.discover();

// Use the discovered service
for (Discoverable discoverable : discoverables) {
    InetSocketAddress address = discoverable.getSocketAddress();
    // Connect to service at address
}

Types

/**
 * Specification for Spark programs containing metadata and resource requirements
 */
public class SparkSpecification {
    /**
     * Gets the program name
     * @return Name of the Spark program
     */
    public String getName();
    
    /**
     * Gets the program description
     * @return Description of the Spark program
     */
    public String getDescription();
    
    /**
     * Gets the main class name
     * @return Fully qualified main class name
     */
    public String getMainClassName();
    
    /**
     * Gets the driver resource requirements
     * @return Resources required for the Spark driver
     */
    public Resources getDriverResources();
    
    /**
     * Gets the executor resource requirements
     * @return Resources required for Spark executors
     */
    public Resources getExecutorResources();
    
    /**
     * Gets the client resource requirements
     * @return Resources required for the Spark client
     */
    public Resources getClientResources();
    
    /**
     * Gets additional properties
     * @return Map of additional program properties
     */
    public Map<String, String> getProperties();
}

/**
 * Resource specification for program components
 */
public class Resources {
    /**
     * Gets the memory requirement in MB
     * @return Memory in megabytes
     */
    public int getMemoryMB();
    
    /**
     * Gets the virtual core requirement
     * @return Number of virtual cores
     */
    public int getVirtualCores();
}

/**
 * Interface for accessing CDAP administrative operations
 */
public interface Admin {
    /**
     * Checks if a dataset exists
     * @param datasetInstanceId Dataset identifier
     * @return true if dataset exists
     */
    boolean datasetExists(DatasetId datasetInstanceId) throws DatasetManagementException;
    
    /**
     * Gets dataset properties
     * @param datasetInstanceId Dataset identifier
     * @return Dataset properties
     */
    DatasetProperties getDatasetProperties(DatasetId datasetInstanceId) throws DatasetManagementException;
}

/**
 * Interface for dataset framework operations
 */
public interface DatasetFramework {
    /**
     * Gets a dataset instance
     * @param datasetInstanceId Dataset identifier
     * @param arguments Dataset arguments
     * @param classLoader Class loader for dataset classes
     * @return Dataset instance
     */
    <T extends Dataset> T getDataset(DatasetId datasetInstanceId, 
                                   Map<String, String> arguments,
                                   ClassLoader classLoader) throws DatasetInstantiationException;
}

/**
 * Interface for messaging operations
 */
public interface MessagingContext {
    /**
     * Gets a message publisher for a topic
     * @param topicId Topic identifier
     * @return MessagePublisher for publishing messages
     */
    MessagePublisher getMessagePublisher(TopicId topicId);
    
    /**
     * Gets a message fetcher for a topic
     * @param topicId Topic identifier
     * @return MessageFetcher for fetching messages
     */
    MessageFetcher getMessageFetcher(TopicId topicId);
}
/**
 * Trait defining the Spark execution context interface
 */
trait SparkExecutionContext {
    /**
     * Gets the Spark program specification
     * @return SparkSpecification containing program metadata
     */
    def getSpecification: SparkSpecification
    
    /**
     * Gets the logical start time of the program
     * @return Start time in milliseconds since epoch
     */
    def getLogicalStartTime: Long
    
    /**
     * Gets runtime arguments as a Scala map
     * @return Map of argument key-value pairs
     */
    def getRuntimeArguments: Map[String, String]
    
    /**
     * Gets the admin interface for CDAP operations
     * @return Admin interface for administrative operations
     */
    def getAdmin: Admin
    
    /**
     * Gets the dataset framework for dataset operations
     * @return DatasetFramework for dataset access
     */
    def getDatasetFramework: DatasetFramework
    
    /**
     * Gets the messaging context for pub-sub operations
     * @return MessagingContext for messaging operations
     */
    def getMessagingContext: MessagingContext
}

Install with Tessl CLI

npx tessl i tessl/maven-co-cask-cdap--cdap-spark-core

docs

data-processing.md

distributed-execution.md

dynamic-compilation.md

execution-contexts.md

http-services.md

index.md

runtime-providers.md

transaction-management.md

tile.json