or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-streams.mdevent-monitoring.mdindex.mdinput-sources.mdjava-api.mdkey-value-ops.mdstate-management.mdstreaming-context.mdwindow-ops.md
tile.json

tessl/maven-org-apache-spark--spark-streaming_2-11

Scalable, high-throughput, fault-tolerant stream processing library for real-time data processing on Apache Spark

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming_2.11@2.4.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-streaming_2-11@2.4.0

index.mddocs/

Apache Spark Streaming

Apache Spark Streaming is a scalable, high-throughput, fault-tolerant stream processing library built on Apache Spark Core that enables processing of live data streams from sources like Kafka, Flume, Kinesis, or TCP sockets. It provides high-level APIs in Scala, Java, and Python for building streaming applications using functional transformations like map, reduce, join, and window operations on DStreams (discretized streams).

Package Information

  • Package Name: org.apache.spark:spark-streaming_2.11
  • Package Type: Maven
  • Language: Scala (with Java API)
  • Installation: Add dependency to your Maven pom.xml or SBT build.sbt

Maven:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.11</artifactId>
  <version>2.4.8</version>
</dependency>

SBT:

"org.apache.spark" %% "spark-streaming" % "2.4.8"

Core Imports

Scala:

import org.apache.spark.streaming.{StreamingContext, Duration, Seconds, Minutes}
import org.apache.spark.streaming.dstream.{DStream, PairDStreamFunctions}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel

Java:

import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.SparkConf;
import org.apache.spark.storage.StorageLevel;

Basic Usage

Scala Example:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{StreamingContext, Seconds}

// Create streaming context
val conf = new SparkConf().setAppName("StreamingExample")
val ssc = new StreamingContext(conf, Seconds(1))

// Create input stream from TCP socket
val lines = ssc.socketTextStream("localhost", 9999)

// Transform and process data
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)

// Output results
wordCounts.print()

// Start streaming
ssc.start()
ssc.awaitTermination()

Java Example:

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.Durations;
import java.util.Arrays;
import scala.Tuple2;

// Create streaming context
SparkConf conf = new SparkConf().setAppName("StreamingExample");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

// Create input stream from TCP socket
JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);

// Transform and process data
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
                                                   .reduceByKey((i1, i2) -> i1 + i2);

// Output results
wordCounts.print();

// Start streaming
jssc.start();
jssc.awaitTermination();

Architecture

Apache Spark Streaming is built around several key components:

  • StreamingContext: Main entry point that coordinates the streaming application and manages the execution
  • DStream (Discretized Stream): Core abstraction representing a continuous stream of RDDs, with time-sliced micro-batches
  • Input Sources: Various data sources like sockets, files, Kafka, Flume for ingesting streaming data
  • Transformations: Functional operations like map, filter, reduce that transform DStreams
  • Output Operations: Actions that send processed data to external systems or storage
  • Checkpointing: Fault tolerance mechanism that periodically saves state to persistent storage
  • Receivers: Components that collect data from streaming sources and feed it into Spark

Capabilities

Streaming Context Management

Entry point for creating and managing Spark Streaming applications. Handles initialization, configuration, and lifecycle management.

class StreamingContext(sparkContext: SparkContext, batchDuration: Duration)
class StreamingContext(conf: SparkConf, batchDuration: Duration)

// Lifecycle methods
def start(): Unit
def stop(stopSparkContext: Boolean = true): Unit
def awaitTermination(): Unit
def awaitTerminationOrTimeout(timeout: Long): Boolean

// Configuration
def checkpoint(directory: String): Unit
def remember(duration: Duration): Unit

Streaming Context

Data Stream Operations

Core functionality for creating, transforming, and processing continuous data streams with functional programming paradigms.

abstract class DStream[T] {
  // Transformations
  def map[U](mapFunc: T => U): DStream[U]
  def flatMap[U](flatMapFunc: T => TraversableOnce[U]): DStream[U]
  def filter(filterFunc: T => Boolean): DStream[T]
  def transform[U](transformFunc: RDD[T] => RDD[U]): DStream[U]
  def union(that: DStream[T]): DStream[T]
  
