Core Spark 1.x integration component for the Cask Data Application Platform providing runtime services and execution context for CDAP applications
npx @tessl/cli install tessl/maven-co-cask-cdap--cdap-spark-core@5.1.0CDAP Spark Core is a Java/Scala library that provides Apache Spark 1.x integration capabilities for the Cask Data Application Platform (CDAP). It serves as a runtime provider and execution context for Spark-based applications within the CDAP framework, offering abstractions for data processing, service contexts, and distributed execution while maintaining integration with Hadoop ecosystem components.
<dependency>
<groupId>co.cask.cdap</groupId>
<artifactId>cdap-spark-core</artifactId>
<version>5.1.2</version>
</dependency>import co.cask.cdap.app.runtime.spark.DefaultSparkExecutionContext;
import co.cask.cdap.app.runtime.spark.Spark1ProgramRuntimeProvider;
import co.cask.cdap.app.runtime.spark.SparkRuntimeContext;
import co.cask.cdap.app.runtime.spark.service.SparkHttpServiceServer;For Scala:
import co.cask.cdap.app.runtime.spark.DefaultSparkExecutionContext
import co.cask.cdap.app.runtime.spark.service.DefaultSparkHttpServiceContext
import co.cask.cdap.app.runtime.spark.dynamic.{DefaultSparkCompiler, DefaultSparkInterpreter}// Create Spark runtime provider
Spark1ProgramRuntimeProvider provider = new Spark1ProgramRuntimeProvider();
// SparkRuntimeContext is created internally by CDAP with all required dependencies
// Access CDAP services within Spark programs through the execution context
SparkRuntimeContext runtimeContext = sparkClassLoader.getRuntimeContext();
TransactionSystemClient txClient = runtimeContext.getTransactionSystemClient();
DatasetFramework datasetFramework = runtimeContext.getDatasetFramework();
AuthorizationEnforcer authEnforcer = runtimeContext.getAuthorizationEnforcer();For Scala execution context:
// DefaultSparkExecutionContext is created with SparkClassLoader and localized resources
val sparkContext = new DefaultSparkExecutionContext(sparkClassLoader, localizeResources)
// Access runtime information
val spec = sparkContext.getSpecification
val startTime = sparkContext.getLogicalStartTime
val args = sparkContext.getRuntimeArguments
val admin = sparkContext.getAdmin
val messaging = sparkContext.getMessagingContextCDAP Spark Core is built around several key components:
Core runtime integration that enables Spark 1.x programs to run within the CDAP platform with full lifecycle management and resource allocation.
@ProgramRuntimeProvider.SupportedProgramType(ProgramType.SPARK)
public class Spark1ProgramRuntimeProvider extends SparkProgramRuntimeProvider {
public Spark1ProgramRuntimeProvider();
}
public abstract class SparkProgramRuntimeProvider implements ProgramRuntimeProvider {
public ProgramRunner createProgramRunner(ProgramType type, Mode mode, Injector injector);
public boolean isSupported(ProgramType programType, CConfiguration cConf);
}Execution contexts that provide access to CDAP services, metadata, and configuration within Spark applications, enabling seamless integration with the broader CDAP ecosystem.
public final class SparkRuntimeContext extends AbstractContext implements Metrics {
SparkRuntimeContext(Configuration hConf, Program program, ProgramOptions programOptions,
CConfiguration cConf, String hostname, TransactionSystemClient txClient,
DatasetFramework datasetFramework, DiscoveryServiceClient discoveryServiceClient,
MetricsCollectionService metricsCollectionService, StreamAdmin streamAdmin,
WorkflowProgramInfo workflowProgramInfo, PluginInstantiator pluginInstantiator,
SecureStore secureStore, SecureStoreManager secureStoreManager,
AuthorizationEnforcer authorizationEnforcer, AuthenticationContext authenticationContext,
MessagingService messagingService, ServiceAnnouncer serviceAnnouncer,
PluginFinder pluginFinder, LocationFactory locationFactory,
MetadataReader metadataReader, MetadataPublisher metadataPublisher);
public SparkSpecification getSpecification();
public long getLogicalStartTime();
public Map<String, String> getRuntimeArguments();
public DiscoveryServiceClient getDiscoveryServiceClient();
public LocationFactory getLocationFactory();
public TransactionSystemClient getTransactionSystemClient();
public AuthorizationEnforcer getAuthorizationEnforcer();
public AuthenticationContext getAuthenticationContext();
public String getHostname();
public Configuration getHConf();
public CConfiguration getCConf();
}class DefaultSparkExecutionContext(sparkClassLoader: SparkClassLoader, localizeResources: util.Map[String, File])
extends AbstractSparkExecutionContext(sparkClassLoader, localizeResources) {
protected def saveAsNewAPIHadoopDataset[K: ClassManifest, V: ClassManifest](sc: SparkContext,
conf: Configuration,
rdd: RDD[(K, V)]): Unit
protected def createInterpreter(settings: Settings, classDir: File,
urlAdder: URLAdder, onClose: () => Unit): SparkInterpreter
protected def createSparkMetricsWriterFactory(): (TaskContext) => SparkMetricsWriter
}
abstract class AbstractSparkExecutionContext(sparkClassLoader: SparkClassLoader,
localizeResources: util.Map[String, File])
extends SparkExecutionContext with AutoCloseable {
def getSpecification: SparkSpecification
def getLogicalStartTime: Long
def getRuntimeArguments: Map[String, String]
def getAdmin: Admin
def getDatasetFramework: DatasetFramework
def getMessagingContext: MessagingContext
def getTransactionSystemClient: TransactionSystemClient
}Data processing utilities and Spark SQL data sources that enable efficient access to CDAP datasets and streams with full type safety and performance optimization.
class DatumScannerIterator[T](scanner: Scanner[T]) extends Iterator[T] {
def hasNext: Boolean
def next(): T
def close(): Unit
}
object DatasetRelationProvider extends RelationProvider {
def shortName(): String
def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
}HTTP service framework that allows Spark applications to expose web endpoints and REST APIs while maintaining full integration with CDAP's service discovery and security model.
public class SparkHttpServiceServer {
public void startAndWait() throws Exception;
public void stopAndWait() throws Exception;
public InetSocketAddress getBindAddress();
}class DefaultSparkHttpServiceContext extends SparkHttpServiceContext {
def getSpecification: SparkSpecification
def getInstanceId: Int
def getInstanceCount: Int
def getLogicalStartTime: Long
}Dynamic Scala compilation and interpretation capabilities that enable runtime code generation, interactive development, and flexible application behavior modification.
class DefaultSparkCompiler extends SparkCompiler {
def compile(code: String): Option[Class[_]]
def compileClass(className: String, code: String): Option[Class[_]]
}
class DefaultSparkInterpreter extends SparkInterpreter {
def interpret(code: String): Unit
def bind(name: String, value: Any): Unit
def reset(): Unit
}Transaction handling capabilities that enable ACID properties for Spark operations within the CDAP platform, providing consistent data access across distributed Spark executors.
public class SparkTransactionHandler {
public void jobStarted(Integer jobId, Set<Integer> stageIds);
public void jobStarted(Integer jobId, Set<Integer> stageIds, SparkTransactional.TransactionInfo txInfo);
public void jobEnded(Integer jobId, boolean succeeded);
public void stageSubmitted(Integer stageId);
public void stageCompleted(Integer stageId);
}
public class SparkTransactional {
public TransactionInfo getTransactionInfo(String key);
public void execute(TransactionType type, TxRunnable runnable) throws TransactionFailureException;
public <T> T execute(TransactionType type, TxCallable<T> callable) throws TransactionFailureException;
}Custom class loading infrastructure that provides proper isolation and dependency management for Spark applications within the CDAP runtime environment.
public class SparkClassLoader extends URLClassLoader {
public SparkRuntimeContext getRuntimeContext();
public void addURL(URL url);
public Class<?> loadClass(String name) throws ClassNotFoundException;
}
public class SparkRunnerClassLoader extends FilterClassLoader {
public static SparkRunnerClassLoader create();
protected boolean includePackage(String packageName);
}Distributed execution framework built on Apache Twill that provides scalable, fault-tolerant Spark application deployment across YARN clusters with proper resource management.
public class SparkExecutionService {
public ListenableFuture<ProgramController> submit(ProgramRunId programRunId, ProgramOptions programOptions);
public void stop();
}
public class SparkTwillRunnable implements TwillRunnable {
public void run();
public void stop();
public void handleCommand(Command command) throws Exception;
}// Core runtime types
interface ProgramRuntimeProvider {
ProgramController createProgramController(ProgramRunId programRunId, ProgramOptions programOptions);
Runnable createRunnable(ProgramRunId programRunId, ProgramOptions programOptions, InetAddress hostname);
}
interface ProgramController {
ListenableFuture<ProgramController> command(String command, Object... args);
ListenableFuture<ProgramController> stop();
State getState();
}
// Spark specification and context types
class SparkSpecification {
String getName();
String getDescription();
String getMainClassName();
Resources getDriverResources();
Resources getExecutorResources();
}
class Resources {
int getMemoryMB();
int getVirtualCores();
}
// Program execution metadata
class ProgramRunId {
String getNamespace();
String getApplication();
ProgramType getType();
String getProgram();
String getRun();
}
class ProgramOptions {
Map<String, String> getArguments();
Map<String, String> getUserArguments();
boolean isDebug();
}// Scala-specific context types
trait SparkExecutionContext {
def getSpecification: SparkSpecification
def getLogicalStartTime: Long
def getRuntimeArguments: Map[String, String]
def getAdmin: Admin
def getDatasetFramework: DatasetFramework
def getMessagingContext: MessagingContext
}
trait SparkHttpServiceContext {
def getSpecification: SparkSpecification
def getInstanceId: Int
def getInstanceCount: Int
def getLogicalStartTime: Long
}
// Dynamic compilation types
trait SparkCompiler {
def compile(code: String): Option[Class[_]]
def compileClass(className: String, code: String): Option[Class[_]]
}
trait SparkInterpreter {
def interpret(code: String): Unit
def bind(name: String, value: Any): Unit
def reset(): Unit
}