CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-streaming-2-12

Spark Streaming - A scalable fault-tolerant streaming processing system that extends Apache Spark

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

Apache Spark Streaming

Apache Spark Streaming is a scalable fault-tolerant streaming processing system built on Apache Spark that enables processing of live data streams. It provides high-level abstractions like DStream (discretized stream) that represents a continuous sequence of RDDs, offering fault-tolerance through lineage-based recovery and integration with Spark's batch processing capabilities.

⚠️ Deprecation Notice: Spark Streaming (DStreams) is deprecated as of Apache Spark 3.4.0. Users are strongly encouraged to migrate to Structured Streaming, which provides a more modern streaming API with better performance, late data handling, and exactly-once processing guarantees.

Package Information

  • Package Name: org.apache.spark:spark-streaming_2.12
  • Package Type: Maven
  • Language: Scala
  • Installation: Add to pom.xml:
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.12</artifactId>
  <version>3.5.6</version>
</dependency>

Core Imports

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._

Basic Usage

import org.apache.spark._
import org.apache.spark.streaming._

// Create StreamingContext with 2 second batch interval
val conf = new SparkConf().setAppName("StreamingApp")
val ssc = new StreamingContext(conf, Seconds(2))

// Create DStream from text files
val lines = ssc.textFileStream("/path/to/directory")

// Transform the data
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)

// Output the results
wordCounts.print()

// Start the streaming context
ssc.start()
ssc.awaitTermination()

Architecture

Spark Streaming is built around several key components:

  • StreamingContext: Main entry point for creating DStreams from various input sources
  • DStream: Discretized stream representing a continuous sequence of RDDs
  • Micro-batching: Divides live streams into small batches processed by Spark engine
  • Fault Tolerance: Lineage-based recovery and checkpointing for stateful operations
  • Write-Ahead Log (WAL): Optional reliability mechanism for receiver-based sources
  • Receivers: Background tasks running on worker nodes to receive external data
  • Rate Controller: Backpressure mechanism to prevent overwhelming the system
  • Block Manager: Manages storage of received data blocks across cluster nodes
  • Integration: Seamless integration with Spark SQL, MLlib, and GraphX

Capabilities

Core Streaming Operations

Primary streaming functionality including StreamingContext creation, DStream transformations, and output operations. Essential for all streaming applications.

class StreamingContext(sparkContext: SparkContext, batchDuration: Duration) {
  def start(): Unit
  def stop(): Unit  
  def awaitTermination(): Unit
  def checkpoint(directory: String): Unit
}

abstract class DStream[T] {
  def map[U](mapFunc: T => U): DStream[U]
  def filter(filterFunc: T => Boolean): DStream[T]
  def print(): Unit
  def foreachRDD(foreachFunc: RDD[T] => Unit): Unit
}

Core Streaming Operations

Input Sources and Data Ingestion

Comprehensive data ingestion capabilities from files, sockets, queues, and custom sources. Supports both receiver-based and direct stream approaches.

class StreamingContext {
  def textFileStream(directory: String): DStream[String]
  def socketTextStream(hostname: String, port: Int): ReceiverInputDStream[String]
  def queueStream[T](queue: Queue[RDD[T]]): InputDStream[T]
  def receiverStream[T](receiver: Receiver[T]): ReceiverInputDStream[T]
}

abstract class Receiver[T](storageLevel: StorageLevel) {
  def onStart(): Unit
  def onStop(): Unit
  def store(dataItem: T): Unit
}

Input Sources and Data Ingestion

Advanced Transformations and Windowing

Windowed operations, stateful transformations, and advanced data processing patterns for complex streaming analytics.

class DStream[T] {
  def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
  def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]
}

class PairDStreamFunctions[K, V] {
  def updateStateByKey[S](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
  def mapWithState[StateType, MappedType](spec: StateSpec[K, V, StateType, MappedType]): MapWithStateDStream[K, V, StateType, MappedType]
}

Advanced Transformations and Windowing

Java API Integration

Java-friendly wrappers providing seamless integration for Java applications with lambda expressions and Java collections support.

class JavaStreamingContext(sparkContext: JavaSparkContext, batchDuration: Duration) {
  def socketTextStream(hostname: String, port: Int): JavaReceiverInputDStream[String]
  def textFileStream(directory: String): JavaDStream[String]
}

class JavaDStream[T] {
  def map[U](f: JFunction[T, U]): JavaDStream[U]
  def filter(f: JFunction[T, Boolean]): JavaDStream[T]
}

Java API Integration

Core Types

case class Duration(milliseconds: Long) {
  def +(that: Duration): Duration
  def -(that: Duration): Duration
  def *(times: Int): Duration
  def isMultipleOf(that: Duration): Boolean
}

object Seconds {
  def apply(seconds: Long): Duration
}

object Minutes {
  def apply(minutes: Long): Duration  
}

case class Time(milliseconds: Long) {
  def +(that: Duration): Time
  def -(that: Time): Duration
  def floor(that: Duration): Time
}

case class Interval(beginTime: Time, endTime: Time) {
  def duration(): Duration
}

sealed trait StreamingContextState
object StreamingContextState {
  case object INITIALIZED extends StreamingContextState
  case object ACTIVE extends StreamingContextState  
  case object STOPPED extends StreamingContextState
}

sealed abstract class StorageLevel extends Serializable {
  def useDisk: Boolean
  def useMemory: Boolean  
  def useOffHeap: Boolean
  def deserialized: Boolean
  def replication: Int
}

object StorageLevel {
  val NONE: StorageLevel
  val DISK_ONLY: StorageLevel
  val DISK_ONLY_2: StorageLevel
  val MEMORY_ONLY: StorageLevel
  val MEMORY_ONLY_2: StorageLevel
  val MEMORY_ONLY_SER: StorageLevel
  val MEMORY_ONLY_SER_2: StorageLevel
  val MEMORY_AND_DISK: StorageLevel
  val MEMORY_AND_DISK_2: StorageLevel
  val MEMORY_AND_DISK_SER: StorageLevel
  val MEMORY_AND_DISK_SER_2: StorageLevel
  val OFF_HEAP: StorageLevel
}

abstract class StreamingListener {
  def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {}
  def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {}
  def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = {}
  def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {}
  def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {}
  def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {}
  def onOutputOperationStarted(outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = {}
  def onOutputOperationCompleted(outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = {}
  def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted): Unit = {}
}

Error Handling

Spark Streaming applications can handle errors through:

  • Receiver Error Handling: Receiver.reportError() for custom receiver failures
  • DStream Failure Recovery: Automatic lineage-based RDD recovery
  • Checkpoint Recovery: StreamingContext.getOrCreate() for driver failure recovery
  • Custom Error Processing: Using DStream.foreachRDD() with try-catch blocks

Common exceptions include:

  • IllegalArgumentException: Invalid configuration parameters
  • SparkException: General Spark runtime errors
  • StreamingQueryException: Streaming-specific runtime errors
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming_2.12@3.5.x
Publish Source
CLI
Badge
tessl/maven-org-apache-spark--spark-streaming-2-12 badge