or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming_2.11@1.6.x
tile.json

tessl/maven-org-apache-spark--spark-streaming_2-11

tessl install tessl/maven-org-apache-spark--spark-streaming_2-11@1.6.0

Apache Spark Streaming library for processing live data streams with fault-tolerance and high throughput.

index.mddocs/

Apache Spark Streaming

Apache Spark Streaming is a scalable, fault-tolerant streaming processing library built on Apache Spark Core that enables the processing of live data streams. It provides high-level APIs in Scala, Java, and Python for building streaming applications that can continuously ingest data from various sources and process it using complex algorithms expressed with high-level functions like map, reduce, join and window operations. The library offers micro-batch processing architecture with configurable batch intervals, automatic fault recovery through checkpointing and write-ahead logs, exactly-once processing semantics for supported sources, and seamless integration with other Spark components.

Package Information

  • Package Name: spark-streaming_2.11
  • Package Type: maven
  • Language: Scala (with Java and Python APIs)
  • Coordinates: org.apache.spark:spark-streaming_2.11:1.6.3
  • Installation: Add to Maven dependencies or download from Apache Spark

Core Imports

Scala:

import org.apache.spark.streaming.{StreamingContext, Seconds, Duration}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.{SparkConf, SparkContext}

Java:

import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.Duration;
import org.apache.spark.SparkConf;

Python:

from pyspark.streaming import StreamingContext
from pyspark import SparkContext, SparkConf

Basic Usage

Scala:

import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.SparkConf

// Create streaming context
val conf = new SparkConf().setAppName("StreamingApp").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(1))

// Create input stream from socket
val lines = ssc.socketTextStream("localhost", 9999)

// Transform and process
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)

// Output results
wordCounts.print()

// Start processing
ssc.start()
ssc.awaitTermination()

Java:

import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.Durations;

// Create streaming context
SparkConf conf = new SparkConf().setAppName("StreamingApp").setMaster("local[2]");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

// Create input stream
JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);

// Transform and process
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);

// Output results
wordCounts.print();

// Start processing
jssc.start();
jssc.awaitTermination();

Architecture

Apache Spark Streaming is built around several key components:

  • Streaming Context: Entry point that manages the streaming application lifecycle and creates input streams
  • DStreams: Discretized streams that represent continuous streams as sequences of RDDs
  • Input Sources: Various connectors for ingesting data from external systems (sockets, files, Kafka, etc.)
  • Transformations: Operations like map, filter, reduce that transform one DStream into another
  • Window Operations: Time-based operations that work over sliding windows of data
  • Output Operations: Actions like print, save, and custom output functions that write results
  • Checkpointing: Fault tolerance mechanism that saves application state for recovery
  • Receivers: Components responsible for continuously receiving data from external sources

Capabilities

Streaming Context

Core entry point for creating and managing streaming applications. Provides lifecycle management, checkpointing, and input stream creation.

class StreamingContext(sparkContext: SparkContext, batchDuration: Duration)
class StreamingContext(conf: SparkConf, batchDuration: Duration) 
class StreamingContext(master: String, appName: String, batchDuration: Duration, ...)

def start(): Unit
def stop(stopSparkContext: Boolean = true, stopGracefully: Boolean = false): Unit
def awaitTermination(): Unit
def awaitTermination(timeout: Long): Unit
def remember(duration: Duration): Unit
def checkpoint(directory: String): Unit

Streaming Context

DStream Operations

Core abstraction for discretized streams with comprehensive transformation and action operations for stream processing.

abstract class DStream[T](ssc: StreamingContext)

// Core transformations
def map[U](f: T => U): DStream[U]
def flatMap[U](f: T => TraversableOnce[U]): DStream[U]
def filter(f: T => Boolean): DStream[T]
def union(that: DStream[T]): DStream[T]
def repartition(numPartitions: Int): DStream[T]

