or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

batch-etl.mdexecution-context.mdindex.mdstreaming-etl.md
tile.json

tessl/maven-co-cask-cdap--cdap-etl-api-spark

Java API interfaces and abstract classes for developing Apache Spark-based ETL operations within the CDAP ecosystem

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/co.cask.cdap/cdap-etl-api-spark@5.1.x

To install, run

npx @tessl/cli install tessl/maven-co-cask-cdap--cdap-etl-api-spark@5.1.0

index.mddocs/

CDAP ETL API Spark

CDAP ETL API Spark provides Java API interfaces and abstract classes for developing Apache Spark-based ETL (Extract, Transform, Load) operations within the CDAP (Cask Data Application Platform) ecosystem. It enables developers to create custom Spark transformations, sinks, and streaming components that integrate seamlessly with CDAP's data processing pipelines.

Package Information

  • Package Name: cdap-etl-api-spark
  • Package Type: maven
  • Language: Java
  • Group ID: co.cask.cdap
  • Artifact ID: cdap-etl-api-spark
  • Installation:

Maven:

<dependency>
  <groupId>co.cask.cdap</groupId>
  <artifactId>cdap-etl-api-spark</artifactId>
  <version>5.1.2</version>
</dependency>

Gradle:

implementation 'co.cask.cdap:cdap-etl-api-spark:5.1.2'

Core Imports

import co.cask.cdap.etl.api.batch.SparkSink;
import co.cask.cdap.etl.api.batch.SparkCompute;
import co.cask.cdap.etl.api.batch.SparkPluginContext;
import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext;
import co.cask.cdap.etl.api.streaming.StreamingContext;
import co.cask.cdap.etl.api.streaming.StreamingSource;
import co.cask.cdap.etl.api.streaming.Windower;

Basic Usage

import co.cask.cdap.etl.api.batch.SparkSink;
import co.cask.cdap.etl.api.batch.SparkPluginContext;
import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext;
import co.cask.cdap.etl.api.PipelineConfigurer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import scala.Tuple2;

// Example Spark sink implementation
public class MyDataSink extends SparkSink<MyRecord> {
  
  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    // Configure pipeline datasets if needed
    super.configurePipeline(pipelineConfigurer);
  }
  
  @Override
  public void prepareRun(SparkPluginContext context) throws Exception {
    // Prepare for batch run - configure Spark settings if needed
  }
  
  @Override
  public void run(SparkExecutionPluginContext context, JavaRDD<MyRecord> input) throws Exception {
    // Persist RDD data to storage
    JavaPairRDD<String, MyRecord> keyedData = input.mapToPair(record -> 
      new Tuple2<>(record.getKey(), record));
    context.saveAsDataset(keyedData, "output-dataset");
  }
  
  @Override
  public void onRunFinish(boolean succeeded, SparkPluginContext context) {
    // Cleanup after batch run completes
    super.onRunFinish(succeeded, context);
  }
}

Architecture

CDAP ETL API Spark is built around several key components:

  • Batch Processing: SparkSink and SparkCompute for batch ETL operations with RDD support
  • Streaming Processing: StreamingSource and Windower for real-time data processing with DStream support
  • Context Management: Rich context interfaces providing access to datasets, streams, metrics, and configuration
  • Plugin Framework: Abstract base classes implementing CDAP's plugin lifecycle and configuration patterns
  • Integration Layer: Seamless integration with CDAP's data platform, including lineage tracking and transactional operations

Capabilities

Batch ETL Operations

Core batch processing capabilities for Spark-based ETL pipelines. Provides abstract classes for implementing data sinks and compute transformations with full access to Spark RDDs and CDAP datasets.

// Spark sink for data persistence
public abstract class SparkSink<IN> extends BatchConfigurable<SparkPluginContext> {
  public abstract void run(SparkExecutionPluginContext context, JavaRDD<IN> input) throws Exception;
}

// Spark compute for data transformations  
public abstract class SparkCompute<IN, OUT> implements PipelineConfigurable {
  public abstract JavaRDD<OUT> transform(SparkExecutionPluginContext context, JavaRDD<IN> input) throws Exception;
  public void initialize(SparkExecutionPluginContext context) throws Exception;
}

Batch ETL Operations

Streaming ETL Operations

Real-time data processing capabilities for Spark Streaming pipelines. Provides interfaces and abstract classes for streaming sources and windowing operations with DStream support.

// Streaming source for real-time data
public abstract class StreamingSource<T> implements PipelineConfigurable {
  public abstract JavaDStream<T> getStream(StreamingContext context) throws Exception;
  public int getRequiredExecutors();
}

// Windowing for time-based aggregations  
public abstract class Windower implements PipelineConfigurable {
  public abstract long getWidth();
  public abstract long getSlideInterval();
}

Streaming ETL Operations

Execution Context

Rich context interfaces providing access to Spark execution environment, CDAP datasets, streams, and runtime configuration. Essential for implementing ETL operations that integrate with CDAP's data platform.

// Primary execution context for Spark operations
public interface SparkExecutionPluginContext extends DatasetContext, TransformContext {
  JavaSparkContext getSparkContext();
  long getLogicalStartTime();
  Map<String, String> getRuntimeArguments();
  <K, V> JavaPairRDD<K, V> fromDataset(String datasetName);
  <K, V> void saveAsDataset(JavaPairRDD<K, V> rdd, String datasetName);
  JavaRDD<StreamEvent> fromStream(String streamName);
}

Execution Context

Type Definitions

Core Parent Types

/**
 * Base class for Batch run configuration methods.
 * @param <T> batch execution context
 */
@Beta
public abstract class BatchConfigurable<T extends BatchContext> implements PipelineConfigurable, SubmitterLifecycle<T> {
  
  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    // Default no-op implementation
  }
  
  /**
   * Prepare the Batch run. Used to configure the job before starting the run.
   * @param context batch execution context
   * @throws Exception if there's an error during this method invocation
   */
  @Override
  public abstract void prepareRun(T context) throws Exception;
  
  /**
   * Invoked after the Batch run finishes. Used to perform any end of the run logic.
   * @param succeeded defines the result of batch execution: true if run succeeded, false otherwise
   * @param context batch execution context
   */
  @Override
  public void onRunFinish(boolean succeeded, T context) {
    // Default no-op implementation
  }
}

/**
 * Context passed to Batch Source and Sink.
 */
@Beta
public interface BatchContext extends DatasetContext, TransformContext {
  
  /**
   * Create a new dataset instance.
   * @param datasetName the name of the new dataset
   * @param typeName the type of the dataset to create
   * @param properties the properties for the new dataset
   * @throws InstanceConflictException if the dataset already exists
   * @throws DatasetManagementException for any issues encountered in the dataset system
   */
  void createDataset(String datasetName, String typeName, DatasetProperties properties)
    throws DatasetManagementException;
  
  /**
   * Check whether a dataset exists in the current namespace.
   * @param datasetName the name of the dataset
   * @return whether a dataset of that name exists
   * @throws DatasetManagementException for any issues encountered in the dataset system
   */
  boolean datasetExists(String datasetName) throws DatasetManagementException;
  
  /**
   * Returns settable pipeline arguments.
   * @return settable pipeline arguments
   */
  @Override
  SettableArguments getArguments();
}

/**
 * Interface for configuring ETL pipelines.
 */
public interface PipelineConfigurable {
  
  /**
   * Configure a pipeline.
   * @param pipelineConfigurer the configurer used to add required datasets and streams
   * @throws IllegalArgumentException if the given config is invalid
   */
  void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException;
}