CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-co-cask-cdap--cdap-etl-api-spark

Java API interfaces and abstract classes for developing Apache Spark-based ETL operations within the CDAP ecosystem

Pending
Overview
Eval results
Files

streaming-etl.mddocs/

Streaming ETL Operations

Real-time data processing capabilities for Spark Streaming pipelines within CDAP. Provides interfaces and abstract classes for streaming sources and windowing operations with DStream support.

Capabilities

StreamingSource

Abstract class for implementing streaming data sources in CDAP ETL pipelines. StreamingSource creates and manages DStreams that provide continuous data input for real-time processing.

/**
 * Source for Spark Streaming pipelines.
 * @param <T> Type of object contained in the stream
 */
@Beta
public abstract class StreamingSource<T> implements PipelineConfigurable, Serializable {
  
  public static final String PLUGIN_TYPE = "streamingsource";
  
  /**
   * Get the DStream to read from for streaming processing.
   * @param context The streaming context for this stage of the pipeline
   * @return JavaDStream providing continuous data input
   */
  public abstract JavaDStream<T> getStream(StreamingContext context) throws Exception;
  
  /**
   * Configure the ETL pipeline by adding required datasets and streams.
   * @param pipelineConfigurer The configurer for adding datasets and streams
   */
  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
    // Default no-op implementation
  }
  
  /**
   * Get number of required executors for the streaming source.
   * Override when the DStream is a union of multiple streams.
   * @return Number of executors required (defaults to 1)
   */
  public int getRequiredExecutors() {
    return 1;
  }
}

Usage Example:

import co.cask.cdap.etl.api.streaming.StreamingSource;
import co.cask.cdap.etl.api.streaming.StreamingContext;
import co.cask.cdap.etl.api.PipelineConfigurer;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.kafka.KafkaUtils;
import kafka.serializer.StringDecoder;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.Collections;

public class KafkaStreamingSource extends StreamingSource<String> {
  
  private KafkaConfig config;
  
  @Override
  public JavaDStream<String> getStream(StreamingContext context) throws Exception {
    // Create Kafka DStream
    Map<String, String> kafkaParams = new HashMap<>();
    kafkaParams.put("metadata.broker.list", config.getBrokers());
    kafkaParams.put("auto.offset.reset", "latest");
    
    Set<String> topics = Collections.singleton(config.getTopic());
    
    return KafkaUtils.createDirectStream(
      context.getSparkStreamingContext(),
      String.class,
      String.class,
      StringDecoder.class,
      StringDecoder.class,
      kafkaParams,
      topics
    ).map(tuple -> tuple._2()); // Extract message value
  }
  
  @Override
  public int getRequiredExecutors() {
    return config.getPartitionCount(); // One executor per Kafka partition
  }
}

Windower

Abstract class for implementing time-based windowing operations in streaming pipelines. Windower defines window parameters for aggregating streaming data over time intervals.

/**
 * Windowing plugin for time-based data aggregation.
 */
@Beta
public abstract class Windower implements PipelineConfigurable, Serializable {
  
  public static final String PLUGIN_TYPE = "windower";
  
  /**
   * Configure the ETL pipeline by adding required datasets and streams.
   * @param pipelineConfigurer The configurer for adding datasets and streams
   */
  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
    // Default no-op implementation
  }
  
  /**
   * Get the width of the window in seconds.
   * Must be a multiple of the underlying batch interval.
   * @return Window width in seconds
   */
  public abstract long getWidth();
  
  /**
   * Get the slide interval of the window in seconds.
   * Must be a multiple of the underlying batch interval.
   * @return Window slide interval in seconds
   */
  public abstract long getSlideInterval();
}

Usage Example:

import co.cask.cdap.etl.api.streaming.Windower;

public class HourlyWindower extends Windower {
  
  @Override
  public long getWidth() {
    return 3600; // 1 hour window
  }
  
  @Override
  public long getSlideInterval() {
    return 300; // Slide every 5 minutes
  }
}

public class TumblingWindowTenMinutes extends Windower {
  
  @Override
  public long getWidth() {
    return 600; // 10 minute window
  }
  
  @Override
  public long getSlideInterval() {
    return 600; // Tumbling window (no overlap)
  }
}

StreamingContext

Context interface for streaming plugin stages. Provides access to Spark Streaming context, CDAP execution context, and lineage registration capabilities.

/**
 * Context for streaming plugin stages.
 */
@Beta
public interface StreamingContext extends StageContext, Transactional {
  
  /**
   * Get the Spark JavaStreamingContext for the pipeline.
   * @return JavaStreamingContext for creating and managing DStreams
   */
  JavaStreamingContext getSparkStreamingContext();
  
  /**
   * Get the CDAP JavaSparkExecutionContext for the pipeline.
   * @return CDAP execution context for accessing datasets and services
   */
  JavaSparkExecutionContext getSparkExecutionContext();
  
  /**
   * Register lineage for this Spark program using the given reference name.
   * @param referenceName Reference name used for source
   * @throws DatasetManagementException If error creating reference dataset
   * @throws TransactionFailureException If error fetching dataset for usage registration
   */
  void registerLineage(String referenceName) throws DatasetManagementException, TransactionFailureException;
}

Usage Example:

import co.cask.cdap.etl.api.streaming.StreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;

public class FileStreamingSource extends StreamingSource<String> {
  
  @Override
  public JavaDStream<String> getStream(StreamingContext context) throws Exception {
    // Register data lineage
    context.registerLineage("file-input-source");
    
    // Create file-based DStream
    JavaStreamingContext jssc = context.getSparkStreamingContext();
    return jssc.textFileStream("/path/to/streaming/files");
  }
}

Plugin Types

The streaming ETL package defines two plugin types for real-time processing:

  • streamingsource: For implementing streaming data sources using StreamingSource
  • windower: For implementing time-based windowing operations using Windower

Window Constraints

Both window width and slide interval must be multiples of the underlying Spark Streaming batch interval. Common patterns:

  • Tumbling Windows: Width equals slide interval (no overlap)
  • Sliding Windows: Slide interval is smaller than width (overlapping windows)
  • Session Windows: Not directly supported, requires custom logic in transformations

Integration with Spark Streaming

StreamingSource integrates with Spark Streaming by:

  1. Creating DStreams from external data sources (Kafka, files, sockets, etc.)
  2. Managing executor requirements based on data source characteristics
  3. Providing lifecycle hooks for resource management
  4. Enabling lineage tracking for data governance

Windower integrates by providing temporal parameters that downstream operations can use for time-based aggregations and transformations.

Install with Tessl CLI

npx tessl i tessl/maven-co-cask-cdap--cdap-etl-api-spark

docs

batch-etl.md

execution-context.md

index.md

streaming-etl.md

tile.json