0
# Windowing System
1
2
Complete windowing system for time-based and count-based data aggregation, supporting event time and processing time semantics with customizable triggers and evictors.
3
4
## Capabilities
5
6
### Window Assigners
7
8
Window assigners determine which windows elements belong to.
9
10
```java { .api }
11
/**
12
* Base window assigner interface
13
* @param <T> Element type
14
* @param <W> Window type
15
*/
16
abstract class WindowAssigner<T, W extends Window> {
17
/**
18
* Assign windows to element
19
* @param element Element
20
* @param timestamp Element timestamp
21
* @param context Window assigner context
22
* @return Collection of windows
23
*/
24
public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
25
26
/**
27
* Get default trigger
28
* @param env Stream execution environment
29
* @return Default trigger
30
*/
31
public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
32
33
/**
34
* Get window serializer
35
* @param executionConfig Execution configuration
36
* @return Window serializer
37
*/
38
public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
39
}
40
41
/**
42
* Tumbling event time windows
43
*/
44
class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
45
/**
46
* Create tumbling windows with size
47
* @param size Window size
48
* @return Window assigner
49
*/
50
public static TumblingEventTimeWindows of(Time size);
51
52
/**
53
* Create tumbling windows with size and offset
54
* @param size Window size
55
* @param offset Window offset
56
* @return Window assigner
57
*/
58
public static TumblingEventTimeWindows of(Time size, Time offset);
59
}
60
61
/**
62
* Sliding event time windows
63
*/
64
class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
65
/**
66
* Create sliding windows
67
* @param size Window size
68
* @param slide Slide interval
69
* @return Window assigner
70
*/
71
public static SlidingEventTimeWindows of(Time size, Time slide);
72
73
/**
74
* Create sliding windows with offset
75
* @param size Window size
76
* @param slide Slide interval
77
* @param offset Window offset
78
* @return Window assigner
79
*/
80
public static SlidingEventTimeWindows of(Time size, Time slide, Time offset);
81
}
82
83
/**
84
* Event time session windows
85
*/
86
class EventTimeSessionWindows extends WindowAssigner<Object, TimeWindow> {
87
/**
88
* Create session windows with gap
89
* @param sessionTimeout Session timeout
90
* @return Window assigner
91
*/
92
public static EventTimeSessionWindows withGap(Time sessionTimeout);
93
94
/**
95
* Create dynamic session windows
96
* @param sessionWindowTimeGapExtractor Gap extractor function
97
* @param <T> Element type
98
* @return Window assigner
99
*/
100
public static <T> EventTimeSessionWindows withDynamicGap(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor);
101
}
102
```
103
104
### Window Triggers
105
106
Triggers determine when window computation should be performed.
107
108
```java { .api }
109
/**
110
* Base trigger class
111
* @param <T> Element type
112
* @param <W> Window type
113
*/
114
abstract class Trigger<T, W extends Window> {
115
/**
116
* Called when element is added to window
117
* @param element Element
118
* @param timestamp Element timestamp
119
* @param window Window
120
* @param ctx Trigger context
121
* @return Trigger result
122
* @throws Exception
123
*/
124
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
125
126
/**
127
* Called when processing time timer fires
128
* @param time Timer timestamp
129
* @param window Window
130
* @param ctx Trigger context
131
* @return Trigger result
132
* @throws Exception
133
*/
134
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
135
136
/**
137
* Called when event time timer fires
138
* @param time Timer timestamp
139
* @param window Window
140
* @param ctx Trigger context
141
* @return Trigger result
142
* @throws Exception
143
*/
144
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
145
146
/**
147
* Clear trigger state
148
* @param window Window
149
* @param ctx Trigger context
150
* @throws Exception
151
*/
152
public abstract void clear(W window, TriggerContext ctx) throws Exception;
153
}
154
155
/**
156
* Trigger result enumeration
157
*/
158
enum TriggerResult {
159
/** Continue without action */
160
CONTINUE,
161
/** Fire window computation */
162
FIRE,
163
/** Purge window contents */
164
PURGE,
165
/** Fire and purge */
166
FIRE_AND_PURGE
167
}
168
169
/**
170
* Continuous processing time trigger
171
*/
172
class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<Object, W> {
173
/**
174
* Create trigger with interval
175
* @param interval Trigger interval
176
* @param <W> Window type
177
* @return Trigger instance
178
*/
179
public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(Time interval);
180
}
181
182
/**
183
* Delta trigger fires when element differs from last by threshold
184
* @param <T> Element type
185
* @param <W> Window type
186
*/
187
class DeltaTrigger<T, W extends Window> extends Trigger<T, W> {
188
/**
189
* Create delta trigger
190
* @param threshold Delta threshold
191
* @param deltaFunction Delta calculation function
192
* @param typeInfo Type information
193
* @param <T> Element type
194
* @param <W> Window type
195
* @return Trigger instance
196
*/
197
public static <T, W extends Window> DeltaTrigger<T, W> of(double threshold, DeltaFunction<T> deltaFunction, TypeInformation<T> typeInfo);
198
}
199
```
200
201
### Window Evictors
202
203
Evictors remove elements from windows before or after window computation.
204
205
```java { .api }
206
/**
207
* Base evictor interface
208
* @param <T> Element type
209
* @param <W> Window type
210
*/
211
interface Evictor<T, W extends Window> {
212
/**
213
* Evict elements before window function
214
* @param elements Window elements
215
* @param size Number of elements
216
* @param window Window
217
* @param evictorContext Evictor context
218
*/
219
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
220
221
/**
222
* Evict elements after window function
223
* @param elements Window elements
224
* @param size Number of elements
225
* @param window Window
226
* @param evictorContext Evictor context
227
*/
228
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
229
}
230
231
/**
232
* Time-based evictor
233
* @param <W> Window type
234
*/
235
class TimeEvictor<W extends Window> implements Evictor<Object, W> {
236
/**
237
* Create time evictor
238
* @param windowSize Window size to keep
239
* @return Time evictor
240
*/
241
public static <W extends Window> TimeEvictor<W> of(Time windowSize);
242
243
/**
244
* Create time evictor with processing time
245
* @param windowSize Window size
246
* @param doEvictAfter Whether to evict after window function
247
* @return Time evictor
248
*/
249
public static <W extends Window> TimeEvictor<W> of(Time windowSize, boolean doEvictAfter);
250
}
251
252
/**
253
* Count-based evictor
254
* @param <W> Window type
255
*/
256
class CountEvictor<W extends Window> implements Evictor<Object, W> {
257
/**
258
* Create count evictor
259
* @param maxCount Maximum elements to keep
260
* @return Count evictor
261
*/
262
public static <W extends Window> CountEvictor<W> of(long maxCount);
263
264
/**
265
* Create count evictor
266
* @param maxCount Maximum elements
267
* @param doEvictAfter Whether to evict after window function
268
* @return Count evictor
269
*/
270
public static <W extends Window> CountEvictor<W> of(long maxCount, boolean doEvictAfter);
271
}
272
273
/**
274
* Delta-based evictor
275
* @param <T> Element type
276
* @param <W> Window type
277
*/
278
class DeltaEvictor<T, W extends Window> implements Evictor<T, W> {
279
/**
280
* Create delta evictor
281
* @param threshold Delta threshold
282
* @param deltaFunction Delta function
283
* @return Delta evictor
284
*/
285
public static <T, W extends Window> DeltaEvictor<T, W> of(double threshold, DeltaFunction<T> deltaFunction);
286
}
287
```
288
289
### Window Functions
290
291
Functions for processing windowed data.
292
293
```java { .api }
294
/**
295
* Window function interface
296
* @param <IN> Input type
297
* @param <OUT> Output type
298
* @param <KEY> Key type
299
* @param <W> Window type
300
*/
301
interface WindowFunction<IN, OUT, KEY, W extends Window> {
302
/**
303
* Process window contents
304
* @param key Window key
305
* @param window Window
306
* @param input Window elements
307
* @param out Result collector
308
* @throws Exception
309
*/
310
void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
311
}
312
313
/**
314
* Process window function with context
315
* @param <IN> Input type
316
* @param <OUT> Output type
317
* @param <KEY> Key type
318
* @param <W> Window type
319
*/
320
abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> {
321
/**
322
* Process window with context
323
* @param key Window key
324
* @param context Process context
325
* @param elements Window elements
326
* @param out Result collector
327
* @throws Exception
328
*/
329
public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
330
331
/**
332
* Process window context
333
*/
334
public abstract class Context {
335
/**
336
* Get current window
337
* @return Current window
338
*/
339
public abstract W window();
340
341
/**
342
* Get current processing time
343
* @return Processing time
344
*/
345
public abstract long currentProcessingTime();
346
347
/**
348
* Get current event time
349
* @return Event time
350
*/
351
public abstract long currentWatermark();
352
353
/**
354
* Get global state
355
* @param stateDescriptor State descriptor
356
* @param <S> State type
357
* @return Global state
358
*/
359
public abstract <S extends State> S globalState(StateDescriptor<S, ?> stateDescriptor);
360
361
/**
362
* Get window state
363
* @param stateDescriptor State descriptor
364
* @param <S> State type
365
* @return Window state
366
*/
367
public abstract <S extends State> S windowState(StateDescriptor<S, ?> stateDescriptor);
368
}
369
}
370
```
371
372
### Time Window
373
374
Time-based window implementation.
375
376
```java { .api }
377
/**
378
* Time window implementation
379
*/
380
class TimeWindow extends Window {
381
/**
382
* Create time window
383
* @param start Start timestamp
384
* @param end End timestamp
385
*/
386
public TimeWindow(long start, long end);
387
388
/**
389
* Get window start time
390
* @return Start timestamp
391
*/
392
public long getStart();
393
394
/**
395
* Get window end time
396
* @return End timestamp
397
*/
398
public long getEnd();
399
400
/**
401
* Get maximum timestamp in window
402
* @return Maximum timestamp
403
*/
404
public long maxTimestamp();
405
406
/**
407
* Check if timestamp intersects window
408
* @param timestamp Timestamp to check
409
* @return true if intersects
410
*/
411
public boolean intersects(TimeWindow other);
412
413
/**
414
* Get window center timestamp
415
* @return Center timestamp
416
*/
417
public long getCenter();
418
}
419
```
420
421
### Context Types
422
423
Context and utility types used by the windowing system.
424
425
```java { .api }
426
/**
427
* Window assigner context
428
*/
429
interface WindowAssignerContext {
430
/**
431
* Get current processing time
432
* @return Current processing time
433
*/
434
long getCurrentProcessingTime();
435
}
436
437
/**
438
* Trigger context interface
439
*/
440
interface TriggerContext {
441
/**
442
* Get current processing time
443
* @return Current processing time
444
*/
445
long getCurrentProcessingTime();
446
447
/**
448
* Get current watermark
449
* @return Current watermark
450
*/
451
long getCurrentWatermark();
452
453
/**
454
* Register processing time timer
455
* @param time Timer timestamp
456
*/
457
void registerProcessingTimeTimer(long time);
458
459
/**
460
* Register event time timer
461
* @param time Timer timestamp
462
*/
463
void registerEventTimeTimer(long time);
464
465
/**
466
* Delete processing time timer
467
* @param time Timer timestamp
468
*/
469
void deleteProcessingTimeTimer(long time);
470
471
/**
472
* Delete event time timer
473
* @param time Timer timestamp
474
*/
475
void deleteEventTimeTimer(long time);
476
477
/**
478
* Get partitioned state
479
* @param stateDescriptor State descriptor
480
* @param <S> State type
481
* @return Partitioned state
482
*/
483
<S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);
484
}
485
486
/**
487
* Evictor context interface
488
*/
489
interface EvictorContext {
490
/**
491
* Get current processing time
492
* @return Current processing time
493
*/
494
long getCurrentProcessingTime();
495
496
/**
497
* Get current watermark
498
* @return Current watermark
499
*/
500
long getCurrentWatermark();
501
}
502
503
/**
504
* Timestamped value wrapper
505
* @param <T> Value type
506
*/
507
class TimestampedValue<T> {
508
/**
509
* Create timestamped value
510
* @param value Value
511
* @param timestamp Timestamp
512
*/
513
public TimestampedValue(T value, long timestamp);
514
515
/**
516
* Get value
517
* @return Value
518
*/
519
public T getValue();
520
521
/**
522
* Get timestamp
523
* @return Timestamp
524
*/
525
public long getTimestamp();
526
527
/**
528
* Check if has timestamp
529
* @return true if has timestamp
530
*/
531
public boolean hasTimestamp();
532
}
533
534
/**
535
* Delta function interface
536
* @param <DATA> Data type
537
*/
538
interface DeltaFunction<DATA> {
539
/**
540
* Calculate delta between two data points
541
* @param oldDataPoint Old data point
542
* @param newDataPoint New data point
543
* @return Delta value
544
*/
545
double getDelta(DATA oldDataPoint, DATA newDataPoint);
546
}
547
548
/**
549
* Session window time gap extractor
550
* @param <T> Element type
551
*/
552
interface SessionWindowTimeGapExtractor<T> {
553
/**
554
* Extract session timeout for element
555
* @param element Element
556
* @return Session timeout in milliseconds
557
*/
558
long extract(T element);
559
}
560
```
561
562
### Base Window Class
563
564
Base window class for all window types.
565
566
```java { .api }
567
/**
568
* Base window class
569
*/
570
abstract class Window {
571
/**
572
* Get maximum timestamp that belongs to this window
573
* @return Maximum timestamp
574
*/
575
public abstract long maxTimestamp();
576
}
577
```