Apache Spark Streaming extension for scalable, fault-tolerant stream processing of live data streams
npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming_2-13@3.5.0Apache Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. It provides a micro-batching architecture for processing continuous streams with exactly-once semantics.
⚠️ Deprecation Notice: Apache Spark Streaming is deprecated as of Spark 3.4.0. Users should migrate to Structured Streaming for new applications.
pom.xml or build.sbtMaven:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.13</artifactId>
<version>3.5.6</version>
</dependency>SBT:
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.5.6"import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver._
import org.apache.spark.streaming.scheduler._For Java API:
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.scheduler.*;import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming._
// Create streaming context with 2-second batch interval
val conf = new SparkConf().setAppName("StreamingApp").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(2))
// Create input stream from socket
val lines = ssc.socketTextStream("localhost", 9999)
// Transform and process the stream
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
// Output results
wordCounts.print()
// Start the streaming context
ssc.start()
ssc.awaitTermination()Apache Spark Streaming processes live data streams using micro-batches:
Main streaming abstractions including StreamingContext, DStream operations, and time management classes for building streaming applications.
class StreamingContext(sparkContext: SparkContext, batchDuration: Duration) {
def start(): Unit
def stop(stopSparkContext: Boolean = true): Unit
def awaitTermination(): Unit
}
abstract class DStream[T] {
def map[U](mapFunc: T => U): DStream[U]
def filter(filterFunc: T => Boolean): DStream[T]
def foreachRDD(func: RDD[T] => Unit): Unit
}
case class Duration(milliseconds: Long)
object Seconds { def apply(seconds: Long): Duration }Data ingestion capabilities for reading from various external sources including files, sockets, and message queues.
class StreamingContext {
def socketTextStream(hostname: String, port: Int): DStream[String]
def textFileStream(directory: String): DStream[String]
def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): DStream[(K, V)]
def queueStream[T](queue: Queue[RDD[T]]): DStream[T]
}Stateful processing capabilities for maintaining state across batches, including updateStateByKey and mapWithState operations.
abstract class State[S] {
def exists(): Boolean
def get(): S
def update(newState: S): Unit
def remove(): Unit
}
abstract class StateSpec[KeyType, ValueType, StateType, MappedType] {
def initialState(rdd: RDD[(KeyType, StateType)]): StateSpec[KeyType, ValueType, StateType, MappedType]
def timeout(timeout: Duration): StateSpec[KeyType, ValueType, StateType, MappedType]
}Java-friendly wrappers for all streaming functionality, providing familiar Java interfaces and method signatures.
public class JavaStreamingContext {
public JavaStreamingContext(JavaSparkContext sparkContext, Duration batchDuration);
public void start();
public void awaitTermination();
}
public class JavaDStream<T> {
public <R> JavaDStream<R> map(Function<T, R> f);
public JavaDStream<T> filter(Function<T, Boolean> f);
public void foreachRDD(VoidFunction<JavaRDD<T>> foreachFunc);
}Comprehensive event system for monitoring streaming applications including batch processing, receiver status, and performance metrics.
trait StreamingListener {
def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted): Unit
def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit
def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit
}
case class BatchInfo(
batchTime: Time,
streamIdToInputInfo: Map[Int, StreamInputInfo],
submissionTime: Long,
processingStartTime: Option[Long],
processingEndTime: Option[Long]
)Built-in web interface for visualizing streaming application metrics, batch processing status, and receiver information.
class StreamingTab(parent: SparkUI)
class StreamingPage(parent: StreamingTab)
class StreamingJobProgressListener extends StreamingListener// Time and Duration
case class Time(milliseconds: Long) {
def +(that: Duration): Time
def -(that: Duration): Time
def -(that: Time): Duration
}
case class Duration(milliseconds: Long) {
def +(that: Duration): Duration
def -(that: Duration): Duration
def *(times: Int): Duration
def <(that: Duration): Boolean
def <=(that: Duration): Boolean
}
case class Interval(beginTime: Time, endTime: Time)
// State Management Types
sealed trait ValidationResult[T]
case class ValidationError(field: String, message: String)
// Streaming Context States
object StreamingContextState extends Enumeration {
val INITIALIZED, ACTIVE, STOPPED = Value
}
// Input Stream Information
case class StreamInputInfo(
inputStreamId: Int,
numRecords: Long,
metadata: Map[String, Any] = Map.empty
) {
def metadataDescription: Option[String]
}