or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

core-streaming.mdindex.mdinput-sources.mdjava-api.mdmonitoring-listeners.mdstate-management.mdweb-ui.md
tile.json

core-streaming.mddocs/

Core Streaming

Core streaming abstractions including StreamingContext for coordinating stream processing, DStream operations for data transformations, and time management classes.

Capabilities

StreamingContext

Main entry point for Spark Streaming functionality, responsible for creating input streams, managing the streaming lifecycle, and coordinating batch processing.

/**
 * Main entry point for all streaming operations
 * @param sparkContext - Spark context for cluster communication
 * @param batchDuration - Time interval for micro-batch processing
 */
class StreamingContext(sparkContext: SparkContext, batchDuration: Duration) {
  
  /** Alternative constructor using SparkConf */
  def this(conf: SparkConf, batchDuration: Duration)
  
  /** Constructor with master URL and app name */
  def this(
    master: String,
    appName: String,
    batchDuration: Duration,
    sparkHome: String = null,
    jars: Seq[String] = Nil,
    environment: Map[String, String] = Map()
  )
  
  /** Constructor for recreating from checkpoint directory */
  def this(path: String, hadoopConf: Configuration)
  
  /** Constructor for recreating from checkpoint directory (simplified) */
  def this(path: String)
  
  // Lifecycle management
  /** Start the streaming context and begin processing */
  def start(): Unit
  
  /** Stop the streaming context */
  def stop(stopSparkContext: Boolean = true): Unit
  
  /** Wait for the streaming context to terminate */
  def awaitTermination(): Unit
  
  /** Wait for termination or timeout */
  def awaitTerminationOrTimeout(timeout: Long): Boolean
  
  // Configuration
  /** Set checkpoint directory for fault tolerance */
  def checkpoint(directory: String): Unit
  
  /** Set remember duration for caching intermediate RDDs */
  def remember(duration: Duration): Unit
  
  // State and properties
  /** Get current state of the streaming context */
  def getState(): StreamingContextState
  
  /** Access to underlying Spark context */
  def sparkContext: SparkContext
  
  // Input stream creation (detailed in Input Sources doc)
  def socketTextStream(hostname: String, port: Int): DStream[String]
  def textFileStream(directory: String): DStream[String]
  def queueStream[T](queue: Queue[RDD[T]], oneAtATime: Boolean = true): DStream[T]
  
  // Listeners
  /** Add a streaming listener for monitoring */
  def addStreamingListener(listener: StreamingListener): Unit
  
  /** Remove a streaming listener */
  def removeStreamingListener(listener: StreamingListener): Unit
}

Usage Examples:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming._

// Create from SparkContext
val sc = new SparkContext(new SparkConf().setAppName("StreamingApp"))
val ssc = new StreamingContext(sc, Seconds(2))

// Create from SparkConf directly
val ssc2 = new StreamingContext(
  new SparkConf().setAppName("StreamingApp").setMaster("local[2]"), 
  Seconds(1)
)

// Configure checkpointing for fault tolerance
ssc.checkpoint("hdfs://namenode:9000/checkpoints")

// Create input streams and transformations
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))

// Start processing
ssc.start()
ssc.awaitTermination()

DStream Operations

Core abstraction representing a continuous sequence of RDDs with transformation and action operations.

/**
 * Discretized Stream - represents a continuous sequence of RDDs
 */
abstract class DStream[T] {
  
  // Core properties
  /** Duration of each batch */
  def slideDuration: Duration
  
  /** Dependencies on other DStreams */
  def dependencies: List[DStream[_]]
  
  /** Access to streaming context */
  def context: StreamingContext
  
  // Transformations - create new DStreams
  /** Transform each element using a function */
  def map[U](mapFunc: T => U): DStream[U]
  
  /** Transform each element to zero or more elements */
  def flatMap[U](flatMapFunc: T => Iterable[U]): DStream[U]
  
  /** Filter elements based on a predicate */
  def filter(filterFunc: T => Boolean): DStream[T]
  
  /** Group elements into arrays for each batch */
  def glom(): DStream[Array[T]]
  
  /** Repartition the DStream */
  def repartition(numPartitions: Int): DStream[T]
  
  /** Union with another DStream */
  def union(that: DStream[T]): DStream[T]
  
  /** Cache the DStream RDDs at default storage level */
  def cache(): DStream[T]
  
  /** Persist the DStream RDDs at specified storage level */
  def persist(level: StorageLevel): DStream[T]
  
  // Window operations
  /** Create windowed DStream */
  def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
  
  /** Reduce elements in a sliding window */
  def reduceByWindow(
    reduceFunc: (T, T) => T,
    windowDuration: Duration,
    slideDuration: Duration
  ): DStream[T]
  
  /** Count elements in a sliding window */
  def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]
  
  // Note: Stateful operations (updateStateByKey, mapWithState) are only available 
  // on DStream[(K, V)] through PairDStreamFunctions - see that section below
  
  // Actions - trigger computation
  /** Print first 10 elements of each batch */
  def print(): Unit
  
  /** Print first num elements of each batch */
  def print(num: Int): Unit
  
  /** Apply a function to each RDD in the DStream */
  def foreachRDD(func: RDD[T] => Unit): Unit
  
  /** Apply a function to each RDD with timestamp */
  def foreachRDD(func: (RDD[T], Time) => Unit): Unit
  
  // Output operations
  /** Save as text files with prefix */
  def saveAsTextFiles(prefix: String, suffix: String = ""): Unit
  
  /** Save as Hadoop files */
  def saveAsHadoopFiles[F <: OutputFormat[K, V]](
    prefix: String,
    suffix: String = ""
  ): Unit
}

