0
# Window Operations and Time Processing
1
2
This document covers time-based and count-based windowing operations for streaming data analysis in Apache Flink Table Uber Blink.
3
4
## Time Attributes
5
6
### Processing Time
7
8
```java { .api }
9
// In Table API
10
Table table = tableEnv.fromDataStream(dataStream,
11
$("user_id"),
12
$("data"),
13
$("proc_time").proctime()
14
);
15
16
// In SQL DDL
17
tEnv.executeSql(
18
"CREATE TABLE events (" +
19
" user_id BIGINT," +
20
" data STRING," +
21
" proc_time AS PROCTIME()" +
22
") WITH (...)"
23
);
24
```
25
26
### Event Time
27
28
```java { .api }
29
// With watermark in DDL
30
tEnv.executeSql(
31
"CREATE TABLE events (" +
32
" user_id BIGINT," +
33
" event_time TIMESTAMP(3)," +
34
" data STRING," +
35
" WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +
36
") WITH (...)"
37
);
38
39
// In Table API
40
Table table = tableEnv.fromDataStream(watermarkedStream,
41
$("user_id"),
42
$("event_time").rowtime(),
43
$("data")
44
);
45
```
46
47
## Group Windows (Table API)
48
49
### Tumbling Windows
50
51
```java { .api }
52
class Tumble {
53
static TumbleWithSize over(Expression size);
54
}
55
56
interface TumbleWithSize {
57
TumbleWithSizeOnTime on(Expression timeField);
58
}
59
60
interface TumbleWithSizeOnTime {
61
GroupWindow as(String alias);
62
}
63
```
64
65
**Usage:**
66
67
```java
68
Table result = table
69
.window(Tumble.over(lit(5).minutes()).on($("event_time")).as("w"))
70
.groupBy($("user_id"), $("w"))
71
.select($("user_id"), $("w").start(), $("w").end(), $("data").count());
72
```
73
74
### Sliding Windows
75
76
```java { .api }
77
class Slide {
78
static SlideWithSize over(Expression size);
79
}
80
81
interface SlideWithSize {
82
SlideWithSizeAndSlide every(Expression slide);
83
}
84
85
interface SlideWithSizeAndSlide {
86
SlideWithSizeAndSlideOnTime on(Expression timeField);
87
}
88
```
89
90
**Usage:**
91
92
```java
93
Table result = table
94
.window(Slide.over(lit(10).minutes()).every(lit(5).minutes()).on($("event_time")).as("w"))
95
.groupBy($("user_id"), $("w"))
96
.select($("user_id"), $("w").start(), $("w").end(), $("data").count());
97
```
98
99
### Session Windows
100
101
```java { .api }
102
class Session {
103
static SessionWithGap withGap(Expression gap);
104
}
105
106
interface SessionWithGap {
107
SessionWithGapOnTime on(Expression timeField);
108
}
109
```
110
111
**Usage:**
112
113
```java
114
Table result = table
115
.window(Session.withGap(lit(30).minutes()).on($("event_time")).as("w"))
116
.groupBy($("user_id"), $("w"))
117
.select($("user_id"), $("w").start(), $("w").end(), $("data").count());
118
```
119
120
## Window SQL Functions
121
122
### Tumbling Window Functions
123
124
```sql
125
-- TUMBLE function
126
SELECT
127
user_id,
128
TUMBLE_START(event_time, INTERVAL '5' MINUTE) as window_start,
129
TUMBLE_END(event_time, INTERVAL '5' MINUTE) as window_end,
130
COUNT(*) as event_count
131
FROM events
132
GROUP BY user_id, TUMBLE(event_time, INTERVAL '5' MINUTE);
133
134
-- TUMBLE_ROWTIME and TUMBLE_PROCTIME
135
SELECT
136
user_id,
137
TUMBLE_ROWTIME(event_time, INTERVAL '5' MINUTE) as window_rowtime,
138
TUMBLE_PROCTIME(event_time, INTERVAL '5' MINUTE) as window_proctime,
139
COUNT(*) as event_count
140
FROM events
141
GROUP BY user_id, TUMBLE(event_time, INTERVAL '5' MINUTE);
142
```
143
144
### Sliding Window Functions
145
146
```sql
147
-- HOP function (sliding window)
148
SELECT
149
user_id,
150
HOP_START(event_time, INTERVAL '2' MINUTE, INTERVAL '5' MINUTE) as window_start,
151
HOP_END(event_time, INTERVAL '2' MINUTE, INTERVAL '5' MINUTE) as window_end,
152
COUNT(*) as event_count
153
FROM events
154
GROUP BY user_id, HOP(event_time, INTERVAL '2' MINUTE, INTERVAL '5' MINUTE);
155
```
156
157
### Session Window Functions
158
159
```sql
160
-- SESSION function
161
SELECT
162
user_id,
163
SESSION_START(event_time, INTERVAL '30' MINUTE) as session_start,
164
SESSION_END(event_time, INTERVAL '30' MINUTE) as session_end,
165
COUNT(*) as event_count
166
FROM events
167
GROUP BY user_id, SESSION(event_time, INTERVAL '30' MINUTE);
168
```
169
170
## Over Windows (Window Aggregations)
171
172
### Unbounded Over Windows
173
174
```sql
175
-- OVER clause with unbounded preceding
176
SELECT
177
user_id,
178
event_time,
179
data,
180
COUNT(*) OVER (
181
PARTITION BY user_id
182
ORDER BY event_time
183
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
184
) as running_count,
185
SUM(amount) OVER (
186
PARTITION BY user_id
187
ORDER BY event_time
188
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
189
) as running_sum
190
FROM events;
191
```
192
193
### Bounded Over Windows
194
195
```sql
196
-- Sliding window with OVER
197
SELECT
198
user_id,
199
event_time,
200
data,
201
COUNT(*) OVER (
202
PARTITION BY user_id
203
ORDER BY event_time
204
ROWS BETWEEN 10 PRECEDING AND CURRENT ROW
205
) as count_last_10,
206
AVG(amount) OVER (
207
PARTITION BY user_id
208
ORDER BY event_time
209
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
210
) as avg_last_hour
211
FROM events;
212
```
213
214
### Over Window in Table API
215
216
```java { .api }
217
interface Table {
218
OverWindowedTable window(OverWindow overWindow);
219
}
220
221
class Over {
222
static OverWindowPartitionedOrderedPreceding partitionBy(Expression... fields);
223
static OverWindowPartitionedOrdered orderBy(Expression field);
224
}
225
```
226
227
**Usage:**
228
229
```java
230
Table result = table
231
.window(Over.partitionBy($("user_id")).orderBy($("event_time")).preceding(UNBOUNDED_ROW).as("w"))
232
.select($("user_id"), $("event_time"), $("data"), $("data").count().over($("w")));
233
```
234
235
## Window TVF (Table-Valued Functions)
236
237
### Tumble TVF
238
239
```sql
240
-- TUMBLE TVF (Flink 1.13+)
241
SELECT
242
window_start,
243
window_end,
244
user_id,
245
COUNT(*) as event_count
246
FROM TABLE(TUMBLE(TABLE events, DESCRIPTOR(event_time), INTERVAL '5' MINUTE))
247
GROUP BY window_start, window_end, user_id;
248
```
249
250
### Hop TVF
251
252
```sql
253
-- HOP TVF
254
SELECT
255
window_start,
256
window_end,
257
user_id,
258
COUNT(*) as event_count
259
FROM TABLE(HOP(TABLE events, DESCRIPTOR(event_time), INTERVAL '2' MINUTE, INTERVAL '5' MINUTE))
260
GROUP BY window_start, window_end, user_id;
261
```
262
263
### Session TVF
264
265
```sql
266
-- SESSION TVF
267
SELECT
268
window_start,
269
window_end,
270
user_id,
271
COUNT(*) as event_count
272
FROM TABLE(SESSION(TABLE events, DESCRIPTOR(event_time), DESCRIPTOR(user_id), INTERVAL '30' MINUTE))
273
GROUP BY window_start, window_end, user_id;
274
```
275
276
## Time-based Joins
277
278
### Interval Joins
279
280
```sql
281
-- Time-based interval join
282
SELECT
283
o.order_id,
284
o.user_id,
285
o.order_time,
286
p.payment_id,
287
p.payment_time
288
FROM orders o
289
JOIN payments p ON o.order_id = p.order_id
290
AND p.payment_time BETWEEN o.order_time - INTERVAL '1' HOUR
291
AND o.order_time + INTERVAL '1' HOUR;
292
```
293
294
### Temporal Joins
295
296
```java
297
// Register temporal table
298
tEnv.createTemporaryView("rates_temporal",
299
rates.createTemporalTableFunction($("update_time"), $("currency")));
300
301
// Temporal join in SQL
302
Table result = tEnv.sqlQuery(
303
"SELECT " +
304
" o.order_id, " +
305
" o.amount, " +
306
" o.currency, " +
307
" r.rate, " +
308
" o.amount * r.rate as amount_usd " +
309
"FROM orders o " +
310
"JOIN rates_temporal FOR SYSTEM_TIME AS OF o.order_time AS r " +
311
"ON o.currency = r.currency"
312
);
313
```
314
315
## Watermarks and Late Data
316
317
### Watermark Strategies
318
319
```java
320
// Bounded out-of-orderness
321
WatermarkStrategy<Event> strategy = WatermarkStrategy
322
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
323
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
324
325
// Monotonous timestamps
326
WatermarkStrategy<Event> monotonous = WatermarkStrategy
327
.<Event>forMonotonousTimestamps()
328
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
329
330
// Custom watermark generator
331
WatermarkStrategy<Event> custom = WatermarkStrategy
332
.forGenerator(ctx -> new CustomWatermarkGenerator())
333
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
334
```
335
336
### Late Data Handling
337
338
```java
339
// Configure late data handling
340
Configuration config = tEnv.getConfig().getConfiguration();
341
config.setString("table.exec.emit.late-fire.enabled", "true");
342
config.setString("table.exec.emit.late-fire.delay", "5 s");
343
344
// Side output for late data
345
DataStream<Event> lateEvents = mainStream
346
.assignTimestampsAndWatermarks(watermarkStrategy)
347
.process(new ProcessFunction<Event, Event>() {
348
private OutputTag<Event> lateOutputTag = new OutputTag<Event>("late-data"){};
349
350
@Override
351
public void processElement(Event event, Context ctx, Collector<Event> out) {
352
if (event.getTimestamp() < ctx.timerService().currentWatermark()) {
353
ctx.output(lateOutputTag, event);
354
} else {
355
out.collect(event);
356
}
357
}
358
});
359
```
360
361
## Window Aggregations
362
363
### Built-in Aggregation Functions
364
365
```sql
366
-- Count, sum, average
367
SELECT
368
TUMBLE_START(event_time, INTERVAL '5' MINUTE) as window_start,
369
COUNT(*) as event_count,
370
SUM(amount) as total_amount,
371
AVG(amount) as avg_amount,
372
MIN(amount) as min_amount,
373
MAX(amount) as max_amount
374
FROM events
375
GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE);
376
377
-- Statistical functions
378
SELECT
379
TUMBLE_START(event_time, INTERVAL '5' MINUTE) as window_start,
380
STDDEV_POP(amount) as stddev,
381
VAR_SAMP(amount) as variance,
382
COLLECT(user_id) as user_list,
383
LISTAGG(event_type, ',') as event_types
384
FROM events
385
GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE);
386
```
387
388
## Types
389
390
```java { .api }
391
interface GroupWindow {
392
Expression getTimeField();
393
Expression getSize();
394
String getAlias();
395
}
396
397
class TumbleWithSize implements GroupWindow;
398
class SlideWithSizeAndSlide implements GroupWindow;
399
class SessionWithGap implements GroupWindow;
400
401
interface OverWindow {
402
Expression getPartitioning();
403
Expression getOrder();
404
Expression getPreceding();
405
Expression getFollowing();
406
String getAlias();
407
}
408
409
interface WindowGroupedTable extends Table {
410
Table select(Expression... fields);
411
AggregatedTable aggregate(Expression aggregateFunction);
412
FlatAggregateTable flatAggregate(Expression tableAggregateFunction);
413
}
414
415
interface OverWindowedTable extends Table {
416
Table select(Expression... fields);
417
}
418
419
// Window bounds
420
class UNBOUNDED_ROW;
421
class UNBOUNDED_RANGE;
422
class CURRENT_ROW;
423
class CURRENT_RANGE;
424
```