or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-streams.mdevent-monitoring.mdindex.mdinput-sources.mdjava-api.mdkey-value-ops.mdstate-management.mdstreaming-context.mdwindow-ops.md
tile.json

data-streams.mddocs/

Data Streams

DStream (Discretized Stream) is the core abstraction in Spark Streaming representing a continuous stream of data. Internally, a DStream is represented as a sequence of RDDs (Resilient Distributed Datasets), where each RDD contains data for a specific time interval.

Capabilities

Basic Transformations

Core transformation operations that modify each element or the structure of the DStream.

/**
 * Transform each element using a function
 * @param mapFunc - Function to apply to each element
 * @returns New DStream with transformed elements
 */
def map[U: ClassTag](mapFunc: T => U): DStream[U]

/**
 * Transform each element to zero or more elements
 * @param flatMapFunc - Function returning a collection for each element
 * @returns New DStream with flattened results
 */
def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] 

/**
 * Filter elements based on a predicate
 * @param filterFunc - Function returning true for elements to keep
 * @returns New DStream with filtered elements
 */
def filter(filterFunc: T => Boolean): DStream[T]

/**
 * Group elements within each RDD into arrays
 * @returns DStream where each element becomes an array of elements
 */
def glom(): DStream[Array[T]]

/**
 * Transform each partition using a function
 * @param mapPartFunc - Function to transform an iterator of elements
 * @param preservePartitioning - Whether to preserve the partitioner
 * @returns New DStream with transformed partitions
 */
def mapPartitions[U: ClassTag](
  mapPartFunc: Iterator[T] => Iterator[U], 
  preservePartitioning: Boolean = false
): DStream[U]

Usage Examples:

val lines = ssc.socketTextStream("localhost", 9999)

// Map transformation
val lengths = lines.map(_.length)

// FlatMap transformation  
val words = lines.flatMap(_.split(" "))

// Filter transformation
val longLines = lines.filter(_.length > 10)

// Combined transformations
val wordCounts = lines
  .flatMap(_.split(" "))
  .map((_, 1))
  .reduceByKey(_ + _)

Advanced Transformations

More complex transformation operations including custom RDD transformations.

/**
 * Transform each RDD using a custom function
 * @param transformFunc - Function to transform each RDD
 * @returns New DStream with transformed RDDs
 */
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]

/**
 * Transform each RDD with access to time information
 * @param transformFunc - Function receiving RDD and time
 * @returns New DStream with transformed RDDs
 */
def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U]

/**
 * Transform two DStreams together
 * @param other - Another DStream to combine with
 * @param transformFunc - Function to transform both RDDs together
 * @returns New DStream with combined transformation
 */
def transformWith[U: ClassTag, V: ClassTag](
  other: DStream[U], 
  transformFunc: (RDD[T], RDD[U]) => RDD[V]
): DStream[V]

/**
 * Transform two DStreams with time information
 * @param other - Another DStream to combine with  
 * @param transformFunc - Function receiving both RDDs and time
 * @returns New DStream with combined transformation
 */
def transformWith[U: ClassTag, V: ClassTag](
  other: DStream[U],
  transformFunc: (RDD[T], RDD[U], Time) => RDD[V]  
): DStream[V]

/**
 * Repartition the DStream to specified number of partitions
 * @param numPartitions - Number of partitions for output
 * @returns Repartitioned DStream
 */
def repartition(numPartitions: Int): DStream[T]

/**
 * Union this DStream with another DStream of same type
 * @param that - DStream to union with
 * @returns Combined DStream containing data from both streams
 */
def union(that: DStream[T]): DStream[T]

Persistence and Caching

Control how DStream data is stored and cached across operations.

/**
 * Persist DStream RDDs with specified storage level
 * @param level - Storage level (MEMORY_ONLY, MEMORY_AND_DISK, etc.)
 * @returns This DStream for method chaining
 */
def persist(level: StorageLevel): DStream[T]

/**
 * Cache DStream RDDs in memory
 * @returns This DStream for method chaining  
 */
def cache(): DStream[T]

/**
 * Enable checkpointing for this DStream
 * @param interval - Interval between checkpoints
 * @returns This DStream for method chaining
 */
def checkpoint(interval: Duration): DStream[T]

Aggregation Operations

Operations that aggregate data within each batch.

/**
 * Reduce elements in each RDD using a function
 * @param reduceFunc - Associative and commutative reduce function
 * @returns DStream with one element per RDD (the reduced result)
 */
def reduce(reduceFunc: (T, T) => T): DStream[T]

/**
 * Count elements in each RDD
 * @returns DStream of Long values representing counts
 */
def count(): DStream[Long]

/**
 * Count occurrences of each unique element
 * @param numPartitions - Number of partitions for the result
 * @returns DStream of (element, count) pairs
 */
def countByValue(numPartitions: Int = ssc.sc.defaultParallelism): DStream[(T, Long)]

Window Operations

Time-based windowing operations for analyzing data across multiple batches.

/**
 * Create windowed DStream
 * @param windowDuration - Width of the window
 * @param slideDuration - Sliding interval of the window (optional, defaults to batch interval)
 * @returns DStream containing data from the specified window
 */
