0
# Windowing Examples
1
2
Advanced windowing patterns including time windows, session windows, and custom triggers with event-time processing. Demonstrates various window types, eviction policies, and complex triggering mechanisms.
3
4
## Capabilities
5
6
### TopSpeedWindowing
7
8
Car speed monitoring with custom triggers and evictors, demonstrating global windows with delta triggers.
9
10
```java { .api }
11
/**
12
* Grouped stream windowing with custom eviction and trigger policies
13
* Monitors car speeds and triggers top speed calculation every x meters
14
* @param args Command line arguments (--input path, --output path)
15
*/
16
public class TopSpeedWindowing {
17
public static void main(String[] args) throws Exception;
18
}
19
```
20
21
**Usage Example:**
22
23
```java
24
// Run with sample car data generator
25
java -cp flink-examples-streaming_2.10-1.3.3.jar \
26
org.apache.flink.streaming.examples.windowing.TopSpeedWindowing
27
28
// Run with file input
29
java -cp flink-examples-streaming_2.10-1.3.3.jar \
30
org.apache.flink.streaming.examples.windowing.TopSpeedWindowing \
31
--input /path/to/car-data.txt --output /path/to/results.txt
32
```
33
34
### SessionWindowing
35
36
Session-based windowing for user activity analysis with configurable session gaps.
37
38
```java { .api }
39
/**
40
* Session windowing example for analyzing user activity sessions
41
* Groups events into sessions based on activity gaps
42
* @param args Command line arguments (--input path, --output path)
43
*/
44
public class SessionWindowing {
45
public static void main(String[] args) throws Exception;
46
}
47
```
48
49
### WindowWordCount
50
51
Basic windowed word count with tumbling time windows.
52
53
```java { .api }
54
/**
55
* Word count with tumbling time windows
56
* Demonstrates basic time-based windowing concepts
57
* @param args Command line arguments (--input path, --output path)
58
*/
59
public class WindowWordCount {
60
public static void main(String[] args) throws Exception;
61
}
62
```
63
64
### GroupedProcessingTimeWindowExample
65
66
High-throughput processing time windows with sliding window patterns and parallel data generation.
67
68
```java { .api }
69
/**
70
* Processing time windows with grouped keys and sliding windows
71
* Performance benchmark with 20M elements across 10K keys
72
* @param args Command line arguments
73
*/
74
public class GroupedProcessingTimeWindowExample {
75
public static void main(String[] args) throws Exception;
76
}
77
```
78
79
**Usage Example:**
80
81
```bash
82
# Run high-throughput windowing benchmark
83
java -cp flink-examples-streaming_2.10-1.3.3.jar \
84
org.apache.flink.streaming.examples.windowing.GroupedProcessingTimeWindowExample
85
```
86
87
### High-Throughput Data Source
88
89
Custom parallel source function for performance testing.
90
91
```java { .api }
92
/**
93
* High-throughput parallel source generating tuple data
94
* Generates 20,000,000 elements across multiple parallel instances
95
*/
96
public class RichParallelSourceFunction<Tuple2<Long, Long>> {
97
private volatile boolean running = true;
98
99
/**
100
* Main data generation loop
101
* @param ctx Source context for element emission
102
*/
103
public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception;
104
105
/**
106
* Cancel source execution
107
*/
108
public void cancel();
109
}
110
```
111
112
### Key Selector Function
113
114
Extracts keys from tuple types for grouping operations.
115
116
```java { .api }
117
/**
118
* Generic key extractor for tuple types
119
* @param <Type> Tuple type extending Tuple
120
* @param <Key> Key type for grouping
121
*/
122
public static class FirstFieldKeyExtractor<Type extends Tuple, Key>
123
implements KeySelector<Type, Key> {
124
125
/**
126
* Extract key from tuple first field
127
* @param value Input tuple
128
* @return Key for grouping (first field of tuple)
129
*/
130
public Key getKey(Type value);
131
}
132
```
133
134
### Window Functions
135
136
#### SummingReducer
137
138
Pre-aggregating reduce function for efficient windowing.
139
140
```java { .api }
141
/**
142
* Efficient reduce function for summing tuple values
143
* Pre-aggregates values within window before output
144
*/
145
public static class SummingReducer implements ReduceFunction<Tuple2<Long, Long>> {
146
/**
147
* Combine two tuples by summing their second field
148
* @param value1 First tuple
149
* @param value2 Second tuple
150
* @return Combined tuple with summed second field
151
*/
152
public Tuple2<Long, Long> reduce(Tuple2<Long, Long> value1, Tuple2<Long, Long> value2);
153
}
154
```
155
156
#### SummingWindowFunction
157
158
Non-pre-aggregating window function for custom aggregation logic.
159
160
```java { .api }
161
/**
162
* Window function that processes all elements at window trigger time
163
* Demonstrates non-pre-aggregating pattern vs reduce function
164
*/
165
public static class SummingWindowFunction
166
implements WindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long, Window> {
167
168
/**
169
* Process all window elements when window triggers
170
* @param key Window key
171
* @param window Window metadata
172
* @param values All elements in window
173
* @param out Output collector
174
*/
175
public void apply(Long key, Window window,
176
Iterable<Tuple2<Long, Long>> values,
177
Collector<Tuple2<Long, Long>> out);
178
}
179
```
180
181
### TopSpeedWindowing (Scala)
182
183
Scala implementation of car speed windowing using functional API and case classes.
184
185
```scala { .api }
186
/**
187
* Scala version of car top speed windowing
188
* @param args Command line arguments
189
*/
190
object TopSpeedWindowing {
191
def main(args: Array[String]): Unit;
192
}
193
```
194
195
## Key Window Patterns
196
197
### Global Windows with Custom Triggers
198
199
```java
200
// Global windows with time evictor and delta trigger
201
DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData
202
.assignTimestampsAndWatermarks(new CarTimestamp())
203
.keyBy(0) // Group by car ID
204
.window(GlobalWindows.create())
205
.evictor(TimeEvictor.of(Time.of(evictionSec, TimeUnit.SECONDS)))
206
.trigger(DeltaTrigger.of(triggerMeters,
207
new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {
208
@Override
209
public double getDelta(
210
Tuple4<Integer, Integer, Double, Long> oldDataPoint,
211
Tuple4<Integer, Integer, Double, Long> newDataPoint) {
212
return newDataPoint.f2 - oldDataPoint.f2; // Distance delta
213
}
214
}, carData.getType().createSerializer(env.getConfig())))
215
.maxBy(1); // Get max speed
216
```
217
218
### Session Windows
219
220
```java
221
// Session windows with configurable gap
222
dataStream
223
.keyBy(keySelector)
224
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
225
.apply(sessionWindowFunction);
226
```
227
228
### Sliding Time Windows
229
230
```java
231
// Sliding windows with size and slide interval (processing time)
232
dataStream
233
.keyBy(keySelector)
234
.timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS)) // 2.5s window, 0.5s slide
235
.reduce(new SummingReducer());
236
237
// Alternative with apply function (non-pre-aggregating)
238
dataStream
239
.keyBy(keySelector)
240
.timeWindow(Time.milliseconds(2500), Time.milliseconds(500))
241
.apply(new SummingWindowFunction());
242
```
243
244
### Tumbling Time Windows
245
246
```java
247
// Fixed-size tumbling windows
248
dataStream
249
.keyBy(keySelector)
250
.timeWindow(Time.seconds(30))
251
.reduce(aggregationFunction);
252
```
253
254
### Event Time Processing
255
256
```java
257
// Enable event time processing
258
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
259
260
// Assign timestamps and watermarks
261
dataStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<T>() {
262
@Override
263
public long extractAscendingTimestamp(T element) {
264
return element.getTimestamp();
265
}
266
});
267
```
268
269
## Window Configuration Options
270
271
### Time Characteristics
272
- **Processing Time**: System time when elements are processed
273
- **Event Time**: Time when events actually occurred (embedded in data)
274
- **Ingestion Time**: Time when events enter Flink system
275
276
### Window Types
277
- **Tumbling Windows**: Fixed-size, non-overlapping windows
278
- **Sliding Windows**: Fixed-size, overlapping windows
279
- **Session Windows**: Variable-size windows based on activity gaps
280
- **Global Windows**: All elements in single window, custom triggers required
281
282
### Triggers
283
- **Time Triggers**: Fire based on processing/event time
284
- **Count Triggers**: Fire after specific number of elements
285
- **Delta Triggers**: Fire based on value changes (like distance traveled)
286
- **Custom Triggers**: User-defined triggering logic
287
288
### Evictors
289
- **Time Evictor**: Remove elements older than specified time
290
- **Count Evictor**: Keep only most recent N elements
291
- **Delta Evictor**: Remove elements based on value differences
292
293
## Car Data Processing Example
294
295
### Data Format
296
```java
297
// Car telemetry tuple: (carId, speed, distance, timestamp)
298
Tuple4<Integer, Integer, Double, Long> carData;
299
// f0: Car ID
300
// f1: Current speed (km/h)
301
// f2: Total distance traveled (meters)
302
// f3: Timestamp (milliseconds)
303
```
304
305
### Sample Data Generation
306
```java
307
private static class CarSource implements SourceFunction<Tuple4<Integer, Integer, Double, Long>> {
308
@Override
309
public void run(SourceContext<Tuple4<Integer, Integer, Double, Long>> ctx) throws Exception {
310
while (isRunning) {
311
Thread.sleep(100); // 100ms intervals
312
for (int carId = 0; carId < speeds.length; carId++) {
313
// Randomly adjust speed
314
if (rand.nextBoolean()) {
315
speeds[carId] = Math.min(100, speeds[carId] + 5);
316
} else {
317
speeds[carId] = Math.max(0, speeds[carId] - 5);
318
}
319
// Update distance traveled
320
distances[carId] += speeds[carId] / 3.6d; // Convert km/h to m/s
321
322
Tuple4<Integer, Integer, Double, Long> record = new Tuple4<>(
323
carId, speeds[carId], distances[carId], System.currentTimeMillis());
324
ctx.collect(record);
325
}
326
}
327
}
328
}
329
```
330
331
## Session Window Configuration
332
333
### Session Gap Configuration
334
```java
335
// Configure session gap based on user activity
336
ProcessingTimeSessionWindows.withGap(Time.minutes(30)) // 30-minute inactivity gap
337
EventTimeSessionWindows.withGap(Time.seconds(60)) // 1-minute gap for event time
338
```
339
340
### Dynamic Session Gaps
341
```java
342
// Variable session gaps based on data
343
EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<T>() {
344
@Override
345
public long extract(T element) {
346
// Return gap in milliseconds based on element properties
347
return element.getUserType().equals("PREMIUM") ? 600000L : 300000L;
348
}
349
});
350
```
351
352
## Dependencies
353
354
```xml
355
<dependency>
356
<groupId>org.apache.flink</groupId>
357
<artifactId>flink-streaming-java_2.10</artifactId>
358
<version>1.3.3</version>
359
</dependency>
360
```
361
362
## Required Imports
363
364
```java
365
import org.apache.flink.api.common.functions.RichMapFunction;
366
import org.apache.flink.api.java.tuple.Tuple4;
367
import org.apache.flink.streaming.api.TimeCharacteristic;
368
import org.apache.flink.streaming.api.datastream.DataStream;
369
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
370
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
371
import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
372
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
373
import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
374
import org.apache.flink.streaming.api.windowing.time.Time;
375
import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger;
376
```