CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-parent-2-13

Apache Spark is a unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R.

Pending
Overview
Eval results
Files

stream-processing.mddocs/

Stream Processing

Apache Spark provides stream processing capabilities through two APIs: Structured Streaming (the modern API) built on DataFrames/Datasets, and the legacy DStreams API. Structured Streaming is the recommended approach for new applications.

Package Information

Stream processing functionality is available through:

// Structured Streaming (recommended)
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.functions._

// Legacy DStreams API (maintenance mode)
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._

Basic Usage

Structured Streaming

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.functions._

val spark = SparkSession.builder()
  .appName("Structured Streaming Example")
  .master("local[*]")
  .getOrCreate()

// Read streaming data
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

// Process streaming data
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()

// Write streaming results
val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .start()

query.awaitTermination()

DStreams (Legacy)

import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._

val ssc = new StreamingContext(sc, Seconds(10))

// Create DStream
val lines = ssc.socketTextStream("localhost", 9999)

// Process DStream
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)

// Output results
wordCounts.print()

// Start streaming computation
ssc.start()
ssc.awaitTermination()

Capabilities

Structured Streaming

The modern streaming API built on the DataFrame/Dataset API, providing end-to-end exactly-once guarantees.

DataStreamReader

Interface for loading streaming DataFrames.

class DataStreamReader {
  // Configuration
  def format(source: String): DataStreamReader
  def schema(schema: StructType): DataStreamReader
  def schema(schemaString: String): DataStreamReader
  def option(key: String, value: String): DataStreamReader
  def option(key: String, value: Boolean): DataStreamReader
  def option(key: String, value: Long): DataStreamReader
  def option(key: String, value: Double): DataStreamReader
  def options(options: scala.collection.Map[String, String]): DataStreamReader
  def options(options: java.util.Map[String, String]): DataStreamReader
  
  // Data sources
  def csv(path: String): DataFrame
  def json(path: String): DataFrame
  def parquet(path: String): DataFrame
  def orc(path: String): DataFrame
  def text(path: String): DataFrame
  def textFile(path: String): Dataset[String]
  def table(tableName: String): DataFrame
  
  // Generic load
  def load(): DataFrame
  def load(path: String): DataFrame
}

DataStreamWriter

Interface for writing streaming DataFrames.

class DataStreamWriter[T] {
  // Configuration
  def outputMode(outputMode: OutputMode): DataStreamWriter[T]
  def outputMode(outputMode: String): DataStreamWriter[T]
  def trigger(trigger: Trigger): DataStreamWriter[T]
  def format(source: String): DataStreamWriter[T]
  def option(key: String, value: String): DataStreamWriter[T]
  def option(key: String, value: Boolean): DataStreamWriter[T]
  def option(key: String, value: Long): DataStreamWriter[T]
  def option(key: String, value: Double): DataStreamWriter[T]
  def options(options: scala.collection.Map[String, String]): DataStreamWriter[T]
  def options(options: java.util.Map[String, String]): DataStreamWriter[T]
  def partitionBy(colNames: String*): DataStreamWriter[T]
  def queryName(queryName: String): DataStreamWriter[T]
  
  // Data sinks
  def foreach(writer: ForeachWriter[T]): DataStreamWriter[T]
  def foreachBatch(function: (Dataset[T], Long) => Unit): DataStreamWriter[T]
  def console(): DataStreamWriter[T]
  def console(numRows: Int): DataStreamWriter[T]
  def console(numRows: Int, truncate: Boolean): DataStreamWriter[T]
  
  // File sinks
  def csv(path: String): DataStreamWriter[T]
  def json(path: String): DataStreamWriter[T]
  def parquet(path: String): DataStreamWriter[T]
  def orc(path: String): DataStreamWriter[T]
  def text(path: String): DataStreamWriter[T]
  
  // Table sinks
  def table(tableName: String): DataStreamWriter[T]
  def toTable(tableName: String): DataStreamWriter[T]
  
  // Generic start
  def start(): StreamingQuery
  def start(path: String): StreamingQuery
}

StreamingQuery

Handle to a running streaming query.

trait StreamingQuery {
  // Query control
  def start(): StreamingQuery
  def stop(): Unit
  def awaitTermination(): Unit
  def awaitTermination(timeoutMs: Long): Boolean
  def processAllAvailable(): Unit
  
  // Query properties
  def id: UUID
  def runId: UUID
  def name: String
  def explain(): Unit
  def explain(extended: Boolean): Unit
  
  // Query status
  def isActive: Boolean
  def exception: Option[StreamingQueryException]
  def status: StreamingQueryStatus
  def recentProgress: Array[StreamingQueryProgress]
  def lastProgress: StreamingQueryProgress
}

StreamingQueryManager

Manager for streaming queries.

class StreamingQueryManager {
  // Query management
  def active: Array[StreamingQuery]
  def get(id: UUID): StreamingQuery
  def get(id: String): StreamingQuery
  def resetTerminated(): Unit
  
  // Waiting
  def awaitAnyTermination(): Unit
  def awaitAnyTermination(timeoutMs: Long): Boolean
  
