Java API interfaces and abstract classes for developing Apache Spark-based ETL operations within the CDAP ecosystem
—
Rich context interfaces providing access to Spark execution environment, CDAP datasets, streams, and runtime configuration. Essential for implementing ETL operations that integrate with CDAP's data platform.
Primary execution context interface for Spark operations. Provides comprehensive access to Spark runtime, CDAP datasets, streams, and platform services during ETL execution.
/**
* Context passed to spark plugin types during execution.
*/
@Beta
public interface SparkExecutionPluginContext extends DatasetContext, TransformContext {
/**
* Returns the logical start time of the Batch Job.
* @return Time in milliseconds since epoch (January 1, 1970 UTC)
*/
@Override
long getLogicalStartTime();
/**
* Returns runtime arguments of the Batch Job.
* @return Map of runtime arguments
*/
Map<String, String> getRuntimeArguments();
/**
* Returns the JavaSparkContext used during execution.
* @return The Spark Context for RDD operations
*/
JavaSparkContext getSparkContext();
/**
* Returns a Serializable PluginContext for requesting plugin instances.
* Can be used in Spark program closures.
* @return Serializable PluginContext
*/
PluginContext getPluginContext();
/**
* Creates a new SparkInterpreter for Scala code compilation and interpretation.
* @return New SparkInterpreter instance
* @throws IOException If failed to create local directory for compiled classes
*/
SparkInterpreter createSparkInterpreter() throws IOException;
}Methods for reading from and writing to CDAP datasets using Spark RDDs.
/**
* Creates a JavaPairRDD from the given Dataset.
* @param datasetName Name of the Dataset
* @param <K> Key type
* @param <V> Value type
* @return JavaPairRDD instance that reads from the Dataset
* @throws DatasetInstantiationException If the Dataset doesn't exist
*/
<K, V> JavaPairRDD<K, V> fromDataset(String datasetName);
/**
* Creates a JavaPairRDD from the given Dataset with arguments.
* @param datasetName Name of the Dataset
* @param arguments Dataset arguments
* @param <K> Key type
* @param <V> Value type
* @return JavaPairRDD instance that reads from the Dataset
* @throws DatasetInstantiationException If the Dataset doesn't exist
*/
<K, V> JavaPairRDD<K, V> fromDataset(String datasetName, Map<String, String> arguments);
/**
* Creates a JavaPairRDD from the given Dataset with custom splits.
* @param datasetName Name of the Dataset
* @param arguments Dataset arguments
* @param splits Custom list of splits, or null for default splits
* @param <K> Key type
* @param <V> Value type
* @return JavaPairRDD instance that reads from the Dataset
* @throws DatasetInstantiationException If the Dataset doesn't exist
*/
<K, V> JavaPairRDD<K, V> fromDataset(String datasetName, Map<String, String> arguments,
@Nullable Iterable<? extends Split> splits);
/**
* Saves the given JavaPairRDD to the given Dataset.
* @param rdd JavaPairRDD to be saved
* @param datasetName Name of the Dataset
* @param <K> Key type
* @param <V> Value type
* @throws DatasetInstantiationException If the Dataset doesn't exist
*/
<K, V> void saveAsDataset(JavaPairRDD<K, V> rdd, String datasetName);
/**
* Saves the given JavaPairRDD to the given Dataset with arguments.
* @param rdd JavaPairRDD to be saved
* @param datasetName Name of the Dataset
* @param arguments Dataset arguments
* @param <K> Key type
* @param <V> Value type
* @throws DatasetInstantiationException If the Dataset doesn't exist
*/
<K, V> void saveAsDataset(JavaPairRDD<K, V> rdd, String datasetName, Map<String, String> arguments);Dataset Usage Example:
import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext;
import org.apache.spark.api.java.JavaPairRDD;
import scala.Tuple2;
public class DataProcessor {
public void processData(SparkExecutionPluginContext context) throws Exception {
// Read from input dataset
JavaPairRDD<String, UserRecord> inputData = context.fromDataset("user-input");
// Transform data
JavaPairRDD<String, UserRecord> processedData = inputData
.filter(tuple -> tuple._2().isActive())
.mapValues(user -> normalizeUser(user));
// Save to output dataset with custom arguments
Map<String, String> outputArgs = new HashMap<>();
outputArgs.put("compression", "snappy");
outputArgs.put("format", "parquet");
context.saveAsDataset(processedData, "user-output", outputArgs);
}
}Methods for reading from CDAP streams with various decoding options.
/**
* Creates a JavaRDD representing all events from the given stream.
* @param streamName Name of the stream
* @return JavaRDD of StreamEvent objects
* @throws DatasetInstantiationException If the Stream doesn't exist
*/
JavaRDD<StreamEvent> fromStream(String streamName);
/**
* Creates a JavaRDD representing events from the stream in a time range.
* @param streamName Name of the stream
* @param startTime Starting time in milliseconds (inclusive)
* @param endTime Ending time in milliseconds (exclusive)
* @return JavaRDD of StreamEvent objects
* @throws DatasetInstantiationException If the Stream doesn't exist
*/
JavaRDD<StreamEvent> fromStream(String streamName, long startTime, long endTime);
/**
* Creates a JavaPairRDD with timestamp keys and decoded stream bodies.
* Supports Text, String, and ByteWritable value types.
* @param streamName Name of the stream
* @param valueType Type of the stream body to decode to
* @param <V> Value type
* @return JavaPairRDD with timestamp keys and decoded values
* @throws DatasetInstantiationException If the Stream doesn't exist
*/
<V> JavaPairRDD<Long, V> fromStream(String streamName, Class<V> valueType);
/**
* Creates a JavaPairRDD with decoded stream events in a time range.
* @param streamName Name of the stream
* @param startTime Starting time in milliseconds (inclusive)
* @param endTime Ending time in milliseconds (exclusive)
* @param valueType Type of the stream body to decode to
* @param <V> Value type
* @return JavaPairRDD with timestamp keys and decoded values
* @throws DatasetInstantiationException If the Stream doesn't exist
*/
<V> JavaPairRDD<Long, V> fromStream(String streamName, long startTime, long endTime, Class<V> valueType);
/**
* Creates a JavaPairRDD with custom stream event decoding.
* @param streamName Name of the stream
* @param startTime Starting time in milliseconds (inclusive)
* @param endTime Ending time in milliseconds (exclusive)
* @param decoderClass StreamEventDecoder class for decoding events
* @param keyType Decoded key type
* @param valueType Decoded value type
* @param <K> Key type
* @param <V> Value type
* @return JavaPairRDD with custom decoded keys and values
* @throws DatasetInstantiationException If the Stream doesn't exist
*/
<K, V> JavaPairRDD<K, V> fromStream(String streamName, long startTime, long endTime,
Class<? extends StreamEventDecoder<K, V>> decoderClass,
Class<K> keyType, Class<V> valueType);Stream Usage Example:
import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext;
import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.api.stream.StreamEventDecoder;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.hadoop.io.Text;
import scala.Tuple2;
public class StreamProcessor {
public void processStreamData(SparkExecutionPluginContext context) throws Exception {
long startTime = System.currentTimeMillis() - (24 * 60 * 60 * 1000); // 24 hours ago
long endTime = System.currentTimeMillis();
// Read raw stream events
JavaRDD<StreamEvent> rawEvents = context.fromStream("user-events", startTime, endTime);
// Read stream with Text decoding
JavaPairRDD<Long, Text> textEvents = context.fromStream("user-events", startTime, endTime, Text.class);
// Process stream data
JavaRDD<UserEvent> userEvents = rawEvents
.map(event -> parseUserEvent(event.getBody()))
.filter(event -> event != null);
// Convert to dataset format and save
JavaPairRDD<String, UserEvent> keyedEvents = userEvents
.mapToPair(event -> new Tuple2<>(event.getUserId(), event));
context.saveAsDataset(keyedEvents, "processed-events");
}
}SparkExecutionPluginContext extends several parent interfaces that provide additional capabilities:
These inherited capabilities enable comprehensive integration with CDAP's data platform, including:
Common exceptions thrown by context methods:
Install with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-etl-api-spark