or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

advanced-transformations.mdcore-streaming.mdindex.mdinput-sources.mdjava-api.md
tile.json

tessl/maven-org-apache-spark--spark-streaming_2-12

Spark Streaming - A scalable fault-tolerant streaming processing system that extends Apache Spark

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming_2.12@3.5.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming_2-12@3.5.0

index.mddocs/

Apache Spark Streaming

Apache Spark Streaming is a scalable fault-tolerant streaming processing system built on Apache Spark that enables processing of live data streams. It provides high-level abstractions like DStream (discretized stream) that represents a continuous sequence of RDDs, offering fault-tolerance through lineage-based recovery and integration with Spark's batch processing capabilities.

⚠️ Deprecation Notice: Spark Streaming (DStreams) is deprecated as of Apache Spark 3.4.0. Users are strongly encouraged to migrate to Structured Streaming, which provides a more modern streaming API with better performance, late data handling, and exactly-once processing guarantees.

Package Information

  • Package Name: org.apache.spark:spark-streaming_2.12
  • Package Type: Maven
  • Language: Scala
  • Installation: Add to pom.xml:
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.12</artifactId>
  <version>3.5.6</version>
</dependency>

Core Imports

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._

Basic Usage

import org.apache.spark._
import org.apache.spark.streaming._

// Create StreamingContext with 2 second batch interval
val conf = new SparkConf().setAppName("StreamingApp")
val ssc = new StreamingContext(conf, Seconds(2))

// Create DStream from text files
val lines = ssc.textFileStream("/path/to/directory")

// Transform the data
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)

// Output the results
wordCounts.print()

// Start the streaming context
ssc.start()
ssc.awaitTermination()

Architecture

Spark Streaming is built around several key components:

  • StreamingContext: Main entry point for creating DStreams from various input sources
  • DStream: Discretized stream representing a continuous sequence of RDDs
  • Micro-batching: Divides live streams into small batches processed by Spark engine
  • Fault Tolerance: Lineage-based recovery and checkpointing for stateful operations
  • Write-Ahead Log (WAL): Optional reliability mechanism for receiver-based sources
  • Receivers: Background tasks running on worker nodes to receive external data
  • Rate Controller: Backpressure mechanism to prevent overwhelming the system
  • Block Manager: Manages storage of received data blocks across cluster nodes
  • Integration: Seamless integration with Spark SQL, MLlib, and GraphX

Capabilities

Core Streaming Operations

Primary streaming functionality including StreamingContext creation, DStream transformations, and output operations. Essential for all streaming applications.

class StreamingContext(sparkContext: SparkContext, batchDuration: Duration) {
  def start(): Unit
  def stop(): Unit  
  def awaitTermination(): Unit
  def checkpoint(directory: String): Unit
}

abstract class DStream[T] {
  def map[U](mapFunc: T => U): DStream[U]
  def filter(filterFunc: T => Boolean): DStream[T]
  def print(): Unit
  def foreachRDD(foreachFunc: RDD[T] => Unit): Unit
}

Core Streaming Operations

Input Sources and Data Ingestion

Comprehensive data ingestion capabilities from files, sockets, queues, and custom sources. Supports both receiver-based and direct stream approaches.

class StreamingContext {
  def textFileStream(directory: String): DStream[String]
  def socketTextStream(hostname: String, port: Int): ReceiverInputDStream[String]
  def queueStream[T](queue: Queue[RDD[T]]): InputDStream[T]
  def receiverStream[T](receiver: Receiver[T]): ReceiverInputDStream[T]
}

abstract class Receiver[T](storageLevel: StorageLevel) {
  def onStart(): Unit
  def onStop(): Unit
  def store(dataItem: T): Unit
}

Input Sources and Data Ingestion

Advanced Transformations and Windowing

Windowed operations, stateful transformations, and advanced data processing patterns for complex streaming analytics.

class DStream[T] {
  def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
  def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]
}

class PairDStreamFunctions[K, V] {
  def updateStateByKey[S](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
  def mapWithState[StateType, MappedType](spec: StateSpec[K, V, StateType, MappedType]): MapWithStateDStream[K, V, StateType, MappedType]
}

Advanced Transformations and Windowing

Java API Integration

Java-friendly wrappers providing seamless integration for Java applications with lambda expressions and Java collections support.

class JavaStreamingContext(sparkContext: JavaSparkContext, batchDuration: Duration) {
  def socketTextStream(hostname: String, port: Int): JavaReceiverInputDStream[String]
  def textFileStream(directory: String): JavaDStream[String]
}

class JavaDStream[T] {
  def map[U](f: JFunction[T, U]): JavaDStream[U]
  def filter(f: JFunction[T, Boolean]): JavaDStream[T]
}

Java API Integration

Core Types

case class Duration(milliseconds: Long) {
  def +(that: Duration): Duration
  def -(that: Duration): Duration
  def *(times: Int): Duration
  def isMultipleOf(that: Duration): Boolean
}

object Seconds {
  def apply(seconds: Long): Duration
}

object Minutes {
  def apply(minutes: Long): Duration  
}

case class Time(milliseconds: Long) {
  def +(that: Duration): Time
  def -(that: Time): Duration
  def floor(that: Duration): Time
}

case class Interval(beginTime: Time, endTime: Time) {
  def duration(): Duration
}

sealed trait StreamingContextState
object StreamingContextState {
  case object INITIALIZED extends StreamingContextState
  case object ACTIVE extends StreamingContextState  
  case object STOPPED extends StreamingContextState
}

sealed abstract class StorageLevel extends Serializable {
  def useDisk: Boolean
  def useMemory: Boolean  
  def useOffHeap: Boolean
  def deserialized: Boolean
  def replication: Int
}

object StorageLevel {
  val NONE: StorageLevel
  val DISK_ONLY: StorageLevel
  val DISK_ONLY_2: StorageLevel
  val MEMORY_ONLY: StorageLevel
  val MEMORY_ONLY_2: StorageLevel
  val MEMORY_ONLY_SER: StorageLevel
  val MEMORY_ONLY_SER_2: StorageLevel
  val MEMORY_AND_DISK: StorageLevel
  val MEMORY_AND_DISK_2: StorageLevel
  val MEMORY_AND_DISK_SER: StorageLevel
  val MEMORY_AND_DISK_SER_2: StorageLevel
  val OFF_HEAP: StorageLevel
}

abstract class StreamingListener {
  def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {}
  def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {}
  def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = {}
  def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {}
  def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {}
  def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {}
  def onOutputOperationStarted(outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = {}
  def onOutputOperationCompleted(outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = {}
  def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted): Unit = {}
}

Error Handling

Spark Streaming applications can handle errors through:

  • Receiver Error Handling: Receiver.reportError() for custom receiver failures
  • DStream Failure Recovery: Automatic lineage-based RDD recovery
  • Checkpoint Recovery: StreamingContext.getOrCreate() for driver failure recovery
  • Custom Error Processing: Using DStream.foreachRDD() with try-catch blocks

Common exceptions include:

  • IllegalArgumentException: Invalid configuration parameters
  • SparkException: General Spark runtime errors
  • StreamingQueryException: Streaming-specific runtime errors