Java API interfaces and abstract classes for developing Apache Spark-based ETL operations within the CDAP ecosystem
—
Core batch processing capabilities for Spark-based ETL pipelines within CDAP. Provides abstract classes for implementing data sinks and compute transformations with full access to Spark RDDs and CDAP datasets.
Abstract class for implementing the final stage of a batch ETL pipeline. SparkSink performs RDD operations and is responsible for persisting data to external storage systems or CDAP datasets.
/**
* SparkSink composes a final, optional stage of a Batch ETL Pipeline. In addition to configuring the Batch run, it
* can also perform RDD operations on the key value pairs provided by the Batch run.
*
* {@link SparkSink#run} method is called inside the Batch Run while {@link SparkSink#prepareRun} and
* {@link SparkSink#onRunFinish} methods are called on the client side, which launches the Batch run, before the
* Batch run starts and after it finishes respectively.
*
* @param <IN> The type of input record to the SparkSink.
*/
@Beta
public abstract class SparkSink<IN> extends BatchConfigurable<SparkPluginContext> implements Serializable {
public static final String PLUGIN_TYPE = "sparksink";
private static final long serialVersionUID = -8600555200583639593L;
/**
* User Spark job which will be executed and is responsible for persisting any data.
* @param context {@link SparkExecutionPluginContext} for this job
* @param input the input from previous stages of the Batch run.
*/
public abstract void run(SparkExecutionPluginContext context, JavaRDD<IN> input) throws Exception;
// Inherited from BatchConfigurable<SparkPluginContext>:
/**
* Configure the ETL pipeline by adding required datasets and streams.
* @param pipelineConfigurer The configurer for adding datasets and streams
*/
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
// Default no-op implementation
}
/**
* Prepare the Batch run. Used to configure the job before starting the run.
* @param context batch execution context
* @throws Exception if there's an error during this method invocation
*/
@Override
public abstract void prepareRun(SparkPluginContext context) throws Exception;
/**
* Invoked after the Batch run finishes. Used to perform any end of the run logic.
* @param succeeded defines the result of batch execution: true if run succeeded, false otherwise
* @param context batch execution context
*/
@Override
public void onRunFinish(boolean succeeded, SparkPluginContext context) {
// Default no-op implementation
}
}Usage Example:
import co.cask.cdap.etl.api.batch.SparkSink;
import co.cask.cdap.etl.api.batch.SparkPluginContext;
import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext;
import co.cask.cdap.etl.api.PipelineConfigurer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.SparkConf;
import scala.Tuple2;
public class DatabaseSink extends SparkSink<UserRecord> {
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
// Configure any required datasets or streams
super.configurePipeline(pipelineConfigurer);
}
@Override
public void prepareRun(SparkPluginContext context) throws Exception {
// Configure Spark settings for this job
SparkConf sparkConf = new SparkConf()
.set("spark.executor.memory", "2g")
.set("spark.executor.cores", "2");
context.setSparkConf(sparkConf);
}
@Override
public void run(SparkExecutionPluginContext context, JavaRDD<UserRecord> input) throws Exception {
// Transform to key-value pairs and save to dataset
JavaPairRDD<String, UserRecord> keyedData = input.mapToPair(record ->
new Tuple2<>(record.getId(), record));
context.saveAsDataset(keyedData, "user-database");
}
@Override
public void onRunFinish(boolean succeeded, SparkPluginContext context) {
// Cleanup logic if needed
super.onRunFinish(succeeded, context);
}
}Abstract class for implementing data transformation stages in a batch ETL pipeline. SparkCompute transforms input RDDs into output RDDs with different types and structures.
/**
* Spark Compute stage for data transformations.
* @param <IN> Type of input object
* @param <OUT> Type of output object
*/
@Beta
public abstract class SparkCompute<IN, OUT> implements PipelineConfigurable, Serializable {
public static final String PLUGIN_TYPE = "sparkcompute";
/**
* Configure the ETL pipeline by adding required datasets and streams.
* @param pipelineConfigurer The configurer for adding datasets and streams
*/
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
// Default no-op implementation
}
/**
* Initialize the plugin before any transform calls are made.
* @param context SparkExecutionPluginContext for this job
*/
public void initialize(SparkExecutionPluginContext context) throws Exception {
// Default no-op implementation
}
/**
* Transform the input RDD and return the output RDD for the next pipeline stage.
* @param context SparkExecutionPluginContext for this job
* @param input Input RDD to be transformed
* @return Transformed output RDD
*/
public abstract JavaRDD<OUT> transform(SparkExecutionPluginContext context, JavaRDD<IN> input) throws Exception;
}Usage Example:
import co.cask.cdap.etl.api.batch.SparkCompute;
import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext;
import org.apache.spark.api.java.JavaRDD;
public class DataCleaner extends SparkCompute<RawRecord, CleanRecord> {
@Override
public void initialize(SparkExecutionPluginContext context) throws Exception {
// Initialize any required resources or configurations
this.config = getConfig(); // Assume config is available
}
@Override
public JavaRDD<CleanRecord> transform(SparkExecutionPluginContext context, JavaRDD<RawRecord> input) throws Exception {
return input
.filter(record -> record.isValid()) // Filter invalid records
.map(record -> cleanAndNormalize(record)) // Transform to clean records
.filter(record -> record != null); // Remove null results
}
private CleanRecord cleanAndNormalize(RawRecord raw) {
// Implementation for cleaning and normalizing data
return new CleanRecord(raw.getName().trim(), raw.getAge(), raw.getEmail().toLowerCase());
}
}Context interface for Spark plugins during the preparation phase. Provides access to batch context capabilities and Spark configuration management.
/**
* Context passed to spark plugin types during prepare run phase.
*/
@Beta
public interface SparkPluginContext extends BatchContext {
/**
* Sets a {@link SparkConf} to be used for the Spark execution.
*
* If your configuration will not change between pipeline runs,
* use {@link PipelineConfigurer#setPipelineProperties}
* instead. This method should only be used when you need different
* configuration settings for each run.
*
* Due to limitations in Spark Streaming, this method cannot be used
* in realtime data pipelines. Calling this method will throw an
* {@link UnsupportedOperationException} in realtime pipelines.
*
* @param sparkConf Spark configuration for the execution
* @throws UnsupportedOperationException in realtime data pipelines
*/
void setSparkConf(SparkConf sparkConf);
}Usage Example:
import co.cask.cdap.etl.api.batch.SparkPluginContext;
import org.apache.spark.SparkConf;
public class MySparkSink extends SparkSink<MyRecord> {
@Override
public void prepareRun(SparkPluginContext context) throws Exception {
// Configure Spark settings for this job
SparkConf sparkConf = new SparkConf()
.set("spark.executor.memory", "2g")
.set("spark.executor.cores", "2")
.set("spark.sql.adaptive.enabled", "true");
context.setSparkConf(sparkConf);
}
@Override
public void run(SparkExecutionPluginContext context, JavaRDD<MyRecord> input) throws Exception {
// Spark job implementation
}
}The batch ETL package defines two plugin types that can be used in CDAP ETL pipelines:
SparkSinkSparkComputeBoth SparkSink and SparkCompute inherit lifecycle methods from their parent classes:
Install with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-etl-api-spark