or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-streams.mdevent-monitoring.mdindex.mdinput-sources.mdjava-api.mdkey-value-ops.mdstate-management.mdstreaming-context.mdwindow-ops.md
tile.json

window-ops.mddocs/

Window Operations

Window operations in Spark Streaming allow you to apply transformations over a sliding window of data. These operations are essential for time-based aggregations, trend analysis, and processing data across multiple batches.

Capabilities

Basic Window Operations

Core windowing functionality for creating time-based data windows.

/**
 * Create windowed DStream with default slide duration
 * @param windowDuration - Width of the window (must be multiple of batch duration)
 * @returns DStream containing data from the specified window
 */
def window(windowDuration: Duration): DStream[T]

/**
 * Create windowed DStream with custom slide duration  
 * @param windowDuration - Width of the window (must be multiple of batch duration)
 * @param slideDuration - Sliding interval (must be multiple of batch duration)
 * @returns DStream containing data from the specified sliding window
 */
def window(windowDuration: Duration, slideDuration: Duration): DStream[T]

Usage Examples:

val lines = ssc.socketTextStream("localhost", 9999)

// 30-second window, sliding every batch (1 second)
val windowedLines = lines.window(Seconds(30))

// 30-second window, sliding every 10 seconds
val slidingWindow = lines.window(Seconds(30), Seconds(10))

Counting Operations

Window operations that count elements over time windows.

/**
 * Count elements over a sliding window
 * @param windowDuration - Width of the window
 * @param slideDuration - Sliding interval of the window
 * @returns DStream of Long values representing element counts
 */
def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]

/**
 * Count occurrences of each unique element over a sliding window
 * @param windowDuration - Width of the window
 * @param slideDuration - Sliding interval of the window
 * @param numPartitions - Number of partitions for result (optional)
 * @returns DStream of (element, count) pairs over the window
 */
def countByValueAndWindow(
  windowDuration: Duration,
  slideDuration: Duration, 
  numPartitions: Int = ssc.sc.defaultParallelism
): DStream[(T, Long)]

/**
 * Count occurrences with timeout for inactive elements
 * @param windowDuration - Width of the window
 * @param slideDuration - Sliding interval of the window
 * @param numPartitions - Number of partitions for result
 * @param timeout - Timeout duration for inactive elements
 * @returns DStream of (element, count) pairs with timeout handling
 */
def countByValueAndWindow(
  windowDuration: Duration,
  slideDuration: Duration,
  numPartitions: Int,
  timeout: Duration
): DStream[(T, Long)]

Reduction Operations

Window operations that reduce/aggregate data over time windows.

/**
 * Reduce elements over a sliding window using associative function
 * @param reduceFunc - Associative and commutative reduce function  
 * @param windowDuration - Width of the window
 * @param slideDuration - Sliding interval of the window
 * @returns DStream with reduced results over windows
 */
def reduceByWindow(
  reduceFunc: (T, T) => T,
  windowDuration: Duration, 
  slideDuration: Duration
): DStream[T]

/**
 * Efficient reduce over window with inverse function
 * @param reduceFunc - Associative reduce function
 * @param invReduceFunc - Inverse of the reduce function for removing old values
 * @param windowDuration - Width of the window
 * @param slideDuration - Sliding interval of the window
 * @returns DStream with reduced results using incremental computation
 */
def reduceByWindow(
  reduceFunc: (T, T) => T,
  invReduceFunc: (T, T) => T, 
  windowDuration: Duration,
  slideDuration: Duration
): DStream[T]

Usage Examples:

val numbers = ssc.socketTextStream("localhost", 9999).map(_.toInt)

// Count numbers in 1-minute windows every 10 seconds
val windowCounts = numbers.countByWindow(Minutes(1), Seconds(10))

// Sum numbers over windows
val windowSums = numbers.reduceByWindow(_ + _, Minutes(1), Seconds(10))

// Efficient windowed sum with inverse function
val efficientSums = numbers.reduceByWindow(
  _ + _,              // Add new values
  _ - _,              // Subtract old values  
  Minutes(1),         // 1-minute window
  Seconds(10)         // Slide every 10 seconds
)

Key-Value Window Operations

Specialized window operations for key-value pair DStreams.

/**
 * Group values by key over a sliding window
 * @param windowDuration - Width of the window
 * @param slideDuration - Sliding interval of the window (optional)
 * @returns DStream of (key, iterable of values) over windows
 */
def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])]
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Iterable[V])]

/**
 * Group by key with custom partitioning
 * @param windowDuration - Width of the window
 * @param slideDuration - Sliding interval of the window
 * @param numPartitions - Number of partitions for result
 * @returns DStream of (key, iterable of values) over windows
 */
def groupByKeyAndWindow(
  windowDuration: Duration,
  slideDuration: Duration, 
  numPartitions: Int
): DStream[(K, Iterable[V])]

/**
 * Reduce values by key over sliding window
 * @param reduceFunc - Associative function to combine values
 * @param windowDuration - Width of the window  
 * @param slideDuration - Sliding interval of the window (optional)
 * @returns DStream of (key, reduced value) over windows
 */
def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowDuration: Duration): DStream[(K, V)]
def reduceByKeyAndWindow(
  reduceFunc: (V, V) => V, 
  windowDuration: Duration,
  slideDuration: Duration
): DStream[(K, V)]

/**
 * Efficient reduce by key with inverse function for incremental computation
 * @param reduceFunc - Associative function to combine values
 * @param invReduceFunc - Inverse function to remove old values
 * @param windowDuration - Width of the window
 * @param slideDuration - Sliding interval of the window
 * @param numPartitions - Number of partitions for result (optional)
 * @param filterFunc - Function to filter results (optional)
 * @returns DStream of (key, reduced value) over windows
 */
