Window operations in Spark Streaming allow you to apply transformations over a sliding window of data. These operations are essential for time-based aggregations, trend analysis, and processing data across multiple batches.
Core windowing functionality for creating time-based data windows.
/**
* Create windowed DStream with default slide duration
* @param windowDuration - Width of the window (must be multiple of batch duration)
* @returns DStream containing data from the specified window
*/
def window(windowDuration: Duration): DStream[T]
/**
* Create windowed DStream with custom slide duration
* @param windowDuration - Width of the window (must be multiple of batch duration)
* @param slideDuration - Sliding interval (must be multiple of batch duration)
* @returns DStream containing data from the specified sliding window
*/
def window(windowDuration: Duration, slideDuration: Duration): DStream[T]Usage Examples:
val lines = ssc.socketTextStream("localhost", 9999)
// 30-second window, sliding every batch (1 second)
val windowedLines = lines.window(Seconds(30))
// 30-second window, sliding every 10 seconds
val slidingWindow = lines.window(Seconds(30), Seconds(10))Window operations that count elements over time windows.
/**
* Count elements over a sliding window
* @param windowDuration - Width of the window
* @param slideDuration - Sliding interval of the window
* @returns DStream of Long values representing element counts
*/
def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]
/**
* Count occurrences of each unique element over a sliding window
* @param windowDuration - Width of the window
* @param slideDuration - Sliding interval of the window
* @param numPartitions - Number of partitions for result (optional)
* @returns DStream of (element, count) pairs over the window
*/
def countByValueAndWindow(
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int = ssc.sc.defaultParallelism
): DStream[(T, Long)]
/**
* Count occurrences with timeout for inactive elements
* @param windowDuration - Width of the window
* @param slideDuration - Sliding interval of the window
* @param numPartitions - Number of partitions for result
* @param timeout - Timeout duration for inactive elements
* @returns DStream of (element, count) pairs with timeout handling
*/
def countByValueAndWindow(
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int,
timeout: Duration
): DStream[(T, Long)]Window operations that reduce/aggregate data over time windows.
/**
* Reduce elements over a sliding window using associative function
* @param reduceFunc - Associative and commutative reduce function
* @param windowDuration - Width of the window
* @param slideDuration - Sliding interval of the window
* @returns DStream with reduced results over windows
*/
def reduceByWindow(
reduceFunc: (T, T) => T,
windowDuration: Duration,
slideDuration: Duration
): DStream[T]
/**
* Efficient reduce over window with inverse function
* @param reduceFunc - Associative reduce function
* @param invReduceFunc - Inverse of the reduce function for removing old values
* @param windowDuration - Width of the window
* @param slideDuration - Sliding interval of the window
* @returns DStream with reduced results using incremental computation
*/
def reduceByWindow(
reduceFunc: (T, T) => T,
invReduceFunc: (T, T) => T,
windowDuration: Duration,
slideDuration: Duration
): DStream[T]Usage Examples:
val numbers = ssc.socketTextStream("localhost", 9999).map(_.toInt)
// Count numbers in 1-minute windows every 10 seconds
val windowCounts = numbers.countByWindow(Minutes(1), Seconds(10))
// Sum numbers over windows
val windowSums = numbers.reduceByWindow(_ + _, Minutes(1), Seconds(10))
// Efficient windowed sum with inverse function
val efficientSums = numbers.reduceByWindow(
_ + _, // Add new values
_ - _, // Subtract old values
Minutes(1), // 1-minute window
Seconds(10) // Slide every 10 seconds
)Specialized window operations for key-value pair DStreams.
/**
* Group values by key over a sliding window
* @param windowDuration - Width of the window
* @param slideDuration - Sliding interval of the window (optional)
* @returns DStream of (key, iterable of values) over windows
*/
def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])]
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Iterable[V])]
/**
* Group by key with custom partitioning
* @param windowDuration - Width of the window
* @param slideDuration - Sliding interval of the window
* @param numPartitions - Number of partitions for result
* @returns DStream of (key, iterable of values) over windows
*/
def groupByKeyAndWindow(
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int
): DStream[(K, Iterable[V])]
/**
* Reduce values by key over sliding window
* @param reduceFunc - Associative function to combine values
* @param windowDuration - Width of the window
* @param slideDuration - Sliding interval of the window (optional)
* @returns DStream of (key, reduced value) over windows
*/
def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowDuration: Duration): DStream[(K, V)]
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration
): DStream[(K, V)]
/**
* Efficient reduce by key with inverse function for incremental computation
* @param reduceFunc - Associative function to combine values
* @param invReduceFunc - Inverse function to remove old values
* @param windowDuration - Width of the window
* @param slideDuration - Sliding interval of the window
* @param numPartitions - Number of partitions for result (optional)
* @param filterFunc - Function to filter results (optional)
* @returns DStream of (key, reduced value) over windows
*/
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int = ssc.sc.defaultParallelism,
filterFunc: ((K, V)) => Boolean = null
): DStream[(K, V)]Usage Examples:
val wordPairs = lines.flatMap(_.split(" ")).map((_, 1))
// Word count over 30-second windows
val windowedWordCounts = wordPairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
// Efficient windowed word count with inverse
val efficientWordCounts = wordPairs.reduceByKeyAndWindow(
_ + _, // Add new counts
_ - _, // Subtract old counts
Seconds(30), // 30-second window
Seconds(10), // Slide every 10 seconds
2, // Use 2 partitions
_._2 > 0 // Filter out zero counts
)
// Group words by length over windows
val wordsByLength = lines
.flatMap(_.split(" "))
.map(word => (word.length, word))
.groupByKeyAndWindow(Seconds(60), Seconds(20))Window operations require more memory as they maintain data across multiple batches:
// Performance characteristics of different window operations
// Memory efficient - only stores aggregated results
val efficientCount = numbers.reduceByWindow(_ + _, _ - _, Minutes(5), Seconds(30))
// Memory intensive - stores all windowed data
val memoryIntensive = numbers.window(Minutes(5), Seconds(30)).reduce(_ + _)
// Optimal partitioning for windowed operations
val optimizedWindow = wordPairs.reduceByKeyAndWindow(
_ + _, _ - _,
windowDuration = Minutes(5),
slideDuration = Seconds(30),
numPartitions = ssc.sparkContext.defaultParallelism * 2 // Increase parallelism
)Window operations must be checkpointed for fault tolerance:
// Window operations require checkpointing
ssc.checkpoint("hdfs://checkpoint-dir")
val windowedData = stream.reduceByWindow(_ + _, Minutes(10), Minutes(1))
// This will fail without checkpoint directory setWindow and slide durations must be multiples of the batch duration:
val ssc = new StreamingContext(conf, Seconds(5)) // 5-second batches
// Valid - multiples of batch duration
val validWindow = stream.window(Seconds(30), Seconds(10)) // 6x and 2x batch duration
// Invalid - not multiples of batch duration
// val invalidWindow = stream.window(Seconds(7), Seconds(3)) // Would throw exceptionComplex aggregations using transform with window operations:
val customAggregation = stream.window(Minutes(5), Minutes(1)).transform { rdd =>
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
import sqlContext.implicits._
rdd.toDF("value")
.groupBy("value")
.agg(
count("*").as("count"),
avg("value").as("average"),
stddev("value").as("stddev")
)
.rdd
.map(row => (row.getString(0), (row.getLong(1), row.getDouble(2), row.getDouble(3))))
}Combining different window sizes for hierarchical analysis:
val stream = ssc.socketTextStream("localhost", 9999).map(_.toInt)
// Short-term trends (1 minute)
val shortTerm = stream.reduceByWindow(_ + _, Seconds(60), Seconds(10))
// Long-term trends (10 minutes)
val longTerm = stream.reduceByWindow(_ + _, Minutes(10), Minutes(1))
// Combine for trend analysis
val trendAnalysis = shortTerm.transformWith(longTerm, (short: RDD[Int], long: RDD[Int]) => {
val shortAvg = if (short.isEmpty()) 0.0 else short.collect().head.toDouble
val longAvg = if (long.isEmpty()) 0.0 else long.collect().head.toDouble / 10.0
ssc.sparkContext.parallelize(Seq((shortAvg, longAvg, shortAvg - longAvg)))
})Joining data from different time windows:
val stream1 = ssc.socketTextStream("host1", 9999).map(line => (line.split(",")(0), line))
val stream2 = ssc.socketTextStream("host2", 9999).map(line => (line.split(",")(0), line))
// Join data within 30-second windows
val windowedJoin = stream1
.window(Seconds(30), Seconds(10))
.join(stream2.window(Seconds(30), Seconds(10)))
// Join with different window sizes
val asymmetricJoin = stream1
.window(Seconds(60)) // 1-minute window for stream1
.join(stream2.window(Seconds(30))) // 30-second window for stream2// Window operation related types
case class Duration(milliseconds: Long) {
def +(other: Duration): Duration
def *(factor: Int): Duration
def /(divisor: Int): Duration
}
case class Time(milliseconds: Long) {
def floor(duration: Duration): Time
def until(endTime: Time, interval: Duration): Seq[Time]
}
// Helper objects for common durations
object Seconds {
def apply(seconds: Long): Duration = Duration(seconds * 1000)
}
object Minutes {
def apply(minutes: Long): Duration = Duration(minutes * 60 * 1000)
}
object Hours {
def apply(hours: Long): Duration = Duration(hours * 60 * 60 * 1000)
}