or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-streams.mdevent-monitoring.mdindex.mdinput-sources.mdjava-api.mdkey-value-ops.mdstate-management.mdstreaming-context.mdwindow-ops.md
tile.json

streaming-context.mddocs/

Streaming Context

The StreamingContext is the main entry point for Apache Spark Streaming functionality. It coordinates the streaming application, manages the execution, and provides methods for creating input streams and configuring the streaming environment.

Capabilities

StreamingContext Creation

Create a StreamingContext with various configuration options.

/**
 * Create StreamingContext from SparkContext
 * @param sparkContext - Existing SparkContext instance
 * @param batchDuration - Time interval at which streaming data will be divided into batches
 */
class StreamingContext(sparkContext: SparkContext, batchDuration: Duration)

/**
 * Create StreamingContext from SparkConf
 * @param conf - Spark configuration
 * @param batchDuration - Time interval at which streaming data will be divided into batches  
 */
class StreamingContext(conf: SparkConf, batchDuration: Duration)

/**
 * Create StreamingContext with master and app name
 * @param master - Cluster URL to connect to
 * @param appName - Name for your application
 * @param batchDuration - Time interval at which streaming data will be divided into batches
 * @param sparkHome - Spark home directory (optional)
 * @param jars - JAR files to send to the cluster (optional)
 * @param environment - Environment variables (optional)
 */
class StreamingContext(
  master: String,
  appName: String, 
  batchDuration: Duration,
  sparkHome: String = null,
  jars: Seq[String] = Nil,
  environment: Map[String, String] = Map()
)

Usage Examples:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{StreamingContext, Seconds}

// From SparkConf
val conf = new SparkConf().setAppName("MyStreamingApp").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(1))

// Direct creation
val ssc2 = new StreamingContext("local[2]", "MyApp", Seconds(5))

Lifecycle Management

Control the streaming application lifecycle with start, stop, and termination methods.

/**
 * Start the streaming context and begin processing
 */
def start(): Unit

/**
 * Stop the streaming context
 * @param stopSparkContext - Whether to stop the underlying SparkContext
 * @param stopGracefully - Whether to stop gracefully by waiting for data to be processed
 */
def stop(stopSparkContext: Boolean = true, stopGracefully: Boolean = false): Unit

/**
 * Wait for the streaming to terminate
 */
def awaitTermination(): Unit

/**
 * Wait for termination or timeout
 * @param timeout - Maximum time to wait in milliseconds
 * @returns true if terminated within timeout, false if timeout occurred
 */
def awaitTerminationOrTimeout(timeout: Long): Boolean

/**
 * Get the current state of the streaming context
 * @returns StreamingContextState (INITIALIZED, ACTIVE, or STOPPED)
 */
def getState(): StreamingContextState

Configuration Methods

Configure checkpointing, data retention, and streaming behavior.

/**
 * Set checkpoint directory for fault tolerance
 * @param directory - HDFS-compatible directory path for checkpoints
 */
def checkpoint(directory: String): Unit

/**
 * Set how long to remember RDDs for recovery
 * @param duration - Duration to keep RDDs in memory for recovery
 */
def remember(duration: Duration): Unit

/**
 * Add streaming listener for monitoring
 * @param streamingListener - Listener to receive streaming events
 */
def addStreamingListener(streamingListener: StreamingListener): Unit

Socket Input Streams

Create input streams from TCP sockets.

/**
 * Create text input stream from TCP socket
 * @param hostname - Hostname to connect to
 * @param port - Port number to connect to  
 * @param storageLevel - Storage level for received data
 * @returns ReceiverInputDStream of strings
 */
def socketTextStream(
  hostname: String, 
  port: Int, 
  storageLevel: StorageLevel = MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String]

/**
 * Create binary input stream from TCP socket
 * @param hostname - Hostname to connect to
 * @param port - Port number to connect to
 * @param converter - Function to convert InputStream to Iterator[T]
 * @param storageLevel - Storage level for received data
 * @returns ReceiverInputDStream of converted type
 */
def socketStream[T: ClassTag](
  hostname: String,
  port: Int, 
  converter: (InputStream) => Iterator[T],
  storageLevel: StorageLevel
): ReceiverInputDStream[T]