// Reduction operations
def reduce(f: (T, T) => T): DStream[T]
def count(): DStream[Long]
def countByValue(): DStream[(T, Long)]

// Output operations
def print(): Unit
def print(num: Int): Unit
def saveAsTextFiles(prefix: String, suffix: String): Unit
def foreachRDD(f: RDD[T] => Unit): Unit
def foreachRDD(f: (RDD[T], Time) => Unit): Unit

DStream Operations

Input Sources

Various connectors for ingesting streaming data from external systems including sockets, files, queues, and custom receivers.

// Socket streams
def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String]
def socketStream[T](hostname: String, port: Int, converter: (InputStream) => Iterator[T], storageLevel: StorageLevel): ReceiverInputDStream[T]

// File streams  
def textFileStream(directory: String): DStream[String]
def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): InputDStream[(K, V)]

// Queue and receiver streams
def queueStream[T](queue: Queue[RDD[T]], oneAtATime: Boolean = true): InputDStream[T]
def receiverStream[T](receiver: Receiver[T]): ReceiverInputDStream[T]

Input Sources

Paired DStream Operations

Advanced operations for DStreams of key-value pairs including joins, grouping, state management, and window-based aggregations.

// Key-based operations
def groupByKey(): DStream[(K, Iterable[V])]
def reduceByKey(func: (V, V) => V): DStream[(K, V)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C): DStream[(K, C)]

// Join operations
def join[W](other: DStream[(K, W)]): DStream[(K, (V, W))]
def leftOuterJoin[W](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]
def rightOuterJoin[W](other: DStream[(K, W)]): DStream[(K, (Option[V], W))]
def fullOuterJoin[W](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))]

// State operations
def updateStateByKey[S](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
def mapWithState[StateType, MappedType](spec: StateSpec[K, V, StateType, MappedType]): MapWithStateDStream[K, V, StateType, MappedType]

Paired DStream Operations

Window Operations

Time-based operations that aggregate data over sliding windows, enabling temporal analytics and batch processing over streaming data.

def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]
def reduceByWindow(reduceFunc: (T, T) => T, invReduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]
def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]

// Window operations for paired DStreams
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Iterable[V])]
def reduceByKeyAndWindow(func: (V, V) => V, windowDuration: Duration, slideDuration: Duration): DStream[(K, V)]
def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Long)]

Window Operations

Java API

Complete Java-friendly API with type-safe wrappers for all Scala functionality, providing seamless integration for Java developers.

class JavaStreamingContext(SparkConf conf, Duration batchDuration)
class JavaStreamingContext(JavaSparkContext sparkContext, Duration batchDuration)

// Input methods
JavaDStream<String> socketTextStream(String hostname, int port)
JavaDStream<String> textFileStream(String directory)
<T> JavaInputDStream<T> receiverStream(Receiver<T> receiver)

// DStream classes
class JavaDStream<T>
class JavaPairDStream<K, V>
class JavaInputDStream<T>
class JavaReceiverInputDStream<T>

Java API

Receiver Framework

Extensible framework for building custom data ingestion components with lifecycle management, error handling, and fault tolerance.

abstract class Receiver[T](storageLevel: StorageLevel) extends Serializable

def onStart(): Unit
def onStop(): Unit
def store(data: T): Unit
def store(data: ArrayBuffer[T]): Unit
def restart(message: String): Unit
def restart(message: String, error: Throwable): Unit
def stop(message: String): Unit
def stop(message: String, error: Throwable): Unit

Receiver Framework

Utility Classes

Essential utility classes for time handling, state management, and duration operations in streaming applications.

case class Duration(millis: Long)
case class Time(millis: Long)
abstract class State[S]
class StateSpec[K, V, S, T]

// Duration helpers
object Milliseconds { def apply(milliseconds: Long): Duration }
object Seconds { def apply(seconds: Long): Duration }
object Minutes { def apply(minutes: Long): Duration }