  // Listeners
  def addListener(listener: StreamingQueryListener): Unit
  def removeListener(listener: StreamingQueryListener): Unit
}

Output Modes

object OutputMode {
  val Append: OutputMode
  val Complete: OutputMode
  val Update: OutputMode
}

Triggers

sealed trait Trigger

object Trigger {
  def ProcessingTime(interval: String): Trigger
  def ProcessingTime(interval: Duration): Trigger
  def ProcessingTime(interval: Long, unit: TimeUnit): Trigger
  def Once(): Trigger
  def Continuous(interval: String): Trigger
  def Continuous(interval: Duration): Trigger
  def AvailableNow(): Trigger
}

ForeachWriter

Custom sink for streaming data.

abstract class ForeachWriter[T] extends Serializable {
  def open(partitionId: Long, epochId: Long): Boolean
  def process(value: T): Unit
  def close(errorOrNull: Throwable): Unit
}

Usage example:

val customWriter = new ForeachWriter[Row] {
  def open(partitionId: Long, epochId: Long): Boolean = {
    // Initialize connection
    true
  }
  
  def process(value: Row): Unit = {
    // Process each row
    println(s"Processing: ${value.toString}")
  }
  
  def close(errorOrNull: Throwable): Unit = {
    // Clean up resources
  }
}

val query = df.writeStream
  .foreach(customWriter)
  .start()

Windowing Operations

// Available through functions object
def window(timeColumn: Column, windowDuration: String): Column
def window(timeColumn: Column, windowDuration: String, slideDuration: String): Column
def window(timeColumn: Column, windowDuration: String, slideDuration: String, startTime: String): Column

def session_window(timeColumn: Column, gapDuration: String): Column
def session_window(timeColumn: Column, gapDuration: Column): Column

Usage example:

import org.apache.spark.sql.functions._

// Tumbling window
val windowedCounts = df
  .withWatermark("timestamp", "10 minutes")
  .groupBy(window(col("timestamp"), "10 minutes"))
  .count()

// Sliding window
val slidingWindowCounts = df
  .withWatermark("timestamp", "10 minutes")
  .groupBy(window(col("timestamp"), "10 minutes", "5 minutes"))
  .count()

// Session window
val sessionCounts = df
  .withWatermark("timestamp", "10 minutes")
  .groupBy(session_window(col("timestamp"), "5 minutes"))
  .count()

Watermarking

// Available on Dataset
def withWatermark(eventTime: String, delayThreshold: String): Dataset[T]

Usage example:

val watermarkedDF = df
  .withWatermark("eventTime", "2 minutes")
  .groupBy(window($"eventTime", "10 minutes"))
  .count()

Legacy DStreams API

The original streaming API based on RDDs (now in maintenance mode).

StreamingContext

The main entry point for DStreams functionality.

class StreamingContext(sparkContext: SparkContext, batchDuration: Duration) {
  def this(conf: SparkConf, batchDuration: Duration)
  def this(path: String, hadoopConf: Configuration = new Configuration())
  
  // Input sources
  def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String]
  def textFileStream(directory: String): DStream[String]
  def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): InputDStream[(K, V)]
  def queueStream[T: ClassTag](queue: Queue[RDD[T]], oneAtATime: Boolean = true): InputDStream[T]
  def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T]
  
  // Union
  def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T]
  
  // Control
  def start(): Unit
  def stop(stopSparkContext: Boolean = true, stopGracefully: Boolean = false): Unit
  def awaitTermination(): Unit
  def awaitTerminationOrTimeout(timeout: Long): Boolean
  
  // State
  def remember(duration: Duration): Unit
  def checkpoint(directory: String): Unit
  
  // Properties
  def sparkContext: SparkContext
  def graph: DStreamGraph
  def getState(): StreamingContextState
}

object StreamingContext {
  def getOrCreate(checkpointPath: String, creatingFunc: () => StreamingContext): StreamingContext
  def getActive(): Option[StreamingContext]
  def getActiveOrCreate(checkpointPath: String, creatingFunc: () => StreamingContext): StreamingContext
}

DStream

The fundamental abstraction in Spark Streaming representing a continuous stream of data.

abstract class DStream[T: ClassTag] extends Serializable {
  // Transformations
  def map[U: ClassTag](mapFunc: T => U): DStream[U]
  def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U]
  def filter(filterFunc: T => Boolean): DStream[T]
  def glom(): DStream[Array[T]]
  def repartition(numPartitions: Int): DStream[T]
  def union(that: DStream[T]): DStream[T]
  def count(): DStream[Long]
  def countByValue(): DStream[(T, Long)]
  def reduce(reduceFunc: (T, T) => T): DStream[T]
  def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]
  def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]
  def reduceByWindow(reduceFunc: (T, T) => T, invReduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]
  
  // Window operations
  def window(windowDuration: Duration): DStream[T]
  def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
  
  // Actions
  def print(): Unit
  def print(num: Int): Unit
  def saveAsTextFiles(prefix: String, suffix: String = ""): Unit
  def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit
  def foreachRDD(foreachFunc: RDD[T] => Unit): Unit
  def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit
  
  // State operations
  def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
  def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S], numPartitions: Int): DStream[(K, S)]
  def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner): DStream[(K, S)]
  
  // Persistence
  def cache(): DStream[T]
  def persist(): DStream[T]
  def persist(level: StorageLevel): DStream[T]
  
  // Output
  def register(): DStream[T]
  
  // Properties
  def context: StreamingContext
  def ssc: StreamingContext
  def slideDuration: Duration
}

