0
# Windowing Operations
1
2
WindowedStream enables bounded computations on infinite streams by grouping elements into finite windows based on time or count. This is essential for aggregations and batch-style processing on streaming data.
3
4
## Capabilities
5
6
### Window Configuration
7
8
Configure window behavior for late data handling and triggering.
9
10
```scala { .api }
11
class WindowedStream[T, K, W <: Window] {
12
/**
13
* Set allowed lateness for late arriving elements
14
* @param lateness Maximum allowed lateness
15
* @return WindowedStream with lateness configuration
16
*/
17
def allowedLateness(lateness: Time): WindowedStream[T, K, W]
18
19
/**
20
* Send late data to a side output
21
* @param outputTag Tag for late data side output
22
* @return WindowedStream with late data handling
23
*/
24
def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W]
25
26
/**
27
* Set a custom trigger for window firing
28
* @param trigger Custom trigger implementation
29
* @return WindowedStream with custom trigger
30
*/
31
def trigger(trigger: Trigger[_ >: T, _ >: W]): WindowedStream[T, K, W]
32
33
/**
34
* Set an evictor for removing elements from windows
35
* @param evictor Custom evictor implementation
36
* @return WindowedStream with custom evictor
37
*/
38
def evictor(evictor: Evictor[_ >: T, _ >: W]): WindowedStream[T, K, W]
39
}
40
```
41
42
### Reduction Operations
43
44
Apply reduction functions to combine elements within windows.
45
46
```scala { .api }
47
class WindowedStream[T, K, W <: Window] {
48
/**
49
* Reduce elements in each window using a ReduceFunction
50
* @param function Reduce function to combine elements
51
* @return DataStream with reduced window results
52
*/
53
def reduce(function: ReduceFunction[T]): DataStream[T]
54
55
/**
56
* Reduce elements in each window using a function
57
* @param function Function to combine two elements
58
* @return DataStream with reduced window results
59
*/
60
def reduce(function: (T, T) => T): DataStream[T]
61
62
/**
63
* Reduce with pre-aggregation and window function
64
* @param preAggregator ReduceFunction for pre-aggregation
65
* @param function WindowFunction for final processing
66
* @return DataStream with window function results
67
*/
68
def reduce[R: TypeInformation](
69
preAggregator: ReduceFunction[T],
70
function: WindowFunction[T, R, K, W]
71
): DataStream[R]
72
73
/**
74
* Reduce with pre-aggregation and process window function
75
* @param preAggregator ReduceFunction for pre-aggregation
76
* @param function ProcessWindowFunction for final processing
77
* @return DataStream with process window function results
78
*/
79
def reduce[R: TypeInformation](
80
preAggregator: ReduceFunction[T],
81
function: ProcessWindowFunction[T, R, K, W]
82
): DataStream[R]
83
}
84
```
85
86
**Usage Examples:**
87
88
```scala
89
import org.apache.flink.streaming.api.scala._
90
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
91
import org.apache.flink.streaming.api.windowing.time.Time
92
93
case class SensorReading(sensorId: String, temperature: Double, timestamp: Long)
94
95
val readings = env.fromElements(
96
SensorReading("sensor1", 20.0, 1000L),
97
SensorReading("sensor1", 25.0, 2000L),
98
SensorReading("sensor1", 22.0, 3000L)
99
)
100
101
val keyedReadings = readings
102
.assignAscendingTimestamps(_.timestamp)
103
.keyBy(_.sensorId)
104
105
// Simple reduction - get maximum temperature per window
106
val maxTemps = keyedReadings
107
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
108
.reduce((r1, r2) => if (r1.temperature > r2.temperature) r1 else r2)
109
110
// Reduction with window function - add window info
111
val maxTempsWithWindow = keyedReadings
112
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
113
.reduce(
114
(r1: SensorReading, r2: SensorReading) => if (r1.temperature > r2.temperature) r1 else r2,
115
(key: String, window: TimeWindow, readings: Iterable[SensorReading], out: Collector[(String, Double, Long, Long)]) => {
116
val maxReading = readings.head
117
out.collect((key, maxReading.temperature, window.getStart, window.getEnd))
118
}
119
)
120
```
121
122
### Aggregation Operations
123
124
Apply aggregate functions for more complex computations within windows.
125
126
```scala { .api }
127
class WindowedStream[T, K, W <: Window] {
128
/**
129
* Apply an AggregateFunction to window elements
130
* @param aggregateFunction Function for incremental aggregation
131
* @return DataStream with aggregation results
132
*/
133
def aggregate[ACC: TypeInformation, R: TypeInformation](
134
aggregateFunction: AggregateFunction[T, ACC, R]
135
): DataStream[R]
136
137
/**
138
* Apply aggregation with window function
139
* @param aggregateFunction Function for incremental aggregation
140
* @param windowFunction Function for final window processing
141
* @return DataStream with window function results
142
*/
143
def aggregate[ACC: TypeInformation, R: TypeInformation](
144
aggregateFunction: AggregateFunction[T, ACC, R],
145
windowFunction: WindowFunction[R, R, K, W]
146
): DataStream[R]
147
148
/**
149
* Apply aggregation with process window function
150
* @param aggregateFunction Function for incremental aggregation
151
* @param windowFunction ProcessWindowFunction for final processing
152
* @return DataStream with process window function results
153
*/
154
def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation](
155
aggregateFunction: AggregateFunction[T, ACC, V],
156
windowFunction: ProcessWindowFunction[V, R, K, W]
157
): DataStream[R]
158
}
159
```
160
161
**Usage Examples:**
162
163
```scala
164
import org.apache.flink.api.common.functions.AggregateFunction
165
166
// Define a custom aggregate function for average temperature
167
class AverageAggregateFunction extends AggregateFunction[SensorReading, (Double, Int), Double] {
168
override def createAccumulator(): (Double, Int) = (0.0, 0)
169
170
override def add(value: SensorReading, accumulator: (Double, Int)): (Double, Int) =
171
(accumulator._1 + value.temperature, accumulator._2 + 1)
172
173
override def getResult(accumulator: (Double, Int)): Double =
174
accumulator._1 / accumulator._2
175
176
override def merge(a: (Double, Int), b: (Double, Int)): (Double, Int) =
177
(a._1 + b._1, a._2 + b._2)
178
}
179
180
// Apply aggregate function
181
val avgTemps = keyedReadings
182
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
183
.aggregate(new AverageAggregateFunction)
184
```
185
186
### Window Functions
187
188
Apply functions that operate on complete window contents.
189
190
```scala { .api }
191
class WindowedStream[T, K, W <: Window] {
192
/**
193
* Apply a WindowFunction to all elements in each window
194
* @param function WindowFunction implementation
195
* @return DataStream with window function results
196
*/
197
def apply[R: TypeInformation](function: WindowFunction[T, R, K, W]): DataStream[R]
198
199
/**
200
* Apply a function using closure syntax
201
* @param function Function with (key, window, elements, collector) parameters
202
* @return DataStream with function results
203
*/
204
def apply[R: TypeInformation](
205
function: (K, W, Iterable[T], Collector[R]) => Unit
206
): DataStream[R]
207
208
/**
209
* Apply a ProcessWindowFunction for advanced processing
210
* @param function ProcessWindowFunction implementation
211
* @return DataStream with process function results
212
*/
213
def process[R: TypeInformation](
214
function: ProcessWindowFunction[T, R, K, W]
215
): DataStream[R]
216
}
217
```
218
219
**Usage Examples:**
220
221
```scala
222
import org.apache.flink.streaming.api.scala.function.WindowFunction
223
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
224
import org.apache.flink.util.Collector
225
226
// Window function to collect all readings with window metadata
227
val allReadingsWithWindow = keyedReadings
228
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
229
.apply(new WindowFunction[SensorReading, (String, List[Double], Long, Long), String, TimeWindow] {
230
override def apply(
231
key: String,
232
window: TimeWindow,
233
input: Iterable[SensorReading],
234
out: Collector[(String, List[Double], Long, Long)]
235
): Unit = {
236
val temperatures = input.map(_.temperature).toList
237
out.collect((key, temperatures, window.getStart, window.getEnd))
238
}
239
})
240
241
// Using closure syntax
242
val readingCounts = keyedReadings
243
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
244
.apply { (key: String, window: TimeWindow, readings: Iterable[SensorReading], out: Collector[(String, Int)]) =>
245
out.collect((key, readings.size))
246
}
247
```
248
249
## Window Assigners
250
251
Window assigners determine how elements are grouped into windows.
252
253
```scala { .api }
254
// Tumbling time windows
255
object TumblingEventTimeWindows {
256
def of(size: Time): TumblingEventTimeWindows
257
def of(size: Time, offset: Time): TumblingEventTimeWindows
258
}
259
260
object TumblingProcessingTimeWindows {
261
def of(size: Time): TumblingProcessingTimeWindows
262
def of(size: Time, offset: Time): TumblingProcessingTimeWindows
263
}
264
265
// Sliding time windows
266
object SlidingEventTimeWindows {
267
def of(size: Time, slide: Time): SlidingEventTimeWindows
268
def of(size: Time, slide: Time, offset: Time): SlidingEventTimeWindows
269
}
270
271
object SlidingProcessingTimeWindows {
272
def of(size: Time, slide: Time): SlidingProcessingTimeWindows
273
def of(size: Time, slide: Time, offset: Time): SlidingProcessingTimeWindows
274
}
275
276
// Session windows
277
object EventTimeSessionWindows {
278
def withGap(sessionTimeout: Time): EventTimeSessionWindows
279
def withDynamicGap(sessionWindowTimeGapExtractor: SessionWindowTimeGapExtractor[Any]): EventTimeSessionWindows
280
}
281
282
object ProcessingTimeSessionWindows {
283
def withGap(sessionTimeout: Time): ProcessingTimeSessionWindows
284
def withDynamicGap(sessionWindowTimeGapExtractor: SessionWindowTimeGapExtractor[Any]): ProcessingTimeSessionWindows
285
}
286
287
// Global windows (for count-based operations)
288
object GlobalWindows {
289
def create(): GlobalWindows
290
}
291
```
292
293
**Usage Examples:**
294
295
```scala
296
import org.apache.flink.streaming.api.windowing.assigners._
297
import org.apache.flink.streaming.api.windowing.time.Time
298
299
// Tumbling windows - non-overlapping fixed-size windows
300
val tumblingWindow = keyedReadings
301
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
302
303
// Sliding windows - overlapping windows
304
val slidingWindow = keyedReadings
305
.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(2)))
306
307
// Session windows - dynamic windows based on inactivity gaps
308
val sessionWindow = keyedReadings
309
.window(EventTimeSessionWindows.withGap(Time.minutes(30)))
310
```
311
312
## Types
313
314
```scala { .api }
315
// Window types
316
abstract class Window {
317
def maxTimestamp(): Long
318
}
319
320
class TimeWindow(start: Long, end: Long) extends Window {
321
def getStart: Long
322
def getEnd: Long
323
def maxTimestamp(): Long = end - 1
324
}
325
326
class GlobalWindow extends Window {
327
def maxTimestamp(): Long = Long.MaxValue
328
}
329
330
// Aggregate function interface
331
trait AggregateFunction[IN, ACC, OUT] {
332
def createAccumulator(): ACC
333
def add(value: IN, accumulator: ACC): ACC
334
def getResult(accumulator: ACC): OUT
335
def merge(a: ACC, b: ACC): ACC
336
}
337
338
// Window function interfaces
339
trait WindowFunction[IN, OUT, KEY, W <: Window] {
340
def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]): Unit
341
}
342
343
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] {
344
def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit
345
def clear(context: Context): Unit = {}
346
347
abstract class Context {
348
def window: W
349
def currentProcessingTime: Long
350
def currentWatermark: Long
351
def windowState: KeyedStateStore
352
def globalState: KeyedStateStore
353
def output[X](outputTag: OutputTag[X], value: X): Unit
354
}
355
}
356
357
// Trigger interface for custom window firing
358
abstract class Trigger[T, W <: Window] {
359
def onElement(element: T, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult
360
def onProcessingTime(time: Long, window: W, ctx: TriggerContext): TriggerResult
361
def onEventTime(time: Long, window: W, ctx: TriggerContext): TriggerResult
362
def clear(window: W, ctx: TriggerContext): Unit
363
}
364
365
// Evictor interface for removing elements
366
trait Evictor[T, W <: Window] {
367
def evictBefore(elements: java.lang.Iterable[TimestampedValue[T]], size: Int, window: W, evictorContext: EvictorContext): Unit
368
def evictAfter(elements: java.lang.Iterable[TimestampedValue[T]], size: Int, window: W, evictorContext: EvictorContext): Unit
369
}
370
371
// Trigger results
372
sealed trait TriggerResult
373
object TriggerResult {
374
case object CONTINUE extends TriggerResult
375
case object FIRE extends TriggerResult
376
case object PURGE extends TriggerResult
377
case object FIRE_AND_PURGE extends TriggerResult
378
}
379
```