0
# Window Operations
1
2
Window operations in Spark Streaming allow you to apply transformations over a sliding window of data. These operations are essential for time-based aggregations, trend analysis, and processing data across multiple batches.
3
4
## Capabilities
5
6
### Basic Window Operations
7
8
Core windowing functionality for creating time-based data windows.
9
10
```scala { .api }
11
/**
12
* Create windowed DStream with default slide duration
13
* @param windowDuration - Width of the window (must be multiple of batch duration)
14
* @returns DStream containing data from the specified window
15
*/
16
def window(windowDuration: Duration): DStream[T]
17
18
/**
19
* Create windowed DStream with custom slide duration
20
* @param windowDuration - Width of the window (must be multiple of batch duration)
21
* @param slideDuration - Sliding interval (must be multiple of batch duration)
22
* @returns DStream containing data from the specified sliding window
23
*/
24
def window(windowDuration: Duration, slideDuration: Duration): DStream[T]
25
```
26
27
**Usage Examples:**
28
29
```scala
30
val lines = ssc.socketTextStream("localhost", 9999)
31
32
// 30-second window, sliding every batch (1 second)
33
val windowedLines = lines.window(Seconds(30))
34
35
// 30-second window, sliding every 10 seconds
36
val slidingWindow = lines.window(Seconds(30), Seconds(10))
37
```
38
39
### Counting Operations
40
41
Window operations that count elements over time windows.
42
43
```scala { .api }
44
/**
45
* Count elements over a sliding window
46
* @param windowDuration - Width of the window
47
* @param slideDuration - Sliding interval of the window
48
* @returns DStream of Long values representing element counts
49
*/
50
def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long]
51
52
/**
53
* Count occurrences of each unique element over a sliding window
54
* @param windowDuration - Width of the window
55
* @param slideDuration - Sliding interval of the window
56
* @param numPartitions - Number of partitions for result (optional)
57
* @returns DStream of (element, count) pairs over the window
58
*/
59
def countByValueAndWindow(
60
windowDuration: Duration,
61
slideDuration: Duration,
62
numPartitions: Int = ssc.sc.defaultParallelism
63
): DStream[(T, Long)]
64
65
/**
66
* Count occurrences with timeout for inactive elements
67
* @param windowDuration - Width of the window
68
* @param slideDuration - Sliding interval of the window
69
* @param numPartitions - Number of partitions for result
70
* @param timeout - Timeout duration for inactive elements
71
* @returns DStream of (element, count) pairs with timeout handling
72
*/
73
def countByValueAndWindow(
74
windowDuration: Duration,
75
slideDuration: Duration,
76
numPartitions: Int,
77
timeout: Duration
78
): DStream[(T, Long)]
79
```
80
81
### Reduction Operations
82
83
Window operations that reduce/aggregate data over time windows.
84
85
```scala { .api }
86
/**
87
* Reduce elements over a sliding window using associative function
88
* @param reduceFunc - Associative and commutative reduce function
89
* @param windowDuration - Width of the window
90
* @param slideDuration - Sliding interval of the window
91
* @returns DStream with reduced results over windows
92
*/
93
def reduceByWindow(
94
reduceFunc: (T, T) => T,
95
windowDuration: Duration,
96
slideDuration: Duration
97
): DStream[T]
98
99
/**
100
* Efficient reduce over window with inverse function
101
* @param reduceFunc - Associative reduce function
102
* @param invReduceFunc - Inverse of the reduce function for removing old values
103
* @param windowDuration - Width of the window
104
* @param slideDuration - Sliding interval of the window
105
* @returns DStream with reduced results using incremental computation
106
*/
107
def reduceByWindow(
108
reduceFunc: (T, T) => T,
109
invReduceFunc: (T, T) => T,
110
windowDuration: Duration,
111
slideDuration: Duration
112
): DStream[T]
113
```
114
115
**Usage Examples:**
116
117
```scala
118
val numbers = ssc.socketTextStream("localhost", 9999).map(_.toInt)
119
120
// Count numbers in 1-minute windows every 10 seconds
121
val windowCounts = numbers.countByWindow(Minutes(1), Seconds(10))
122
123
// Sum numbers over windows
124
val windowSums = numbers.reduceByWindow(_ + _, Minutes(1), Seconds(10))
125
126
// Efficient windowed sum with inverse function
127
val efficientSums = numbers.reduceByWindow(
128
_ + _, // Add new values
129
_ - _, // Subtract old values
130
Minutes(1), // 1-minute window
131
Seconds(10) // Slide every 10 seconds
132
)
133
```
134
135
### Key-Value Window Operations
136
137
Specialized window operations for key-value pair DStreams.
138
139
```scala { .api }
140
/**
141
* Group values by key over a sliding window
142
* @param windowDuration - Width of the window
143
* @param slideDuration - Sliding interval of the window (optional)
144
* @returns DStream of (key, iterable of values) over windows
145
*/
146
def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])]
147
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Iterable[V])]
148
149
/**
150
* Group by key with custom partitioning
151
* @param windowDuration - Width of the window
152
* @param slideDuration - Sliding interval of the window
153
* @param numPartitions - Number of partitions for result
154
* @returns DStream of (key, iterable of values) over windows
155
*/
156
def groupByKeyAndWindow(
157
windowDuration: Duration,
158
slideDuration: Duration,
159
numPartitions: Int
160
): DStream[(K, Iterable[V])]
161
162
/**
163
* Reduce values by key over sliding window
164
* @param reduceFunc - Associative function to combine values
165
* @param windowDuration - Width of the window
166
* @param slideDuration - Sliding interval of the window (optional)
167
* @returns DStream of (key, reduced value) over windows
168
*/
169
def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowDuration: Duration): DStream[(K, V)]
170
def reduceByKeyAndWindow(
171
reduceFunc: (V, V) => V,
172
windowDuration: Duration,
173
slideDuration: Duration
174
): DStream[(K, V)]
175
176
/**
177
* Efficient reduce by key with inverse function for incremental computation
178
* @param reduceFunc - Associative function to combine values
179
* @param invReduceFunc - Inverse function to remove old values
180
* @param windowDuration - Width of the window
181
* @param slideDuration - Sliding interval of the window
182
* @param numPartitions - Number of partitions for result (optional)
183
* @param filterFunc - Function to filter results (optional)
184
* @returns DStream of (key, reduced value) over windows
185
*/
186
def reduceByKeyAndWindow(
187
reduceFunc: (V, V) => V,
188
invReduceFunc: (V, V) => V,
189
windowDuration: Duration,
190
slideDuration: Duration,
191
numPartitions: Int = ssc.sc.defaultParallelism,
192
filterFunc: ((K, V)) => Boolean = null
193
): DStream[(K, V)]
194
```
195
196
**Usage Examples:**
197
198
```scala
199
val wordPairs = lines.flatMap(_.split(" ")).map((_, 1))
200
201
// Word count over 30-second windows
202
val windowedWordCounts = wordPairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
203
204
// Efficient windowed word count with inverse
205
val efficientWordCounts = wordPairs.reduceByKeyAndWindow(
206
_ + _, // Add new counts
207
_ - _, // Subtract old counts
208
Seconds(30), // 30-second window
209
Seconds(10), // Slide every 10 seconds
210
2, // Use 2 partitions
211
_._2 > 0 // Filter out zero counts
212
)
213
214
// Group words by length over windows
215
val wordsByLength = lines
216
.flatMap(_.split(" "))
217
.map(word => (word.length, word))
218
.groupByKeyAndWindow(Seconds(60), Seconds(20))
219
```
220
221
## Window Operation Characteristics
222
223
### Memory Requirements
224
225
Window operations require more memory as they maintain data across multiple batches:
226
227
- **Window Size Impact**: Larger windows require more memory to store historical data
228
- **Slide Duration Impact**: Smaller slide durations create more overlapping windows
229
- **Checkpointing**: Window operations require checkpointing for fault tolerance
230
231
### Performance Considerations
232
233
```scala { .api }
234
// Performance characteristics of different window operations
235
236
// Memory efficient - only stores aggregated results
237
val efficientCount = numbers.reduceByWindow(_ + _, _ - _, Minutes(5), Seconds(30))
238
239
// Memory intensive - stores all windowed data
240
val memoryIntensive = numbers.window(Minutes(5), Seconds(30)).reduce(_ + _)
241
242
// Optimal partitioning for windowed operations
243
val optimizedWindow = wordPairs.reduceByKeyAndWindow(
244
_ + _, _ - _,
245
windowDuration = Minutes(5),
246
slideDuration = Seconds(30),
247
numPartitions = ssc.sparkContext.defaultParallelism * 2 // Increase parallelism
248
)
249
```
250
251
### Checkpointing Requirements
252
253
Window operations must be checkpointed for fault tolerance:
254
255
```scala
256
// Window operations require checkpointing
257
ssc.checkpoint("hdfs://checkpoint-dir")
258
259
val windowedData = stream.reduceByWindow(_ + _, Minutes(10), Minutes(1))
260
// This will fail without checkpoint directory set
261
```
262
263
### Window Duration Constraints
264
265
Window and slide durations must be multiples of the batch duration:
266
267
```scala
268
val ssc = new StreamingContext(conf, Seconds(5)) // 5-second batches
269
270
// Valid - multiples of batch duration
271
val validWindow = stream.window(Seconds(30), Seconds(10)) // 6x and 2x batch duration
272
273
// Invalid - not multiples of batch duration
274
// val invalidWindow = stream.window(Seconds(7), Seconds(3)) // Would throw exception
275
```
276
277
## Advanced Window Patterns
278
279
### Custom Window Aggregations
280
281
Complex aggregations using transform with window operations:
282
283
```scala
284
val customAggregation = stream.window(Minutes(5), Minutes(1)).transform { rdd =>
285
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
286
import sqlContext.implicits._
287
288
rdd.toDF("value")
289
.groupBy("value")
290
.agg(
291
count("*").as("count"),
292
avg("value").as("average"),
293
stddev("value").as("stddev")
294
)
295
.rdd
296
.map(row => (row.getString(0), (row.getLong(1), row.getDouble(2), row.getDouble(3))))
297
}
298
```
299
300
### Multi-Level Windows
301
302
Combining different window sizes for hierarchical analysis:
303
304
```scala
305
val stream = ssc.socketTextStream("localhost", 9999).map(_.toInt)
306
307
// Short-term trends (1 minute)
308
val shortTerm = stream.reduceByWindow(_ + _, Seconds(60), Seconds(10))
309
310
// Long-term trends (10 minutes)
311
val longTerm = stream.reduceByWindow(_ + _, Minutes(10), Minutes(1))
312
313
// Combine for trend analysis
314
val trendAnalysis = shortTerm.transformWith(longTerm, (short: RDD[Int], long: RDD[Int]) => {
315
val shortAvg = if (short.isEmpty()) 0.0 else short.collect().head.toDouble
316
val longAvg = if (long.isEmpty()) 0.0 else long.collect().head.toDouble / 10.0
317
318
ssc.sparkContext.parallelize(Seq((shortAvg, longAvg, shortAvg - longAvg)))
319
})
320
```
321
322
### Sliding Window Joins
323
324
Joining data from different time windows:
325
326
```scala
327
val stream1 = ssc.socketTextStream("host1", 9999).map(line => (line.split(",")(0), line))
328
val stream2 = ssc.socketTextStream("host2", 9999).map(line => (line.split(",")(0), line))
329
330
// Join data within 30-second windows
331
val windowedJoin = stream1
332
.window(Seconds(30), Seconds(10))
333
.join(stream2.window(Seconds(30), Seconds(10)))
334
335
// Join with different window sizes
336
val asymmetricJoin = stream1
337
.window(Seconds(60)) // 1-minute window for stream1
338
.join(stream2.window(Seconds(30))) // 30-second window for stream2
339
```
340
341
## Types
342
343
```scala { .api }
344
// Window operation related types
345
case class Duration(milliseconds: Long) {
346
def +(other: Duration): Duration
347
def *(factor: Int): Duration
348
def /(divisor: Int): Duration
349
}
350
351
case class Time(milliseconds: Long) {
352
def floor(duration: Duration): Time
353
def until(endTime: Time, interval: Duration): Seq[Time]
354
}
355
356
// Helper objects for common durations
357
object Seconds {
358
def apply(seconds: Long): Duration = Duration(seconds * 1000)
359
}
360
361
object Minutes {
362
def apply(minutes: Long): Duration = Duration(minutes * 60 * 1000)
363
}
364
365
object Hours {
366
def apply(hours: Long): Duration = Duration(hours * 60 * 60 * 1000)
367
}
368
```