  // Window operations
  def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
  def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]
  
  // Output operations
  def foreachRDD(foreachFunc: RDD[T] => Unit): Unit
  def print(num: Int = 10): Unit
}

Data Streams

Input Sources

Various mechanisms for ingesting data from external sources including sockets, files, message queues, and custom receivers.

// Socket streams
def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String]
def rawSocketStream[T](hostname: String, port: Int, storageLevel: StorageLevel = MEMORY_AND_DISK_SER_2): ReceiverInputDStream[T]

// File streams  
def textFileStream(directory: String): DStream[String]
def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): InputDStream[(K, V)]

// Queue and custom streams
def queueStream[T](queue: Queue[RDD[T]], oneAtATime: Boolean = true): InputDStream[T]
def receiverStream[T](receiver: Receiver[T]): ReceiverInputDStream[T]

Input Sources

Key-Value Operations

Specialized operations for key-value pair streams including aggregations, joins, and state management.

class PairDStreamFunctions[K, V] {
  // Aggregations
  def groupByKey(): DStream[(K, Iterable[V])]
  def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)]
  def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C): DStream[(K, C)]
  
  // Window aggregations
  def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration): DStream[(K, V)]
  
  // State management
  def updateStateByKey[S](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
  def mapWithState[StateType, MappedType](spec: StateSpec[K, V, StateType, MappedType]): MapWithStateDStream[K, V, StateType, MappedType]
  
  // Joins
  def join[W](other: DStream[(K, W)]): DStream[(K, (V, W))]
  def leftOuterJoin[W](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]
}

Key-Value Operations

Window Operations

Time-based windowing operations for aggregating data across multiple time intervals.

// Basic windowing
def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]

// Windowed reductions
def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T] 
def reduceByWindow(reduceFunc: (T, T) => T, invReduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]

Window Operations

State Management

Stateful processing capabilities for maintaining state across streaming batches with fault tolerance.

// StateSpec for mapWithState
object StateSpec {
  def function[KeyType, ValueType, StateType, MappedType](
    mappingFunction: (KeyType, Option[ValueType], State[StateType]) => Option[MappedType]
  ): StateSpec[KeyType, ValueType, StateType, MappedType]
}

abstract class State[S] {
  def exists(): Boolean
  def get(): S
  def update(newState: S): Unit
  def remove(): Unit
  def isTimingOut(): Boolean
}

State Management

Java API

Java-friendly wrappers providing the same functionality with Java-compatible interfaces and type system.

public class JavaStreamingContext {
  public JavaStreamingContext(SparkConf conf, Duration batchDuration);
  public JavaDStream<String> socketTextStream(String hostname, int port);
  public void start();
  public void awaitTermination();
}

public class JavaDStream<T> {
  public <R> JavaDStream<R> map(Function<T, R> f);
  public JavaDStream<T> filter(Function<T, Boolean> f);
  public <R> JavaDStream<R> flatMap(FlatMapFunction<T, R> f);
  public void foreachRDD(VoidFunction<JavaRDD<T>> foreachFunc);
}

Java API

Event Monitoring

Listener system for monitoring streaming application performance, batch processing, and receiver status.

trait StreamingListener {
  def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted): Unit
  def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit  
  def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit
  def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit
  def onReceiverError(receiverError: StreamingListenerReceiverError): Unit
}

Event Monitoring

Types

// Time-related types
case class Duration(milliseconds: Long) {
  def +(that: Duration): Duration
  def -(that: Duration): Duration  
  def *(times: Int): Duration
  def /(that: Duration): Double
}

case class Time(milliseconds: Long) {
  def +(that: Duration): Time
  def -(that: Duration): Time
  def -(that: Time): Duration
}

case class Interval(beginTime: Time, endTime: Time) {
  def duration(): Duration
}

// Helper objects
object Seconds {
  def apply(seconds: Long): Duration
}

object Minutes {
  def apply(minutes: Long): Duration  
}

object Milliseconds {
  def apply(milliseconds: Long): Duration
}