0
# Window Functions
1
2
Specialized functions for processing windowed data with access to window metadata, state, and complete window contents. Essential for complex windowed computations and analytics.
3
4
## Capabilities
5
6
### WindowFunction
7
8
Basic window function interface for processing all elements in a window.
9
10
```scala { .api }
11
trait WindowFunction[IN, OUT, KEY, W <: Window] {
12
/**
13
* Process all elements in a window
14
* @param key The key of the window
15
* @param window The window metadata
16
* @param input All elements in the window
17
* @param out Collector for emitting results
18
*/
19
def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]): Unit
20
}
21
```
22
23
### ProcessWindowFunction
24
25
Advanced window function with access to context and state.
26
27
```scala { .api }
28
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] {
29
/**
30
* Process all elements in a window with access to context
31
* @param key The key of the window
32
* @param context Context providing window info and state access
33
* @param elements All elements in the window
34
* @param out Collector for emitting results
35
*/
36
def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit
37
38
/**
39
* Clear any per-window state (optional override)
40
* @param context Context for accessing state
41
*/
42
def clear(context: Context): Unit = {}
43
44
abstract class Context {
45
def window: W
46
def currentProcessingTime: Long
47
def currentWatermark: Long
48
def windowState: KeyedStateStore
49
def globalState: KeyedStateStore
50
def output[X](outputTag: OutputTag[X], value: X): Unit
51
}
52
}
53
```
54
55
**Usage Examples:**
56
57
```scala
58
import org.apache.flink.streaming.api.scala._
59
import org.apache.flink.streaming.api.scala.function.{WindowFunction, ProcessWindowFunction}
60
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
61
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
62
import org.apache.flink.streaming.api.windowing.time.Time
63
import org.apache.flink.util.Collector
64
65
case class SensorReading(sensorId: String, temperature: Double, timestamp: Long)
66
case class WindowStats(sensorId: String, windowStart: Long, windowEnd: Long,
67
count: Int, avgTemp: Double, minTemp: Double, maxTemp: Double)
68
69
// WindowFunction example - calculate window statistics
70
class SensorStatsWindowFunction extends WindowFunction[SensorReading, WindowStats, String, TimeWindow] {
71
override def apply(
72
key: String,
73
window: TimeWindow,
74
input: Iterable[SensorReading],
75
out: Collector[WindowStats]
76
): Unit = {
77
val readings = input.toList
78
val count = readings.size
79
val temperatures = readings.map(_.temperature)
80
val avgTemp = temperatures.sum / count
81
val minTemp = temperatures.min
82
val maxTemp = temperatures.max
83
84
out.collect(WindowStats(
85
key, window.getStart, window.getEnd,
86
count, avgTemp, minTemp, maxTemp
87
))
88
}
89
}
90
91
// ProcessWindowFunction example - with state and side outputs
92
class AdvancedSensorStatsFunction extends ProcessWindowFunction[SensorReading, WindowStats, String, TimeWindow] {
93
94
override def process(
95
key: String,
96
context: Context,
97
elements: Iterable[SensorReading],
98
out: Collector[WindowStats]
99
): Unit = {
100
val readings = elements.toList
101
val count = readings.size
102
val temperatures = readings.map(_.temperature)
103
val avgTemp = temperatures.sum / count
104
val minTemp = temperatures.min
105
val maxTemp = temperatures.max
106
107
// Emit main result
108
out.collect(WindowStats(
109
key, context.window.getStart, context.window.getEnd,
110
count, avgTemp, minTemp, maxTemp
111
))
112
113
// Emit to side output if temperature is too high
114
if (maxTemp > 80.0) {
115
context.output(
116
OutputTag[String]("high-temp-alerts"),
117
s"Sensor $key exceeded 80°C in window ${context.window.getStart}-${context.window.getEnd}"
118
)
119
}
120
121
// Update global state (count of processed windows)
122
val globalCounter = context.globalState.getState(
123
new ValueStateDescriptor[Long]("window-count", classOf[Long])
124
)
125
val currentCount = Option(globalCounter.value()).getOrElse(0L)
126
globalCounter.update(currentCount + 1)
127
}
128
}
129
130
// Apply window functions
131
val readings = env.fromElements(
132
SensorReading("sensor1", 20.0, 1000L),
133
SensorReading("sensor1", 25.0, 2000L),
134
SensorReading("sensor1", 30.0, 3000L)
135
).assignAscendingTimestamps(_.timestamp)
136
137
val keyedReadings = readings.keyBy(_.sensorId)
138
139
// Using WindowFunction
140
val windowStats = keyedReadings
141
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
142
.apply(new SensorStatsWindowFunction)
143
144
// Using ProcessWindowFunction
145
val advancedStats = keyedReadings
146
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
147
.process(new AdvancedSensorStatsFunction)
148
149
// Get side output
150
val highTempAlerts = advancedStats.getSideOutput(OutputTag[String]("high-temp-alerts"))
151
```
152
153
### AllWindowFunction
154
155
Window function for non-keyed streams (all-window operations).
156
157
```scala { .api }
158
trait AllWindowFunction[IN, OUT, W <: Window] {
159
/**
160
* Process all elements in a window (non-keyed)
161
* @param window The window metadata
162
* @param input All elements in the window
163
* @param out Collector for emitting results
164
*/
165
def apply(window: W, input: Iterable[IN], out: Collector[OUT]): Unit
166
}
167
```
168
169
### ProcessAllWindowFunction
170
171
Advanced all-window function with context access.
172
173
```scala { .api }
174
abstract class ProcessAllWindowFunction[IN, OUT, W <: Window] {
175
/**
176
* Process all elements in a window with context (non-keyed)
177
* @param context Context providing window info and state access
178
* @param elements All elements in the window
179
* @param out Collector for emitting results
180
*/
181
def process(context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit
182
183
/**
184
* Clear any per-window state (optional override)
185
* @param context Context for accessing state
186
*/
187
def clear(context: Context): Unit = {}
188
189
abstract class Context {
190
def window: W
191
def currentProcessingTime: Long
192
def currentWatermark: Long
193
def windowState: KeyedStateStore
194
def globalState: KeyedStateStore
195
def output[X](outputTag: OutputTag[X], value: X): Unit
196
}
197
}
198
```
199
200
**Usage Examples:**
201
202
```scala
203
import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, ProcessAllWindowFunction}
204
205
case class GlobalStats(windowStart: Long, windowEnd: Long, totalCount: Int, avgValue: Double)
206
207
// AllWindowFunction example - global statistics
208
class GlobalStatsFunction extends AllWindowFunction[SensorReading, GlobalStats, TimeWindow] {
209
override def apply(
210
window: TimeWindow,
211
input: Iterable[SensorReading],
212
out: Collector[GlobalStats]
213
): Unit = {
214
val readings = input.toList
215
val count = readings.size
216
val avgTemp = readings.map(_.temperature).sum / count
217
218
out.collect(GlobalStats(window.getStart, window.getEnd, count, avgTemp))
219
}
220
}
221
222
// ProcessAllWindowFunction example - with side outputs
223
class AdvancedGlobalStatsFunction extends ProcessAllWindowFunction[SensorReading, GlobalStats, TimeWindow] {
224
override def process(
225
context: Context,
226
elements: Iterable[SensorReading],
227
out: Collector[GlobalStats]
228
): Unit = {
229
val readings = elements.toList
230
val count = readings.size
231
val avgTemp = readings.map(_.temperature).sum / count
232
233
out.collect(GlobalStats(context.window.getStart, context.window.getEnd, count, avgTemp))
234
235
// Side output for anomalies
236
if (avgTemp > 50.0) {
237
context.output(
238
OutputTag[String]("global-anomalies"),
239
s"Global average temperature ${avgTemp}°C is above threshold"
240
)
241
}
242
}
243
}
244
245
// Apply all-window functions
246
val globalStats = readings
247
.windowAll(TumblingEventTimeWindows.of(Time.minutes(1)))
248
.apply(new GlobalStatsFunction)
249
250
val advancedGlobalStats = readings
251
.windowAll(TumblingEventTimeWindows.of(Time.minutes(1)))
252
.process(new AdvancedGlobalStatsFunction)
253
```
254
255
### Rich Window Functions
256
257
Rich versions of window functions with lifecycle methods and runtime context access.
258
259
```scala { .api }
260
abstract class RichWindowFunction[IN, OUT, KEY, W <: Window]
261
extends WindowFunction[IN, OUT, KEY, W] with RichFunction {
262
263
override def open(parameters: Configuration): Unit = {}
264
override def close(): Unit = {}
265
def getRuntimeContext: RuntimeContext
266
def setRuntimeContext(t: RuntimeContext): Unit
267
}
268
269
abstract class RichProcessWindowFunction[IN, OUT, KEY, W <: Window]
270
extends ProcessWindowFunction[IN, OUT, KEY, W] with RichFunction {
271
272
override def open(parameters: Configuration): Unit = {}
273
override def close(): Unit = {}
274
def getRuntimeContext: RuntimeContext
275
def setRuntimeContext(t: RuntimeContext): Unit
276
}
277
278
abstract class RichAllWindowFunction[IN, OUT, W <: Window]
279
extends AllWindowFunction[IN, OUT, W] with RichFunction {
280
281
override def open(parameters: Configuration): Unit = {}
282
override def close(): Unit = {}
283
def getRuntimeContext: RuntimeContext
284
def setRuntimeContext(t: RuntimeContext): Unit
285
}
286
287
abstract class RichProcessAllWindowFunction[IN, OUT, W <: Window]
288
extends ProcessAllWindowFunction[IN, OUT, W] with RichFunction {
289
290
override def open(parameters: Configuration): Unit = {}
291
override def close(): Unit = {}
292
def getRuntimeContext: RuntimeContext
293
def setRuntimeContext(t: RuntimeContext): Unit
294
}
295
```
296
297
**Usage Examples:**
298
299
```scala
300
import org.apache.flink.configuration.Configuration
301
302
class MetricsWindowFunction extends RichWindowFunction[SensorReading, WindowStats, String, TimeWindow] {
303
private var processedWindows: Counter = _
304
305
override def open(parameters: Configuration): Unit = {
306
processedWindows = getRuntimeContext
307
.getMetricGroup
308
.counter("processed-windows")
309
}
310
311
override def apply(
312
key: String,
313
window: TimeWindow,
314
input: Iterable[SensorReading],
315
out: Collector[WindowStats]
316
): Unit = {
317
// Increment metric
318
processedWindows.inc()
319
320
// Process window as usual
321
val readings = input.toList
322
val count = readings.size
323
val avgTemp = readings.map(_.temperature).sum / count
324
325
out.collect(WindowStats(key, window.getStart, window.getEnd, count, avgTemp, 0.0, 0.0))
326
}
327
}
328
```
329
330
## Types
331
332
```scala { .api }
333
// Window types
334
abstract class Window {
335
def maxTimestamp(): Long
336
}
337
338
class TimeWindow(start: Long, end: Long) extends Window {
339
def getStart: Long = start
340
def getEnd: Long = end
341
def maxTimestamp(): Long = end - 1
342
}
343
344
class GlobalWindow extends Window {
345
def maxTimestamp(): Long = Long.MaxValue
346
}
347
348
// State store for window functions
349
trait KeyedStateStore {
350
def getState[T](stateDescriptor: StateDescriptor[T, _]): State
351
def getListState[T](stateDescriptor: ListStateDescriptor[T]): ListState[T]
352
def getReducingState[T](stateDescriptor: ReducingStateDescriptor[T]): ReducingState[T]
353
def getAggregatingState[IN, ACC, OUT](stateDescriptor: AggregatingStateDescriptor[IN, ACC, OUT]): AggregatingState[IN, OUT]
354
def getMapState[UK, UV](stateDescriptor: MapStateDescriptor[UK, UV]): MapState[UK, UV]
355
}
356
357
// State descriptors
358
class ValueStateDescriptor[T](name: String, typeClass: Class[T])
359
class ListStateDescriptor[T](name: String, typeClass: Class[T])
360
class ReducingStateDescriptor[T](name: String, reduceFunction: ReduceFunction[T], typeClass: Class[T])
361
class MapStateDescriptor[K, V](name: String, keyClass: Class[K], valueClass: Class[V])
362
363
// Collector interface
364
trait Collector[T] {
365
def collect(record: T): Unit
366
def close(): Unit
367
}
368
369
// Output tag for side outputs
370
case class OutputTag[T: TypeInformation](id: String) {
371
def getTypeInfo: TypeInformation[T]
372
}
373
374
// Rich function base
375
trait RichFunction {
376
def open(parameters: Configuration): Unit
377
def close(): Unit
378
def getRuntimeContext: RuntimeContext
379
def setRuntimeContext(t: RuntimeContext): Unit
380
}
381
382
// Runtime context
383
trait RuntimeContext {
384
def getTaskName: String
385
def getMetricGroup: MetricGroup
386
def getNumberOfParallelSubtasks: Int
387
def getIndexOfThisSubtask: Int
388
}
389
390
// Metrics
391
trait Counter {
392
def inc(): Unit
393
def inc(n: Long): Unit
394
def getCount: Long
395
}
396
397
trait MetricGroup {
398
def counter(name: String): Counter
399
def gauge[T](name: String, gauge: Gauge[T]): Gauge[T]
400
def histogram(name: String, histogram: Histogram): Histogram
401
}
402
```