Spark Streaming - A scalable fault-tolerant streaming processing system that extends Apache Spark
npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming_2-12@3.5.0Apache Spark Streaming is a scalable fault-tolerant streaming processing system built on Apache Spark that enables processing of live data streams. It provides high-level abstractions like DStream (discretized stream) that represents a continuous sequence of RDDs, offering fault-tolerance through lineage-based recovery and integration with Spark's batch processing capabilities.
⚠️ Deprecation Notice: Spark Streaming (DStreams) is deprecated as of Apache Spark 3.4.0. Users are strongly encouraged to migrate to Structured Streaming, which provides a more modern streaming API with better performance, late data handling, and exactly-once processing guarantees.
pom.xml:<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.5.6</version>
</dependency>import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._import org.apache.spark._
import org.apache.spark.streaming._
// Create StreamingContext with 2 second batch interval
val conf = new SparkConf().setAppName("StreamingApp")
val ssc = new StreamingContext(conf, Seconds(2))
// Create DStream from text files
val lines = ssc.textFileStream("/path/to/directory")
// Transform the data
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
// Output the results
wordCounts.print()
// Start the streaming context
ssc.start()
ssc.awaitTermination()Spark Streaming is built around several key components:
Primary streaming functionality including StreamingContext creation, DStream transformations, and output operations. Essential for all streaming applications.
class StreamingContext(sparkContext: SparkContext, batchDuration: Duration) {
def start(): Unit
def stop(): Unit
def awaitTermination(): Unit
def checkpoint(directory: String): Unit
}
abstract class DStream[T] {
def map[U](mapFunc: T => U): DStream[U]
def filter(filterFunc: T => Boolean): DStream[T]
def print(): Unit
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit
}Comprehensive data ingestion capabilities from files, sockets, queues, and custom sources. Supports both receiver-based and direct stream approaches.
class StreamingContext {
def textFileStream(directory: String): DStream[String]
def socketTextStream(hostname: String, port: Int): ReceiverInputDStream[String]
def queueStream[T](queue: Queue[RDD[T]]): InputDStream[T]
def receiverStream[T](receiver: Receiver[T]): ReceiverInputDStream[T]
}
abstract class Receiver[T](storageLevel: StorageLevel) {
def onStart(): Unit
def onStop(): Unit
def store(dataItem: T): Unit
}Input Sources and Data Ingestion
Windowed operations, stateful transformations, and advanced data processing patterns for complex streaming analytics.
class DStream[T] {
def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]
}
class PairDStreamFunctions[K, V] {
def updateStateByKey[S](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
def mapWithState[StateType, MappedType](spec: StateSpec[K, V, StateType, MappedType]): MapWithStateDStream[K, V, StateType, MappedType]
}Advanced Transformations and Windowing
Java-friendly wrappers providing seamless integration for Java applications with lambda expressions and Java collections support.
class JavaStreamingContext(sparkContext: JavaSparkContext, batchDuration: Duration) {
def socketTextStream(hostname: String, port: Int): JavaReceiverInputDStream[String]
def textFileStream(directory: String): JavaDStream[String]
}
class JavaDStream[T] {
def map[U](f: JFunction[T, U]): JavaDStream[U]
def filter(f: JFunction[T, Boolean]): JavaDStream[T]
}case class Duration(milliseconds: Long) {
def +(that: Duration): Duration
def -(that: Duration): Duration
def *(times: Int): Duration
def isMultipleOf(that: Duration): Boolean
}
object Seconds {
def apply(seconds: Long): Duration
}
object Minutes {
def apply(minutes: Long): Duration
}
case class Time(milliseconds: Long) {
def +(that: Duration): Time
def -(that: Time): Duration
def floor(that: Duration): Time
}
case class Interval(beginTime: Time, endTime: Time) {
def duration(): Duration
}
sealed trait StreamingContextState
object StreamingContextState {
case object INITIALIZED extends StreamingContextState
case object ACTIVE extends StreamingContextState
case object STOPPED extends StreamingContextState
}
sealed abstract class StorageLevel extends Serializable {
def useDisk: Boolean
def useMemory: Boolean
def useOffHeap: Boolean
def deserialized: Boolean
def replication: Int
}
object StorageLevel {
val NONE: StorageLevel
val DISK_ONLY: StorageLevel
val DISK_ONLY_2: StorageLevel
val MEMORY_ONLY: StorageLevel
val MEMORY_ONLY_2: StorageLevel
val MEMORY_ONLY_SER: StorageLevel
val MEMORY_ONLY_SER_2: StorageLevel
val MEMORY_AND_DISK: StorageLevel
val MEMORY_AND_DISK_2: StorageLevel
val MEMORY_AND_DISK_SER: StorageLevel
val MEMORY_AND_DISK_SER_2: StorageLevel
val OFF_HEAP: StorageLevel
}
abstract class StreamingListener {
def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {}
def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {}
def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = {}
def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {}
def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {}
def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {}
def onOutputOperationStarted(outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = {}
def onOutputOperationCompleted(outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = {}
def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted): Unit = {}
}Spark Streaming applications can handle errors through:
Receiver.reportError() for custom receiver failuresStreamingContext.getOrCreate() for driver failure recoveryDStream.foreachRDD() with try-catch blocksCommon exceptions include:
IllegalArgumentException: Invalid configuration parametersSparkException: General Spark runtime errorsStreamingQueryException: Streaming-specific runtime errors