or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

core-streaming.mdindex.mdinput-sources.mdjava-api.mdmonitoring-listeners.mdstate-management.mdweb-ui.md
tile.json

tessl/maven-org-apache-spark--spark-streaming_2-13

Apache Spark Streaming extension for scalable, fault-tolerant stream processing of live data streams

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming_2.13@3.5.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming_2-13@3.5.0

index.mddocs/

Apache Spark Streaming

Apache Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. It provides a micro-batching architecture for processing continuous streams with exactly-once semantics.

⚠️ Deprecation Notice: Apache Spark Streaming is deprecated as of Spark 3.4.0. Users should migrate to Structured Streaming for new applications.

Package Information

  • Package Name: spark-streaming_2.13
  • Package Type: maven
  • Language: Scala (with Java API)
  • Version: 3.5.6
  • Installation: Add to pom.xml or build.sbt

Maven:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.13</artifactId>
    <version>3.5.6</version>
</dependency>

SBT:

libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.5.6"

Core Imports

import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver._
import org.apache.spark.streaming.scheduler._

For Java API:

import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.scheduler.*;

Basic Usage

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming._

// Create streaming context with 2-second batch interval
val conf = new SparkConf().setAppName("StreamingApp").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(2))

// Create input stream from socket
val lines = ssc.socketTextStream("localhost", 9999)

// Transform and process the stream
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)

// Output results
wordCounts.print()

// Start the streaming context
ssc.start()
ssc.awaitTermination()

Architecture

Apache Spark Streaming processes live data streams using micro-batches:

  • StreamingContext: Main entry point that coordinates stream processing
  • DStream: Fundamental abstraction representing a continuous sequence of RDDs
  • Receivers: Components that ingest data from external sources (Kafka, Flume, etc.)
  • Transformations: Operations that create new DStreams from existing ones
  • Output Operations: Actions that write DStream data to external systems
  • Checkpointing: Fault tolerance mechanism for stateful operations

Capabilities

Core Streaming

Main streaming abstractions including StreamingContext, DStream operations, and time management classes for building streaming applications.

class StreamingContext(sparkContext: SparkContext, batchDuration: Duration) {
  def start(): Unit
  def stop(stopSparkContext: Boolean = true): Unit
  def awaitTermination(): Unit
}

abstract class DStream[T] {
  def map[U](mapFunc: T => U): DStream[U]
  def filter(filterFunc: T => Boolean): DStream[T]
  def foreachRDD(func: RDD[T] => Unit): Unit
}

case class Duration(milliseconds: Long)
object Seconds { def apply(seconds: Long): Duration }

Core Streaming

Input Sources

Data ingestion capabilities for reading from various external sources including files, sockets, and message queues.

class StreamingContext {
  def socketTextStream(hostname: String, port: Int): DStream[String]
  def textFileStream(directory: String): DStream[String]
  def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): DStream[(K, V)]
  def queueStream[T](queue: Queue[RDD[T]]): DStream[T]
}

Input Sources

State Management

Stateful processing capabilities for maintaining state across batches, including updateStateByKey and mapWithState operations.

abstract class State[S] {
  def exists(): Boolean
  def get(): S
  def update(newState: S): Unit
  def remove(): Unit
}

abstract class StateSpec[KeyType, ValueType, StateType, MappedType] {
  def initialState(rdd: RDD[(KeyType, StateType)]): StateSpec[KeyType, ValueType, StateType, MappedType]
  def timeout(timeout: Duration): StateSpec[KeyType, ValueType, StateType, MappedType]
}

State Management

Java API

Java-friendly wrappers for all streaming functionality, providing familiar Java interfaces and method signatures.

public class JavaStreamingContext {
  public JavaStreamingContext(JavaSparkContext sparkContext, Duration batchDuration);
  public void start();
  public void awaitTermination();
}

public class JavaDStream<T> {
  public <R> JavaDStream<R> map(Function<T, R> f);
  public JavaDStream<T> filter(Function<T, Boolean> f);
  public void foreachRDD(VoidFunction<JavaRDD<T>> foreachFunc);
}

Java API

Monitoring and Listeners

Comprehensive event system for monitoring streaming applications including batch processing, receiver status, and performance metrics.

trait StreamingListener {
  def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted): Unit
  def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit
  def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit
}

case class BatchInfo(
  batchTime: Time,
  streamIdToInputInfo: Map[Int, StreamInputInfo],
  submissionTime: Long,
  processingStartTime: Option[Long],
  processingEndTime: Option[Long]
)

Monitoring and Listeners

Web UI

Built-in web interface for visualizing streaming application metrics, batch processing status, and receiver information.

class StreamingTab(parent: SparkUI)
class StreamingPage(parent: StreamingTab)
class StreamingJobProgressListener extends StreamingListener

Web UI

Types

// Time and Duration
case class Time(milliseconds: Long) {
  def +(that: Duration): Time
  def -(that: Duration): Time
  def -(that: Time): Duration
}

case class Duration(milliseconds: Long) {
  def +(that: Duration): Duration
  def -(that: Duration): Duration
  def *(times: Int): Duration
  def <(that: Duration): Boolean
  def <=(that: Duration): Boolean
}

case class Interval(beginTime: Time, endTime: Time)

// State Management Types
sealed trait ValidationResult[T]
case class ValidationError(field: String, message: String)

// Streaming Context States
object StreamingContextState extends Enumeration {
  val INITIALIZED, ACTIVE, STOPPED = Value
}

// Input Stream Information
case class StreamInputInfo(
  inputStreamId: Int,
  numRecords: Long,
  metadata: Map[String, Any] = Map.empty
) {
  def metadataDescription: Option[String]
}