or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-processing.mddistributed-execution.mddynamic-compilation.mdexecution-contexts.mdhttp-services.mdindex.mdruntime-providers.mdtransaction-management.md
tile.json

tessl/maven-co-cask-cdap--cdap-spark-core

Core Spark 1.x integration component for the Cask Data Application Platform providing runtime services and execution context for CDAP applications

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/co.cask.cdap/cdap-spark-core@5.1.x

To install, run

npx @tessl/cli install tessl/maven-co-cask-cdap--cdap-spark-core@5.1.0

index.mddocs/

CDAP Spark Core

CDAP Spark Core is a Java/Scala library that provides Apache Spark 1.x integration capabilities for the Cask Data Application Platform (CDAP). It serves as a runtime provider and execution context for Spark-based applications within the CDAP framework, offering abstractions for data processing, service contexts, and distributed execution while maintaining integration with Hadoop ecosystem components.

Package Information

  • Package Name: cdap-spark-core
  • Package Type: maven
  • Group ID: co.cask.cdap
  • Language: Java/Scala
  • Installation: Add to your Maven pom.xml:
<dependency>
    <groupId>co.cask.cdap</groupId>
    <artifactId>cdap-spark-core</artifactId>
    <version>5.1.2</version>
</dependency>

Core Imports

import co.cask.cdap.app.runtime.spark.DefaultSparkExecutionContext;
import co.cask.cdap.app.runtime.spark.Spark1ProgramRuntimeProvider;
import co.cask.cdap.app.runtime.spark.SparkRuntimeContext;
import co.cask.cdap.app.runtime.spark.service.SparkHttpServiceServer;

For Scala:

import co.cask.cdap.app.runtime.spark.DefaultSparkExecutionContext
import co.cask.cdap.app.runtime.spark.service.DefaultSparkHttpServiceContext
import co.cask.cdap.app.runtime.spark.dynamic.{DefaultSparkCompiler, DefaultSparkInterpreter}

Basic Usage

// Create Spark runtime provider
Spark1ProgramRuntimeProvider provider = new Spark1ProgramRuntimeProvider();

// SparkRuntimeContext is created internally by CDAP with all required dependencies
// Access CDAP services within Spark programs through the execution context
SparkRuntimeContext runtimeContext = sparkClassLoader.getRuntimeContext();
TransactionSystemClient txClient = runtimeContext.getTransactionSystemClient();
DatasetFramework datasetFramework = runtimeContext.getDatasetFramework();
AuthorizationEnforcer authEnforcer = runtimeContext.getAuthorizationEnforcer();

For Scala execution context:

// DefaultSparkExecutionContext is created with SparkClassLoader and localized resources
val sparkContext = new DefaultSparkExecutionContext(sparkClassLoader, localizeResources)

// Access runtime information
val spec = sparkContext.getSpecification
val startTime = sparkContext.getLogicalStartTime
val args = sparkContext.getRuntimeArguments
val admin = sparkContext.getAdmin
val messaging = sparkContext.getMessagingContext

Architecture

CDAP Spark Core is built around several key components:

  • Runtime Providers: Service providers that integrate Spark 1.x execution with CDAP program lifecycle
  • Execution Contexts: Abstractions providing access to CDAP services and metadata within Spark applications
  • Data Integration: SQL data sources and scanning utilities for accessing CDAP datasets and streams
  • Service Framework: HTTP service contexts for running web services alongside Spark applications
  • Dynamic Compilation: Scala compiler and interpreter integration for runtime code generation
  • Distributed Execution: Twill-based distributed application support with proper resource management
  • Class Loading: Custom class loaders providing proper isolation and dependency management

Capabilities

Runtime Providers and Program Execution

Core runtime integration that enables Spark 1.x programs to run within the CDAP platform with full lifecycle management and resource allocation.

@ProgramRuntimeProvider.SupportedProgramType(ProgramType.SPARK)
public class Spark1ProgramRuntimeProvider extends SparkProgramRuntimeProvider {
    public Spark1ProgramRuntimeProvider();
}