Usage Examples:

// Basic transformations
val lines: DStream[String] = ssc.socketTextStream("localhost", 9999)
val words: DStream[String] = lines.flatMap(_.split(" "))
val filtered: DStream[String] = words.filter(_.length > 3)

// Window operations
val windowedWords = words.window(Seconds(10), Seconds(2))
val wordCounts = words.map((_, 1))
                     .reduceByKeyAndWindow(_ + _, Seconds(10), Seconds(2))

// Actions
words.print(20)  // Print first 20 elements each batch
words.foreachRDD { rdd =>
  if (!rdd.isEmpty()) {
    println(s"Batch has ${rdd.count()} elements")
  }
}

// Save to files
words.saveAsTextFiles("hdfs://namenode/output", "txt")

PairDStreamFunctions

Operations available only on DStreams of key-value pairs, providing aggregation and join capabilities.

/**
 * Additional operations on DStreams of (K, V) pairs
 * Available via implicit conversion when DStream contains tuples
 */
class PairDStreamFunctions[K, V](self: DStream[(K, V)]) {
  
  // Key-based aggregations
  /** Group values by key */
  def groupByKey(): DStream[(K, Iterable[V])]
  
  /** Reduce values by key using associative function */
  def reduceByKey(func: (V, V) => V): DStream[(K, V)]
  
  /** Combine values by key using combiner functions */
  def combineByKey[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiner: (C, C) => C
  ): DStream[(K, C)]
  
  /** Count occurrences of each key */
  def countByKey(): DStream[(K, Long)]
  
  // Joins with other pair DStreams
  /** Inner join with another pair DStream */
  def join[W](other: DStream[(K, W)]): DStream[(K, (V, W))]
  
  /** Left outer join */
  def leftOuterJoin[W](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))]
  
  /** Right outer join */
  def rightOuterJoin[W](other: DStream[(K, W)]): DStream[(K, (Option[V], W))]
  
  /** Full outer join */
  def fullOuterJoin[W](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))]
  
  // Windowed operations
  /** Group by key within a sliding window */
  def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Iterable[V])]
  
  /** Reduce by key within a sliding window */
  def reduceByKeyAndWindow(
    func: (V, V) => V,
    windowDuration: Duration,
    slideDuration: Duration
  ): DStream[(K, V)]
  
  // Stateful operations
  /** Update state by key using a function */
  def updateStateByKey[S](
    updateFunc: (Seq[V], Option[S]) => Option[S]
  ): DStream[(K, S)]
  
  /** Update state by key with custom partitioner */
  def updateStateByKey[S](
    updateFunc: (Seq[V], Option[S]) => Option[S],
    partitioner: Partitioner
  ): DStream[(K, S)]
  
  /** Update state by key with number of partitions */
  def updateStateByKey[S](
    updateFunc: (Seq[V], Option[S]) => Option[S],
    numPartitions: Int
  ): DStream[(K, S)]
  
  /** Map with state using StateSpec */
  def mapWithState[StateType, MappedType](
    spec: StateSpec[K, V, StateType, MappedType]
  ): MapWithStateDStream[K, V, StateType, MappedType]
}

Time and Duration Classes

Time management classes for specifying batch intervals, window durations, and timeout periods.

/**
 * Represents a duration for streaming operations
 */
case class Duration(milliseconds: Long) {
  // Arithmetic operations
  def +(other: Duration): Duration
  def -(other: Duration): Duration
  def *(times: Int): Duration
  def /(divisor: Int): Duration
  
  // Comparison operations
  def <(other: Duration): Boolean
  def <=(other: Duration): Boolean
  def >(other: Duration): Boolean
  def >=(other: Duration): Boolean
  
  // Utility methods
  def isMultipleOf(other: Duration): Boolean
  def min(other: Duration): Duration
  def max(other: Duration): Duration
  def isZero: Boolean
  def prettyPrint: String
}

/**
 * Represents an absolute point in time
 */
case class Time(milliseconds: Long) {
  // Arithmetic with Duration
  def +(duration: Duration): Time
  def -(duration: Duration): Time
  def -(other: Time): Duration
  
  // Utility methods
  def floor(duration: Duration): Time
  def isMultipleOf(duration: Duration): Boolean
  def until(endTime: Time, stepSize: Duration): Seq[Time]
  def to(endTime: Time, stepSize: Duration): Seq[Time]
}

/**
 * Factory objects for creating durations
 */
object Milliseconds {
  def apply(milliseconds: Long): Duration
}

object Seconds {
  def apply(seconds: Long): Duration
}

object Minutes {
  def apply(minutes: Long): Duration
}

/**
 * Java-friendly duration factories
 */
object Durations {
  def milliseconds(milliseconds: Long): Duration
  def seconds(seconds: Long): Duration  
  def minutes(minutes: Long): Duration
}

/**
 * Represents a time interval with start and end
 */
case class Interval(beginTime: Time, endTime: Time) {
  def duration: Duration = endTime - beginTime
  def contains(time: Time): Boolean
}

Usage Examples:

// Creating durations
val batchInterval = Seconds(2)
val windowSize = Minutes(10) 
val slideInterval = Seconds(30)

// Time arithmetic
val future = Time(System.currentTimeMillis()) + Minutes(5)
val elapsed = Time(System.currentTimeMillis()) - Time(startTime)

// Using in streaming operations
val ssc = new StreamingContext(conf, Seconds(1))
val windowed = dstream.window(Minutes(5), Seconds(30))
val reduced = dstream.reduceByWindow(_ + _, Minutes(2), Seconds(10))