Scalable, high-throughput, fault-tolerant stream processing library for real-time data processing on Apache Spark
npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming_2-11@2.4.0Apache Spark Streaming is a scalable, high-throughput, fault-tolerant stream processing library built on Apache Spark Core that enables processing of live data streams from sources like Kafka, Flume, Kinesis, or TCP sockets. It provides high-level APIs in Scala, Java, and Python for building streaming applications using functional transformations like map, reduce, join, and window operations on DStreams (discretized streams).
pom.xml or SBT build.sbtMaven:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.8</version>
</dependency>SBT:
"org.apache.spark" %% "spark-streaming" % "2.4.8"Scala:
import org.apache.spark.streaming.{StreamingContext, Duration, Seconds, Minutes}
import org.apache.spark.streaming.dstream.{DStream, PairDStreamFunctions}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevelJava:
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.SparkConf;
import org.apache.spark.storage.StorageLevel;Scala Example:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{StreamingContext, Seconds}
// Create streaming context
val conf = new SparkConf().setAppName("StreamingExample")
val ssc = new StreamingContext(conf, Seconds(1))
// Create input stream from TCP socket
val lines = ssc.socketTextStream("localhost", 9999)
// Transform and process data
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
// Output results
wordCounts.print()
// Start streaming
ssc.start()
ssc.awaitTermination()Java Example:
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.Durations;
import java.util.Arrays;
import scala.Tuple2;
// Create streaming context
SparkConf conf = new SparkConf().setAppName("StreamingExample");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
// Create input stream from TCP socket
JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);
// Transform and process data
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
.reduceByKey((i1, i2) -> i1 + i2);
// Output results
wordCounts.print();
// Start streaming
jssc.start();
jssc.awaitTermination();Apache Spark Streaming is built around several key components:
Entry point for creating and managing Spark Streaming applications. Handles initialization, configuration, and lifecycle management.
class StreamingContext(sparkContext: SparkContext, batchDuration: Duration)
class StreamingContext(conf: SparkConf, batchDuration: Duration)
// Lifecycle methods
def start(): Unit
def stop(stopSparkContext: Boolean = true): Unit
def awaitTermination(): Unit
def awaitTerminationOrTimeout(timeout: Long): Boolean
// Configuration
def checkpoint(directory: String): Unit
def remember(duration: Duration): UnitCore functionality for creating, transforming, and processing continuous data streams with functional programming paradigms.
abstract class DStream[T] {
// Transformations
def map[U](mapFunc: T => U): DStream[U]
def flatMap[U](flatMapFunc: T => TraversableOnce[U]): DStream[U]
def filter(filterFunc: T => Boolean): DStream[T]
def transform[U](transformFunc: RDD[T] => RDD[U]): DStream[U]
def union(that: DStream[T]): DStream[T]
// Window operations
def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]
// Output operations
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit
def print(num: Int = 10): Unit
}Various mechanisms for ingesting data from external sources including sockets, files, message queues, and custom receivers.
// Socket streams
def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String]
def rawSocketStream[T](hostname: String, port: Int, storageLevel: StorageLevel = MEMORY_AND_DISK_SER_2): ReceiverInputDStream[T]
// File streams
def textFileStream(directory: String): DStream[String]
def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): InputDStream[(K, V)]
// Queue and custom streams
def queueStream[T](queue: Queue[RDD[T]], oneAtATime: Boolean = true): InputDStream[T]
def receiverStream[T](receiver: Receiver[T]): ReceiverInputDStream[T]Specialized operations for key-value pair streams including aggregations, joins, and state management.
class PairDStreamFunctions[K, V] {
// Aggregations
def groupByKey(): DStream[(K, Iterable[V])]
def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C): DStream[(K, C)]
// Window aggregations
def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration): DStream[(K, V)]
// State management
def updateStateByKey[S](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
def mapWithState[StateType, MappedType](spec: StateSpec[K, V, StateType, MappedType]): MapWithStateDStream[K, V, StateType, MappedType]
// Joins
def join[W](other: DStream[(K, W)]): DStream[(K, (V, W))]
def leftOuterJoin[W](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]
}Time-based windowing operations for aggregating data across multiple time intervals.
// Basic windowing
def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]
// Windowed reductions
def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]
def reduceByWindow(reduceFunc: (T, T) => T, invReduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]Stateful processing capabilities for maintaining state across streaming batches with fault tolerance.
// StateSpec for mapWithState
object StateSpec {
def function[KeyType, ValueType, StateType, MappedType](
mappingFunction: (KeyType, Option[ValueType], State[StateType]) => Option[MappedType]
): StateSpec[KeyType, ValueType, StateType, MappedType]
}
abstract class State[S] {
def exists(): Boolean
def get(): S
def update(newState: S): Unit
def remove(): Unit
def isTimingOut(): Boolean
}Java-friendly wrappers providing the same functionality with Java-compatible interfaces and type system.
public class JavaStreamingContext {
public JavaStreamingContext(SparkConf conf, Duration batchDuration);
public JavaDStream<String> socketTextStream(String hostname, int port);
public void start();
public void awaitTermination();
}
public class JavaDStream<T> {
public <R> JavaDStream<R> map(Function<T, R> f);
public JavaDStream<T> filter(Function<T, Boolean> f);
public <R> JavaDStream<R> flatMap(FlatMapFunction<T, R> f);
public void foreachRDD(VoidFunction<JavaRDD<T>> foreachFunc);
}Listener system for monitoring streaming application performance, batch processing, and receiver status.
trait StreamingListener {
def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted): Unit
def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit
def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit
def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit
def onReceiverError(receiverError: StreamingListenerReceiverError): Unit
}// Time-related types
case class Duration(milliseconds: Long) {
def +(that: Duration): Duration
def -(that: Duration): Duration
def *(times: Int): Duration
def /(that: Duration): Double
}
case class Time(milliseconds: Long) {
def +(that: Duration): Time
def -(that: Duration): Time
def -(that: Time): Duration
}
case class Interval(beginTime: Time, endTime: Time) {
def duration(): Duration
}
// Helper objects
object Seconds {
def apply(seconds: Long): Duration
}
object Minutes {
def apply(minutes: Long): Duration
}
object Milliseconds {
def apply(milliseconds: Long): Duration
}