Utility Classes

Monitoring and Listeners

Event-driven monitoring system for tracking streaming application health, performance metrics, and operational status.

trait StreamingListener {
  def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit
  def onReceiverError(receiverError: StreamingListenerReceiverError): Unit  
  def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit
  def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit
  def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit
  def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit
}

Monitoring and Listeners

Types

// Core streaming types
class StreamingContext
abstract class DStream[T]
abstract class InputDStream[T] extends DStream[T]
class ReceiverInputDStream[T] extends InputDStream[T]
class QueueInputDStream[T] extends InputDStream[T]
class FileInputDStream[K, V, F <: NewInputFormat[K, V]] extends InputDStream[(K, V)]
class SocketInputDStream[T] extends ReceiverInputDStream[T]
class RawInputDStream[T] extends ReceiverInputDStream[T]
class PluggableInputDStream[T] extends InputDStream[T]

// Specialized DStream types
class MappedDStream[T, U] extends DStream[U]
class FlatMappedDStream[T, U] extends DStream[U]
class FilteredDStream[T] extends DStream[T]
class GlommedDStream[T] extends DStream[Array[T]]
class MapPartitionedDStream[T, U] extends DStream[U]
class UnionDStream[T] extends DStream[T]
class TransformedDStream[T, U] extends DStream[U]
class WindowedDStream[T] extends DStream[T]
class ShuffledDStream[K, V, C] extends DStream[(K, C)]
class StateDStream[K, V, S] extends DStream[(K, S)]
class MapWithStateDStream[K, V, S, E] extends DStream[E]
class ForEachDStream[T] extends DStream[T]

// Time and duration
case class Duration(private val millis: Long) {
  def +(that: Duration): Duration
  def -(that: Duration): Duration
  def *(times: Int): Duration
  def /(that: Duration): Double
  def <(that: Duration): Boolean
  def >(that: Duration): Boolean
  def <=(that: Duration): Boolean
  def >=(that: Duration): Boolean
  def milliseconds: Long
  def prettyPrint: String
  def isMultipleOf(that: Duration): Boolean
  def min(that: Duration): Duration
  def max(that: Duration): Duration
}

case class Time(private val millis: Long) {
  def +(duration: Duration): Time
  def -(duration: Duration): Time
  def -(that: Time): Duration
  def <(that: Time): Boolean
  def >(that: Time): Boolean
  def <=(that: Time): Boolean
  def >=(that: Time): Boolean
  def milliseconds: Long
  def floor(duration: Duration): Time
  def until(endTime: Time, duration: Duration): Seq[Time]
}

case class Interval(beginTime: Time, endTime: Time) {
  def duration: Duration
  def +(time: Duration): Interval
  def move(duration: Duration): Interval
  def size: Duration
  def intersect(other: Interval): Interval
  def union(other: Interval): Interval
  def covers(time: Time): Boolean
  def covers(other: Interval): Boolean
}

// Duration factory objects
object Milliseconds {
  def apply(milliseconds: Long): Duration
}
object Seconds {
  def apply(seconds: Long): Duration
}
object Minutes {
  def apply(minutes: Long): Duration
}

// State management
abstract class State[S] {
  def exists(): Boolean
  def get(): S
  def update(newState: S): Unit
  def remove(): Unit
  def isTimingOut(): Boolean
}

class StateSpec[K, V, S, T] {
  def function(mappingFunction: (K, Option[V], State[S]) => Option[T]): StateSpec[K, V, S, T]
  def initialState(rdd: RDD[(K, S)]): StateSpec[K, V, S, T]
  def numPartitions(numPartitions: Int): StateSpec[K, V, S, T]
  def timeout(duration: Duration): StateSpec[K, V, S, T]
  def partitioner(partitioner: Partitioner): StateSpec[K, V, S, T]
}

