Java API interfaces and abstract classes for developing Apache Spark-based ETL operations within the CDAP ecosystem
npx @tessl/cli install tessl/maven-co-cask-cdap--cdap-etl-api-spark@5.1.0CDAP ETL API Spark provides Java API interfaces and abstract classes for developing Apache Spark-based ETL (Extract, Transform, Load) operations within the CDAP (Cask Data Application Platform) ecosystem. It enables developers to create custom Spark transformations, sinks, and streaming components that integrate seamlessly with CDAP's data processing pipelines.
Maven:
<dependency>
<groupId>co.cask.cdap</groupId>
<artifactId>cdap-etl-api-spark</artifactId>
<version>5.1.2</version>
</dependency>Gradle:
implementation 'co.cask.cdap:cdap-etl-api-spark:5.1.2'import co.cask.cdap.etl.api.batch.SparkSink;
import co.cask.cdap.etl.api.batch.SparkCompute;
import co.cask.cdap.etl.api.batch.SparkPluginContext;
import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext;
import co.cask.cdap.etl.api.streaming.StreamingContext;
import co.cask.cdap.etl.api.streaming.StreamingSource;
import co.cask.cdap.etl.api.streaming.Windower;import co.cask.cdap.etl.api.batch.SparkSink;
import co.cask.cdap.etl.api.batch.SparkPluginContext;
import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext;
import co.cask.cdap.etl.api.PipelineConfigurer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import scala.Tuple2;
// Example Spark sink implementation
public class MyDataSink extends SparkSink<MyRecord> {
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
// Configure pipeline datasets if needed
super.configurePipeline(pipelineConfigurer);
}
@Override
public void prepareRun(SparkPluginContext context) throws Exception {
// Prepare for batch run - configure Spark settings if needed
}
@Override
public void run(SparkExecutionPluginContext context, JavaRDD<MyRecord> input) throws Exception {
// Persist RDD data to storage
JavaPairRDD<String, MyRecord> keyedData = input.mapToPair(record ->
new Tuple2<>(record.getKey(), record));
context.saveAsDataset(keyedData, "output-dataset");
}
@Override
public void onRunFinish(boolean succeeded, SparkPluginContext context) {
// Cleanup after batch run completes
super.onRunFinish(succeeded, context);
}
}CDAP ETL API Spark is built around several key components:
Core batch processing capabilities for Spark-based ETL pipelines. Provides abstract classes for implementing data sinks and compute transformations with full access to Spark RDDs and CDAP datasets.
// Spark sink for data persistence
public abstract class SparkSink<IN> extends BatchConfigurable<SparkPluginContext> {
public abstract void run(SparkExecutionPluginContext context, JavaRDD<IN> input) throws Exception;
}
// Spark compute for data transformations
public abstract class SparkCompute<IN, OUT> implements PipelineConfigurable {
public abstract JavaRDD<OUT> transform(SparkExecutionPluginContext context, JavaRDD<IN> input) throws Exception;
public void initialize(SparkExecutionPluginContext context) throws Exception;
}Real-time data processing capabilities for Spark Streaming pipelines. Provides interfaces and abstract classes for streaming sources and windowing operations with DStream support.
// Streaming source for real-time data
public abstract class StreamingSource<T> implements PipelineConfigurable {
public abstract JavaDStream<T> getStream(StreamingContext context) throws Exception;
public int getRequiredExecutors();
}
// Windowing for time-based aggregations
public abstract class Windower implements PipelineConfigurable {
public abstract long getWidth();
public abstract long getSlideInterval();
}Rich context interfaces providing access to Spark execution environment, CDAP datasets, streams, and runtime configuration. Essential for implementing ETL operations that integrate with CDAP's data platform.
// Primary execution context for Spark operations
public interface SparkExecutionPluginContext extends DatasetContext, TransformContext {
JavaSparkContext getSparkContext();
long getLogicalStartTime();
Map<String, String> getRuntimeArguments();
<K, V> JavaPairRDD<K, V> fromDataset(String datasetName);
<K, V> void saveAsDataset(JavaPairRDD<K, V> rdd, String datasetName);
JavaRDD<StreamEvent> fromStream(String streamName);
}/**
* Base class for Batch run configuration methods.
* @param <T> batch execution context
*/
@Beta
public abstract class BatchConfigurable<T extends BatchContext> implements PipelineConfigurable, SubmitterLifecycle<T> {
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
// Default no-op implementation
}
/**
* Prepare the Batch run. Used to configure the job before starting the run.
* @param context batch execution context
* @throws Exception if there's an error during this method invocation
*/
@Override
public abstract void prepareRun(T context) throws Exception;
/**
* Invoked after the Batch run finishes. Used to perform any end of the run logic.
* @param succeeded defines the result of batch execution: true if run succeeded, false otherwise
* @param context batch execution context
*/
@Override
public void onRunFinish(boolean succeeded, T context) {
// Default no-op implementation
}
}
/**
* Context passed to Batch Source and Sink.
*/
@Beta
public interface BatchContext extends DatasetContext, TransformContext {
/**
* Create a new dataset instance.
* @param datasetName the name of the new dataset
* @param typeName the type of the dataset to create
* @param properties the properties for the new dataset
* @throws InstanceConflictException if the dataset already exists
* @throws DatasetManagementException for any issues encountered in the dataset system
*/
void createDataset(String datasetName, String typeName, DatasetProperties properties)
throws DatasetManagementException;
/**
* Check whether a dataset exists in the current namespace.
* @param datasetName the name of the dataset
* @return whether a dataset of that name exists
* @throws DatasetManagementException for any issues encountered in the dataset system
*/
boolean datasetExists(String datasetName) throws DatasetManagementException;
/**
* Returns settable pipeline arguments.
* @return settable pipeline arguments
*/
@Override
SettableArguments getArguments();
}
/**
* Interface for configuring ETL pipelines.
*/
public interface PipelineConfigurable {
/**
* Configure a pipeline.
* @param pipelineConfigurer the configurer used to add required datasets and streams
* @throws IllegalArgumentException if the given config is invalid
*/
void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException;
}