def window(windowDuration: Duration): DStream[T]
def window(windowDuration: Duration, slideDuration: Duration): DStream[T]

/**
 * Count elements over a sliding window
 * @param windowDuration - Width of the window
 * @param slideDuration - Sliding interval of the window
 * @returns DStream of window counts
 */
def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]

/**
 * Count occurrences of each element over a sliding window
 * @param windowDuration - Width of the window
 * @param slideDuration - Sliding interval of the window
 * @param numPartitions - Number of partitions for result (optional)
 * @returns DStream of (element, count) pairs over the window
 */
def countByValueAndWindow(
  windowDuration: Duration,
  slideDuration: Duration, 
  numPartitions: Int = ssc.sc.defaultParallelism
): DStream[(T, Long)]

/**
 * Reduce elements over a sliding window
 * @param reduceFunc - Associative and commutative reduce function
 * @param windowDuration - Width of the window
 * @param slideDuration - Sliding interval of the window
 * @returns DStream with reduced results over windows
 */
def reduceByWindow(
  reduceFunc: (T, T) => T,
  windowDuration: Duration, 
  slideDuration: Duration
): DStream[T]

/**
 * Reduce elements over sliding window with inverse function for efficiency
 * @param reduceFunc - Associative reduce function
 * @param invReduceFunc - Inverse of the reduce function
 * @param windowDuration - Width of the window
 * @param slideDuration - Sliding interval of the window
 * @returns DStream with reduced results over windows
 */
def reduceByWindow(
  reduceFunc: (T, T) => T,
  invReduceFunc: (T, T) => T, 
  windowDuration: Duration,
  slideDuration: Duration
): DStream[T]

Output Operations

Actions that send data to external systems or trigger computation.

/**
 * Apply function to each RDD in the DStream
 * @param foreachFunc - Function to apply to each RDD
 */
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit

/**
 * Apply function to each RDD with time information
 * @param foreachFunc - Function receiving RDD and time
 */
def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit

/**
 * Print first num elements of each RDD to console
 * @param num - Number of elements to print (default 10)
 */
def print(num: Int = 10): Unit

/**
 * Save DStream as text files
 * @param prefix - Prefix for output file names
 * @param suffix - Suffix for output file names (optional)
 */
def saveAsTextFiles(prefix: String, suffix: String = ""): Unit

/**
 * Save DStream as object files (serialized)
 * @param prefix - Prefix for output file names  
 * @param suffix - Suffix for output file names (optional)
 */
def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit

Usage Examples:

val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))

// Window operations
val windowedWords = words.window(Seconds(30), Seconds(10))
val windowedCounts = words.countByWindow(Seconds(30), Seconds(10))

// Reduce over windows
val windowedWordCount = words
  .map((_, 1))
  .reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))

// Output operations
words.foreachRDD { rdd =>
  rdd.collect().foreach(println)
}

windowedWordCount.print()
windowedWordCount.saveAsTextFiles("output/wordcount")

Utility Operations

Helper methods for accessing DStream data and metadata.

/**
 * Get RDDs for a specific time range
 * @param fromTime - Start time (inclusive)
 * @param toTime - End time (exclusive)
 * @returns Sequence of RDDs in the time range
 */
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]]

/**
 * Get RDDs for a specific time interval
 * @param interval - Time interval to retrieve
 * @returns Sequence of RDDs in the interval
 */
def slice(interval: Interval): Seq[RDD[T]]

/**
 * Compute the RDD for a specific time
 * @param validTime - Time for which to compute the RDD
 * @returns RDD containing data for the specified time
 */
def compute(validTime: Time): Option[RDD[T]]

Key Properties

// DStream properties
def slideDuration: Duration         // Sliding interval of the DStream
def dependencies: List[DStream[_]]  // Parent DStreams this DStream depends on  
def generatedRDDs: HashMap[Time, RDD[T]] // Cache of generated RDDs
def rememberDuration: Duration      // How long to remember RDDs
def storageLevel: StorageLevel      // Storage level for persistence
def mustCheckpoint: Boolean         // Whether this DStream requires checkpointing
def checkpointDuration: Duration    // Checkpoint interval

Advanced Usage:

// Custom transformation with RDD operations
val processed = lines.transform { rdd =>
  val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
  import sqlContext.implicits._
  
  val df = rdd.toDF("line")
  df.filter($"line".contains("ERROR"))
    .select($"line")
    .rdd
    .map(_.getString(0))
}

// Complex windowed operations
val complexWindow = words
  .map((_, 1))
  .reduceByKeyAndWindow(
    (a: Int, b: Int) => a + b,    // Add new values
    (a: Int, b: Int) => a - b,    // Remove old values  
    Seconds(30),                   // Window duration
    Seconds(10)                    // Slide duration
  )
  .filter(_._2 > 5)               // Filter low counts

// Combining multiple streams with transformWith  
val stream1 = ssc.socketTextStream("host1", 9999)
val stream2 = ssc.socketTextStream("host2", 9999)  

val combined = stream1.transformWith(stream2, (rdd1: RDD[String], rdd2: RDD[String]) => {
  rdd1.intersection(rdd2)  // Find common elements
})