or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-streaming_2.11@1.6.x

docs

dstream-operations.mdindex.mdinput-sources.mdjava-api.mdmonitoring-listeners.mdpaired-dstream-operations.mdreceiver-framework.mdstreaming-context.mdutility-classes.mdwindow-operations.md
tile.json

tessl/maven-org-apache-spark--spark-streaming_2-11

tessl install tessl/maven-org-apache-spark--spark-streaming_2-11@1.6.0

Apache Spark Streaming library for processing live data streams with fault-tolerance and high throughput.

streaming-context.mddocs/

Streaming Context

The StreamingContext is the main entry point for all Spark Streaming functionality. It provides methods to create DStreams from various input sources, manages the streaming application lifecycle, and coordinates the execution of streaming computations.

Core Imports

import org.apache.spark.streaming.{StreamingContext, Duration, Seconds}
import org.apache.spark.{SparkConf, SparkContext}

StreamingContext Creation

Constructor with SparkContext

class StreamingContext(sparkContext: SparkContext, batchDuration: Duration)

Create a StreamingContext using an existing SparkContext.

Parameters:

  • sparkContext - Existing SparkContext to use
  • batchDuration - Time interval at which streaming data will be divided into batches

Example:

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))

Constructor with SparkConf

class StreamingContext(conf: SparkConf, batchDuration: Duration)

Create a StreamingContext by providing configuration for a new SparkContext.

Parameters:

  • conf - SparkConf object specifying Spark parameters
  • batchDuration - Time interval for batch processing

Example:

val conf = new SparkConf().setAppName("StreamingApp").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(1))

Constructor with Master and App Name

class StreamingContext(
  master: String,
  appName: String, 
  batchDuration: Duration,
  sparkHome: String = null,
  jars: Seq[String] = Nil,
  environment: Map[String, String] = Map()
)

Create a StreamingContext by providing connection details for a new SparkContext.

Parameters:

  • master - Cluster URL to connect to (e.g. "local[2]", "spark://host:port")
  • appName - Name for the application, displayed in cluster web UI
  • batchDuration - Time interval for batch processing
  • sparkHome - SPARK_HOME directory on slave nodes (optional)
  • jars - JAR files to distribute to cluster (optional)
  • environment - Environment variables for worker nodes (optional)

Example:

val ssc = new StreamingContext("local[2]", "MyStreamingApp", Seconds(1))

Lifecycle Management

Starting and Stopping

def start(): Unit
def stop(stopSparkContext: Boolean = true, stopGracefully: Boolean = false): Unit
def awaitTermination(): Unit
def awaitTermination(timeout: Long): Unit

start() - Start the execution of streams. Must be called after all transformations and output operations are defined.

stop() - Stop the execution of streams.

  • stopSparkContext - Whether to stop the underlying SparkContext (default: true)
  • stopGracefully - Whether to stop gracefully by waiting for processing of all received data (default: false)

awaitTermination() - Wait for the execution to stop. Blocks the current thread.

awaitTermination(timeout) - Wait for execution to stop with timeout in milliseconds.

Example:

ssc.start()
ssc.awaitTermination() // Wait until killed manually

State and Context Information

def sparkContext: SparkContext
def remember(duration: Duration): Unit
def checkpoint(directory: String): Unit

sparkContext - Get the associated SparkContext.

remember(duration) - Set the duration for which DStreams will remember each RDD created. This must be set before any input streams are created.

checkpoint(directory) - Set the checkpoint directory for fault tolerance. Required for stateful operations like updateStateByKey.

Example:

ssc.checkpoint("hdfs://checkpoints/my-app")
ssc.remember(Minutes(2))

Input Stream Creation

Socket Streams

def socketTextStream(
  hostname: String, 
  port: Int, 
  storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String]

def socketStream[T: ClassTag](
  hostname: String,
  port: Int, 
  converter: (InputStream) => Iterator[T],
  storageLevel: StorageLevel
): ReceiverInputDStream[T]

def rawSocketStream[T: ClassTag](
  hostname: String,
  port: Int,
  storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[T]

socketTextStream - Create input stream from TCP source that reads text data.

socketStream - Create input stream from TCP source with custom converter function.

rawSocketStream - Create input stream from TCP source for binary data.

Example:

val lines = ssc.socketTextStream("localhost", 9999)
val binaryData = ssc.rawSocketStream[ByteBuffer]("localhost", 8888)

File Streams

def textFileStream(directory: String): DStream[String]

def fileStream[K, V, F <: NewInputFormat[K, V]](
  directory: String,
  kClass: Class[K],
  vClass: Class[V], 
  fClass: Class[F]
): InputDStream[(K, V)]

def binaryRecordsStream(
  directory: String,
  recordLength: Int
): DStream[Array[Byte]]

textFileStream - Monitor directory for new text files and read them as a stream.

fileStream - Monitor directory for new files using Hadoop InputFormat.

binaryRecordsStream - Monitor directory for new binary files with fixed record length.

Example:

val textFiles = ssc.textFileStream("hdfs://data/input")
val csvFiles = ssc.fileStream[LongWritable, Text, TextInputFormat]("/data/csv")

Queue and Receiver Streams

def queueStream[T: ClassTag](
  queue: Queue[RDD[T]],
  oneAtATime: Boolean = true,
  defaultRDD: RDD[T] = null
): InputDStream[T]

def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T]