// Receiver framework
abstract class Receiver[T](storageLevel: StorageLevel) extends Serializable {
  def onStart(): Unit
  def onStop(): Unit
  def store(data: T): Unit
  def store(data: ArrayBuffer[T]): Unit
  def store(data: Iterator[T]): Unit
  def restart(message: String): Unit
  def restart(message: String, error: Throwable): Unit
  def stop(message: String): Unit
  def stop(message: String, error: Throwable): Unit
  def isStopped(): Boolean
  def isStarted(): Boolean
}

trait ActorReceiver {
  def store(data: Any): Unit
}

case class ReceiverInfo(
  streamId: Int,
  name: String,
  active: Boolean,
  location: String,
  executorId: String,
  lastErrorMessage: String = "",
  lastError: String = "",
  lastErrorTime: Long = -1L
)

// Storage and checkpointing
class Checkpoint
sealed abstract class StorageLevel
object StorageLevel {
  val NONE: StorageLevel
  val DISK_ONLY: StorageLevel
  val DISK_ONLY_2: StorageLevel
  val MEMORY_ONLY: StorageLevel
  val MEMORY_ONLY_2: StorageLevel
  val MEMORY_ONLY_SER: StorageLevel
  val MEMORY_ONLY_SER_2: StorageLevel
  val MEMORY_AND_DISK: StorageLevel
  val MEMORY_AND_DISK_2: StorageLevel
  val MEMORY_AND_DISK_SER: StorageLevel
  val MEMORY_AND_DISK_SER_2: StorageLevel
  val OFF_HEAP: StorageLevel
}

// Batch and job information
case class BatchInfo(
  batchTime: Time,
  streamIdToInputInfo: Map[Int, StreamInputInfo],
  submissionTime: Long,
  processingStartTime: Option[Long] = None,
  processingEndTime: Option[Long] = None
) {
  def processingDelay: Option[Long]
  def schedulingDelay: Option[Long]
  def totalDelay: Option[Long]
}

case class StreamInputInfo(
  inputStreamId: Int,
  numRecords: Long,
  metadata: Map[String, Any] = Map.empty
)

case class OutputOperationInfo(
  batchTime: Time,
  id: Int,
  name: String,
  description: String,
  startTime: Option[Long],
  endTime: Option[Long],
  failureReason: Option[String]
) {
  def duration: Option[Long] = for (s <- startTime; e <- endTime) yield e - s
}

// Listener events
trait StreamingListener {
  def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {}
  def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {}
  def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = {}
  def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {}
  def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {}
  def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {}
  def onOutputOperationStarted(outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = {}
  def onOutputOperationCompleted(outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = {}
}

sealed trait StreamingListenerEvent
case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo) extends StreamingListenerEvent
case class StreamingListenerReceiverError(receiverInfo: ReceiverInfo) extends StreamingListenerEvent
case class StreamingListenerReceiverStopped(receiverInfo: ReceiverInfo) extends StreamingListenerEvent
case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent
case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent
case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends StreamingListenerEvent
case class StreamingListenerOutputOperationStarted(outputOperationInfo: OutputOperationInfo) extends StreamingListenerEvent
case class StreamingListenerOutputOperationCompleted(outputOperationInfo: OutputOperationInfo) extends StreamingListenerEvent

// Context state
sealed abstract class StreamingContextState
object StreamingContextState {
  case object INITIALIZED extends StreamingContextState
  case object ACTIVE extends StreamingContextState
  case object STOPPED extends StreamingContextState
}

// Java API types
import org.apache.spark.streaming.api.java._
class JavaStreamingContext
class JavaDStream[T]
class JavaPairDStream[K, V]
class JavaInputDStream[T]
class JavaPairInputDStream[K, V]
class JavaReceiverInputDStream[T]
class JavaPairReceiverInputDStream[K, V]
class JavaMapWithStateDStream[K, V, S, E]
abstract class JavaStreamingListener
class JavaStreamingListenerWrapper