PairDStreamFunctions

Additional operations for DStreams of key-value pairs.

class PairDStreamFunctions[K, V](self: DStream[(K, V)])(implicit kt: ClassTag[K], vt: ClassTag[V]) {
  // Transformations
  def groupByKey(): DStream[(K, Iterable[V])]
  def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])]
  def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])]
  def reduceByKey(func: (V, V) => V): DStream[(K, V)]
  def reduceByKey(func: (V, V) => V, numPartitions: Int): DStream[(K, V)]
  def reduceByKey(func: (V, V) => V, partitioner: Partitioner): DStream[(K, V)]
  def combineByKey[C: ClassTag](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C): DStream[(K, C)]
  def combineByKey[C: ClassTag](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C, numPartitions: Int): DStream[(K, C)]
  def combineByKey[C: ClassTag](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C, partitioner: Partitioner): DStream[(K, C)]
  
  // Window operations
  def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])]
  def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Iterable[V])]
  def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int): DStream[(K, Iterable[V])]
  def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner): DStream[(K, Iterable[V])]
  def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration): DStream[(K, V)]
  def reduceByKeyAndWindow(reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration): DStream[(K, V)]
  def reduceByKeyAndWindow(reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration, numPartitions: Int, filterFunc: ((K, V)) => Boolean = null): DStream[(K, V)]
  
  // Join operations
  def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))]
  def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))]
  def join[W: ClassTag](other: DStream[(K, W)], partitioner: Partitioner): DStream[(K, (V, W))]
  def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]
  def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))]
  def fullOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))]
  def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))]
  
  // State operations
  def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
  def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S], numPartitions: Int): DStream[(K, S)]
  def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner): DStream[(K, S)]
  def mapWithState[StateType: ClassTag, MappedType: ClassTag](spec: StateSpec[K, V, StateType, MappedType]): MapWithStateDStream[K, V, StateType, MappedType]
  
  // Output
  def saveAsHadoopFiles[F <: OutputFormat[K, V]](prefix: String, suffix: String = ""): Unit
  def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](prefix: String, suffix: String = ""): Unit
}

Input Sources

Various built-in input sources for DStreams.

// File-based sources
object StreamingContext {
  def textFileStream(directory: String): DStream[String]
  def binaryRecordsStream(directory: String, recordLength: Int): DStream[Array[Byte]]
}

// Network sources
object StreamingContext {
  def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String]
  def socketStream[T: ClassTag](hostname: String, port: Int, converter: (InputStream) => Iterator[T], storageLevel: StorageLevel): ReceiverInputDStream[T]
}

// Queue source (for testing)
object StreamingContext {
  def queueStream[T: ClassTag](queue: Queue[RDD[T]], oneAtATime: Boolean = true, defaultRDD: RDD[T] = null): InputDStream[T]
}

Custom Receivers

abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable {
  def onStart(): Unit
  def onStop(): Unit
  def store(dataItem: T): Unit
  def store(dataBuffer: ArrayBuffer[T]): Unit
  def store(dataIterator: Iterator[T]): Unit
  def store(dataBuffer: ArrayBuffer[T], metadata: Any): Unit
  def store(dataIterator: Iterator[T], metadata: Any): Unit
  def reportError(message: String, throwable: Throwable): Unit
  def restart(message: String): Unit
  def restart(message: String, error: Throwable): Unit
  def restart(message: String, error: Throwable, millisecond: Int): Unit
  def stop(message: String): Unit
  def stop(message: String, error: Throwable): Unit
  def isStarted(): Boolean
  def isStopped(): Boolean
  def hasStopped(): Boolean
}

Usage examples:

// Structured Streaming: File source
val fileStream = spark.readStream
  .format("json")
  .option("path", "/path/to/json/files")
  .load()

val query = fileStream.writeStream
  .format("console")
  .outputMode("append")
  .start()

// Structured Streaming: Kafka source
val kafkaStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "topic1")
  .load()

val parsedStream = kafkaStream
  .select(from_json(col("value").cast("string"), schema).as("data"))
  .select("data.*")

// DStreams: Custom receiver
class CustomReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
  def onStart() {
    // Start receiving data
    new Thread("Custom Receiver") {
      override def run() { receive() }
    }.start()
  }
  
  def onStop() {
    // Stop receiving data
  }
  
  private def receive() {
    while (!isStopped()) {
      // Receive data and store it
      val data = // ... get data from somewhere
      store(data)
    }
  }
}

val customStream = ssc.receiverStream(new CustomReceiver())

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-parent-2-13

docs

core-engine.md

graph-processing.md

index.md

machine-learning.md

sql-dataframes.md

stream-processing.md

tile.json