0
# Event Time and Watermarks
1
2
Apache Flink provides sophisticated support for event-time processing, enabling applications to handle out-of-order events and late arrivals through watermarks. This capability is essential for accurate time-based computations in streaming applications.
3
4
## Event Time Concepts
5
6
### Timestamp Assignment
7
8
Assign timestamps to events for event-time processing.
9
10
```java { .api }
11
import org.apache.flink.api.common.eventtime.TimestampAssigner;
12
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
13
14
// Simple timestamp assigner
15
public class EventTimestampAssigner implements SerializableTimestampAssigner<Event> {
16
@Override
17
public long extractTimestamp(Event element, long recordTimestamp) {
18
// Extract timestamp from the event
19
return element.getTimestamp();
20
}
21
}
22
23
// Custom timestamp extraction logic
24
public class CustomTimestampAssigner implements SerializableTimestampAssigner<LogEntry> {
25
@Override
26
public long extractTimestamp(LogEntry element, long recordTimestamp) {
27
// Parse timestamp from log format
28
return parseTimestamp(element.getTimestampString());
29
}
30
31
private long parseTimestamp(String timestampStr) {
32
// Custom timestamp parsing logic
33
return Instant.parse(timestampStr).toEpochMilli();
34
}
35
}
36
37
// Using ingestion time
38
public class IngestionTimeAssigner implements SerializableTimestampAssigner<Event> {
39
@Override
40
public long extractTimestamp(Event element, long recordTimestamp) {
41
// Use processing time as event time
42
return System.currentTimeMillis();
43
}
44
}
45
```
46
47
### Watermark Strategies
48
49
Define how watermarks are generated for handling late events.
50
51
```java { .api }
52
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
53
import org.apache.flink.api.common.eventtime.TimestampAssigner;
54
import java.time.Duration;
55
56
// Monotonic timestamps (events arrive in order)
57
WatermarkStrategy<Event> ascendingStrategy =
58
WatermarkStrategy.<Event>forMonotonousTimestamps()
59
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
60
61
// Bounded out-of-orderness
62
WatermarkStrategy<Event> boundedOutOfOrderStrategy =
63
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
64
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
65
66
// Custom watermark strategy with idleness detection
67
WatermarkStrategy<Event> customStrategy =
68
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
69
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
70
.withIdleness(Duration.ofMinutes(1)); // Mark source idle after 1 minute
71
72
// Generator-based strategy
73
WatermarkStrategy<Event> generatorStrategy =
74
WatermarkStrategy.<Event>forGenerator(context -> new CustomWatermarkGenerator())
75
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
76
```
77
78
## Watermark Generators
79
80
### Built-in Generators
81
82
```java { .api }
83
import org.apache.flink.api.common.eventtime.AscendingTimestampsWatermarks;
84
import org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks;
85
86
// Ascending timestamps generator
87
public class AscendingWatermarkExample {
88
public static WatermarkStrategy<Event> createStrategy() {
89
return WatermarkStrategy.<Event>forGenerator(context ->
90
new AscendingTimestampsWatermarks<>())
91
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
92
}
93
}
94
95
// Bounded out-of-orderness generator
96
public class BoundedOutOfOrderExample {
97
public static WatermarkStrategy<Event> createStrategy() {
98
Duration maxOutOfOrderness = Duration.ofSeconds(5);
99
return WatermarkStrategy.<Event>forGenerator(context ->
100
new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness))
101
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
102
}
103
}
104
```
105
106
### Custom Watermark Generators
107
108
```java { .api }
109
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
110
import org.apache.flink.api.common.eventtime.WatermarkOutput;
111
import org.apache.flink.api.common.eventtime.Watermark;
112
113
// Periodic watermark generator
114
public class PeriodicWatermarkGenerator implements WatermarkGenerator<Event> {
115
private long maxTimestamp = Long.MIN_VALUE;
116
private final long maxOutOfOrderness = 5000; // 5 seconds
117
118
@Override
119
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
120
// Update max timestamp seen so far
121
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
122
}
123
124
@Override
125
public void onPeriodicEmit(WatermarkOutput output) {
126
// Emit watermark periodically (every few hundred milliseconds)
127
if (maxTimestamp != Long.MIN_VALUE) {
128
output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness - 1));
129
}
130
}
131
}
132
133
// Punctuated watermark generator
134
public class PunctuatedWatermarkGenerator implements WatermarkGenerator<Event> {
135
136
@Override
137
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
138
// Emit watermark on special events
139
if (event.hasWatermarkMarker()) {
140
output.emitWatermark(new Watermark(eventTimestamp));
141
}
142
}
143
144
@Override
145
public void onPeriodicEmit(WatermarkOutput output) {
146
// No periodic watermarks needed
147
}
148
}
149
150
// Adaptive watermark generator
151
public class AdaptiveWatermarkGenerator implements WatermarkGenerator<SensorReading> {
152
private long maxTimestamp = Long.MIN_VALUE;
153
private long baseDelayMs = 1000; // 1 second base delay
154
private long adaptiveDelayMs = 1000;
155
156
@Override
157
public void onEvent(SensorReading reading, long eventTimestamp, WatermarkOutput output) {
158
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
159
160
// Adapt delay based on out-of-orderness observed
161
long currentTime = System.currentTimeMillis();
162
long eventDelay = currentTime - eventTimestamp;
163
164
if (eventDelay > adaptiveDelayMs) {
165
// Increase delay if we see more out-of-order events
166
adaptiveDelayMs = Math.min(eventDelay * 2, 30000); // Max 30 seconds
167
} else {
168
// Gradually decrease delay when events are more in order
169
adaptiveDelayMs = Math.max(baseDelayMs, adaptiveDelayMs * 0.95);
170
}
171
}
172
173
@Override
174
public void onPeriodicEmit(WatermarkOutput output) {
175
if (maxTimestamp != Long.MIN_VALUE) {
176
output.emitWatermark(new Watermark(maxTimestamp - adaptiveDelayMs));
177
}
178
}
179
}
180
```
181
182
### Generator with Idleness Detection
183
184
```java { .api }
185
import org.apache.flink.api.common.eventtime.IdlenessTimer;
186
187
// Wrapper generator with idleness detection
188
public class IdleAwareWatermarkGenerator implements WatermarkGenerator<Event> {
189
private final WatermarkGenerator<Event> delegate;
190
private final IdlenessTimer idlenessTimer;
191
192
public IdleAwareWatermarkGenerator(WatermarkGenerator<Event> delegate,
193
Duration idleTimeout) {
194
this.delegate = delegate;
195
this.idlenessTimer = new IdlenessTimer(idleTimeout);
196
}
197
198
@Override
199
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
200
// Mark as active and delegate
201
idlenessTimer.activity();
202
delegate.onEvent(event, eventTimestamp, output);
203
}
204
205
@Override
206
public void onPeriodicEmit(WatermarkOutput output) {
207
// Check for idleness and emit watermarks
208
if (idlenessTimer.checkIfIdle(output)) {
209
// Source is idle, watermark advancement is paused
210
return;
211
}
212
213
delegate.onPeriodicEmit(output);
214
}
215
}
216
```
217
218
## Advanced Watermark Patterns
219
220
### Multi-Stream Watermarks
221
222
Handle watermarks from multiple input streams.
223
224
```java { .api }
225
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
226
import org.apache.flink.util.Collector;
227
228
public class MultiStreamWatermarkFunction extends CoProcessFunction<Event1, Event2, CombinedEvent> {
229
230
@Override
231
public void processElement1(Event1 event1, Context ctx, Collector<CombinedEvent> out)
232
throws Exception {
233
234
// Current watermark for this stream
235
long watermark1 = ctx.timerService().currentWatermark();
236
237
// Process event considering watermark
238
if (event1.getTimestamp() <= watermark1) {
239
// Event is within watermark bounds
240
out.collect(new CombinedEvent(event1, null));
241
}
242
}
243
244
@Override
245
public void processElement2(Event2 event2, Context ctx, Collector<CombinedEvent> out)
246
throws Exception {
247
248
long watermark2 = ctx.timerService().currentWatermark();
249
250
if (event2.getTimestamp() <= watermark2) {
251
out.collect(new CombinedEvent(null, event2));
252
}
253
}
254
}
255
```
256
257
### Custom Watermark Alignment
258
259
```java { .api }
260
import org.apache.flink.streaming.api.watermark.Watermark;
261
262
public class WatermarkAlignmentFunction extends ProcessFunction<Event, Event> {
263
private long lastWatermark = Long.MIN_VALUE;
264
private final long alignmentThreshold = 1000; // 1 second
265
266
@Override
267
public void processElement(Event event, Context ctx, Collector<Event> out)
268
throws Exception {
269
270
long currentWatermark = ctx.timerService().currentWatermark();
271
272
// Align processing based on watermark progression
273
if (currentWatermark - lastWatermark >= alignmentThreshold) {
274
// Watermark has advanced significantly
275
performPeriodicCleanup();
276
lastWatermark = currentWatermark;
277
}
278
279
out.collect(event);
280
}
281
282
private void performPeriodicCleanup() {
283
// Cleanup old state, flush buffers, etc.
284
}
285
}
286
```
287
288
## Working with Late Events
289
290
### Allowed Lateness
291
292
Configure how to handle late events in windowed operations.
293
294
```java { .api }
295
import org.apache.flink.streaming.api.windowing.time.Time;
296
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
297
import org.apache.flink.util.OutputTag;
298
299
public class LateEventHandling {
300
301
// Output tag for late events
302
private static final OutputTag<Event> LATE_EVENTS_TAG =
303
new OutputTag<Event>("late-events") {};
304
305
public static void handleLateEvents(DataStream<Event> input) {
306
SingleOutputStreamOperator<WindowedResult> mainOutput = input
307
.assignTimestampsAndWatermarks(
308
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
309
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
310
)
311
.keyBy(Event::getKey)
312
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
313
.allowedLateness(Time.minutes(1)) // Allow 1 minute lateness
314
.sideOutputLateData(LATE_EVENTS_TAG) // Collect late events
315
.process(new WindowProcessFunction<Event, WindowedResult, String, TimeWindow>() {
316
@Override
317
public void process(String key, Context context,
318
Iterable<Event> elements,
319
Collector<WindowedResult> out) throws Exception {
320
321
// Process events in window
322
long count = StreamSupport.stream(elements.spliterator(), false).count();
323
out.collect(new WindowedResult(key, count, context.window()));
324
}
325
});
326
327
// Handle late events separately
328
DataStream<Event> lateEvents = mainOutput.getSideOutput(LATE_EVENTS_TAG);
329
lateEvents.process(new ProcessFunction<Event, Void>() {
330
@Override
331
public void processElement(Event event, Context ctx, Collector<Void> out)
332
throws Exception {
333
// Log, store, or reprocess late events
334
System.out.println("Late event: " + event +
335
" arrived " + (ctx.timestamp() - event.getTimestamp()) + "ms late");
336
}
337
});
338
}
339
}
340
```
341
342
### Late Event Reprocessing
343
344
```java { .api }
345
public class LateEventReprocessor extends ProcessFunction<Event, ProcessedEvent> {
346
private final long maxLateness = 60000; // 1 minute
347
348
@Override
349
public void processElement(Event event, Context ctx, Collector<ProcessedEvent> out)
350
throws Exception {
351
352
long currentWatermark = ctx.timerService().currentWatermark();
353
long eventTime = event.getTimestamp();
354
355
if (eventTime <= currentWatermark) {
356
// Event is late
357
long lateness = currentWatermark - eventTime;
358
359
if (lateness <= maxLateness) {
360
// Acceptable lateness - reprocess
361
out.collect(new ProcessedEvent(event, true, lateness));
362
} else {
363
// Too late - handle specially
364
handleTooLateEvent(event, lateness, ctx);
365
}
366
} else {
367
// Event is on time
368
out.collect(new ProcessedEvent(event, false, 0));
369
}
370
}
371
372
private void handleTooLateEvent(Event event, long lateness, Context ctx) {
373
// Store in external system, alert, or discard
374
System.out.println("Event too late by " + lateness + "ms: " + event);
375
}
376
}
377
```
378
379
## Watermark Monitoring and Debugging
380
381
### Watermark Metrics
382
383
```java { .api }
384
public class WatermarkMetricsGenerator implements WatermarkGenerator<Event> {
385
private long maxTimestamp = Long.MIN_VALUE;
386
private final long maxOutOfOrderness = 5000;
387
388
// Metrics
389
private Counter watermarkEmissions;
390
private Gauge<Long> currentWatermark;
391
private Histogram eventLateness;
392
393
public void setMetrics(MetricGroup metricGroup) {
394
this.watermarkEmissions = metricGroup.counter("watermark-emissions");
395
this.currentWatermark = metricGroup.gauge("current-watermark", () ->
396
maxTimestamp - maxOutOfOrderness);
397
this.eventLateness = metricGroup.histogram("event-lateness");
398
}
399
400
@Override
401
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
402
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
403
404
// Track lateness metrics
405
long currentTime = System.currentTimeMillis();
406
long lateness = currentTime - eventTimestamp;
407
eventLateness.update(lateness);
408
}
409
410
@Override
411
public void onPeriodicEmit(WatermarkOutput output) {
412
if (maxTimestamp != Long.MIN_VALUE) {
413
watermarkEmissions.inc();
414
output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness - 1));
415
}
416
}
417
}
418
```
419
420
### Watermark Debugging
421
422
```java { .api }
423
public class WatermarkDebugger implements WatermarkGenerator<Event> {
424
private final WatermarkGenerator<Event> delegate;
425
private final String sourceName;
426
427
public WatermarkDebugger(WatermarkGenerator<Event> delegate, String sourceName) {
428
this.delegate = delegate;
429
this.sourceName = sourceName;
430
}
431
432
@Override
433
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
434
System.out.println(String.format(
435
"[%s] Event received - timestamp: %d, extracted: %d, delay: %dms",
436
sourceName, event.getTimestamp(), eventTimestamp,
437
System.currentTimeMillis() - eventTimestamp
438
));
439
440
delegate.onEvent(event, eventTimestamp, new WatermarkOutputWrapper(output));
441
}
442
443
@Override
444
public void onPeriodicEmit(WatermarkOutput output) {
445
delegate.onPeriodicEmit(new WatermarkOutputWrapper(output));
446
}
447
448
private class WatermarkOutputWrapper implements WatermarkOutput {
449
private final WatermarkOutput delegate;
450
451
public WatermarkOutputWrapper(WatermarkOutput delegate) {
452
this.delegate = delegate;
453
}
454
455
@Override
456
public void emitWatermark(Watermark watermark) {
457
System.out.println(String.format(
458
"[%s] Watermark emitted: %d (%s)",
459
sourceName, watermark.getTimestamp(),
460
new Date(watermark.getTimestamp())
461
));
462
delegate.emitWatermark(watermark);
463
}
464
465
@Override
466
public void markIdle() {
467
System.out.println(String.format("[%s] Source marked as idle", sourceName));
468
delegate.markIdle();
469
}
470
471
@Override
472
public void markActive() {
473
System.out.println(String.format("[%s] Source marked as active", sourceName));
474
delegate.markActive();
475
}
476
}
477
}
478
```
479
480
## Timestamp and Watermark Utilities
481
482
### Time Extraction Utilities
483
484
```java { .api }
485
public class TimeExtractionUtils {
486
487
// Extract from JSON timestamp field
488
public static SerializableTimestampAssigner<JsonNode> jsonTimestampExtractor(String timestampField) {
489
return (element, recordTimestamp) -> {
490
JsonNode timestampNode = element.get(timestampField);
491
return timestampNode != null ? timestampNode.asLong() : recordTimestamp;
492
};
493
}
494
495
// Extract from formatted string
496
public static SerializableTimestampAssigner<String> formatTimestampExtractor(String pattern) {
497
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
498
return (element, recordTimestamp) -> {
499
try {
500
LocalDateTime dateTime = LocalDateTime.parse(element, formatter);
501
return dateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
502
} catch (Exception e) {
503
return recordTimestamp; // Fallback to record timestamp
504
}
505
};
506
}
507
508
// Extract from CSV with timestamp column
509
public static SerializableTimestampAssigner<String> csvTimestampExtractor(int timestampColumn,
510
String delimiter) {
511
return (element, recordTimestamp) -> {
512
String[] fields = element.split(delimiter);
513
if (fields.length > timestampColumn) {
514
try {
515
return Long.parseLong(fields[timestampColumn]);
516
} catch (NumberFormatException e) {
517
return recordTimestamp;
518
}
519
}
520
return recordTimestamp;
521
};
522
}
523
}
524
```
525
526
### Watermark Strategy Builders
527
528
```java { .api }
529
public class WatermarkStrategyBuilder {
530
531
public static <T> WatermarkStrategy<T> createRobustStrategy(
532
SerializableTimestampAssigner<T> timestampAssigner,
533
Duration maxOutOfOrderness,
534
Duration idlenessTimeout) {
535
536
return WatermarkStrategy.<T>forBoundedOutOfOrderness(maxOutOfOrderness)
537
.withTimestampAssigner(timestampAssigner)
538
.withIdleness(idlenessTimeout);
539
}
540
541
public static <T> WatermarkStrategy<T> createAdaptiveStrategy(
542
SerializableTimestampAssigner<T> timestampAssigner) {
543
544
return WatermarkStrategy.<T>forGenerator(context ->
545
new AdaptiveWatermarkGenerator<>())
546
.withTimestampAssigner(timestampAssigner);
547
}
548
549
public static <T> WatermarkStrategy<T> createDebugStrategy(
550
SerializableTimestampAssigner<T> timestampAssigner,
551
Duration maxOutOfOrderness,
552
String sourceName) {
553
554
return WatermarkStrategy.<T>forGenerator(context ->
555
new WatermarkDebugger<>(
556
new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness),
557
sourceName
558
))
559
.withTimestampAssigner(timestampAssigner);
560
}
561
}
562
```
563
564
## Best Practices
565
566
### Choosing Watermark Strategies
567
568
```java { .api }
569
public class WatermarkBestPractices {
570
571
// For mostly ordered streams with occasional late events
572
public static <T> WatermarkStrategy<T> forMostlyOrdered(
573
SerializableTimestampAssigner<T> timestampAssigner) {
574
575
return WatermarkStrategy.<T>forBoundedOutOfOrderness(Duration.ofSeconds(5))
576
.withTimestampAssigner(timestampAssigner)
577
.withIdleness(Duration.ofMinutes(1));
578
}
579
580
// For highly out-of-order streams
581
public static <T> WatermarkStrategy<T> forHighlyDisordered(
582
SerializableTimestampAssigner<T> timestampAssigner) {
583
584
return WatermarkStrategy.<T>forBoundedOutOfOrderness(Duration.ofMinutes(5))
585
.withTimestampAssigner(timestampAssigner)
586
.withIdleness(Duration.ofMinutes(2));
587
}
588
589
// For streams with unpredictable patterns
590
public static <T> WatermarkStrategy<T> forUnpredictableStreams(
591
SerializableTimestampAssigner<T> timestampAssigner) {
592
593
return WatermarkStrategy.<T>forGenerator(context ->
594
new AdaptiveWatermarkGenerator<>())
595
.withTimestampAssigner(timestampAssigner)
596
.withIdleness(Duration.ofMinutes(5));
597
}
598
}
599
600
// Performance considerations
601
public class PerformanceOptimizedWatermarkGenerator implements WatermarkGenerator<Event> {
602
private long maxTimestamp = Long.MIN_VALUE;
603
private final long maxOutOfOrderness;
604
private long lastWatermarkTime = 0;
605
private final long watermarkInterval = 1000; // Emit at most once per second
606
607
public PerformanceOptimizedWatermarkGenerator(Duration maxOutOfOrderness) {
608
this.maxOutOfOrderness = maxOutOfOrderness.toMillis();
609
}
610
611
@Override
612
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
613
// Only update if timestamp is actually newer
614
if (eventTimestamp > maxTimestamp) {
615
maxTimestamp = eventTimestamp;
616
617
// Emit watermark immediately for significant advances
618
long currentTime = System.currentTimeMillis();
619
if (currentTime - lastWatermarkTime > watermarkInterval) {
620
output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness));
621
lastWatermarkTime = currentTime;
622
}
623
}
624
}
625
626
@Override
627
public void onPeriodicEmit(WatermarkOutput output) {
628
// Periodic emission as backup
629
if (maxTimestamp != Long.MIN_VALUE) {
630
output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness));
631
lastWatermarkTime = System.currentTimeMillis();
632
}
633
}
634
}
635
```
636
637
Apache Flink's event-time processing and watermark system provides robust support for handling temporal aspects of streaming data. By understanding these concepts and applying appropriate strategies, you can build applications that accurately process time-based computations even with out-of-order and late-arriving events.