Core Spark 1.x integration component for the Cask Data Application Platform providing runtime services and execution context for CDAP applications
—
Data processing utilities and Spark SQL data sources that enable efficient access to CDAP datasets and streams with full type safety, performance optimization, and seamless integration with Apache Spark's DataFrame and RDD APIs.
Iterator implementation for efficiently scanning data from CDAP datasets within Spark applications, providing type-safe access to dataset records.
/**
* Iterator for scanning data from CDAP datasets in Spark applications
* Provides efficient, type-safe access to dataset records
* @tparam T Type of data items being scanned
*/
class DatumScannerIterator[T](scanner: Scanner[T]) extends Iterator[T] with Closeable {
/**
* Checks if there are more items to scan
* @return true if more items are available
*/
def hasNext: Boolean
/**
* Gets the next item from the scanner
* @return Next data item of type T
* @throws NoSuchElementException if no more items available
*/
def next(): T
/**
* Closes the underlying scanner and releases resources
*/
def close(): Unit
}Serializable wrapper for CDAP stream events that can be efficiently processed in distributed Spark operations.
/**
* Serializable wrapper for stream events in Spark processing
* Enables efficient distribution of stream data across Spark executors
*/
public class SerializableStreamEvent implements Serializable {
/**
* Gets the underlying stream event
* @return StreamEvent containing the actual event data
*/
public StreamEvent getStreamEvent();
/**
* Gets the timestamp of the stream event
* @return Event timestamp in milliseconds since epoch
*/
public long getTimestamp();
/**
* Gets the event headers
* @return Map of header key-value pairs
*/
public Map<String, String> getHeaders();
/**
* Gets the event body
* @return ByteBuffer containing the event body data
*/
public ByteBuffer getBody();
/**
* Gets the event body as a byte array
* @return Byte array containing the event body
*/
public byte[] getBodyBytes();
/**
* Gets the event body as a UTF-8 string
* @return String representation of the event body
*/
public String getBodyAsString();
}Spark SQL data source provider for CDAP datasets, enabling SQL queries against CDAP datasets using DataFrame API.
/**
* Spark SQL data source provider for CDAP datasets
* Enables SQL queries and DataFrame operations on CDAP datasets
*/
object DatasetRelationProvider extends RelationProvider with SchemaRelationProvider {
/**
* Gets the short name for this data source
* @return "dataset" as the data source identifier
*/
def shortName(): String
/**
* Creates a relation for the specified dataset
* @param sqlContext Spark SQL context
* @param parameters Data source parameters including dataset name and namespace
* @return BaseRelation for querying the dataset
*/
def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
/**
* Creates a relation with a specified schema
* @param sqlContext Spark SQL context
* @param parameters Data source parameters
* @param schema Expected schema for the dataset
* @return BaseRelation with the specified schema
*/
def createRelation(sqlContext: SQLContext,
parameters: Map[String, String],
schema: StructType): BaseRelation
}Spark SQL data source provider for CDAP streams, enabling SQL queries against stream data using DataFrame API.
/**
* Spark SQL data source provider for CDAP streams
* Enables SQL queries and DataFrame operations on CDAP streams
*/
object StreamRelationProvider extends RelationProvider with SchemaRelationProvider {
/**
* Gets the short name for this data source
* @return "stream" as the data source identifier
*/
def shortName(): String
/**
* Creates a relation for the specified stream
* @param sqlContext Spark SQL context
* @param parameters Data source parameters including stream name and time range
* @return BaseRelation for querying the stream
*/
def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
/**
* Creates a relation with a specified schema
* @param sqlContext Spark SQL context
* @param parameters Data source parameters
* @param schema Expected schema for the stream data
* @return BaseRelation with the specified schema
*/
def createRelation(sqlContext: SQLContext,
parameters: Map[String, String],
schema: StructType): BaseRelation
}Base relation implementation for CDAP streams that provides efficient scanning and querying capabilities.
/**
* Stream-based relation for Spark SQL queries
* Provides efficient access to CDAP stream data through DataFrame operations
*/
class StreamRelation(parameters: Map[String, String],
userSchema: Option[StructType])
(implicit sqlContext: SQLContext) extends BaseRelation with TableScan with PrunedScan {
/**
* Gets the schema for this relation
* @return StructType defining the stream data schema
*/
def schema: StructType
/**
* Builds a scan over the entire stream
* @return RDD[Row] containing all stream events
*/
def buildScan(): RDD[Row]
/**
* Builds a scan with column pruning
* @param requiredColumns Array of column names to include in the scan
* @return RDD[Row] containing only the required columns
*/
def buildScan(requiredColumns: Array[String]): RDD[Row]
/**
* Gets the SQL context
* @return SQLContext used for this relation
*/
def sqlContext: SQLContext
}Dataset Scanner Iterator Usage:
import co.cask.cdap.app.runtime.spark.data.DatumScannerIterator
import co.cask.cdap.api.dataset.Dataset
// Create a scanner for a dataset
val dataset: Dataset = // ... obtain dataset instance
val scanner = dataset.scan()
val iterator = new DatumScannerIterator(scanner)
// Use iterator to process data
try {
while (iterator.hasNext) {
val record = iterator.next()
// Process record
println(s"Processing record: $record")
}
} finally {
iterator.close()
}
// Use with Spark RDD
val rdd = sparkContext.parallelize(Seq(iterator))
val processedRDD = rdd.flatMap(_.toSeq)Serializable Stream Event Usage:
import co.cask.cdap.app.runtime.spark.SerializableStreamEvent;
import java.util.Map;
// Process serializable stream events in Spark
JavaRDD<SerializableStreamEvent> streamRDD = // ... obtain from stream source
JavaRDD<String> processedRDD = streamRDD.map(event -> {
// Access event metadata
long timestamp = event.getTimestamp();
Map<String, String> headers = event.getHeaders();
// Process event body
String body = event.getBodyAsString();
return "Processed at " + timestamp + ": " + body;
});Dataset SQL Data Source Usage:
import org.apache.spark.sql.SQLContext
// Create SQL context
val sqlContext = new SQLContext(sparkContext)
// Read from CDAP dataset using SQL
val datasetDF = sqlContext.read
.format("dataset")
.option("dataset.name", "my-dataset")
.option("dataset.namespace", "default")
.load()
// Query the dataset
datasetDF.createOrReplaceTempView("my_table")
val results = sqlContext.sql("SELECT * FROM my_table WHERE age > 21")
results.show()
// Use DataFrame API
val filteredDF = datasetDF.filter($"age" > 21).select($"name", $"age")
filteredDF.collect().foreach(println)Stream SQL Data Source Usage:
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
// Define stream schema
val streamSchema = StructType(Seq(
StructField("timestamp", LongType, nullable = false),
StructField("headers", MapType(StringType, StringType), nullable = true),
StructField("body", StringType, nullable = true)
))
// Read from CDAP stream
val streamDF = sqlContext.read
.format("stream")
.option("stream.name", "my-stream")
.option("stream.namespace", "default")
.option("stream.start.time", "2023-01-01T00:00:00Z")
.option("stream.end.time", "2023-12-31T23:59:59Z")
.schema(streamSchema)
.load()
// Query stream data
streamDF.createOrReplaceTempView("stream_events")
val recentEvents = sqlContext.sql(
"SELECT * FROM stream_events WHERE timestamp > unix_timestamp() - 3600"
)/**
* Interface for dataset scanners
* @tparam T Type of data items being scanned
*/
trait Scanner[T] extends Closeable {
/**
* Checks if there are more items to scan
* @return true if more items are available
*/
def hasNext: Boolean
/**
* Gets the next item from the scanner
* @return Next data item of type T
*/
def next(): T
/**
* Closes the scanner and releases resources
*/
def close(): Unit
}
/**
* Base relation interface for Spark SQL data sources
*/
trait BaseRelation {
/**
* Gets the SQL context
* @return SQLContext for this relation
*/
def sqlContext: SQLContext
/**
* Gets the schema for this relation
* @return StructType defining the data schema
*/
def schema: StructType
}
/**
* Interface for relations that support full table scans
*/
trait TableScan {
/**
* Builds a scan over the entire relation
* @return RDD[Row] containing all data
*/
def buildScan(): RDD[Row]
}
/**
* Interface for relations that support column pruning
*/
trait PrunedScan {
/**
* Builds a scan with column pruning
* @param requiredColumns Array of column names to include
* @return RDD[Row] containing only the required columns
*/
def buildScan(requiredColumns: Array[String]): RDD[Row]
}/**
* Interface for CDAP stream events
*/
public interface StreamEvent {
/**
* Gets the event timestamp
* @return Timestamp in milliseconds since epoch
*/
long getTimestamp();
/**
* Gets the event headers
* @return Map of header key-value pairs
*/
Map<String, String> getHeaders();
/**
* Gets the event body
* @return ByteBuffer containing the event data
*/
ByteBuffer getBody();
}
/**
* Data source parameters for dataset access
*/
public class DatasetParameters {
public static final String DATASET_NAME = "dataset.name";
public static final String DATASET_NAMESPACE = "dataset.namespace";
public static final String DATASET_ARGUMENTS = "dataset.arguments";
}
/**
* Data source parameters for stream access
*/
public class StreamParameters {
public static final String STREAM_NAME = "stream.name";
public static final String STREAM_NAMESPACE = "stream.namespace";
public static final String STREAM_START_TIME = "stream.start.time";
public static final String STREAM_END_TIME = "stream.end.time";
public static final String STREAM_BATCH_SIZE = "stream.batch.size";
}Install with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-spark-core