def actorStream[T: ClassTag](
  props: Props,
  name: String,
  storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
  supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
): ReceiverInputDStream[T]

queueStream - Create input stream from a queue of RDDs, useful for testing.

receiverStream - Create input stream using a custom Receiver.

actorStream - Create input stream using an Akka Actor as the receiver.

Example:

val rddQueue = new Queue[RDD[Int]]()
val queueStream = ssc.queueStream(rddQueue)

val customReceiver = new MyCustomReceiver()
val receiverStream = ssc.receiverStream(customReceiver)

Stream Union

def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T]

Create a unified DStream from multiple input streams of the same type.

Example:

val stream1 = ssc.socketTextStream("host1", 9999)
val stream2 = ssc.socketTextStream("host2", 9999) 
val combinedStream = ssc.union(Seq(stream1, stream2))

Static Context Methods

Checkpoint Recovery

object StreamingContext {
  def getOrCreate(
    checkpointPath: String,
    creatingFunc: () => StreamingContext,
    hadoopConf: Configuration = SparkHadoopUtil.get.conf,
    createOnError: Boolean = false
  ): StreamingContext
  
  def getActive(): Option[StreamingContext]
  
  def getActiveOrCreate(
    checkpointPath: String,
    creatingFunc: () => StreamingContext,
    hadoopConf: Configuration = SparkHadoopUtil.get.conf,
    createOnError: Boolean = false
  ): StreamingContext
}

getOrCreate - Get StreamingContext from checkpoint or create new one if checkpoint doesn't exist.

getActive - Get the currently active StreamingContext, if any.

getActiveOrCreate - Get active StreamingContext or create from checkpoint/function.

Example:

def createContext(): StreamingContext = {
  val conf = new SparkConf().setAppName("RecoverableApp")
  val ssc = new StreamingContext(conf, Seconds(1))
  ssc.checkpoint("hdfs://checkpoints")
  // Define your streaming computation
  ssc
}

val ssc = StreamingContext.getOrCreate("hdfs://checkpoints", createContext _)

Usage Examples

Basic Application Setup

import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.SparkConf

// Create and configure StreamingContext
val conf = new SparkConf()
  .setAppName("MyStreamingApp")
  .setMaster("local[2]")
  .set("spark.streaming.blockInterval", "200ms")

val ssc = new StreamingContext(conf, Seconds(1))
ssc.checkpoint("checkpoints")

// Create input streams
val lines = ssc.socketTextStream("localhost", 9999)

// Define transformations
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)

// Define output
wordCounts.print()

// Start and wait
ssc.start()
ssc.awaitTermination()

Fault Tolerant Application

import org.apache.spark.streaming.{StreamingContext, Seconds}

def createStreamingContext(): StreamingContext = {
  val conf = new SparkConf().setAppName("FaultTolerantApp")
  val ssc = new StreamingContext(conf, Seconds(1))
  
  // Set checkpoint directory
  ssc.checkpoint("hdfs://checkpoints/fault-tolerant")
  
  // Define your streaming computation
  val lines = ssc.socketTextStream("localhost", 9999)
  val wordCounts = lines
    .flatMap(_.split(" "))
    .map((_, 1))
    .reduceByKey(_ + _)
  
  wordCounts.print()
  ssc
}

// Recover from checkpoint or create new
val ssc = StreamingContext.getOrCreate(
  "hdfs://checkpoints/fault-tolerant",
  createStreamingContext _
)

ssc.start()
ssc.awaitTermination()

Multiple Input Sources

val ssc = new StreamingContext(conf, Seconds(1))

// Multiple input sources
val kafkaStream = createKafkaStream(ssc) // Custom Kafka stream
val socketStream = ssc.socketTextStream("localhost", 9999)
val fileStream = ssc.textFileStream("hdfs://input")

// Union multiple streams
val allStreams = ssc.union(Seq(kafkaStream, socketStream, fileStream))

// Process unified stream
val processed = allStreams
  .filter(_.nonEmpty)
  .map(processMessage)

processed.print()

ssc.start()
ssc.awaitTermination()

Configuration Options

Key Spark Streaming configuration parameters that can be set in SparkConf:

  • spark.streaming.blockInterval - Interval for receiver block generation (default: 200ms)
  • spark.streaming.receiver.maxRate - Maximum records per second per receiver (default: unlimited)
  • spark.streaming.backpressure.enabled - Enable backpressure mechanism (default: false)
  • spark.streaming.kafka.maxRatePerPartition - Max records per second per Kafka partition
  • spark.streaming.stopGracefullyOnShutdown - Enable graceful shutdown (default: false)
  • spark.streaming.unpersist - Force unpersisting of old RDDs (default: true)

Example:

val conf = new SparkConf()
  .setAppName("ConfiguredApp")
  .set("spark.streaming.blockInterval", "100ms")
  .set("spark.streaming.receiver.maxRate", "1000")
  .set("spark.streaming.backpressure.enabled", "true")
  .set("spark.streaming.stopGracefullyOnShutdown", "true")