Apache Spark Streaming extension for scalable, fault-tolerant stream processing of live data streams
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Apache Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. It provides a micro-batching architecture for processing continuous streams with exactly-once semantics.
⚠️ Deprecation Notice: Apache Spark Streaming is deprecated as of Spark 3.4.0. Users should migrate to Structured Streaming for new applications.
pom.xml or build.sbtMaven:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.13</artifactId>
<version>3.5.6</version>
</dependency>SBT:
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.5.6"import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver._
import org.apache.spark.streaming.scheduler._For Java API:
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.scheduler.*;import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming._
// Create streaming context with 2-second batch interval
val conf = new SparkConf().setAppName("StreamingApp").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(2))
// Create input stream from socket
val lines = ssc.socketTextStream("localhost", 9999)
// Transform and process the stream
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
// Output results
wordCounts.print()
// Start the streaming context
ssc.start()
ssc.awaitTermination()Apache Spark Streaming processes live data streams using micro-batches:
Main streaming abstractions including StreamingContext, DStream operations, and time management classes for building streaming applications.
class StreamingContext(sparkContext: SparkContext, batchDuration: Duration) {
def start(): Unit
def stop(stopSparkContext: Boolean = true): Unit
def awaitTermination(): Unit
}
abstract class DStream[T] {
def map[U](mapFunc: T => U): DStream[U]
def filter(filterFunc: T => Boolean): DStream[T]
def foreachRDD(func: RDD[T] => Unit): Unit
}
case class Duration(milliseconds: Long)
object Seconds { def apply(seconds: Long): Duration }Data ingestion capabilities for reading from various external sources including files, sockets, and message queues.
class StreamingContext {
def socketTextStream(hostname: String, port: Int): DStream[String]
def textFileStream(directory: String): DStream[String]
def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): DStream[(K, V)]
def queueStream[T](queue: Queue[RDD[T]]): DStream[T]
}Stateful processing capabilities for maintaining state across batches, including updateStateByKey and mapWithState operations.
abstract class State[S] {
def exists(): Boolean
def get(): S
def update(newState: S): Unit
def remove(): Unit
}
abstract class StateSpec[KeyType, ValueType, StateType, MappedType] {
def initialState(rdd: RDD[(KeyType, StateType)]): StateSpec[KeyType, ValueType, StateType, MappedType]
def timeout(timeout: Duration): StateSpec[KeyType, ValueType, StateType, MappedType]
}Java-friendly wrappers for all streaming functionality, providing familiar Java interfaces and method signatures.
public class JavaStreamingContext {
public JavaStreamingContext(JavaSparkContext sparkContext, Duration batchDuration);
public void start();
public void awaitTermination();
}
public class JavaDStream<T> {
public <R> JavaDStream<R> map(Function<T, R> f);
public JavaDStream<T> filter(Function<T, Boolean> f);
public void foreachRDD(VoidFunction<JavaRDD<T>> foreachFunc);
}Comprehensive event system for monitoring streaming applications including batch processing, receiver status, and performance metrics.
trait StreamingListener {
def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted): Unit
def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit
def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit
}
case class BatchInfo(
batchTime: Time,
streamIdToInputInfo: Map[Int, StreamInputInfo],
submissionTime: Long,
processingStartTime: Option[Long],
processingEndTime: Option[Long]
)Built-in web interface for visualizing streaming application metrics, batch processing status, and receiver information.
class StreamingTab(parent: SparkUI)
class StreamingPage(parent: StreamingTab)
class StreamingJobProgressListener extends StreamingListener// Time and Duration
case class Time(milliseconds: Long) {
def +(that: Duration): Time
def -(that: Duration): Time
def -(that: Time): Duration
}
case class Duration(milliseconds: Long) {
def +(that: Duration): Duration
def -(that: Duration): Duration
def *(times: Int): Duration
def <(that: Duration): Boolean
def <=(that: Duration): Boolean
}
case class Interval(beginTime: Time, endTime: Time)
// State Management Types
sealed trait ValidationResult[T]
case class ValidationError(field: String, message: String)
// Streaming Context States
object StreamingContextState extends Enumeration {
val INITIALIZED, ACTIVE, STOPPED = Value
}
// Input Stream Information
case class StreamInputInfo(
inputStreamId: Int,
numRecords: Long,
metadata: Map[String, Any] = Map.empty
) {
def metadataDescription: Option[String]
}