Core Spark 1.x integration component for the Cask Data Application Platform providing runtime services and execution context for CDAP applications
—
Execution contexts that provide access to CDAP services, metadata, and configuration within Spark applications, enabling seamless integration with the broader CDAP ecosystem including datasets, messaging, and service discovery.
Java-based runtime context that provides access to core CDAP services and configuration for Spark program execution.
/**
* Runtime context for Spark program execution within CDAP
* Provides access to CDAP services, configuration, and program metadata
*/
public class SparkRuntimeContext extends AbstractContext {
/**
* Gets the Spark program specification
* @return SparkSpecification containing program metadata
*/
public SparkSpecification getSpecification();
/**
* Gets the logical start time of the program
* @return Start time in milliseconds since epoch
*/
public long getLogicalStartTime();
/**
* Gets runtime arguments provided to the program
* @return Map of argument key-value pairs
*/
public Map<String, String> getRuntimeArguments();
/**
* Gets the service discovery client for finding services
* @return DiscoveryServiceClient for service discovery
*/
public DiscoveryServiceClient getDiscoveryServiceClient();
/**
* Gets the location factory for accessing file systems
* @return LocationFactory for creating file system locations
*/
public LocationFactory getLocationFactory();
/**
* Gets the dataset framework for dataset operations
* @return DatasetFramework for dataset access
*/
public DatasetFramework getDatasetFramework();
/**
* Gets the admin interface for CDAP operations
* @return Admin interface for administrative operations
*/
public Admin getAdmin();
/**
* Gets the messaging context for pub-sub operations
* @return MessagingContext for messaging operations
*/
public MessagingContext getMessagingContext();
}Scala-based execution context implementation that provides typed access to CDAP services with functional programming patterns.
/**
* Default implementation of Spark execution context for CDAP applications
* Provides access to CDAP services with Scala-friendly interfaces
*/
class DefaultSparkExecutionContext(runtimeContext: SparkRuntimeContext) extends SparkExecutionContext {
/**
* Gets the Spark program specification
* @return SparkSpecification containing program metadata
*/
def getSpecification: SparkSpecification
/**
* Gets the logical start time of the program
* @return Start time in milliseconds since epoch
*/
def getLogicalStartTime: Long
/**
* Gets runtime arguments as a Scala map
* @return Map of argument key-value pairs
*/
def getRuntimeArguments: Map[String, String]
/**
* Gets the admin interface for CDAP operations
* @return Admin interface for administrative operations
*/
def getAdmin: Admin
/**
* Gets the dataset framework for dataset operations
* @return DatasetFramework for dataset access
*/
def getDatasetFramework: DatasetFramework
/**
* Gets the messaging context for pub-sub operations
* @return MessagingContext for messaging operations
*/
def getMessagingContext: MessagingContext
/**
* Gets the location factory for accessing file systems
* @return LocationFactory for creating file system locations
*/
def getLocationFactory: LocationFactory
/**
* Gets the service discovery client
* @return DiscoveryServiceClient for service discovery
*/
def getDiscoveryServiceClient: DiscoveryServiceClient
}Base class providing common functionality for all CDAP program contexts.
/**
* Abstract base class for CDAP program contexts
* Provides common functionality and service access patterns
*/
public abstract class AbstractContext {
/**
* Gets the program run ID
* @return ProgramRunId identifying this program run
*/
public ProgramRunId getProgramRunId();
/**
* Gets the namespace of the program
* @return Namespace name
*/
public String getNamespace();
/**
* Gets the application name
* @return Application name
*/
public String getApplicationName();
/**
* Gets the program name
* @return Program name
*/
public String getProgramName();
/**
* Gets the run ID
* @return Run ID string
*/
public String getRunId();
/**
* Gets the CDAP configuration
* @return CConfiguration containing CDAP settings
*/
protected CConfiguration getCConfiguration();
/**
* Gets the Hadoop configuration
* @return Configuration containing Hadoop settings
*/
protected Configuration getConfiguration();
}Java Runtime Context Usage:
import co.cask.cdap.app.runtime.spark.SparkRuntimeContext;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.proto.id.DatasetId;
// Access dataset framework
SparkRuntimeContext context = // ... obtained from runtime
DatasetFramework datasetFramework = context.getDatasetFramework();
// Access a dataset
DatasetId datasetId = new DatasetId("namespace", "my-dataset");
Dataset dataset = datasetFramework.getDataset(datasetId, Collections.emptyMap(), null);
// Get runtime arguments
Map<String, String> args = context.getRuntimeArguments();
String configValue = args.get("config.key");
// Access location factory for file operations
LocationFactory locationFactory = context.getLocationFactory();
Location fileLocation = locationFactory.create("/path/to/file");Scala Execution Context Usage:
import co.cask.cdap.app.runtime.spark.DefaultSparkExecutionContext
import co.cask.cdap.api.data.DatasetContext
// Create execution context
val sparkContext = new DefaultSparkExecutionContext(runtimeContext)
// Access program metadata
val spec = sparkContext.getSpecification
val startTime = sparkContext.getLogicalStartTime
val args = sparkContext.getRuntimeArguments
// Access CDAP services
val admin = sparkContext.getAdmin
val datasetFramework = sparkContext.getDatasetFramework
val messagingContext = sparkContext.getMessagingContext
// Use functional patterns with runtime arguments
val configPrefix = "spark.config."
val sparkConfigs = args.filter(_._1.startsWith(configPrefix))
.map { case (key, value) => key.substring(configPrefix.length) -> value }Service Discovery Usage:
import co.cask.cdap.common.discovery.DiscoveryServiceClient;
import org.apache.twill.discovery.Discoverable;
import org.apache.twill.discovery.ServiceDiscovered;
// Get service discovery client
DiscoveryServiceClient discoveryClient = context.getDiscoveryServiceClient();
// Discover a service
ServiceDiscovered serviceDiscovered = discoveryClient.discover("my-service");
Iterable<Discoverable> discoverables = serviceDiscovered.discover();
// Use the discovered service
for (Discoverable discoverable : discoverables) {
InetSocketAddress address = discoverable.getSocketAddress();
// Connect to service at address
}/**
* Specification for Spark programs containing metadata and resource requirements
*/
public class SparkSpecification {
/**
* Gets the program name
* @return Name of the Spark program
*/
public String getName();
/**
* Gets the program description
* @return Description of the Spark program
*/
public String getDescription();
/**
* Gets the main class name
* @return Fully qualified main class name
*/
public String getMainClassName();
/**
* Gets the driver resource requirements
* @return Resources required for the Spark driver
*/
public Resources getDriverResources();
/**
* Gets the executor resource requirements
* @return Resources required for Spark executors
*/
public Resources getExecutorResources();
/**
* Gets the client resource requirements
* @return Resources required for the Spark client
*/
public Resources getClientResources();
/**
* Gets additional properties
* @return Map of additional program properties
*/
public Map<String, String> getProperties();
}
/**
* Resource specification for program components
*/
public class Resources {
/**
* Gets the memory requirement in MB
* @return Memory in megabytes
*/
public int getMemoryMB();
/**
* Gets the virtual core requirement
* @return Number of virtual cores
*/
public int getVirtualCores();
}
/**
* Interface for accessing CDAP administrative operations
*/
public interface Admin {
/**
* Checks if a dataset exists
* @param datasetInstanceId Dataset identifier
* @return true if dataset exists
*/
boolean datasetExists(DatasetId datasetInstanceId) throws DatasetManagementException;
/**
* Gets dataset properties
* @param datasetInstanceId Dataset identifier
* @return Dataset properties
*/
DatasetProperties getDatasetProperties(DatasetId datasetInstanceId) throws DatasetManagementException;
}
/**
* Interface for dataset framework operations
*/
public interface DatasetFramework {
/**
* Gets a dataset instance
* @param datasetInstanceId Dataset identifier
* @param arguments Dataset arguments
* @param classLoader Class loader for dataset classes
* @return Dataset instance
*/
<T extends Dataset> T getDataset(DatasetId datasetInstanceId,
Map<String, String> arguments,
ClassLoader classLoader) throws DatasetInstantiationException;
}
/**
* Interface for messaging operations
*/
public interface MessagingContext {
/**
* Gets a message publisher for a topic
* @param topicId Topic identifier
* @return MessagePublisher for publishing messages
*/
MessagePublisher getMessagePublisher(TopicId topicId);
/**
* Gets a message fetcher for a topic
* @param topicId Topic identifier
* @return MessageFetcher for fetching messages
*/
MessageFetcher getMessageFetcher(TopicId topicId);
}/**
* Trait defining the Spark execution context interface
*/
trait SparkExecutionContext {
/**
* Gets the Spark program specification
* @return SparkSpecification containing program metadata
*/
def getSpecification: SparkSpecification
/**
* Gets the logical start time of the program
* @return Start time in milliseconds since epoch
*/
def getLogicalStartTime: Long
/**
* Gets runtime arguments as a Scala map
* @return Map of argument key-value pairs
*/
def getRuntimeArguments: Map[String, String]
/**
* Gets the admin interface for CDAP operations
* @return Admin interface for administrative operations
*/
def getAdmin: Admin
/**
* Gets the dataset framework for dataset operations
* @return DatasetFramework for dataset access
*/
def getDatasetFramework: DatasetFramework
/**
* Gets the messaging context for pub-sub operations
* @return MessagingContext for messaging operations
*/
def getMessagingContext: MessagingContext
}Install with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-spark-core