Apache Spark is a unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R.
—
Apache Spark provides stream processing capabilities through two APIs: Structured Streaming (the modern API) built on DataFrames/Datasets, and the legacy DStreams API. Structured Streaming is the recommended approach for new applications.
Stream processing functionality is available through:
// Structured Streaming (recommended)
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.functions._
// Legacy DStreams API (maintenance mode)
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.functions._
val spark = SparkSession.builder()
.appName("Structured Streaming Example")
.master("local[*]")
.getOrCreate()
// Read streaming data
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// Process streaming data
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
// Write streaming results
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.trigger(Trigger.ProcessingTime("10 seconds"))
.start()
query.awaitTermination()import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
val ssc = new StreamingContext(sc, Seconds(10))
// Create DStream
val lines = ssc.socketTextStream("localhost", 9999)
// Process DStream
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
// Output results
wordCounts.print()
// Start streaming computation
ssc.start()
ssc.awaitTermination()The modern streaming API built on the DataFrame/Dataset API, providing end-to-end exactly-once guarantees.
Interface for loading streaming DataFrames.
class DataStreamReader {
// Configuration
def format(source: String): DataStreamReader
def schema(schema: StructType): DataStreamReader
def schema(schemaString: String): DataStreamReader
def option(key: String, value: String): DataStreamReader
def option(key: String, value: Boolean): DataStreamReader
def option(key: String, value: Long): DataStreamReader
def option(key: String, value: Double): DataStreamReader
def options(options: scala.collection.Map[String, String]): DataStreamReader
def options(options: java.util.Map[String, String]): DataStreamReader
// Data sources
def csv(path: String): DataFrame
def json(path: String): DataFrame
def parquet(path: String): DataFrame
def orc(path: String): DataFrame
def text(path: String): DataFrame
def textFile(path: String): Dataset[String]
def table(tableName: String): DataFrame
// Generic load
def load(): DataFrame
def load(path: String): DataFrame
}Interface for writing streaming DataFrames.
class DataStreamWriter[T] {
// Configuration
def outputMode(outputMode: OutputMode): DataStreamWriter[T]
def outputMode(outputMode: String): DataStreamWriter[T]
def trigger(trigger: Trigger): DataStreamWriter[T]
def format(source: String): DataStreamWriter[T]
def option(key: String, value: String): DataStreamWriter[T]
def option(key: String, value: Boolean): DataStreamWriter[T]
def option(key: String, value: Long): DataStreamWriter[T]
def option(key: String, value: Double): DataStreamWriter[T]
def options(options: scala.collection.Map[String, String]): DataStreamWriter[T]
def options(options: java.util.Map[String, String]): DataStreamWriter[T]
def partitionBy(colNames: String*): DataStreamWriter[T]
def queryName(queryName: String): DataStreamWriter[T]
// Data sinks
def foreach(writer: ForeachWriter[T]): DataStreamWriter[T]
def foreachBatch(function: (Dataset[T], Long) => Unit): DataStreamWriter[T]
def console(): DataStreamWriter[T]
def console(numRows: Int): DataStreamWriter[T]
def console(numRows: Int, truncate: Boolean): DataStreamWriter[T]
// File sinks
def csv(path: String): DataStreamWriter[T]
def json(path: String): DataStreamWriter[T]
def parquet(path: String): DataStreamWriter[T]
def orc(path: String): DataStreamWriter[T]
def text(path: String): DataStreamWriter[T]
// Table sinks
def table(tableName: String): DataStreamWriter[T]
def toTable(tableName: String): DataStreamWriter[T]
// Generic start
def start(): StreamingQuery
def start(path: String): StreamingQuery
}Handle to a running streaming query.
trait StreamingQuery {
// Query control
def start(): StreamingQuery
def stop(): Unit
def awaitTermination(): Unit
def awaitTermination(timeoutMs: Long): Boolean
def processAllAvailable(): Unit
// Query properties
def id: UUID
def runId: UUID
def name: String
def explain(): Unit
def explain(extended: Boolean): Unit
// Query status
def isActive: Boolean
def exception: Option[StreamingQueryException]
def status: StreamingQueryStatus
def recentProgress: Array[StreamingQueryProgress]
def lastProgress: StreamingQueryProgress
}Manager for streaming queries.
class StreamingQueryManager {
// Query management
def active: Array[StreamingQuery]
def get(id: UUID): StreamingQuery
def get(id: String): StreamingQuery
def resetTerminated(): Unit
// Waiting
def awaitAnyTermination(): Unit
def awaitAnyTermination(timeoutMs: Long): Boolean
// Listeners
def addListener(listener: StreamingQueryListener): Unit
def removeListener(listener: StreamingQueryListener): Unit
}object OutputMode {
val Append: OutputMode
val Complete: OutputMode
val Update: OutputMode
}sealed trait Trigger
object Trigger {
def ProcessingTime(interval: String): Trigger
def ProcessingTime(interval: Duration): Trigger
def ProcessingTime(interval: Long, unit: TimeUnit): Trigger
def Once(): Trigger
def Continuous(interval: String): Trigger
def Continuous(interval: Duration): Trigger
def AvailableNow(): Trigger
}Custom sink for streaming data.
abstract class ForeachWriter[T] extends Serializable {
def open(partitionId: Long, epochId: Long): Boolean
def process(value: T): Unit
def close(errorOrNull: Throwable): Unit
}Usage example:
val customWriter = new ForeachWriter[Row] {
def open(partitionId: Long, epochId: Long): Boolean = {
// Initialize connection
true
}
def process(value: Row): Unit = {
// Process each row
println(s"Processing: ${value.toString}")
}
def close(errorOrNull: Throwable): Unit = {
// Clean up resources
}
}
val query = df.writeStream
.foreach(customWriter)
.start()// Available through functions object
def window(timeColumn: Column, windowDuration: String): Column
def window(timeColumn: Column, windowDuration: String, slideDuration: String): Column
def window(timeColumn: Column, windowDuration: String, slideDuration: String, startTime: String): Column
def session_window(timeColumn: Column, gapDuration: String): Column
def session_window(timeColumn: Column, gapDuration: Column): ColumnUsage example:
import org.apache.spark.sql.functions._
// Tumbling window
val windowedCounts = df
.withWatermark("timestamp", "10 minutes")
.groupBy(window(col("timestamp"), "10 minutes"))
.count()
// Sliding window
val slidingWindowCounts = df
.withWatermark("timestamp", "10 minutes")
.groupBy(window(col("timestamp"), "10 minutes", "5 minutes"))
.count()
// Session window
val sessionCounts = df
.withWatermark("timestamp", "10 minutes")
.groupBy(session_window(col("timestamp"), "5 minutes"))
.count()// Available on Dataset
def withWatermark(eventTime: String, delayThreshold: String): Dataset[T]Usage example:
val watermarkedDF = df
.withWatermark("eventTime", "2 minutes")
.groupBy(window($"eventTime", "10 minutes"))
.count()The original streaming API based on RDDs (now in maintenance mode).
The main entry point for DStreams functionality.
class StreamingContext(sparkContext: SparkContext, batchDuration: Duration) {
def this(conf: SparkConf, batchDuration: Duration)
def this(path: String, hadoopConf: Configuration = new Configuration())
// Input sources
def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String]
def textFileStream(directory: String): DStream[String]
def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): InputDStream[(K, V)]
def queueStream[T: ClassTag](queue: Queue[RDD[T]], oneAtATime: Boolean = true): InputDStream[T]
def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T]
// Union
def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T]
// Control
def start(): Unit
def stop(stopSparkContext: Boolean = true, stopGracefully: Boolean = false): Unit
def awaitTermination(): Unit
def awaitTerminationOrTimeout(timeout: Long): Boolean
// State
def remember(duration: Duration): Unit
def checkpoint(directory: String): Unit
// Properties
def sparkContext: SparkContext
def graph: DStreamGraph
def getState(): StreamingContextState
}
object StreamingContext {
def getOrCreate(checkpointPath: String, creatingFunc: () => StreamingContext): StreamingContext
def getActive(): Option[StreamingContext]
def getActiveOrCreate(checkpointPath: String, creatingFunc: () => StreamingContext): StreamingContext
}The fundamental abstraction in Spark Streaming representing a continuous stream of data.
abstract class DStream[T: ClassTag] extends Serializable {
// Transformations
def map[U: ClassTag](mapFunc: T => U): DStream[U]
def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U]
def filter(filterFunc: T => Boolean): DStream[T]
def glom(): DStream[Array[T]]
def repartition(numPartitions: Int): DStream[T]
def union(that: DStream[T]): DStream[T]
def count(): DStream[Long]
def countByValue(): DStream[(T, Long)]
def reduce(reduceFunc: (T, T) => T): DStream[T]
def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]
def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]
def reduceByWindow(reduceFunc: (T, T) => T, invReduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]
// Window operations
def window(windowDuration: Duration): DStream[T]
def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
// Actions
def print(): Unit
def print(num: Int): Unit
def saveAsTextFiles(prefix: String, suffix: String = ""): Unit
def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit
def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit
// State operations
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S], numPartitions: Int): DStream[(K, S)]
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner): DStream[(K, S)]
// Persistence
def cache(): DStream[T]
def persist(): DStream[T]
def persist(level: StorageLevel): DStream[T]
// Output
def register(): DStream[T]
// Properties
def context: StreamingContext
def ssc: StreamingContext
def slideDuration: Duration
}Additional operations for DStreams of key-value pairs.
class PairDStreamFunctions[K, V](self: DStream[(K, V)])(implicit kt: ClassTag[K], vt: ClassTag[V]) {
// Transformations
def groupByKey(): DStream[(K, Iterable[V])]
def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])]
def reduceByKey(func: (V, V) => V): DStream[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): DStream[(K, V)]
def reduceByKey(func: (V, V) => V, partitioner: Partitioner): DStream[(K, V)]
def combineByKey[C: ClassTag](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C): DStream[(K, C)]
def combineByKey[C: ClassTag](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C, numPartitions: Int): DStream[(K, C)]
def combineByKey[C: ClassTag](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C, partitioner: Partitioner): DStream[(K, C)]
// Window operations
def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])]
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Iterable[V])]
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int): DStream[(K, Iterable[V])]
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner): DStream[(K, Iterable[V])]
def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration): DStream[(K, V)]
def reduceByKeyAndWindow(reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration): DStream[(K, V)]
def reduceByKeyAndWindow(reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration, numPartitions: Int, filterFunc: ((K, V)) => Boolean = null): DStream[(K, V)]
// Join operations
def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))]
def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))]
def join[W: ClassTag](other: DStream[(K, W)], partitioner: Partitioner): DStream[(K, (V, W))]
def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]
def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))]
def fullOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))]
def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))]
// State operations
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S], numPartitions: Int): DStream[(K, S)]
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner): DStream[(K, S)]
def mapWithState[StateType: ClassTag, MappedType: ClassTag](spec: StateSpec[K, V, StateType, MappedType]): MapWithStateDStream[K, V, StateType, MappedType]
// Output
def saveAsHadoopFiles[F <: OutputFormat[K, V]](prefix: String, suffix: String = ""): Unit
def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](prefix: String, suffix: String = ""): Unit
}Various built-in input sources for DStreams.
// File-based sources
object StreamingContext {
def textFileStream(directory: String): DStream[String]
def binaryRecordsStream(directory: String, recordLength: Int): DStream[Array[Byte]]
}
// Network sources
object StreamingContext {
def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String]
def socketStream[T: ClassTag](hostname: String, port: Int, converter: (InputStream) => Iterator[T], storageLevel: StorageLevel): ReceiverInputDStream[T]
}
// Queue source (for testing)
object StreamingContext {
def queueStream[T: ClassTag](queue: Queue[RDD[T]], oneAtATime: Boolean = true, defaultRDD: RDD[T] = null): InputDStream[T]
}abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable {
def onStart(): Unit
def onStop(): Unit
def store(dataItem: T): Unit
def store(dataBuffer: ArrayBuffer[T]): Unit
def store(dataIterator: Iterator[T]): Unit
def store(dataBuffer: ArrayBuffer[T], metadata: Any): Unit
def store(dataIterator: Iterator[T], metadata: Any): Unit
def reportError(message: String, throwable: Throwable): Unit
def restart(message: String): Unit
def restart(message: String, error: Throwable): Unit
def restart(message: String, error: Throwable, millisecond: Int): Unit
def stop(message: String): Unit
def stop(message: String, error: Throwable): Unit
def isStarted(): Boolean
def isStopped(): Boolean
def hasStopped(): Boolean
}Usage examples:
// Structured Streaming: File source
val fileStream = spark.readStream
.format("json")
.option("path", "/path/to/json/files")
.load()
val query = fileStream.writeStream
.format("console")
.outputMode("append")
.start()
// Structured Streaming: Kafka source
val kafkaStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1")
.load()
val parsedStream = kafkaStream
.select(from_json(col("value").cast("string"), schema).as("data"))
.select("data.*")
// DStreams: Custom receiver
class CustomReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
def onStart() {
// Start receiving data
new Thread("Custom Receiver") {
override def run() { receive() }
}.start()
}
def onStop() {
// Stop receiving data
}
private def receive() {
while (!isStopped()) {
// Receive data and store it
val data = // ... get data from somewhere
store(data)
}
}
}
val customStream = ssc.receiverStream(new CustomReceiver())Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-parent-2-13