Apache Spark - Unified analytics engine for large-scale data processing
—
Structured Streaming provides real-time stream processing with exactly-once fault-tolerance guarantees. Built on the Spark SQL engine for seamless integration with batch processing. Also includes legacy Spark Streaming (DStreams) for micro-batch processing.
Modern streaming API built on DataFrame/Dataset with continuous processing and exactly-once guarantees.
/**
* Reader for streaming data sources
*/
class DataStreamReader {
/** Specify streaming data source format */
def format(source: String): DataStreamReader
/** Add input option */
def option(key: String, value: String): DataStreamReader
def option(key: String, value: Boolean): DataStreamReader
def option(key: String, value: Long): DataStreamReader
def option(key: String, value: Double): DataStreamReader
/** Add multiple options */
def options(options: Map[String, String]): DataStreamReader
/** Set expected schema */
def schema(schema: StructType): DataStreamReader
def schema(schemaString: String): DataStreamReader
/** Load streaming DataFrame */
def load(): DataFrame
def load(path: String): DataFrame
/** Format-specific methods */
def json(path: String): DataFrame
def csv(path: String): DataFrame
def parquet(path: String): DataFrame
def text(path: String): DataFrame
def textFile(path: String): Dataset[String]
def kafka(options: Map[String, String]): DataFrame
def socket(host: String, port: Int): DataFrame
}
/**
* Writer for streaming data sinks
*/
class DataStreamWriter[T] {
/** Set output mode */
def outputMode(outputMode: OutputMode): DataStreamWriter[T]
def outputMode(outputMode: String): DataStreamWriter[T]
/** Set trigger interval */
def trigger(trigger: Trigger): DataStreamWriter[T]
/** Specify output format */
def format(source: String): DataStreamWriter[T]
/** Add output option */
def option(key: String, value: String): DataStreamWriter[T]
def option(key: String, value: Boolean): DataStreamWriter[T]
def option(key: String, value: Long): DataStreamWriter[T]
def option(key: String, value: Double): DataStreamWriter[T]
/** Add multiple options */
def options(options: Map[String, String]): DataStreamWriter[T]
/** Partition output by columns */
def partitionBy(colNames: String*): DataStreamWriter[T]
/** Set query name */
def queryName(queryName: String): DataStreamWriter[T]
/** Start streaming query */
def start(): StreamingQuery
def start(path: String): StreamingQuery
/** Format-specific methods */
def console(): StreamingQuery
def json(path: String): StreamingQuery
def csv(path: String): StreamingQuery
def parquet(path: String): StreamingQuery
def text(path: String): StreamingQuery
def kafka(): StreamingQuery
def memory(queryName: String): StreamingQuery
}
/**
* Output modes for streaming queries
*/
object OutputMode {
val Append: OutputMode
val Complete: OutputMode
val Update: OutputMode
}
/**
* Trigger policies for streaming queries
*/
object Trigger {
/** Process data as fast as possible */
def ProcessingTime(interval: String): Trigger
def ProcessingTime(interval: Long, unit: TimeUnit): Trigger
/** Process data once then stop */
def Once(): Trigger
/** Continuous processing with low latency */
def Continuous(interval: String): Trigger
def Continuous(interval: Long, unit: TimeUnit): Trigger
}Usage Examples:
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.functions._
// Read from file source
val lines = spark.readStream
.format("text")
.option("path", "input-directory")
.load()
// Word count example
val words = lines
.select(explode(split($"value", " ")).as("word"))
.groupBy("word")
.count()
// Write to console
val query = words.writeStream
.outputMode("complete")
.format("console")
.trigger(Trigger.ProcessingTime("10 seconds"))
.start()
query.awaitTermination()
// Kafka source and sink
val kafkaDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1")
.load()
val processedDF = kafkaDF
.select(from_json($"value".cast("string"), schema).as("data"))
.select("data.*")
.groupBy("category")
.count()
val output = processedDF.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output-topic")
.outputMode("update")
.start()Interface for managing and monitoring streaming queries.
/**
* Handle to a running streaming query
*/
abstract class StreamingQuery {
/** Unique identifier for the query */
def id: UUID
/** Name of the query */
def name: String
/** Check if query is currently active */
def isActive: Boolean
/** Get current status */
def status: StreamingQueryStatus
/** Get recent progress updates */
def recentProgress: Array[StreamingQueryProgress]
/** Get last progress update */
def lastProgress: StreamingQueryProgress
/** Block until query terminates */
def awaitTermination(): Unit
def awaitTermination(timeoutMs: Long): Boolean
/** Stop the query */
def stop(): Unit
/** Get exception that caused query to fail */
def exception: Option[StreamingQueryException]
/** Explain the streaming query plan */
def explain(): Unit
def explain(extended: Boolean): Unit
}
/**
* Manager for streaming queries
*/
class StreamingQueryManager {
/** Get currently active queries */
def active: Array[StreamingQuery]
/** Get query by id */
def get(id: UUID): StreamingQuery
def get(id: String): StreamingQuery
/** Block until all queries terminate */
def awaitAnyTermination(): Unit
def awaitAnyTermination(timeoutMs: Long): Boolean
/** Stop all active queries */
def stopAll(): Unit
/** Add listener for query events */
def addListener(listener: StreamingQueryListener): Unit
/** Remove listener */
def removeListener(listener: StreamingQueryListener): Unit
}
/**
* Progress information for streaming queries
*/
case class StreamingQueryProgress(
id: UUID,
runId: UUID,
name: String,
timestamp: String,
batchId: Long,
batchDuration: Long,
durationMs: Map[String, Long],
eventTime: Map[String, String],
stateOperators: Array[StateOperatorProgress],
sources: Array[SourceProgress],
sink: SinkProgress
)Time-based windowing for streaming aggregations.
/**
* Window functions for streaming data
*/
object functions {
/** Tumbling time window */
def window(
timeColumn: Column,
windowDuration: String
): Column
/** Sliding time window */
def window(
timeColumn: Column,
windowDuration: String,
slideDuration: String
): Column
/** Sliding time window with start offset */
def window(
timeColumn: Column,
windowDuration: String,
slideDuration: String,
startTime: String
): Column
/** Session window */
def session_window(
timeColumn: Column,
gapDuration: String
): Column
}Usage Examples:
import org.apache.spark.sql.functions._
// Tumbling window (10 minute windows)
val windowed = events
.withWatermark("timestamp", "10 minutes")
.groupBy(
window($"timestamp", "10 minutes"),
$"category"
)
.count()
// Sliding window (10 minute windows, sliding every 5 minutes)
val sliding = events
.withWatermark("timestamp", "10 minutes")
.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"userId"
)
.agg(sum("amount").as("total"))
// Session window (gap-based)
val sessions = events
.withWatermark("timestamp", "30 minutes")
.groupBy(
$"userId",
session_window($"timestamp", "20 minutes")
)
.count()Handling late-arriving data with watermarks.
/**
* Watermarking for handling late data
*/
implicit class DatasetWatermark[T](ds: Dataset[T]) {
/** Define watermark for late data handling */
def withWatermark(eventTime: String, delayThreshold: String): Dataset[T]
}Usage Examples:
// Handle late data with watermarking
val result = inputStream
.withWatermark("eventTime", "10 minutes") // Allow 10 minutes of late data
.groupBy(
window($"eventTime", "5 minutes"),
$"deviceId"
)
.count()
// Watermark affects state cleanup
val stateStream = inputStream
.withWatermark("eventTime", "1 hour")
.groupBy($"sessionId")
.agg(
min("eventTime").as("sessionStart"),
max("eventTime").as("sessionEnd"),
count("*").as("eventCount")
)Original streaming API using discretized streams (micro-batches).
/**
* Main entry point for Spark Streaming
*/
class StreamingContext(conf: SparkConf, batchDuration: Duration) {
/** Create DStream from TCP socket */
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String]
/** Monitor directory for new text files */
def textFileStream(directory: String): DStream[String]
/** Monitor directory for new files */
def fileStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag](
directory: String
): InputDStream[(K, V)]
/** Create DStream from RDD queue */
def queueStream[T: ClassTag](
queue: Queue[RDD[T]],
oneAtATime: Boolean = true
): InputDStream[T]
/** Union multiple DStreams */
def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T]
/** Start streaming computation */
def start(): Unit
/** Stop streaming */
def stop(stopSparkContext: Boolean = true, stopGracefully: Boolean = false): Unit
/** Wait for termination */
def awaitTermination(): Unit
def awaitTermination(timeout: Long): Boolean
/** Set remember duration for RDDs */
def remember(duration: Duration): Unit
/** Set checkpoint directory */
def checkpoint(directory: String): Unit
}
/**
* Discretized Stream - sequence of RDDs
*/
abstract class DStream[T: ClassTag] {
/** Transform each element */
def map[U: ClassTag](mapFunc: T => U): DStream[U]
/** Transform and flatten */
def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U]
/** Filter elements */
def filter(filterFunc: T => Boolean): DStream[T]
/** Reduce elements in each RDD */
def reduce(reduceFunc: (T, T) => T): DStream[T]
/** Count elements in each RDD */
def count(): DStream[Long]
/** Union with another DStream */
def union(that: DStream[T]): DStream[T]
/** Transform with function on RDD */
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]
def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]
/** Window operations */
def window(windowDuration: Duration): DStream[T]
def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
/** Reduce over window */
def reduceByWindow(
reduceFunc: (T, T) => T,
windowDuration: Duration,
slideDuration: Duration
): DStream[T]
/** Count over window */
def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]
/** Apply function to each RDD */
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit
def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit
/** Print elements from each RDD */
def print(num: Int = 10): Unit
/** Save as text files */
def saveAsTextFiles(prefix: String, suffix: String = ""): Unit
/** Cache stream RDDs */
def cache(): DStream[T]
/** Persist stream RDDs */
def persist(level: StorageLevel = StorageLevel.MEMORY_ONLY_SER): DStream[T]
}Additional operations for DStreams of key-value pairs.
/**
* Additional operations for pair DStreams
*/
implicit class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K, V)]) {
/** Group values by key */
def groupByKey(): DStream[(K, Iterable[V])]
def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])]
/** Reduce values by key */
def reduceByKey(func: (V, V) => V): DStream[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): DStream[(K, V)]
/** Reduce values by key over window */
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration
): DStream[(K, V)]
/** Join with another pair DStream */
def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))]
def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))]
/** Left outer join */
def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]
/** Cogroup with another pair DStream */
def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))]
/** Update state by key */
def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S]
): DStream[(K, S)]
/** Map with state */
def mapWithState[StateType: ClassTag, MappedType: ClassTag](
spec: StateSpec[K, V, StateType, MappedType]
): MapWithStateDStream[K, V, StateType, MappedType]
}Usage Examples:
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
// Create StreamingContext
val conf = new SparkConf().setAppName("StreamingApp").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(1))
ssc.checkpoint("checkpoint")
// Socket stream
val lines = ssc.socketTextStream("localhost", 9999)
// Word count
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
// Windowed operations
val windowedWordCounts = pairs
.reduceByKeyAndWindow(
(a: Int, b: Int) => (a + b),
Seconds(30),
Seconds(10)
)
// Stateful operations
val runningCounts = pairs.updateStateByKey[Int] { (values, state) =>
val currentCount = values.sum
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
// File stream
val textFiles = ssc.textFileStream("input-directory")
val processedFiles = textFiles
.flatMap(_.split("\n"))
.filter(_.nonEmpty)
.map(line => (line.split(",")(0), 1))
.reduceByKey(_ + _)
ssc.start()
ssc.awaitTermination()Time handling utilities for streaming applications.
/**
* Duration for streaming intervals
*/
case class Duration(private val millis: Long) {
def +(other: Duration): Duration
def -(other: Duration): Duration
def *(times: Int): Duration
def /(that: Duration): Double
def <(other: Duration): Boolean
def <=(other: Duration): Boolean
def >(other: Duration): Boolean
def >=(other: Duration): Boolean
def milliseconds: Long
}
/** Duration factory methods */
object Duration {
def apply(length: Long): Duration
}
object Milliseconds {
def apply(milliseconds: Long): Duration
}
object Seconds {
def apply(seconds: Long): Duration
}
object Minutes {
def apply(minutes: Long): Duration
}
/**
* Time instance for DStreams
*/
case class Time(private val millis: Long) {
def +(other: Duration): Time
def -(other: Duration): Time
def -(other: Time): Duration
def <(other: Time): Boolean
def <=(other: Time): Boolean
def >(other: Time): Boolean
def >=(other: Time): Boolean
def milliseconds: Long
}Mechanisms for fault tolerance in streaming applications.
/**
* Checkpointing utilities
*/
object StreamingContext {
/** Create or recover StreamingContext from checkpoint */
def getOrCreate(
checkpointPath: String,
creatingFunc: () => StreamingContext
): StreamingContext
/** Create or recover with Hadoop configuration */
def getOrCreate(
checkpointPath: String,
creatingFunc: () => StreamingContext,
hadoopConf: Configuration
): StreamingContext
}Usage Examples:
// Checkpointing example
def createStreamingContext(): StreamingContext = {
val conf = new SparkConf().setAppName("CheckpointApp")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint("hdfs://checkpoint")
// Define streaming computation
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc
}
// Recovery from checkpoint
val ssc = StreamingContext.getOrCreate("hdfs://checkpoint", createStreamingContext _)
ssc.start()
ssc.awaitTermination()Common streaming exceptions:
StreamingQueryException - Streaming query execution failuresAnalysisException - Invalid streaming operations or configurationsIllegalStateException - Invalid streaming application stateTimeoutException - Query termination timeoutsInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-parent-2-12