public abstract class SparkProgramRuntimeProvider implements ProgramRuntimeProvider {
    public ProgramRunner createProgramRunner(ProgramType type, Mode mode, Injector injector);
    public boolean isSupported(ProgramType programType, CConfiguration cConf);
}

Runtime Providers

Spark Execution Contexts

Execution contexts that provide access to CDAP services, metadata, and configuration within Spark applications, enabling seamless integration with the broader CDAP ecosystem.

public final class SparkRuntimeContext extends AbstractContext implements Metrics {
    SparkRuntimeContext(Configuration hConf, Program program, ProgramOptions programOptions,
                        CConfiguration cConf, String hostname, TransactionSystemClient txClient,
                        DatasetFramework datasetFramework, DiscoveryServiceClient discoveryServiceClient,
                        MetricsCollectionService metricsCollectionService, StreamAdmin streamAdmin,
                        WorkflowProgramInfo workflowProgramInfo, PluginInstantiator pluginInstantiator,
                        SecureStore secureStore, SecureStoreManager secureStoreManager,
                        AuthorizationEnforcer authorizationEnforcer, AuthenticationContext authenticationContext,
                        MessagingService messagingService, ServiceAnnouncer serviceAnnouncer,
                        PluginFinder pluginFinder, LocationFactory locationFactory,
                        MetadataReader metadataReader, MetadataPublisher metadataPublisher);
    
    public SparkSpecification getSpecification();
    public long getLogicalStartTime();
    public Map<String, String> getRuntimeArguments();
    public DiscoveryServiceClient getDiscoveryServiceClient();
    public LocationFactory getLocationFactory();
    public TransactionSystemClient getTransactionSystemClient();
    public AuthorizationEnforcer getAuthorizationEnforcer();
    public AuthenticationContext getAuthenticationContext();
    public String getHostname();
    public Configuration getHConf();
    public CConfiguration getCConf();
}
class DefaultSparkExecutionContext(sparkClassLoader: SparkClassLoader, localizeResources: util.Map[String, File])
  extends AbstractSparkExecutionContext(sparkClassLoader, localizeResources) {
    
    protected def saveAsNewAPIHadoopDataset[K: ClassManifest, V: ClassManifest](sc: SparkContext,
                                                                               conf: Configuration,
                                                                               rdd: RDD[(K, V)]): Unit
    
    protected def createInterpreter(settings: Settings, classDir: File,
                                   urlAdder: URLAdder, onClose: () => Unit): SparkInterpreter
    
    protected def createSparkMetricsWriterFactory(): (TaskContext) => SparkMetricsWriter
}

abstract class AbstractSparkExecutionContext(sparkClassLoader: SparkClassLoader,
                                           localizeResources: util.Map[String, File])
  extends SparkExecutionContext with AutoCloseable {
    
    def getSpecification: SparkSpecification
    def getLogicalStartTime: Long  
    def getRuntimeArguments: Map[String, String]
    def getAdmin: Admin
    def getDatasetFramework: DatasetFramework
    def getMessagingContext: MessagingContext
    def getTransactionSystemClient: TransactionSystemClient
}

Execution Contexts

Data Processing and SQL Integration

Data processing utilities and Spark SQL data sources that enable efficient access to CDAP datasets and streams with full type safety and performance optimization.

class DatumScannerIterator[T](scanner: Scanner[T]) extends Iterator[T] {
    def hasNext: Boolean
    def next(): T
    def close(): Unit
}

object DatasetRelationProvider extends RelationProvider {
    def shortName(): String
    def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
}

Data Processing

HTTP Service Framework

HTTP service framework that allows Spark applications to expose web endpoints and REST APIs while maintaining full integration with CDAP's service discovery and security model.

public class SparkHttpServiceServer {
    public void startAndWait() throws Exception;
    public void stopAndWait() throws Exception;
    public InetSocketAddress getBindAddress();
}
class DefaultSparkHttpServiceContext extends SparkHttpServiceContext {
    def getSpecification: SparkSpecification
    def getInstanceId: Int
    def getInstanceCount: Int
    def getLogicalStartTime: Long
}

HTTP Services

Dynamic Compilation and Interpretation