def reduceByKeyAndWindow(
  reduceFunc: (V, V) => V,
  invReduceFunc: (V, V) => V,
  windowDuration: Duration, 
  slideDuration: Duration,
  numPartitions: Int = ssc.sc.defaultParallelism,
  filterFunc: ((K, V)) => Boolean = null
): DStream[(K, V)]

Usage Examples:

val wordPairs = lines.flatMap(_.split(" ")).map((_, 1))

// Word count over 30-second windows
val windowedWordCounts = wordPairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))

// Efficient windowed word count with inverse
val efficientWordCounts = wordPairs.reduceByKeyAndWindow(
  _ + _,                    // Add new counts
  _ - _,                    // Subtract old counts
  Seconds(30),              // 30-second window
  Seconds(10),              // Slide every 10 seconds
  2,                        // Use 2 partitions
  _._2 > 0                 // Filter out zero counts
)

// Group words by length over windows
val wordsByLength = lines
  .flatMap(_.split(" "))
  .map(word => (word.length, word))
  .groupByKeyAndWindow(Seconds(60), Seconds(20))

Window Operation Characteristics

Memory Requirements

Window operations require more memory as they maintain data across multiple batches:

  • Window Size Impact: Larger windows require more memory to store historical data
  • Slide Duration Impact: Smaller slide durations create more overlapping windows
  • Checkpointing: Window operations require checkpointing for fault tolerance

Performance Considerations

// Performance characteristics of different window operations

// Memory efficient - only stores aggregated results
val efficientCount = numbers.reduceByWindow(_ + _, _ - _, Minutes(5), Seconds(30))

// Memory intensive - stores all windowed data
val memoryIntensive = numbers.window(Minutes(5), Seconds(30)).reduce(_ + _)

// Optimal partitioning for windowed operations
val optimizedWindow = wordPairs.reduceByKeyAndWindow(
  _ + _, _ - _, 
  windowDuration = Minutes(5),
  slideDuration = Seconds(30),
  numPartitions = ssc.sparkContext.defaultParallelism * 2  // Increase parallelism
)

Checkpointing Requirements

Window operations must be checkpointed for fault tolerance:

// Window operations require checkpointing
ssc.checkpoint("hdfs://checkpoint-dir")

val windowedData = stream.reduceByWindow(_ + _, Minutes(10), Minutes(1))
// This will fail without checkpoint directory set

Window Duration Constraints

Window and slide durations must be multiples of the batch duration:

val ssc = new StreamingContext(conf, Seconds(5))  // 5-second batches

// Valid - multiples of batch duration
val validWindow = stream.window(Seconds(30), Seconds(10))   // 6x and 2x batch duration

// Invalid - not multiples of batch duration  
// val invalidWindow = stream.window(Seconds(7), Seconds(3))  // Would throw exception

Advanced Window Patterns

Custom Window Aggregations

Complex aggregations using transform with window operations:

val customAggregation = stream.window(Minutes(5), Minutes(1)).transform { rdd =>
  val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
  import sqlContext.implicits._
  
  rdd.toDF("value")
    .groupBy("value")
    .agg(
      count("*").as("count"),
      avg("value").as("average"),
      stddev("value").as("stddev")
    )
    .rdd
    .map(row => (row.getString(0), (row.getLong(1), row.getDouble(2), row.getDouble(3))))
}

Multi-Level Windows

Combining different window sizes for hierarchical analysis:

val stream = ssc.socketTextStream("localhost", 9999).map(_.toInt)

// Short-term trends (1 minute)
val shortTerm = stream.reduceByWindow(_ + _, Seconds(60), Seconds(10))

// Long-term trends (10 minutes)  
val longTerm = stream.reduceByWindow(_ + _, Minutes(10), Minutes(1))

// Combine for trend analysis
val trendAnalysis = shortTerm.transformWith(longTerm, (short: RDD[Int], long: RDD[Int]) => {
  val shortAvg = if (short.isEmpty()) 0.0 else short.collect().head.toDouble
  val longAvg = if (long.isEmpty()) 0.0 else long.collect().head.toDouble / 10.0
  
  ssc.sparkContext.parallelize(Seq((shortAvg, longAvg, shortAvg - longAvg)))
})

Sliding Window Joins

Joining data from different time windows:

val stream1 = ssc.socketTextStream("host1", 9999).map(line => (line.split(",")(0), line))
val stream2 = ssc.socketTextStream("host2", 9999).map(line => (line.split(",")(0), line))

// Join data within 30-second windows
val windowedJoin = stream1
  .window(Seconds(30), Seconds(10))
  .join(stream2.window(Seconds(30), Seconds(10)))

// Join with different window sizes
val asymmetricJoin = stream1
  .window(Seconds(60))           // 1-minute window for stream1
  .join(stream2.window(Seconds(30)))  // 30-second window for stream2

Types

// Window operation related types
case class Duration(milliseconds: Long) {
  def +(other: Duration): Duration
  def *(factor: Int): Duration
  def /(divisor: Int): Duration
}

case class Time(milliseconds: Long) {
  def floor(duration: Duration): Time
  def until(endTime: Time, interval: Duration): Seq[Time]
}

// Helper objects for common durations
object Seconds {
  def apply(seconds: Long): Duration = Duration(seconds * 1000)
}

object Minutes {
  def apply(minutes: Long): Duration = Duration(minutes * 60 * 1000)  
}

object Hours {
  def apply(hours: Long): Duration = Duration(hours * 60 * 60 * 1000)
}