Java API interfaces and abstract classes for developing Apache Spark-based ETL operations within the CDAP ecosystem
—
Real-time data processing capabilities for Spark Streaming pipelines within CDAP. Provides interfaces and abstract classes for streaming sources and windowing operations with DStream support.
Abstract class for implementing streaming data sources in CDAP ETL pipelines. StreamingSource creates and manages DStreams that provide continuous data input for real-time processing.
/**
* Source for Spark Streaming pipelines.
* @param <T> Type of object contained in the stream
*/
@Beta
public abstract class StreamingSource<T> implements PipelineConfigurable, Serializable {
public static final String PLUGIN_TYPE = "streamingsource";
/**
* Get the DStream to read from for streaming processing.
* @param context The streaming context for this stage of the pipeline
* @return JavaDStream providing continuous data input
*/
public abstract JavaDStream<T> getStream(StreamingContext context) throws Exception;
/**
* Configure the ETL pipeline by adding required datasets and streams.
* @param pipelineConfigurer The configurer for adding datasets and streams
*/
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
// Default no-op implementation
}
/**
* Get number of required executors for the streaming source.
* Override when the DStream is a union of multiple streams.
* @return Number of executors required (defaults to 1)
*/
public int getRequiredExecutors() {
return 1;
}
}Usage Example:
import co.cask.cdap.etl.api.streaming.StreamingSource;
import co.cask.cdap.etl.api.streaming.StreamingContext;
import co.cask.cdap.etl.api.PipelineConfigurer;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.kafka.KafkaUtils;
import kafka.serializer.StringDecoder;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.Collections;
public class KafkaStreamingSource extends StreamingSource<String> {
private KafkaConfig config;
@Override
public JavaDStream<String> getStream(StreamingContext context) throws Exception {
// Create Kafka DStream
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", config.getBrokers());
kafkaParams.put("auto.offset.reset", "latest");
Set<String> topics = Collections.singleton(config.getTopic());
return KafkaUtils.createDirectStream(
context.getSparkStreamingContext(),
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topics
).map(tuple -> tuple._2()); // Extract message value
}
@Override
public int getRequiredExecutors() {
return config.getPartitionCount(); // One executor per Kafka partition
}
}Abstract class for implementing time-based windowing operations in streaming pipelines. Windower defines window parameters for aggregating streaming data over time intervals.
/**
* Windowing plugin for time-based data aggregation.
*/
@Beta
public abstract class Windower implements PipelineConfigurable, Serializable {
public static final String PLUGIN_TYPE = "windower";
/**
* Configure the ETL pipeline by adding required datasets and streams.
* @param pipelineConfigurer The configurer for adding datasets and streams
*/
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
// Default no-op implementation
}
/**
* Get the width of the window in seconds.
* Must be a multiple of the underlying batch interval.
* @return Window width in seconds
*/
public abstract long getWidth();
/**
* Get the slide interval of the window in seconds.
* Must be a multiple of the underlying batch interval.
* @return Window slide interval in seconds
*/
public abstract long getSlideInterval();
}Usage Example:
import co.cask.cdap.etl.api.streaming.Windower;
public class HourlyWindower extends Windower {
@Override
public long getWidth() {
return 3600; // 1 hour window
}
@Override
public long getSlideInterval() {
return 300; // Slide every 5 minutes
}
}
public class TumblingWindowTenMinutes extends Windower {
@Override
public long getWidth() {
return 600; // 10 minute window
}
@Override
public long getSlideInterval() {
return 600; // Tumbling window (no overlap)
}
}Context interface for streaming plugin stages. Provides access to Spark Streaming context, CDAP execution context, and lineage registration capabilities.
/**
* Context for streaming plugin stages.
*/
@Beta
public interface StreamingContext extends StageContext, Transactional {
/**
* Get the Spark JavaStreamingContext for the pipeline.
* @return JavaStreamingContext for creating and managing DStreams
*/
JavaStreamingContext getSparkStreamingContext();
/**
* Get the CDAP JavaSparkExecutionContext for the pipeline.
* @return CDAP execution context for accessing datasets and services
*/
JavaSparkExecutionContext getSparkExecutionContext();
/**
* Register lineage for this Spark program using the given reference name.
* @param referenceName Reference name used for source
* @throws DatasetManagementException If error creating reference dataset
* @throws TransactionFailureException If error fetching dataset for usage registration
*/
void registerLineage(String referenceName) throws DatasetManagementException, TransactionFailureException;
}Usage Example:
import co.cask.cdap.etl.api.streaming.StreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;
public class FileStreamingSource extends StreamingSource<String> {
@Override
public JavaDStream<String> getStream(StreamingContext context) throws Exception {
// Register data lineage
context.registerLineage("file-input-source");
// Create file-based DStream
JavaStreamingContext jssc = context.getSparkStreamingContext();
return jssc.textFileStream("/path/to/streaming/files");
}
}The streaming ETL package defines two plugin types for real-time processing:
StreamingSourceWindowerBoth window width and slide interval must be multiples of the underlying Spark Streaming batch interval. Common patterns:
StreamingSource integrates with Spark Streaming by:
Windower integrates by providing temporal parameters that downstream operations can use for time-based aggregations and transformations.
Install with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-etl-api-spark