0
# Window Operations
1
2
Window operations provide specialized functionality for reading and writing window state data. The State Processor API supports both regular windows and evicting windows with different aggregation strategies.
3
4
## Window Reader Overview
5
6
The `WindowReader` class provides entry points for reading window state from savepoints.
7
8
```java { .api }
9
public class WindowReader<W extends Window> {
10
public EvictingWindowReader<W> evictor();
11
12
public <T, K> DataSource<T> reduce(
13
String uid,
14
ReduceFunction<T> function,
15
TypeInformation<K> keyType,
16
TypeInformation<T> reduceType
17
) throws IOException;
18
19
public <K, T, OUT> DataSource<OUT> reduce(
20
String uid,
21
ReduceFunction<T> function,
22
WindowReaderFunction<T, OUT, K, W> readerFunction,
23
TypeInformation<K> keyType,
24
TypeInformation<T> reduceType,
25
TypeInformation<OUT> outputType
26
) throws IOException;
27
28
public <K, T, ACC, R> DataSource<R> aggregate(
29
String uid,
30
AggregateFunction<T, ACC, R> aggregateFunction,
31
TypeInformation<K> keyType,
32
TypeInformation<ACC> accType,
33
TypeInformation<R> outputType
34
) throws IOException;
35
36
public <K, T, ACC, R, OUT> DataSource<OUT> aggregate(
37
String uid,
38
AggregateFunction<T, ACC, R> aggregateFunction,
39
WindowReaderFunction<R, OUT, K, W> readerFunction,
40
TypeInformation<K> keyType,
41
TypeInformation<ACC> accType,
42
TypeInformation<OUT> outputType
43
) throws IOException;
44
45
public <K, T, OUT> DataSource<OUT> process(
46
String uid,
47
WindowReaderFunction<T, OUT, K, W> readerFunction,
48
TypeInformation<K> keyType,
49
TypeInformation<T> stateType,
50
TypeInformation<OUT> outputType
51
) throws IOException;
52
}
53
```
54
55
## Creating Window Readers
56
57
### Using Window Assigners
58
59
```java
60
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
61
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
62
import org.apache.flink.streaming.api.windowing.assigners.SessionWindows;
63
64
// Create window reader with tumbling windows
65
WindowReader<TimeWindow> tumblingReader = savepoint.window(
66
TumblingEventTimeWindows.of(Duration.ofMinutes(5))
67
);
68
69
// Create window reader with sliding windows
70
WindowReader<TimeWindow> slidingReader = savepoint.window(
71
SlidingEventTimeWindows.of(Duration.ofMinutes(10), Duration.ofMinutes(2))
72
);
73
74
// Create window reader with session windows
75
WindowReader<TimeWindow> sessionReader = savepoint.window(
76
SessionWindows.withGap(Duration.ofMinutes(30))
77
);
78
```
79
80
### Using Window Serializers
81
82
```java
83
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
84
85
// Create window reader with explicit serializer
86
TypeSerializer<TimeWindow> windowSerializer = new TimeWindow.Serializer();
87
WindowReader<TimeWindow> reader = savepoint.window(windowSerializer);
88
```
89
90
## Reading Reduce Windows
91
92
### Simple Reduce Reading
93
94
```java
95
// Read window state created with ReduceFunction
96
DataSource<Integer> windowSums = reader.reduce(
97
"sum-window-operator",
98
new SumReduceFunction(), // The original reduce function
99
Types.STRING, // Key type
100
Types.INT // Value type
101
);
102
103
windowSums.print();
104
```
105
106
### Reduce with Window Reader Function
107
108
```java
109
public class WindowSummaryReader implements WindowReaderFunction<Integer, WindowSummary, String, TimeWindow> {
110
111
@Override
112
public void readWindow(
113
String key,
114
Context context,
115
Iterable<Integer> elements,
116
Collector<WindowSummary> out
117
) throws Exception {
118
119
TimeWindow window = context.window();
120
121
// Process reduced result (should be single element for reduce)
122
Integer sum = elements.iterator().next();
123
124
WindowSummary summary = new WindowSummary(
125
key,
126
window.getStart(),
127
window.getEnd(),
128
sum,
129
1 // Reduce produces single value
130
);
131
132
out.collect(summary);
133
}
134
}
135
136
// Use with reduce reader
137
DataSource<WindowSummary> summaries = reader.reduce(
138
"sum-window-operator",
139
new SumReduceFunction(),
140
new WindowSummaryReader(),
141
Types.STRING, // Key type
142
Types.INT, // Reduce type
143
TypeInformation.of(WindowSummary.class) // Output type
144
);
145
```
146
147
## Reading Aggregate Windows
148
149
### Simple Aggregate Reading
150
151
```java
152
public class AverageAggregateFunction implements AggregateFunction<Double, AverageAccumulator, Double> {
153
154
@Override
155
public AverageAccumulator createAccumulator() {
156
return new AverageAccumulator();
157
}
158
159
@Override
160
public AverageAccumulator add(Double value, AverageAccumulator accumulator) {
161
accumulator.sum += value;
162
accumulator.count++;
163
return accumulator;
164
}
165
166
@Override
167
public Double getResult(AverageAccumulator accumulator) {
168
return accumulator.count == 0 ? 0.0 : accumulator.sum / accumulator.count;
169
}
170
171
@Override
172
public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
173
a.sum += b.sum;
174
a.count += b.count;
175
return a;
176
}
177
}
178
179
// Read aggregated window results
180
DataSource<Double> windowAverages = reader.aggregate(
181
"average-window-operator",
182
new AverageAggregateFunction(),
183
Types.STRING, // Key type
184
TypeInformation.of(AverageAccumulator.class), // Accumulator type
185
Types.DOUBLE // Result type
186
);
187
```
188
189
### Aggregate with Window Reader Function
190
191
```java
192
public class AggregateWindowReader implements WindowReaderFunction<Double, AggregateResult, String, TimeWindow> {
193
194
@Override
195
public void readWindow(
196
String key,
197
Context context,
198
Iterable<Double> elements,
199
Collector<AggregateResult> out
200
) throws Exception {
201
202
TimeWindow window = context.window();
203
Double average = elements.iterator().next(); // Single aggregated result
204
205
AggregateResult result = new AggregateResult(
206
key,
207
window.getStart(),
208
window.getEnd(),
209
average,
210
"AVERAGE"
211
);
212
213
out.collect(result);
214
}
215
}
216
217
// Use with aggregate reader
218
DataSource<AggregateResult> results = reader.aggregate(
219
"average-window-operator",
220
new AverageAggregateFunction(),
221
new AggregateWindowReader(),
222
Types.STRING, // Key type
223
TypeInformation.of(AverageAccumulator.class), // Accumulator type
224
TypeInformation.of(AggregateResult.class) // Output type
225
);
226
```
227
228
## Reading Process Windows
229
230
Process windows handle raw window contents without pre-aggregation.
231
232
```java
233
public class ProcessWindowReader implements WindowReaderFunction<SensorReading, WindowAnalysis, String, TimeWindow> {
234
235
@Override
236
public void readWindow(
237
String sensorId,
238
Context context,
239
Iterable<SensorReading> readings,
240
Collector<WindowAnalysis> out
241
) throws Exception {
242
243
TimeWindow window = context.window();
244
List<SensorReading> readingList = new ArrayList<>();
245
readings.forEach(readingList::add);
246
247
if (!readingList.isEmpty()) {
248
// Analyze raw window contents
249
double min = readingList.stream().mapToDouble(SensorReading::getValue).min().orElse(0.0);
250
double max = readingList.stream().mapToDouble(SensorReading::getValue).max().orElse(0.0);
251
double avg = readingList.stream().mapToDouble(SensorReading::getValue).average().orElse(0.0);
252
253
WindowAnalysis analysis = new WindowAnalysis(
254
sensorId,
255
window.getStart(),
256
window.getEnd(),
257
readingList.size(),
258
min,
259
max,
260
avg,
261
calculateTrend(readingList)
262
);
263
264
out.collect(analysis);
265
}
266
}
267
268
private String calculateTrend(List<SensorReading> readings) {
269
if (readings.size() < 2) return "STABLE";
270
271
double first = readings.get(0).getValue();
272
double last = readings.get(readings.size() - 1).getValue();
273
274
if (last > first * 1.1) return "INCREASING";
275
if (last < first * 0.9) return "DECREASING";
276
return "STABLE";
277
}
278
}
279
280
// Read process window state
281
DataSource<WindowAnalysis> analyses = reader.process(
282
"sensor-window-operator",
283
new ProcessWindowReader(),
284
Types.STRING, // Key type (sensor ID)
285
TypeInformation.of(SensorReading.class), // Element type
286
TypeInformation.of(WindowAnalysis.class) // Output type
287
);
288
```
289
290
## Evicting Window Operations
291
292
Evicting windows use evictors to remove elements from windows before or after window processing.
293
294
```java { .api }
295
public class EvictingWindowReader<W extends Window> {
296
public <T, K> DataSource<T> reduce(
297
String uid,
298
ReduceFunction<T> function,
299
TypeInformation<K> keyType,
300
TypeInformation<T> reduceType
301
) throws IOException;
302
303
public <K, T, OUT> DataSource<OUT> reduce(
304
String uid,
305
ReduceFunction<T> function,
306
WindowReaderFunction<Iterable<T>, OUT, K, W> readerFunction,
307
TypeInformation<K> keyType,
308
TypeInformation<T> reduceType,
309
TypeInformation<OUT> outputType
310
) throws IOException;
311
312
public <K, T, ACC, R> DataSource<R> aggregate(
313
String uid,
314
AggregateFunction<T, ACC, R> aggregateFunction,
315
TypeInformation<K> keyType,
316
TypeInformation<ACC> accType,
317
TypeInformation<R> outputType
318
) throws IOException;
319
320
public <K, T, ACC, R, OUT> DataSource<OUT> aggregate(
321
String uid,
322
AggregateFunction<T, ACC, R> aggregateFunction,
323
WindowReaderFunction<Iterable<R>, OUT, K, W> readerFunction,
324
TypeInformation<K> keyType,
325
TypeInformation<ACC> accType,
326
TypeInformation<OUT> outputType
327
) throws IOException;
328
329
public <K, T, OUT> DataSource<OUT> process(
330
String uid,
331
WindowReaderFunction<Iterable<T>, OUT, K, W> readerFunction,
332
TypeInformation<K> keyType,
333
TypeInformation<T> stateType,
334
TypeInformation<OUT> outputType
335
) throws IOException;
336
}
337
```
338
339
### Reading Evicting Windows
340
341
```java
342
// Get evicting window reader
343
EvictingWindowReader<TimeWindow> evictingReader = reader.evictor();
344
345
// Read evicting window with process function
346
public class EvictingProcessReader implements WindowReaderFunction<Iterable<Event>, EventSummary, String, TimeWindow> {
347
348
@Override
349
public void readWindow(
350
String key,
351
Context context,
352
Iterable<Iterable<Event>> elements, // Note: Iterable of Iterable for evicting windows
353
Collector<EventSummary> out
354
) throws Exception {
355
356
TimeWindow window = context.window();
357
List<Event> allEvents = new ArrayList<>();
358
359
// Flatten the nested iterables
360
for (Iterable<Event> eventGroup : elements) {
361
eventGroup.forEach(allEvents::add);
362
}
363
364
if (!allEvents.isEmpty()) {
365
EventSummary summary = new EventSummary(
366
key,
367
window.getStart(),
368
window.getEnd(),
369
allEvents.size(),
370
allEvents
371
);
372
373
out.collect(summary);
374
}
375
}
376
}
377
378
DataSource<EventSummary> evictingSummaries = evictingReader.process(
379
"evicting-window-operator",
380
new EvictingProcessReader(),
381
Types.STRING, // Key type
382
TypeInformation.of(Event.class), // Element type
383
TypeInformation.of(EventSummary.class) // Output type
384
);
385
```
386
387
## Window Bootstrap Operations
388
389
While the main focus is on reading windows, you can also bootstrap window state.
390
391
### Creating Window Bootstrap Transformation
392
393
```java
394
public class WindowBootstrapFunction extends KeyedStateBootstrapFunction<String, TimestampedEvent> {
395
private WindowState<TimeWindow> windowState;
396
397
@Override
398
public void open(Configuration parameters) throws Exception {
399
super.open(parameters);
400
401
// Register window state
402
WindowStateDescriptor<TimeWindow> windowDesc = new WindowStateDescriptor<>(
403
"window", new TimeWindow.Serializer()
404
);
405
windowState = getRuntimeContext().getWindowState(windowDesc);
406
}
407
408
@Override
409
public void processElement(TimestampedEvent event, Context ctx) throws Exception {
410
// Assign to window based on timestamp
411
long timestamp = event.getTimestamp();
412
long windowStart = timestamp - (timestamp % Duration.ofMinutes(5).toMillis());
413
long windowEnd = windowStart + Duration.ofMinutes(5).toMillis();
414
415
TimeWindow window = new TimeWindow(windowStart, windowEnd);
416
417
// Add event to window
418
windowState.add(window, event);
419
420
// Set timer for window end
421
ctx.timerService().registerEventTimeTimer(windowEnd);
422
}
423
}
424
```
425
426
## Complex Window Analysis
427
428
### Multi-Window Analysis
429
430
```java
431
public class MultiWindowAnalyzer implements WindowReaderFunction<MetricEvent, MultiWindowResult, String, TimeWindow> {
432
433
@Override
434
public void readWindow(
435
String metricName,
436
Context context,
437
Iterable<MetricEvent> events,
438
Collector<MultiWindowResult> out
439
) throws Exception {
440
441
TimeWindow window = context.window();
442
List<MetricEvent> eventList = new ArrayList<>();
443
events.forEach(eventList::add);
444
445
if (eventList.isEmpty()) return;
446
447
// Calculate multiple statistics
448
DoubleSummaryStatistics stats = eventList.stream()
449
.mapToDouble(MetricEvent::getValue)
450
.summaryStatistics();
451
452
// Calculate percentiles
453
List<Double> sortedValues = eventList.stream()
454
.mapToDouble(MetricEvent::getValue)
455
.sorted()
456
.boxed()
457
.collect(Collectors.toList());
458
459
double p50 = calculatePercentile(sortedValues, 0.5);
460
double p95 = calculatePercentile(sortedValues, 0.95);
461
double p99 = calculatePercentile(sortedValues, 0.99);
462
463
// Detect anomalies
464
List<MetricEvent> anomalies = detectAnomalies(eventList, stats.getAverage(), Math.sqrt(stats.getAverage()));
465
466
MultiWindowResult result = new MultiWindowResult(
467
metricName,
468
window.getStart(),
469
window.getEnd(),
470
eventList.size(),
471
stats.getMin(),
472
stats.getMax(),
473
stats.getAverage(),
474
p50, p95, p99,
475
anomalies
476
);
477
478
out.collect(result);
479
}
480
481
private double calculatePercentile(List<Double> sortedValues, double percentile) {
482
int index = (int) Math.ceil(percentile * sortedValues.size()) - 1;
483
return sortedValues.get(Math.max(0, Math.min(index, sortedValues.size() - 1)));
484
}
485
486
private List<MetricEvent> detectAnomalies(List<MetricEvent> events, double mean, double stdDev) {
487
double threshold = 2.0 * stdDev;
488
return events.stream()
489
.filter(event -> Math.abs(event.getValue() - mean) > threshold)
490
.collect(Collectors.toList());
491
}
492
}
493
```
494
495
## Error Handling in Window Operations
496
497
### Robust Window Reading
498
499
```java
500
public class RobustWindowReader implements WindowReaderFunction<DataPoint, WindowResult, String, TimeWindow> {
501
private static final Logger LOG = LoggerFactory.getLogger(RobustWindowReader.class);
502
503
@Override
504
public void readWindow(
505
String key,
506
Context context,
507
Iterable<DataPoint> elements,
508
Collector<WindowResult> out
509
) throws Exception {
510
511
try {
512
TimeWindow window = context.window();
513
List<DataPoint> points = new ArrayList<>();
514
515
// Safely iterate over elements
516
for (DataPoint point : elements) {
517
if (point != null && isValidDataPoint(point)) {
518
points.add(point);
519
} else {
520
LOG.warn("Invalid data point in window for key: {}", key);
521
}
522
}
523
524
if (!points.isEmpty()) {
525
WindowResult result = processDataPoints(key, window, points);
526
out.collect(result);
527
} else {
528
LOG.debug("No valid data points in window for key: {}", key);
529
}
530
531
} catch (Exception e) {
532
LOG.error("Error processing window for key: {}", key, e);
533
// Could emit error result instead of failing
534
// out.collect(new WindowResult(key, context.window(), "ERROR", e.getMessage()));
535
throw e; // Re-throw to fail the job
536
}
537
}
538
539
private boolean isValidDataPoint(DataPoint point) {
540
return point.getValue() >= 0 && point.getTimestamp() > 0;
541
}
542
543
private WindowResult processDataPoints(String key, TimeWindow window, List<DataPoint> points) {
544
// Safe processing logic
545
double average = points.stream().mapToDouble(DataPoint::getValue).average().orElse(0.0);
546
return new WindowResult(key, window, "SUCCESS", average);
547
}
548
}
549
```
550
551
### Window Type Safety
552
553
```java
554
// Ensure proper type handling for different window types
555
public class TypeSafeWindowReader<W extends Window> implements WindowReaderFunction<TypedEvent, TypedResult, String, W> {
556
private final Class<W> windowClass;
557
558
public TypeSafeWindowReader(Class<W> windowClass) {
559
this.windowClass = windowClass;
560
}
561
562
@Override
563
public void readWindow(
564
String key,
565
Context context,
566
Iterable<TypedEvent> elements,
567
Collector<TypedResult> out
568
) throws Exception {
569
570
W window = context.window();
571
572
// Type-safe window handling
573
if (windowClass.isInstance(window)) {
574
TypedResult result = processTypedWindow(key, windowClass.cast(window), elements);
575
out.collect(result);
576
} else {
577
throw new IllegalArgumentException("Unexpected window type: " + window.getClass());
578
}
579
}
580
581
private TypedResult processTypedWindow(String key, W window, Iterable<TypedEvent> elements) {
582
// Type-specific processing
583
return new TypedResult(key, window.toString(), elements);
584
}
585
}
586
```
587
588
## Performance Optimization
589
590
### Efficient Window Processing
591
592
```java
593
public class OptimizedWindowReader implements WindowReaderFunction<LargeEvent, CompactResult, String, TimeWindow> {
594
595
@Override
596
public void readWindow(
597
String key,
598
Context context,
599
Iterable<LargeEvent> elements,
600
Collector<CompactResult> out
601
) throws Exception {
602
603
TimeWindow window = context.window();
604
605
// Process in streaming fashion to avoid loading all into memory
606
double sum = 0.0;
607
int count = 0;
608
double min = Double.MAX_VALUE;
609
double max = Double.MIN_VALUE;
610
611
for (LargeEvent event : elements) {
612
double value = event.getValue();
613
sum += value;
614
count++;
615
min = Math.min(min, value);
616
max = Math.max(max, value);
617
}
618
619
if (count > 0) {
620
CompactResult result = new CompactResult(
621
key,
622
window.getStart(),
623
window.getEnd(),
624
count,
625
sum / count,
626
min,
627
max
628
);
629
630
out.collect(result);
631
}
632
}
633
}