Dynamic Scala compilation and interpretation capabilities that enable runtime code generation, interactive development, and flexible application behavior modification.

class DefaultSparkCompiler extends SparkCompiler {
    def compile(code: String): Option[Class[_]]
    def compileClass(className: String, code: String): Option[Class[_]]
}

class DefaultSparkInterpreter extends SparkInterpreter {
    def interpret(code: String): Unit
    def bind(name: String, value: Any): Unit
    def reset(): Unit
}

Dynamic Compilation

Transaction Management

Transaction handling capabilities that enable ACID properties for Spark operations within the CDAP platform, providing consistent data access across distributed Spark executors.

public class SparkTransactionHandler {
    public void jobStarted(Integer jobId, Set<Integer> stageIds);
    public void jobStarted(Integer jobId, Set<Integer> stageIds, SparkTransactional.TransactionInfo txInfo);
    public void jobEnded(Integer jobId, boolean succeeded);
    public void stageSubmitted(Integer stageId);
    public void stageCompleted(Integer stageId);
}

public class SparkTransactional {
    public TransactionInfo getTransactionInfo(String key);
    public void execute(TransactionType type, TxRunnable runnable) throws TransactionFailureException;
    public <T> T execute(TransactionType type, TxCallable<T> callable) throws TransactionFailureException;
}

Transaction Management

Class Loading and Runtime Support

Custom class loading infrastructure that provides proper isolation and dependency management for Spark applications within the CDAP runtime environment.

public class SparkClassLoader extends URLClassLoader {
    public SparkRuntimeContext getRuntimeContext();
    public void addURL(URL url);
    public Class<?> loadClass(String name) throws ClassNotFoundException;
}

public class SparkRunnerClassLoader extends FilterClassLoader {
    public static SparkRunnerClassLoader create();
    protected boolean includePackage(String packageName);
}

Distributed Execution

Distributed execution framework built on Apache Twill that provides scalable, fault-tolerant Spark application deployment across YARN clusters with proper resource management.

public class SparkExecutionService {
    public ListenableFuture<ProgramController> submit(ProgramRunId programRunId, ProgramOptions programOptions);
    public void stop();
}

public class SparkTwillRunnable implements TwillRunnable {
    public void run();
    public void stop();
    public void handleCommand(Command command) throws Exception;
}

Distributed Execution

Types

// Core runtime types
interface ProgramRuntimeProvider {
    ProgramController createProgramController(ProgramRunId programRunId, ProgramOptions programOptions);
    Runnable createRunnable(ProgramRunId programRunId, ProgramOptions programOptions, InetAddress hostname);
}

interface ProgramController {
    ListenableFuture<ProgramController> command(String command, Object... args);
    ListenableFuture<ProgramController> stop();
    State getState();
}

// Spark specification and context types
class SparkSpecification {
    String getName();
    String getDescription();
    String getMainClassName();
    Resources getDriverResources();
    Resources getExecutorResources();
}

class Resources {
    int getMemoryMB();
    int getVirtualCores();
}

// Program execution metadata
class ProgramRunId {
    String getNamespace();
    String getApplication();
    ProgramType getType();
    String getProgram();
    String getRun();
}

class ProgramOptions {
    Map<String, String> getArguments();
    Map<String, String> getUserArguments();
    boolean isDebug();
}
// Scala-specific context types
trait SparkExecutionContext {
    def getSpecification: SparkSpecification
    def getLogicalStartTime: Long
    def getRuntimeArguments: Map[String, String]
    def getAdmin: Admin
    def getDatasetFramework: DatasetFramework
    def getMessagingContext: MessagingContext
}

trait SparkHttpServiceContext {
    def getSpecification: SparkSpecification
    def getInstanceId: Int
    def getInstanceCount: Int
    def getLogicalStartTime: Long
}

// Dynamic compilation types
trait SparkCompiler {
    def compile(code: String): Option[Class[_]]
    def compileClass(className: String, code: String): Option[Class[_]]
}

trait SparkInterpreter {
    def interpret(code: String): Unit
    def bind(name: String, value: Any): Unit
    def reset(): Unit
}