tessl install tessl/maven-org-apache-spark--spark-streaming_2-11@1.6.0Apache Spark Streaming library for processing live data streams with fault-tolerance and high throughput.
Apache Spark Streaming is a scalable, fault-tolerant streaming processing library built on Apache Spark Core that enables the processing of live data streams. It provides high-level APIs in Scala, Java, and Python for building streaming applications that can continuously ingest data from various sources and process it using complex algorithms expressed with high-level functions like map, reduce, join and window operations. The library offers micro-batch processing architecture with configurable batch intervals, automatic fault recovery through checkpointing and write-ahead logs, exactly-once processing semantics for supported sources, and seamless integration with other Spark components.
org.apache.spark:spark-streaming_2.11:1.6.3Scala:
import org.apache.spark.streaming.{StreamingContext, Seconds, Duration}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.{SparkConf, SparkContext}Java:
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.Duration;
import org.apache.spark.SparkConf;Python:
from pyspark.streaming import StreamingContext
from pyspark import SparkContext, SparkConfScala:
import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.SparkConf
// Create streaming context
val conf = new SparkConf().setAppName("StreamingApp").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(1))
// Create input stream from socket
val lines = ssc.socketTextStream("localhost", 9999)
// Transform and process
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
// Output results
wordCounts.print()
// Start processing
ssc.start()
ssc.awaitTermination()Java:
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.Durations;
// Create streaming context
SparkConf conf = new SparkConf().setAppName("StreamingApp").setMaster("local[2]");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
// Create input stream
JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);
// Transform and process
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
// Output results
wordCounts.print();
// Start processing
jssc.start();
jssc.awaitTermination();Apache Spark Streaming is built around several key components:
Core entry point for creating and managing streaming applications. Provides lifecycle management, checkpointing, and input stream creation.
class StreamingContext(sparkContext: SparkContext, batchDuration: Duration)
class StreamingContext(conf: SparkConf, batchDuration: Duration)
class StreamingContext(master: String, appName: String, batchDuration: Duration, ...)
def start(): Unit
def stop(stopSparkContext: Boolean = true, stopGracefully: Boolean = false): Unit
def awaitTermination(): Unit
def awaitTermination(timeout: Long): Unit
def remember(duration: Duration): Unit
def checkpoint(directory: String): UnitCore abstraction for discretized streams with comprehensive transformation and action operations for stream processing.
abstract class DStream[T](ssc: StreamingContext)
// Core transformations
def map[U](f: T => U): DStream[U]
def flatMap[U](f: T => TraversableOnce[U]): DStream[U]
def filter(f: T => Boolean): DStream[T]
def union(that: DStream[T]): DStream[T]
def repartition(numPartitions: Int): DStream[T]
// Reduction operations
def reduce(f: (T, T) => T): DStream[T]
def count(): DStream[Long]
def countByValue(): DStream[(T, Long)]
// Output operations
def print(): Unit
def print(num: Int): Unit
def saveAsTextFiles(prefix: String, suffix: String): Unit
def foreachRDD(f: RDD[T] => Unit): Unit
def foreachRDD(f: (RDD[T], Time) => Unit): UnitVarious connectors for ingesting streaming data from external systems including sockets, files, queues, and custom receivers.
// Socket streams
def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String]
def socketStream[T](hostname: String, port: Int, converter: (InputStream) => Iterator[T], storageLevel: StorageLevel): ReceiverInputDStream[T]
// File streams
def textFileStream(directory: String): DStream[String]
def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): InputDStream[(K, V)]
// Queue and receiver streams
def queueStream[T](queue: Queue[RDD[T]], oneAtATime: Boolean = true): InputDStream[T]
def receiverStream[T](receiver: Receiver[T]): ReceiverInputDStream[T]Advanced operations for DStreams of key-value pairs including joins, grouping, state management, and window-based aggregations.
// Key-based operations
def groupByKey(): DStream[(K, Iterable[V])]
def reduceByKey(func: (V, V) => V): DStream[(K, V)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C): DStream[(K, C)]
// Join operations
def join[W](other: DStream[(K, W)]): DStream[(K, (V, W))]
def leftOuterJoin[W](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]
def rightOuterJoin[W](other: DStream[(K, W)]): DStream[(K, (Option[V], W))]
def fullOuterJoin[W](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))]
// State operations
def updateStateByKey[S](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
def mapWithState[StateType, MappedType](spec: StateSpec[K, V, StateType, MappedType]): MapWithStateDStream[K, V, StateType, MappedType]Time-based operations that aggregate data over sliding windows, enabling temporal analytics and batch processing over streaming data.
def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]
def reduceByWindow(reduceFunc: (T, T) => T, invReduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]
def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]
// Window operations for paired DStreams
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Iterable[V])]
def reduceByKeyAndWindow(func: (V, V) => V, windowDuration: Duration, slideDuration: Duration): DStream[(K, V)]
def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Long)]Complete Java-friendly API with type-safe wrappers for all Scala functionality, providing seamless integration for Java developers.
class JavaStreamingContext(SparkConf conf, Duration batchDuration)
class JavaStreamingContext(JavaSparkContext sparkContext, Duration batchDuration)
// Input methods
JavaDStream<String> socketTextStream(String hostname, int port)
JavaDStream<String> textFileStream(String directory)
<T> JavaInputDStream<T> receiverStream(Receiver<T> receiver)
// DStream classes
class JavaDStream<T>
class JavaPairDStream<K, V>
class JavaInputDStream<T>
class JavaReceiverInputDStream<T>Extensible framework for building custom data ingestion components with lifecycle management, error handling, and fault tolerance.
abstract class Receiver[T](storageLevel: StorageLevel) extends Serializable
def onStart(): Unit
def onStop(): Unit
def store(data: T): Unit
def store(data: ArrayBuffer[T]): Unit
def restart(message: String): Unit
def restart(message: String, error: Throwable): Unit
def stop(message: String): Unit
def stop(message: String, error: Throwable): UnitEssential utility classes for time handling, state management, and duration operations in streaming applications.
case class Duration(millis: Long)
case class Time(millis: Long)
abstract class State[S]
class StateSpec[K, V, S, T]
// Duration helpers
object Milliseconds { def apply(milliseconds: Long): Duration }
object Seconds { def apply(seconds: Long): Duration }
object Minutes { def apply(minutes: Long): Duration }Event-driven monitoring system for tracking streaming application health, performance metrics, and operational status.
trait StreamingListener {
def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit
def onReceiverError(receiverError: StreamingListenerReceiverError): Unit
def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit
def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit
def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit
def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit
}// Core streaming types
class StreamingContext
abstract class DStream[T]
abstract class InputDStream[T] extends DStream[T]
class ReceiverInputDStream[T] extends InputDStream[T]
class QueueInputDStream[T] extends InputDStream[T]
class FileInputDStream[K, V, F <: NewInputFormat[K, V]] extends InputDStream[(K, V)]
class SocketInputDStream[T] extends ReceiverInputDStream[T]
class RawInputDStream[T] extends ReceiverInputDStream[T]
class PluggableInputDStream[T] extends InputDStream[T]
// Specialized DStream types
class MappedDStream[T, U] extends DStream[U]
class FlatMappedDStream[T, U] extends DStream[U]
class FilteredDStream[T] extends DStream[T]
class GlommedDStream[T] extends DStream[Array[T]]
class MapPartitionedDStream[T, U] extends DStream[U]
class UnionDStream[T] extends DStream[T]
class TransformedDStream[T, U] extends DStream[U]
class WindowedDStream[T] extends DStream[T]
class ShuffledDStream[K, V, C] extends DStream[(K, C)]
class StateDStream[K, V, S] extends DStream[(K, S)]
class MapWithStateDStream[K, V, S, E] extends DStream[E]
class ForEachDStream[T] extends DStream[T]
// Time and duration
case class Duration(private val millis: Long) {
def +(that: Duration): Duration
def -(that: Duration): Duration
def *(times: Int): Duration
def /(that: Duration): Double
def <(that: Duration): Boolean
def >(that: Duration): Boolean
def <=(that: Duration): Boolean
def >=(that: Duration): Boolean
def milliseconds: Long
def prettyPrint: String
def isMultipleOf(that: Duration): Boolean
def min(that: Duration): Duration
def max(that: Duration): Duration
}
case class Time(private val millis: Long) {
def +(duration: Duration): Time
def -(duration: Duration): Time
def -(that: Time): Duration
def <(that: Time): Boolean
def >(that: Time): Boolean
def <=(that: Time): Boolean
def >=(that: Time): Boolean
def milliseconds: Long
def floor(duration: Duration): Time
def until(endTime: Time, duration: Duration): Seq[Time]
}
case class Interval(beginTime: Time, endTime: Time) {
def duration: Duration
def +(time: Duration): Interval
def move(duration: Duration): Interval
def size: Duration
def intersect(other: Interval): Interval
def union(other: Interval): Interval
def covers(time: Time): Boolean
def covers(other: Interval): Boolean
}
// Duration factory objects
object Milliseconds {
def apply(milliseconds: Long): Duration
}
object Seconds {
def apply(seconds: Long): Duration
}
object Minutes {
def apply(minutes: Long): Duration
}
// State management
abstract class State[S] {
def exists(): Boolean
def get(): S
def update(newState: S): Unit
def remove(): Unit
def isTimingOut(): Boolean
}
class StateSpec[K, V, S, T] {
def function(mappingFunction: (K, Option[V], State[S]) => Option[T]): StateSpec[K, V, S, T]
def initialState(rdd: RDD[(K, S)]): StateSpec[K, V, S, T]
def numPartitions(numPartitions: Int): StateSpec[K, V, S, T]
def timeout(duration: Duration): StateSpec[K, V, S, T]
def partitioner(partitioner: Partitioner): StateSpec[K, V, S, T]
}
// Receiver framework
abstract class Receiver[T](storageLevel: StorageLevel) extends Serializable {
def onStart(): Unit
def onStop(): Unit
def store(data: T): Unit
def store(data: ArrayBuffer[T]): Unit
def store(data: Iterator[T]): Unit
def restart(message: String): Unit
def restart(message: String, error: Throwable): Unit
def stop(message: String): Unit
def stop(message: String, error: Throwable): Unit
def isStopped(): Boolean
def isStarted(): Boolean
}
trait ActorReceiver {
def store(data: Any): Unit
}
case class ReceiverInfo(
streamId: Int,
name: String,
active: Boolean,
location: String,
executorId: String,
lastErrorMessage: String = "",
lastError: String = "",
lastErrorTime: Long = -1L
)
// Storage and checkpointing
class Checkpoint
sealed abstract class StorageLevel
object StorageLevel {
val NONE: StorageLevel
val DISK_ONLY: StorageLevel
val DISK_ONLY_2: StorageLevel
val MEMORY_ONLY: StorageLevel
val MEMORY_ONLY_2: StorageLevel
val MEMORY_ONLY_SER: StorageLevel
val MEMORY_ONLY_SER_2: StorageLevel
val MEMORY_AND_DISK: StorageLevel
val MEMORY_AND_DISK_2: StorageLevel
val MEMORY_AND_DISK_SER: StorageLevel
val MEMORY_AND_DISK_SER_2: StorageLevel
val OFF_HEAP: StorageLevel
}
// Batch and job information
case class BatchInfo(
batchTime: Time,
streamIdToInputInfo: Map[Int, StreamInputInfo],
submissionTime: Long,
processingStartTime: Option[Long] = None,
processingEndTime: Option[Long] = None
) {
def processingDelay: Option[Long]
def schedulingDelay: Option[Long]
def totalDelay: Option[Long]
}
case class StreamInputInfo(
inputStreamId: Int,
numRecords: Long,
metadata: Map[String, Any] = Map.empty
)
case class OutputOperationInfo(
batchTime: Time,
id: Int,
name: String,
description: String,
startTime: Option[Long],
endTime: Option[Long],
failureReason: Option[String]
) {
def duration: Option[Long] = for (s <- startTime; e <- endTime) yield e - s
}
// Listener events
trait StreamingListener {
def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {}
def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {}
def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = {}
def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {}
def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {}
def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {}
def onOutputOperationStarted(outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = {}
def onOutputOperationCompleted(outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = {}
}
sealed trait StreamingListenerEvent
case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo) extends StreamingListenerEvent
case class StreamingListenerReceiverError(receiverInfo: ReceiverInfo) extends StreamingListenerEvent
case class StreamingListenerReceiverStopped(receiverInfo: ReceiverInfo) extends StreamingListenerEvent
case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent
case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent
case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends StreamingListenerEvent
case class StreamingListenerOutputOperationStarted(outputOperationInfo: OutputOperationInfo) extends StreamingListenerEvent
case class StreamingListenerOutputOperationCompleted(outputOperationInfo: OutputOperationInfo) extends StreamingListenerEvent
// Context state
sealed abstract class StreamingContextState
object StreamingContextState {
case object INITIALIZED extends StreamingContextState
case object ACTIVE extends StreamingContextState
case object STOPPED extends StreamingContextState
}
// Java API types
import org.apache.spark.streaming.api.java._
class JavaStreamingContext
class JavaDStream[T]
class JavaPairDStream[K, V]
class JavaInputDStream[T]
class JavaPairInputDStream[K, V]
class JavaReceiverInputDStream[T]
class JavaPairReceiverInputDStream[K, V]
class JavaMapWithStateDStream[K, V, S, E]
abstract class JavaStreamingListener
class JavaStreamingListenerWrapper