0
# Window Operations
1
2
Window operations enable time-based and count-based processing for streaming data. Flink supports multiple window types including tumbling, sliding, session windows, and over windows for different analytical scenarios.
3
4
## Capabilities
5
6
### Tumbling Windows
7
8
Fixed-size, non-overlapping windows that partition data into distinct time segments.
9
10
```java { .api }
11
public final class Tumble {
12
/**
13
* Creates a tumbling window with the specified size
14
* @param size Window size expression (duration)
15
* @return TumbleWithSize for further configuration
16
*/
17
public static TumbleWithSize over(Expression size);
18
}
19
20
public final class TumbleWithSize {
21
/**
22
* Specifies the time attribute for the tumbling window
23
* @param timeField Time attribute column
24
* @return TumbleWithSizeOnTime for alias configuration
25
*/
26
public TumbleWithSizeOnTime on(Expression timeField);
27
}
28
29
public final class TumbleWithSizeOnTime {
30
/**
31
* Assigns an alias to the window for reference in selection
32
* @param alias Window alias name
33
* @return GroupWindow that can be used with groupBy
34
*/
35
public GroupWindow as(String alias);
36
}
37
```
38
39
**Usage Examples:**
40
41
```java
42
import static org.apache.flink.table.api.Expressions.*;
43
44
// 1-hour tumbling window
45
GroupWindow hourlyWindow = Tumble
46
.over(lit(1).hour())
47
.on($("event_time"))
48
.as("hourly_window");
49
50
Table hourlyStats = sourceTable
51
.window(hourlyWindow)
52
.groupBy($("hourly_window"), $("category"))
53
.select(
54
$("category"),
55
$("hourly_window").start().as("window_start"),
56
$("hourly_window").end().as("window_end"),
57
count($("*")).as("event_count"),
58
sum($("amount")).as("total_amount")
59
);
60
61
// 15-minute tumbling window
62
GroupWindow quarterHourWindow = Tumble
63
.over(lit(15).minute())
64
.on($("process_time"))
65
.as("quarter_hour");
66
67
Table frequentStats = sourceTable
68
.window(quarterHourWindow)
69
.groupBy($("quarter_hour"))
70
.select(
71
$("quarter_hour").start().as("period_start"),
72
count($("*")).as("transaction_count"),
73
avg($("value")).as("avg_transaction_value")
74
);
75
```
76
77
### Sliding Windows
78
79
Fixed-size, overlapping windows that slide by a specified interval, useful for moving averages and trend analysis.
80
81
```java { .api }
82
public final class Slide {
83
/**
84
* Creates a sliding window with the specified size
85
* @param size Window size expression (duration)
86
* @return SlideWithSize for slide interval configuration
87
*/
88
public static SlideWithSize over(Expression size);
89
}
90
91
public final class SlideWithSize {
92
/**
93
* Specifies the slide interval for the window
94
* @param slide Slide interval expression (duration)
95
* @return SlideWithSizeAndSlide for time field configuration
96
*/
97
public SlideWithSizeAndSlide every(Expression slide);
98
}
99
100
public final class SlideWithSizeAndSlide {
101
/**
102
* Specifies the time attribute for the sliding window
103
* @param timeField Time attribute column
104
* @return SlideWithSizeAndSlideOnTime for alias configuration
105
*/
106
public SlideWithSizeAndSlideOnTime on(Expression timeField);
107
}
108
109
public final class SlideWithSizeAndSlideOnTime {
110
/**
111
* Assigns an alias to the sliding window
112
* @param alias Window alias name
113
* @return GroupWindow for use with groupBy
114
*/
115
public GroupWindow as(String alias);
116
}
117
```
118
119
**Usage Examples:**
120
121
```java
122
// 1-hour window sliding every 15 minutes
123
GroupWindow slidingWindow = Slide
124
.over(lit(1).hour())
125
.every(lit(15).minute())
126
.on($("event_time"))
127
.as("sliding_window");
128
129
Table movingAverages = sourceTable
130
.window(slidingWindow)
131
.groupBy($("sliding_window"), $("sensor_id"))
132
.select(
133
$("sensor_id"),
134
$("sliding_window").start().as("window_start"),
135
$("sliding_window").end().as("window_end"),
136
avg($("temperature")).as("avg_temperature"),
137
count($("*")).as("reading_count")
138
);
139
140
// 30-minute window sliding every 5 minutes for real-time monitoring
141
GroupWindow realtimeWindow = Slide
142
.over(lit(30).minute())
143
.every(lit(5).minute())
144
.on($("processing_time"))
145
.as("realtime_window");
146
147
Table realtimeMetrics = sourceTable
148
.window(realtimeWindow)
149
.groupBy($("realtime_window"))
150
.select(
151
$("realtime_window").start().as("period_start"),
152
count($("*")).as("event_rate"),
153
sum($("bytes")).as("total_bytes"),
154
max($("latency")).as("max_latency")
155
);
156
```
157
158
### Session Windows
159
160
Dynamic windows that group events based on activity sessions with configurable gap timeouts.
161
162
```java { .api }
163
public final class Session {
164
/**
165
* Creates a session window with the specified gap
166
* @param gap Session gap expression (duration of inactivity)
167
* @return SessionWithGap for time field configuration
168
*/
169
public static SessionWithGap withGap(Expression gap);
170
}
171
172
public final class SessionWithGap {
173
/**
174
* Specifies the time attribute for the session window
175
* @param timeField Time attribute column
176
* @return SessionWithGapOnTime for alias configuration
177
*/
178
public SessionWithGapOnTime on(Expression timeField);
179
}
180
181
public final class SessionWithGapOnTime {
182
/**
183
* Assigns an alias to the session window
184
* @param alias Window alias name
185
* @return GroupWindow for use with groupBy
186
*/
187
public GroupWindow as(String alias);
188
}
189
```
190
191
**Usage Examples:**
192
193
```java
194
// Session window with 30-minute inactivity gap
195
GroupWindow userSession = Session
196
.withGap(lit(30).minute())
197
.on($("event_time"))
198
.as("user_session");
199
200
Table sessionAnalysis = sourceTable
201
.window(userSession)
202
.groupBy($("user_session"), $("user_id"))
203
.select(
204
$("user_id"),
205
$("user_session").start().as("session_start"),
206
$("user_session").end().as("session_end"),
207
count($("*")).as("actions_in_session"),
208
sum($("duration")).as("total_session_time"),
209
max($("page_views")).as("max_page_views")
210
);
211
212
// Short session window for detecting bursts of activity
213
GroupWindow activityBurst = Session
214
.withGap(lit(2).minute())
215
.on($("event_time"))
216
.as("activity_burst");
217
218
Table burstDetection = sourceTable
219
.window(activityBurst)
220
.groupBy($("activity_burst"), $("device_id"))
221
.select(
222
$("device_id"),
223
$("activity_burst").start().as("burst_start"),
224
$("activity_burst").end().as("burst_end"),
225
count($("*")).as("burst_event_count")
226
)
227
.filter($("burst_event_count").isGreater(10)); // Only high-activity bursts
228
```
229
230
### Over Windows
231
232
Unbounded or bounded windows for analytical functions like ranking, cumulative sums, and moving averages without explicit grouping.
233
234
```java { .api }
235
public final class Over {
236
/**
237
* Creates an Over window partitioned by specified fields
238
* @param partitionBy Fields to partition the window by
239
* @return OverWindowPartitioned for ordering configuration
240
*/
241
public static OverWindowPartitioned partitionBy(Expression... partitionBy);
242
243
/**
244
* Creates an Over window with ordering but no partitioning
245
* @param orderBy Fields to order the window by
246
* @return OverWindowPartitionedOrdered for range/rows configuration
247
*/
248
public static OverWindowPartitionedOrdered orderBy(Expression... orderBy);
249
}
250
251
public interface OverWindowPartitioned {
252
/**
253
* Specifies ordering for the partitioned over window
254
* @param orderBy Ordering expressions
255
* @return OverWindowPartitionedOrdered for range/rows configuration
256
*/
257
OverWindowPartitionedOrdered orderBy(Expression... orderBy);
258
}
259
260
public interface OverWindowPartitionedOrdered {
261
/**
262
* Creates a preceding rows window
263
* @param preceding Number of preceding rows
264
* @return OverWindowPartitionedOrderedPreceding for alias configuration
265
*/
266
OverWindowPartitionedOrderedPreceding preceding(Expression preceding);
267
268
/**
269
* Creates an unbounded preceding window
270
* @return OverWindow for alias configuration
271
*/
272
OverWindow unboundedPreceding();
273
274
/**
275
* Creates a current row window
276
* @return OverWindow for alias configuration
277
*/
278
OverWindow currentRow();
279
}
280
281
public interface OverWindow {
282
/**
283
* Assigns an alias to the over window
284
* @param alias Window alias name
285
* @return OverWindow with alias
286
*/
287
OverWindow as(String alias);
288
}
289
```
290
291
**Usage Examples:**
292
293
```java
294
// Running totals and cumulative calculations
295
OverWindow cumulativeWindow = Over
296
.partitionBy($("customer_id"))
297
.orderBy($("order_date"))
298
.unboundedPreceding()
299
.as("cumulative");
300
301
Table runningTotals = sourceTable
302
.window(cumulativeWindow)
303
.select(
304
$("customer_id"),
305
$("order_date"),
306
$("amount"),
307
sum($("amount")).over($("cumulative")).as("running_total"),
308
count($("*")).over($("cumulative")).as("order_sequence"),
309
row_number().over($("cumulative")).as("order_rank")
310
);
311
312
// Moving averages with bounded windows
313
OverWindow movingAvgWindow = Over
314
.partitionBy($("product_id"))
315
.orderBy($("sale_date"))
316
.preceding(lit(6)) // 7-day moving window (6 preceding + current)
317
.as("weekly_window");
318
319
Table movingAverages = sourceTable
320
.window(movingAvgWindow)
321
.select(
322
$("product_id"),
323
$("sale_date"),
324
$("daily_sales"),
325
avg($("daily_sales")).over($("weekly_window")).as("weekly_avg_sales"),
326
sum($("daily_sales")).over($("weekly_window")).as("weekly_total_sales")
327
);
328
329
// Ranking and analytical functions
330
OverWindow rankingWindow = Over
331
.partitionBy($("department"))
332
.orderBy($("salary").desc())
333
.currentRow()
334
.as("dept_ranking");
335
336
Table employeeRanking = sourceTable
337
.window(rankingWindow)
338
.select(
339
$("employee_id"),
340
$("name"),
341
$("department"),
342
$("salary"),
343
row_number().over($("dept_ranking")).as("salary_rank"),
344
rank().over($("dept_ranking")).as("salary_dense_rank"),
345
lag($("salary"), 1).over($("dept_ranking")).as("next_highest_salary")
346
);
347
```
348
349
### Window Functions and Expressions
350
351
Special functions available for window operations and time manipulation.
352
353
```java { .api }
354
// Time interval expressions
355
public static Expression lit(long value).year();
356
public static Expression lit(long value).month();
357
public static Expression lit(long value).day();
358
public static Expression lit(long value).hour();
359
public static Expression lit(long value).minute();
360
public static Expression lit(long value).second();
361
public static Expression lit(long value).milli();
362
363
// Window functions
364
public static Expression rowNumber();
365
public static Expression rank();
366
public static Expression denseRank();
367
public static Expression lag(Expression field, int offset);
368
public static Expression lead(Expression field, int offset);
369
public static Expression firstValue(Expression field);
370
public static Expression lastValue(Expression field);
371
372
// Window start/end functions (for group windows)
373
// Available as methods on window alias expressions
374
// $("window_alias").start()
375
// $("window_alias").end()
376
```
377
378
**Usage Examples:**
379
380
```java
381
// Time interval examples
382
GroupWindow dailyWindow = Tumble
383
.over(lit(1).day())
384
.on($("event_time"))
385
.as("daily");
386
387
GroupWindow weeklySliding = Slide
388
.over(lit(7).day())
389
.every(lit(1).day())
390
.on($("event_time"))
391
.as("weekly_sliding");
392
393
// Analytical window functions
394
OverWindow analyticalWindow = Over
395
.partitionBy($("category"))
396
.orderBy($("created_date"))
397
.unboundedPreceding()
398
.as("analytical");
399
400
Table analyticalResults = sourceTable
401
.window(analyticalWindow)
402
.select(
403
$("id"),
404
$("category"),
405
$("value"),
406
$("created_date"),
407
rowNumber().over($("analytical")).as("row_num"),
408
rank().over($("analytical")).as("rank"),
409
lag($("value"), 1).over($("analytical")).as("prev_value"),
410
lead($("value"), 1).over($("analytical")).as("next_value"),
411
firstValue($("value")).over($("analytical")).as("first_in_category"),
412
lastValue($("value")).over($("analytical")).as("current_last")
413
);
414
```
415
416
### WindowGroupedTable Operations
417
418
Tables with window grouping applied support aggregation operations.
419
420
```java { .api }
421
public interface WindowGroupedTable {
422
/**
423
* Performs selection with aggregation on windowed data
424
* @param fields Selection expressions including aggregates and window functions
425
* @return Table with windowed aggregation results
426
*/
427
Table select(Expression... fields);
428
}
429
```
430
431
### Complex Window Scenarios
432
433
Advanced windowing patterns for complex analytical use cases.
434
435
**Usage Examples:**
436
437
```java
438
// Multi-level windowing - hourly stats with daily rollups
439
Table hourlyStats = sourceTable
440
.window(Tumble.over(lit(1).hour()).on($("event_time")).as("hourly"))
441
.groupBy($("hourly"), $("region"))
442
.select(
443
$("region"),
444
$("hourly").start().as("hour_start"),
445
count($("*")).as("hourly_count"),
446
sum($("revenue")).as("hourly_revenue")
447
);
448
449
Table dailyRollup = hourlyStats
450
.window(Tumble.over(lit(1).day()).on($("hour_start")).as("daily"))
451
.groupBy($("daily"), $("region"))
452
.select(
453
$("region"),
454
$("daily").start().as("day_start"),
455
sum($("hourly_count")).as("daily_count"),
456
sum($("hourly_revenue")).as("daily_revenue"),
457
avg($("hourly_revenue")).as("avg_hourly_revenue")
458
);
459
460
// Session analysis with user behavior patterns
461
GroupWindow userActivitySession = Session
462
.withGap(lit(20).minute())
463
.on($("event_time"))
464
.as("session");
465
466
Table sessionBehavior = sourceTable
467
.window(userActivitySession)
468
.groupBy($("session"), $("user_id"))
469
.select(
470
$("user_id"),
471
$("session").start().as("session_start"),
472
$("session").end().as("session_end"),
473
count($("*")).as("total_actions"),
474
countDistinct($("page_id")).as("unique_pages"),
475
sum($("time_spent")).as("total_time"),
476
first($("entry_page")).as("landing_page"),
477
last($("page_id")).as("exit_page"),
478
// Calculate session duration manually
479
$("session").end().minus($("session").start()).as("session_duration")
480
);
481
```
482
483
## Window Time Attributes
484
485
```java { .api }
486
// Processing time attribute (system time when record is processed)
487
// Usually defined in DDL or table descriptor
488
// PROCTIME() AS processing_time
489
490
// Event time attribute (timestamp from the data)
491
// Defined with watermark strategy in DDL
492
// event_time TIMESTAMP(3),
493
// WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
494
495
// Window functions for accessing time attributes
496
Expression start(); // Window start time
497
Expression end(); // Window end time
498
Expression proctime(); // Processing time
499
Expression rowtime(); // Event time (watermark)
500
```