0
# Window Operations
1
2
The Flink Table API provides comprehensive windowing support for time-based and count-based aggregations. Windows enable grouping of streaming data by time intervals or row counts for meaningful aggregations.
3
4
## Capabilities
5
6
### Time Windows
7
8
Time-based windows that group events by temporal boundaries.
9
10
```scala { .api }
11
/**
12
* Tumbling window with fixed size and no overlap
13
* @param size Window size expression (time interval)
14
*/
15
case class TumbleWithSize(size: Expression) {
16
/**
17
* Specifies the time attribute for the window
18
* @param timeField Time field expression (rowtime or proctime)
19
* @returns Window specification with time attribute
20
*/
21
def on(timeField: Expression): TumbleWithSizeOnTime
22
}
23
24
case class TumbleWithSizeOnTime(size: Expression, timeField: Expression) {
25
/**
26
* Assigns an alias to the window
27
* @param alias Window alias for referencing window properties
28
* @returns Complete tumbling window specification
29
*/
30
def as(alias: Expression): TumbleWithSizeOnTimeWithAlias
31
}
32
33
/**
34
* Sliding window with fixed size and slide interval
35
* @param size Window size expression (time interval)
36
*/
37
case class SlideWithSize(size: Expression) {
38
/**
39
* Specifies the slide interval
40
* @param slide Slide interval expression (time interval)
41
* @returns Window specification with slide
42
*/
43
def every(slide: Expression): SlideWithSizeAndSlide
44
}
45
46
case class SlideWithSizeAndSlide(size: Expression, slide: Expression) {
47
/**
48
* Specifies the time attribute for the window
49
* @param timeField Time field expression (rowtime or proctime)
50
* @returns Window specification with time attribute
51
*/
52
def on(timeField: Expression): SlideWithSizeAndSlideOnTime
53
}
54
55
case class SlideWithSizeAndSlideOnTime(size: Expression, slide: Expression, timeField: Expression) {
56
/**
57
* Assigns an alias to the window
58
* @param alias Window alias for referencing window properties
59
* @returns Complete sliding window specification
60
*/
61
def as(alias: Expression): SlideWithSizeAndSlideOnTimeWithAlias
62
}
63
64
/**
65
* Session window with dynamic gaps based on data activity
66
* @param gap Session gap expression (time interval)
67
*/
68
case class SessionWithGap(gap: Expression) {
69
/**
70
* Specifies the time attribute for the window
71
* @param timeField Time field expression (rowtime or proctime)
72
* @returns Window specification with time attribute
73
*/
74
def on(timeField: Expression): SessionWithGapOnTime
75
}
76
77
case class SessionWithGapOnTime(gap: Expression, timeField: Expression) {
78
/**
79
* Assigns an alias to the window
80
* @param alias Window alias for referencing window properties
81
* @returns Complete session window specification
82
*/
83
def as(alias: Expression): SessionWithGapOnTimeWithAlias
84
}
85
```
86
87
**Usage Examples:**
88
89
```scala
90
import org.apache.flink.table.api.Tumble
91
import org.apache.flink.table.api.Slide
92
import org.apache.flink.table.api.Session
93
94
// Tumbling window - 10 minute non-overlapping windows
95
val tumblingResult = table
96
.window(Tumble over 10.minutes on 'rowtime as 'w)
97
.groupBy('w, 'userId)
98
.select('userId, 'w.start, 'w.end, 'amount.sum)
99
100
// Sliding window - 10 minute windows sliding every 5 minutes
101
val slidingResult = table
102
.window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w)
103
.groupBy('w, 'userId)
104
.select('userId, 'w.start, 'w.end, 'amount.avg)
105
106
// Session window - sessions with 15 minute inactivity gap
107
val sessionResult = table
108
.window(Session withGap 15.minutes on 'rowtime as 'w)
109
.groupBy('w, 'userId)
110
.select('userId, 'w.start, 'w.end, 'eventCount.count)
111
112
// Processing time windows (using proctime)
113
val proctimeResult = table
114
.window(Tumble over 1.hour on 'proctime as 'w)
115
.groupBy('w, 'category)
116
.select('category, 'w.start, 'amount.max)
117
```
118
119
### Window Properties
120
121
Access window metadata and boundaries within windowed aggregations.
122
123
```scala { .api }
124
/**
125
* Window properties available in windowed table operations
126
*/
127
trait WindowProperty extends Expression {
128
/**
129
* Start timestamp of the window
130
*/
131
def start: Expression
132
133
/**
134
* End timestamp of the window
135
*/
136
def end: Expression
137
138
/**
139
* Rowtime timestamp of the window (for event time windows)
140
*/
141
def rowtime: Expression
142
143
/**
144
* Processing time timestamp of the window (for processing time windows)
145
*/
146
def proctime: Expression
147
}
148
```
149
150
**Usage Examples:**
151
152
```scala
153
// Access window properties in aggregations
154
val windowedStats = table
155
.window(Tumble over 1.hour on 'rowtime as 'w)
156
.groupBy('w, 'department)
157
.select(
158
'department,
159
'w.start as 'windowStart,
160
'w.end as 'windowEnd,
161
'w.rowtime as 'windowTime,
162
'salary.avg as 'avgSalary,
163
'employee.count as 'employeeCount
164
)
165
166
// Use window properties in filtering
167
val recentWindows = windowedStats
168
.filter('windowEnd > (currentTimestamp() - 2.hours))
169
```
170
171
### Over Windows
172
173
Row-based windows for analytical functions and running calculations.
174
175
```scala { .api }
176
/**
177
* Over window specification for analytical functions
178
* @param partitionBy Partitioning expressions
179
* @param orderBy Ordering expression
180
* @param preceding Frame start (rows or range before current)
181
* @param following Frame end (rows or range after current)
182
*/
183
case class OverWindow(
184
partitionBy: Seq[Expression],
185
orderBy: Expression,
186
preceding: Expression,
187
following: Expression
188
)
189
190
/**
191
* Over window builder starting with OVER keyword
192
*/
193
object Over {
194
/**
195
* Partitions the over window by specified fields
196
* @param fields Partitioning field expressions
197
* @returns Partial over window specification
198
*/
199
def partitionBy(fields: Expression*): OverWindowWithPartitioning
200
201
/**
202
* Orders the over window by specified field
203
* @param field Ordering field expression
204
* @returns Partial over window specification
205
*/
206
def orderBy(field: Expression): OverWindowWithOrdering
207
}
208
209
case class OverWindowWithPartitioning(partitionBy: Seq[Expression]) {
210
/**
211
* Orders the partitioned over window
212
* @param field Ordering field expression
213
* @returns Over window with partitioning and ordering
214
*/
215
def orderBy(field: Expression): OverWindowWithPartitioningAndOrdering
216
}
217
218
case class OverWindowWithOrdering(orderBy: Expression) {
219
/**
220
* Specifies the preceding frame boundary
221
* @param preceding Frame start boundary
222
* @returns Over window with ordering and preceding
223
*/
224
def preceding(preceding: Expression): OverWindowWithPreceding
225
}
226
227
case class OverWindowWithPartitioningAndOrdering(partitionBy: Seq[Expression], orderBy: Expression) {
228
/**
229
* Specifies the preceding frame boundary
230
* @param preceding Frame start boundary
231
* @returns Over window with partitioning, ordering, and preceding
232
*/
233
def preceding(preceding: Expression): OverWindowWithPreceding
234
}
235
236
case class OverWindowWithPreceding(/* fields */) {
237
/**
238
* Specifies the following frame boundary
239
* @param following Frame end boundary
240
* @returns Complete over window specification
241
*/
242
def following(following: Expression): OverWindow
243
244
/**
245
* Assigns an alias to the over window
246
* @param alias Window alias
247
* @returns Complete over window specification with alias
248
*/
249
def as(alias: Expression): OverWindow
250
}
251
252
// Frame boundary constants
253
object FrameBoundary {
254
val UNBOUNDED_PRECEDING: Expression = ???
255
val UNBOUNDED_FOLLOWING: Expression = ???
256
val CURRENT_ROW: Expression = ???
257
val CURRENT_RANGE: Expression = ???
258
}
259
```
260
261
**Usage Examples:**
262
263
```scala
264
import org.apache.flink.table.api.Over
265
import org.apache.flink.table.api.FrameBoundary._
266
267
// Running sum over all previous rows in partition
268
val runningSum = table
269
.window(Over partitionBy 'department orderBy 'salary.desc preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'w)
270
.select('employee, 'department, 'salary, 'salary.sum over 'w as 'runningSalary)
271
272
// Moving average over last 3 rows
273
val movingAvg = table
274
.window(Over partitionBy 'department orderBy 'date.asc preceding 2.rows following CURRENT_ROW as 'w)
275
.select('employee, 'date, 'sales, 'sales.avg over 'w as 'movingAvgSales)
276
277
// Ranking within partition
278
val ranking = table
279
.window(Over partitionBy 'department orderBy 'salary.desc preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'w)
280
.select('employee, 'department, 'salary, row_number() over 'w as 'rank)
281
282
// Multiple over windows
283
val analytics = table
284
.window(
285
Over partitionBy 'department orderBy 'salary.desc preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'salaryWindow,
286
Over partitionBy 'department orderBy 'hireDate.asc preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'timeWindow
287
)
288
.select(
289
'employee,
290
'salary,
291
'salary.sum over 'salaryWindow as 'totalDeptSalary,
292
row_number() over 'salaryWindow as 'salaryRank,
293
row_number() over 'timeWindow as 'seniorityRank
294
)
295
```
296
297
### Windowed Table Operations
298
299
Operations available on windowed tables for grouping and aggregation.
300
301
```scala { .api }
302
/**
303
* Table with applied window specification
304
*/
305
class WindowedTable {
306
/**
307
* Groups the windowed table by specified fields
308
* @param fields Grouping field expressions (usually includes window alias)
309
* @returns Grouped windowed table for aggregation
310
*/
311
def groupBy(fields: Expression*): WindowGroupedTable
312
}
313
314
/**
315
* Windowed table after grouping, ready for aggregation
316
*/
317
class WindowGroupedTable {
318
/**
319
* Selects aggregated results from windowed groups
320
* @param fields Field expressions including aggregations and window properties
321
* @returns Aggregated table
322
*/
323
def select(fields: Expression*): Table
324
}
325
326
/**
327
* Table with applied over windows
328
*/
329
class OverWindowedTable {
330
/**
331
* Selects fields with over window functions applied
332
* @param fields Field expressions including over window functions
333
* @returns Table with over window calculations
334
*/
335
def select(fields: Expression*): Table
336
}
337
```
338
339
**Usage Examples:**
340
341
```scala
342
// Complex windowed aggregation
343
val complexWindowed = table
344
.window(Tumble over 15.minutes on 'eventTime as 'w)
345
.groupBy('w, 'userId, 'category)
346
.select(
347
'userId,
348
'category,
349
'w.start as 'windowStart,
350
'w.end as 'windowEnd,
351
'amount.sum as 'totalAmount,
352
'amount.avg as 'avgAmount,
353
'amount.min as 'minAmount,
354
'amount.max as 'maxAmount,
355
'transactionId.count as 'transactionCount,
356
'transactionId.countDistinct as 'uniqueTransactions
357
)
358
359
// Over window with multiple analytical functions
360
val analyticalResult = table
361
.window(Over partitionBy 'category orderBy 'amount.desc preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'w)
362
.select(
363
'transactionId,
364
'category,
365
'amount,
366
'amount.sum over 'w as 'categoryTotal,
367
'amount.avg over 'w as 'categoryAvg,
368
row_number() over 'w as 'amountRank,
369
rank() over 'w as 'amountRankWithTies,
370
dense_rank() over 'w as 'amountDenseRank,
371
percent_rank() over 'w as 'amountPercentRank
372
)
373
```
374
375
### Time Attributes
376
377
Special timestamp fields for defining event time and processing time.
378
379
```scala { .api }
380
/**
381
* Methods for defining time attributes in table schemas
382
*/
383
object TimeAttributes {
384
/**
385
* Defines a rowtime attribute for event time processing
386
* @param field Timestamp field expression
387
* @returns Rowtime attribute expression
388
*/
389
def rowtime(field: Expression): Expression
390
391
/**
392
* Defines a processing time attribute
393
* @returns Processing time attribute expression
394
*/
395
def proctime(): Expression
396
}
397
```
398
399
**Usage Examples:**
400
401
```scala
402
// Define table with time attributes
403
val tableWithTime = table
404
.select('userId, 'amount, 'eventTimestamp.rowtime as 'rowtime, proctime() as 'proctime)
405
406
// Register table source with time attributes
407
val sourceWithTime = new StreamTableSource[Row] {
408
override def getTableSchema: TableSchema = {
409
new TableSchema(
410
Array("userId", "amount", "eventTime", "proctime"),
411
Array(Types.LONG, Types.DOUBLE, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP)
412
)
413
}
414
415
override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
416
execEnv.addSource(/* source */)
417
.assignTimestampsAndWatermarks(/* watermark strategy */)
418
}
419
}
420
421
// Use time attributes in window operations
422
val eventTimeWindows = tableWithTime
423
.window(Tumble over 1.hour on 'rowtime as 'w)
424
.groupBy('w, 'userId)
425
.select('userId, 'w.start, 'amount.sum)
426
427
val procTimeWindows = tableWithTime
428
.window(Tumble over 1.hour on 'proctime as 'w)
429
.groupBy('w, 'userId)
430
.select('userId, 'w.start, 'amount.count)
431
```
432
433
### Window SQL Support
434
435
SQL syntax for window operations and time functions.
436
437
```scala { .api }
438
// Available window functions in SQL
439
val sqlWindowed = tEnv.sqlQuery("""
440
SELECT
441
userId,
442
TUMBLE_START(rowtime, INTERVAL '1' HOUR) as window_start,
443
TUMBLE_END(rowtime, INTERVAL '1' HOUR) as window_end,
444
SUM(amount) as total_amount,
445
COUNT(*) as transaction_count
446
FROM Transactions
447
GROUP BY
448
userId,
449
TUMBLE(rowtime, INTERVAL '1' HOUR)
450
""")
451
452
val sqlOverWindow = tEnv.sqlQuery("""
453
SELECT
454
userId,
455
amount,
456
SUM(amount) OVER (
457
PARTITION BY userId
458
ORDER BY eventTime
459
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
460
) as running_total,
461
ROW_NUMBER() OVER (
462
PARTITION BY userId
463
ORDER BY amount DESC
464
) as amount_rank
465
FROM Transactions
466
""")
467
```
468
469
## Watermarks and Late Data Handling
470
471
Configuration for handling out-of-order events and late arrivals.
472
473
```scala { .api }
474
/**
475
* Watermark strategies for event time processing
476
*/
477
trait WatermarkStrategy {
478
def extractTimestamp(element: Row, recordTimestamp: Long): Long
479
def onPeriodicEmit(watermarkOutput: WatermarkOutput): Unit
480
}
481
482
/**
483
* Configuration for late data handling
484
*/
485
case class LatentConfig(
486
allowedLateness: Time,
487
sideOutputTag: OutputTag[Row]
488
)
489
```
490
491
**Usage Examples:**
492
493
```scala
494
// Configure watermarks and late data handling
495
val tableWithWatermarks = table
496
.select('userId, 'amount, 'eventTime.rowtime as 'rowtime)
497
.where('eventTime > (currentTimestamp() - 1.day)) // Filter very old events
498
499
// Window with allowed lateness
500
val lateDataHandling = tableWithWatermarks
501
.window(Tumble over 1.hour on 'rowtime as 'w)
502
.allowedLateness(5.minutes) // Allow 5 minutes of lateness
503
.groupBy('w, 'userId)
504
.select('userId, 'w.start, 'amount.sum)
505
```
506
507
## Types
508
509
```scala { .api }
510
sealed trait Window
511
case class TumbleWithSize(size: Expression) extends Window
512
case class TumbleWithSizeOnTime(size: Expression, timeField: Expression) extends Window
513
case class TumbleWithSizeOnTimeWithAlias(size: Expression, timeField: Expression, alias: Expression) extends Window
514
515
case class SlideWithSize(size: Expression) extends Window
516
case class SlideWithSizeAndSlide(size: Expression, slide: Expression) extends Window
517
case class SlideWithSizeAndSlideOnTime(size: Expression, slide: Expression, timeField: Expression) extends Window
518
case class SlideWithSizeAndSlideOnTimeWithAlias(size: Expression, slide: Expression, timeField: Expression, alias: Expression) extends Window
519
520
case class SessionWithGap(gap: Expression) extends Window
521
case class SessionWithGapOnTime(gap: Expression, timeField: Expression) extends Window
522
case class SessionWithGapOnTimeWithAlias(gap: Expression, timeField: Expression, alias: Expression) extends Window
523
524
case class OverWindow(partitionBy: Seq[Expression], orderBy: Expression, preceding: Expression, following: Expression)
525
case class OverWindowWithAlias(window: OverWindow, alias: Expression)
526
527
class WindowedTable
528
class WindowGroupedTable
529
class OverWindowedTable
530
531
trait WindowProperty extends Expression
532
trait WatermarkStrategy
533
case class LatentConfig(allowedLateness: Time, sideOutputTag: OutputTag[Row])
534
```