0
# Windowing and Time-Based Processing
1
2
Windowing in Flink enables processing of unbounded streams by grouping elements into finite sets called windows. Flink supports both keyed and non-keyed windowing with various window types and time semantics.
3
4
## Keyed Windowing
5
6
### Time-Based Windows
7
8
```scala { .api }
9
class KeyedStream[T, K] {
10
def timeWindow(size: Time): WindowedStream[T, K, TimeWindow]
11
def timeWindow(size: Time, slide: Time): WindowedStream[T, K, TimeWindow]
12
}
13
```
14
15
Create time-based windows for keyed streams:
16
17
```scala
18
import org.apache.flink.streaming.api.scala._
19
import org.apache.flink.streaming.api.windowing.time.Time
20
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
21
22
val env = StreamExecutionEnvironment.getExecutionEnvironment
23
24
case class SensorReading(sensorId: String, temperature: Double, timestamp: Long)
25
26
val sensorData = env.fromElements(
27
SensorReading("sensor1", 20.0, 1000),
28
SensorReading("sensor1", 22.0, 2000),
29
SensorReading("sensor2", 18.0, 1500),
30
SensorReading("sensor2", 25.0, 2500)
31
).keyBy(_.sensorId)
32
33
// Tumbling time windows (non-overlapping)
34
val tumblingWindows = sensorData
35
.timeWindow(Time.minutes(5)) // 5-minute tumbling windows
36
37
// Sliding time windows (overlapping)
38
val slidingWindows = sensorData
39
.timeWindow(Time.minutes(10), Time.minutes(2)) // 10-minute windows, sliding every 2 minutes
40
41
// Process windows
42
tumblingWindows
43
.apply((key, window, readings, out) => {
44
val avgTemp = readings.map(_.temperature).sum / readings.size
45
out.collect((key, avgTemp, window.getStart, window.getEnd))
46
})
47
.print()
48
```
49
50
### Count-Based Windows
51
52
```scala { .api }
53
class KeyedStream[T, K] {
54
def countWindow(size: Long): WindowedStream[T, K, GlobalWindow]
55
def countWindow(size: Long, slide: Long): WindowedStream[T, K, GlobalWindow]
56
}
57
```
58
59
Create count-based windows:
60
61
```scala
62
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
63
64
val env = StreamExecutionEnvironment.getExecutionEnvironment
65
val events = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
66
.keyBy(identity)
67
68
// Tumbling count windows
69
val tumblingCountWindows = events
70
.countWindow(3) // Every 3 elements
71
72
// Sliding count windows
73
val slidingCountWindows = events
74
.countWindow(5, 2) // 5 elements per window, slide by 2
75
76
tumblingCountWindows
77
.sum(0)
78
.print("Tumbling Count")
79
80
slidingCountWindows
81
.sum(0)
82
.print("Sliding Count")
83
```
84
85
### Custom Window Assigners
86
87
```scala { .api }
88
class KeyedStream[T, K] {
89
def window[W <: Window](assigner: WindowAssigner[_ >: T, W]): WindowedStream[T, K, W]
90
}
91
```
92
93
Use custom window assigners:
94
95
```scala
96
import org.apache.flink.streaming.api.windowing.assigners._
97
import org.apache.flink.streaming.api.windowing.time.Time
98
99
val env = StreamExecutionEnvironment.getExecutionEnvironment
100
val keyedStream = env.fromElements(1, 2, 3, 4, 5).keyBy(identity)
101
102
// Session windows (windows that close after period of inactivity)
103
val sessionWindows = keyedStream
104
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
105
106
// Global windows (all elements in one window until manually triggered)
107
val globalWindows = keyedStream
108
.window(GlobalWindows.create())
109
110
// Processing time sliding windows
111
val processingTimeWindows = keyedStream
112
.window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.minutes(5)))
113
114
// Event time tumbling windows
115
val eventTimeWindows = keyedStream
116
.window(TumblingEventTimeWindows.of(Time.hours(1)))
117
```
118
119
## Non-Keyed Windowing (All Windows)
120
121
### Time-Based All Windows
122
123
```scala { .api }
124
class DataStream[T] {
125
def timeWindowAll(size: Time): AllWindowedStream[T, TimeWindow]
126
def timeWindowAll(size: Time, slide: Time): AllWindowedStream[T, TimeWindow]
127
}
128
```
129
130
Apply windows to entire stream (all elements):
131
132
```scala
133
val env = StreamExecutionEnvironment.getExecutionEnvironment
134
val allData = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
135
136
// All elements in 5-minute tumbling windows
137
val allTumbling = allData
138
.timeWindowAll(Time.minutes(5))
139
140
// All elements in 10-minute sliding windows
141
val allSliding = allData
142
.timeWindowAll(Time.minutes(10), Time.minutes(2))
143
144
allTumbling
145
.sum(0)
146
.print("All Tumbling")
147
```
148
149
### Count-Based All Windows
150
151
```scala { .api }
152
class DataStream[T] {
153
def countWindowAll(size: Long): AllWindowedStream[T, GlobalWindow]
154
def countWindowAll(size: Long, slide: Long): AllWindowedStream[T, GlobalWindow]
155
}
156
```
157
158
Count windows over all elements:
159
160
```scala
161
val allCountWindows = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
162
.countWindowAll(4) // Every 4 elements
163
164
allCountWindows
165
.sum(0)
166
.print("All Count Windows")
167
```
168
169
### Custom All Window Assigners
170
171
```scala { .api }
172
class DataStream[T] {
173
def windowAll[W <: Window](assigner: WindowAssigner[_ >: T, W]): AllWindowedStream[T, W]
174
}
175
```
176
177
## Window Configuration
178
179
### Triggers
180
181
```scala { .api }
182
class WindowedStream[T, K, W <: Window] {
183
def trigger(trigger: Trigger[_ >: T, _ >: W]): WindowedStream[T, K, W]
184
}
185
186
class AllWindowedStream[T, W <: Window] {
187
def trigger(trigger: Trigger[_ >: T, _ >: W]): AllWindowedStream[T, W]
188
}
189
```
190
191
Control when windows fire:
192
193
```scala
194
import org.apache.flink.streaming.api.windowing.triggers._
195
196
val env = StreamExecutionEnvironment.getExecutionEnvironment
197
val keyedStream = env.fromElements(1, 2, 3, 4, 5).keyBy(identity)
198
199
// Custom trigger: fire on every element or when window is full
200
val customTrigger = keyedStream
201
.timeWindow(Time.minutes(5))
202
.trigger(CountTrigger.of(10)) // Fire when 10 elements or window end
203
204
// Processing time trigger
205
val processingTrigger = keyedStream
206
.timeWindow(Time.minutes(5))
207
.trigger(ProcessingTimeTrigger.create())
208
209
// Purging trigger (removes elements after firing)
210
val purgingTrigger = keyedStream
211
.timeWindow(Time.minutes(5))
212
.trigger(PurgingTrigger.of(CountTrigger.of(5)))
213
```
214
215
### Evictors
216
217
```scala { .api }
218
class WindowedStream[T, K, W <: Window] {
219
def evictor(evictor: Evictor[_ >: T, _ >: W]): WindowedStream[T, K, W]
220
}
221
```
222
223
Remove elements from windows before or after function application:
224
225
```scala
226
import org.apache.flink.streaming.api.windowing.evictors._
227
228
val env = StreamExecutionEnvironment.getExecutionEnvironment
229
val keyedStream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).keyBy(identity)
230
231
// Count evictor: keep only the latest N elements
232
val countEvictor = keyedStream
233
.timeWindow(Time.minutes(5))
234
.evictor(CountEvictor.of(5)) // Keep only latest 5 elements
235
236
// Time evictor: keep only elements from last N time units
237
val timeEvictor = keyedStream
238
.timeWindow(Time.minutes(5))
239
.evictor(TimeEvictor.of(Time.minutes(2))) // Keep only last 2 minutes
240
241
// Delta evictor: keep elements within threshold
242
val deltaEvictor = keyedStream
243
.timeWindow(Time.minutes(5))
244
.evictor(DeltaEvictor.of(5.0, (a: Int, b: Int) => Math.abs(a - b).toDouble))
245
```
246
247
### Allowed Lateness
248
249
```scala { .api }
250
class WindowedStream[T, K, W <: Window] {
251
def allowedLateness(lateness: Time): WindowedStream[T, K, W]
252
}
253
```
254
255
Handle late-arriving data:
256
257
```scala
258
val env = StreamExecutionEnvironment.getExecutionEnvironment
259
val lateDataStream = env.fromElements(1, 2, 3, 4, 5).keyBy(identity)
260
261
// Allow 1 minute of lateness
262
val windowWithLateness = lateDataStream
263
.timeWindow(Time.minutes(5))
264
.allowedLateness(Time.minutes(1))
265
.sum(0)
266
```
267
268
### Side Output for Late Data
269
270
```scala { .api }
271
class WindowedStream[T, K, W <: Window] {
272
def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W]
273
}
274
```
275
276
Route late data to side output:
277
278
```scala
279
import org.apache.flink.streaming.api.scala.OutputTag
280
281
val env = StreamExecutionEnvironment.getExecutionEnvironment
282
val lateDataTag = OutputTag[Int]("late-data")
283
284
val mainResult = env.fromElements(1, 2, 3, 4, 5).keyBy(identity)
285
.timeWindow(Time.minutes(5))
286
.allowedLateness(Time.minutes(1))
287
.sideOutputLateData(lateDataTag)
288
.sum(0)
289
290
// Process late data separately
291
val lateData = mainResult.getSideOutput(lateDataTag)
292
lateData.print("Late Data")
293
```
294
295
## Time Characteristics and Watermarks
296
297
### Time Characteristics
298
299
```scala
300
import org.apache.flink.streaming.api.TimeCharacteristic
301
302
val env = StreamExecutionEnvironment.getExecutionEnvironment
303
304
// Event time processing (use timestamps in data)
305
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
306
307
// Processing time (use system time when elements arrive)
308
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
309
310
// Ingestion time (use time when elements enter Flink)
311
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
312
```
313
314
### Watermark Assignment
315
316
```scala
317
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
318
import org.apache.flink.streaming.api.windowing.time.Time
319
320
case class TimestampedEvent(data: String, eventTime: Long)
321
322
val env = StreamExecutionEnvironment.getExecutionEnvironment
323
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
324
325
val timestampedStream = env.fromElements(
326
TimestampedEvent("event1", 1000),
327
TimestampedEvent("event2", 2000),
328
TimestampedEvent("event3", 1500)
329
)
330
331
// Assign watermarks with bounded out-of-orderness
332
val watermarkedStream = timestampedStream
333
.assignTimestampsAndWatermarks(
334
new BoundedOutOfOrdernessTimestampExtractor[TimestampedEvent](Time.seconds(5)) {
335
override def extractTimestamp(element: TimestampedEvent): Long = element.eventTime
336
}
337
)
338
339
// Ascending timestamps (no out-of-order events)
340
val ascendingStream = timestampedStream
341
.assignAscendingTimestamps(_.eventTime)
342
```
343
344
## Window Functions
345
346
### Built-in Aggregations
347
348
```scala { .api }
349
class WindowedStream[T, K, W <: Window] {
350
def sum(position: Int): DataStream[T]
351
def sum(field: String): DataStream[T]
352
def min(position: Int): DataStream[T]
353
def min(field: String): DataStream[T]
354
def max(position: Int): DataStream[T]
355
def max(field: String): DataStream[T]
356
def minBy(position: Int): DataStream[T]
357
def minBy(field: String): DataStream[T]
358
def maxBy(position: Int): DataStream[T]
359
def maxBy(field: String): DataStream[T]
360
}
361
```
362
363
### Custom Window Functions
364
365
```scala { .api }
366
class WindowedStream[T, K, W <: Window] {
367
def apply[R: TypeInformation](function: WindowFunction[T, R, K, W]): DataStream[R]
368
def apply[R: TypeInformation](function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R]
369
}
370
```
371
372
Apply custom functions to windows:
373
374
```scala
375
import org.apache.flink.streaming.api.scala.function.WindowFunction
376
import org.apache.flink.util.Collector
377
378
case class WindowResult(key: String, count: Int, avg: Double, window: String)
379
380
class StatisticsWindowFunction extends WindowFunction[SensorReading, WindowResult, String, TimeWindow] {
381
override def apply(
382
key: String,
383
window: TimeWindow,
384
readings: Iterable[SensorReading],
385
out: Collector[WindowResult]
386
): Unit = {
387
val temperatures = readings.map(_.temperature).toList
388
val count = temperatures.size
389
val avg = temperatures.sum / count
390
val windowInfo = s"${window.getStart}-${window.getEnd}"
391
392
out.collect(WindowResult(key, count, avg, windowInfo))
393
}
394
}
395
396
val env = StreamExecutionEnvironment.getExecutionEnvironment
397
val sensorData = env.fromElements(
398
SensorReading("sensor1", 20.0, 1000),
399
SensorReading("sensor1", 25.0, 2000)
400
).keyBy(_.sensorId)
401
402
// Apply custom window function
403
val windowResults = sensorData
404
.timeWindow(Time.minutes(5))
405
.apply(new StatisticsWindowFunction)
406
407
// Lambda-based window function
408
val lambdaResults = sensorData
409
.timeWindow(Time.minutes(5))
410
.apply { (key, window, readings, out) =>
411
val maxTemp = readings.map(_.temperature).max
412
out.collect((key, maxTemp, window.getStart))
413
}
414
```
415
416
## Advanced Windowing Patterns
417
418
### Session Windows
419
420
```scala
421
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows
422
423
case class UserActivity(userId: String, activity: String, timestamp: Long)
424
425
val env = StreamExecutionEnvironment.getExecutionEnvironment
426
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
427
428
val userActivities = env.fromElements(
429
UserActivity("user1", "login", 1000),
430
UserActivity("user1", "click", 2000),
431
UserActivity("user1", "scroll", 3000)
432
)
433
434
// Session windows with 10-minute inactivity gap
435
val sessionWindows = userActivities
436
.assignAscendingTimestamps(_.timestamp)
437
.keyBy(_.userId)
438
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
439
.apply { (userId, window, activities, out) =>
440
val sessionSummary = (
441
userId,
442
activities.size,
443
activities.map(_.activity).mkString(","),
444
window.getEnd - window.getStart
445
)
446
out.collect(sessionSummary)
447
}
448
```
449
450
### Custom Window Assigner
451
452
```scala
453
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
454
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
455
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger
456
457
// Custom window assigner for business hours (9 AM to 5 PM)
458
class BusinessHoursWindowAssigner extends WindowAssigner[Object, TimeWindow] {
459
override def assignWindows(
460
element: Object,
461
timestamp: Long,
462
context: WindowAssigner.WindowAssignerContext
463
): java.util.Collection[TimeWindow] = {
464
val hour = (timestamp / 3600000) % 24 // Hour of day
465
466
if (hour >= 9 && hour < 17) { // Business hours
467
val startOfDay = timestamp - (timestamp % 86400000) // Start of day
468
val startOfBusinessHours = startOfDay + 9 * 3600000 // 9 AM
469
val endOfBusinessHours = startOfDay + 17 * 3600000 // 5 PM
470
471
java.util.Collections.singletonList(new TimeWindow(startOfBusinessHours, endOfBusinessHours))
472
} else {
473
java.util.Collections.emptyList()
474
}
475
}
476
477
override def getDefaultTrigger(env: org.apache.flink.streaming.api.environment.StreamExecutionEnvironment): org.apache.flink.streaming.api.windowing.triggers.Trigger[Object, TimeWindow] = {
478
EventTimeTrigger.create()
479
}
480
481
override def getWindowSerializer(executionConfig: org.apache.flink.api.common.ExecutionConfig): org.apache.flink.api.common.typeutils.TypeSerializer[TimeWindow] = {
482
new TimeWindow.Serializer()
483
}
484
485
override def isEventTime: Boolean = true
486
}
487
```
488
489
## Complete Example: Real-Time Analytics Dashboard
490
491
```scala
492
import org.apache.flink.streaming.api.scala._
493
import org.apache.flink.streaming.api.windowing.time.Time
494
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
495
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
496
import org.apache.flink.streaming.api.scala.function.WindowFunction
497
import org.apache.flink.util.Collector
498
499
case class WebEvent(
500
userId: String,
501
pageUrl: String,
502
action: String,
503
timestamp: Long,
504
sessionId: String
505
)
506
507
case class PageViewStats(
508
pageUrl: String,
509
windowStart: Long,
510
windowEnd: Long,
511
uniqueUsers: Int,
512
totalViews: Int,
513
avgSessionDuration: Double
514
)
515
516
object RealTimeAnalytics {
517
518
class PageViewAnalytics extends WindowFunction[WebEvent, PageViewStats, String, TimeWindow] {
519
override def apply(
520
pageUrl: String,
521
window: TimeWindow,
522
events: Iterable[WebEvent],
523
out: Collector[PageViewStats]
524
): Unit = {
525
val eventList = events.toList
526
val uniqueUsers = eventList.map(_.userId).distinct.size
527
val totalViews = eventList.size
528
529
// Calculate average session duration (simplified)
530
val sessionDurations = eventList
531
.groupBy(_.sessionId)
532
.mapValues(events => {
533
val timestamps = events.map(_.timestamp)
534
timestamps.max - timestamps.min
535
})
536
537
val avgSessionDuration = if (sessionDurations.nonEmpty) {
538
sessionDurations.values.sum.toDouble / sessionDurations.size
539
} else 0.0
540
541
out.collect(PageViewStats(
542
pageUrl,
543
window.getStart,
544
window.getEnd,
545
uniqueUsers,
546
totalViews,
547
avgSessionDuration
548
))
549
}
550
}
551
552
def main(args: Array[String]): Unit = {
553
val env = StreamExecutionEnvironment.getExecutionEnvironment
554
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
555
env.setParallelism(1)
556
557
// Sample web events
558
val webEvents = env.fromElements(
559
WebEvent("user1", "/home", "view", 1000, "session1"),
560
WebEvent("user2", "/home", "view", 1500, "session2"),
561
WebEvent("user1", "/products", "view", 2000, "session1"),
562
WebEvent("user3", "/home", "view", 2500, "session3"),
563
WebEvent("user2", "/checkout", "view", 3000, "session2")
564
)
565
566
// Assign watermarks
567
val watermarkedEvents = webEvents
568
.assignTimestampsAndWatermarks(
569
new BoundedOutOfOrdernessTimestampExtractor[WebEvent](Time.seconds(10)) {
570
override def extractTimestamp(event: WebEvent): Long = event.timestamp
571
}
572
)
573
574
// Real-time page view analytics
575
val pageViewAnalytics = watermarkedEvents
576
.keyBy(_.pageUrl)
577
.timeWindow(Time.minutes(5)) // 5-minute windows
578
.apply(new PageViewAnalytics)
579
580
// User activity patterns (sliding windows)
581
val userActivityPatterns = watermarkedEvents
582
.keyBy(_.userId)
583
.timeWindow(Time.minutes(10), Time.minutes(2)) // 10-minute windows, slide every 2 minutes
584
.apply { (userId, window, events, out) =>
585
val actionsPerMinute = events.size.toDouble / 10.0 // Actions per minute
586
val uniquePages = events.map(_.pageUrl).toSet.size
587
out.collect((userId, window.getStart, actionsPerMinute, uniquePages))
588
}
589
590
// Global statistics (all events)
591
val globalStats = watermarkedEvents
592
.timeWindowAll(Time.minutes(1)) // 1-minute global windows
593
.apply { (window, events, out) =>
594
val totalEvents = events.size
595
val uniqueUsers = events.map(_.userId).toSet.size
596
val uniquePages = events.map(_.pageUrl).toSet.size
597
out.collect((window.getStart, totalEvents, uniqueUsers, uniquePages))
598
}
599
600
// Print results
601
pageViewAnalytics.print("Page View Analytics")
602
userActivityPatterns.print("User Activity")
603
globalStats.print("Global Stats")
604
605
env.execute("Real-Time Web Analytics")
606
}
607
}
608
```