/**
 * Create raw TCP socket stream  
 * @param hostname - Hostname to connect to
 * @param port - Port number to connect to
 * @param storageLevel - Storage level for received data
 * @returns ReceiverInputDStream of byte arrays
 */
def rawSocketStream[T: ClassTag](
  hostname: String,
  port: Int,
  storageLevel: StorageLevel = MEMORY_AND_DISK_SER_2  
): ReceiverInputDStream[T]

File Input Streams

Create input streams that monitor file systems for new files.

/**
 * Create input stream from text files in a directory
 * @param directory - Directory path to monitor
 * @returns DStream of strings (file contents line by line)
 */
def textFileStream(directory: String): DStream[String]

/**
 * Create input stream from binary files  
 * @param directory - Directory path to monitor
 * @param recordLength - Length of each record in bytes
 * @returns DStream of byte arrays
 */
def binaryRecordsStream(directory: String, recordLength: Int): DStream[Array[Byte]]

/**
 * Create generic file input stream
 * @param directory - Directory path to monitor
 * @returns InputDStream of key-value pairs based on input format
 */
def fileStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag](
  directory: String
): InputDStream[(K, V)]

Queue and Custom Input Streams

Create input streams from RDD queues or custom receivers.

/**
 * Create input stream from queue of RDDs
 * @param queue - Queue containing RDDs to process
 * @param oneAtATime - Whether to process one RDD per batch
 * @param defaultRDD - Default RDD when queue is empty
 * @returns InputDStream of queue elements
 */
def queueStream[T: ClassTag](
  queue: Queue[RDD[T]],
  oneAtATime: Boolean = true,
  defaultRDD: RDD[T] = null
): InputDStream[T]

/**
 * Create input stream from custom receiver
 * @param receiver - Custom receiver implementation
 * @returns ReceiverInputDStream from the receiver
 */
def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T]

Stream Combination Operations

Combine multiple streams into unified streams.

/**
 * Union multiple DStreams of the same type
 * @param streams - Sequence of DStreams to union
 * @returns Single DStream containing data from all input streams
 */
def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T]

/**
 * Transform multiple DStreams using a custom function
 * @param dstreams - Sequence of input DStreams
 * @param transformFunc - Function to transform RDDs from all streams
 * @returns DStream with transformed data
 */
def transform[T: ClassTag](
  dstreams: Seq[DStream[_]], 
  transformFunc: (Seq[RDD[_]], Time) => RDD[T]
): DStream[T]

Usage Examples:

// Basic lifecycle
val ssc = new StreamingContext(conf, Seconds(1))
ssc.checkpoint("hdfs://checkpoint")

val lines = ssc.socketTextStream("localhost", 9999)
lines.print()

ssc.start()
ssc.awaitTermination()

// Combining streams
val stream1 = ssc.socketTextStream("host1", 9999)  
val stream2 = ssc.socketTextStream("host2", 9999)
val combined = ssc.union(Seq(stream1, stream2))

// Custom transformation
val transformed = ssc.transform(Seq(stream1, stream2), (rdds, time) => {
  val rdd1 = rdds(0).asInstanceOf[RDD[String]]
  val rdd2 = rdds(1).asInstanceOf[RDD[String]]  
  rdd1.union(rdd2).filter(_.nonEmpty)
})

Context Factory Methods

Factory methods for creating StreamingContext from checkpoints.

object StreamingContext {
  /**
   * Recreate StreamingContext from checkpoint
   * @param path - Path to checkpoint directory
   * @param hadoopConf - Hadoop configuration (optional)
   * @returns StreamingContext restored from checkpoint
   */
  def getOrCreate(
    path: String,
    creatingFunc: () => StreamingContext,
    hadoopConf: Configuration = new Configuration(),
    createOnError: Boolean = false
  ): StreamingContext
  
  /**
   * Get active StreamingContext
   * @returns Currently active StreamingContext or null
   */
  def getActive(): Option[StreamingContext]
}

Types

// Context state enumeration
object StreamingContextState extends Enumeration {
  type StreamingContextState = Value
  val INITIALIZED, ACTIVE, STOPPED = Value
}