0
# Watermark Strategies
1
2
Watermark strategies in the Flink Table API Java Bridge enable event-time processing by defining how to handle out-of-order events and when to trigger time-based operations. These strategies are essential for windowed operations and temporal joins in streaming applications.
3
4
## Overview
5
6
Watermarks are timestamps that flow as part of the data stream and indicate the progress of event time. They help Flink determine when all events for a particular time window have arrived, enabling the system to produce complete and correct results for time-based operations.
7
8
The bridge provides several watermark assignment strategies that can be used with table sources.
9
10
## Base Classes
11
12
### PeriodicWatermarkAssigner
13
14
Abstract base class for watermark strategies that generate watermarks periodically:
15
16
```java { .api }
17
@PublicEvolving
18
public abstract class PeriodicWatermarkAssigner extends WatermarkStrategy {
19
public abstract void nextTimestamp(long timestamp);
20
public abstract Watermark getWatermark();
21
22
// Inherited from WatermarkStrategy
23
public abstract Map<String, String> toProperties();
24
public abstract boolean equals(Object obj);
25
public abstract int hashCode();
26
}
27
```
28
29
**Usage Pattern:**
30
31
```java
32
public class MyPeriodicWatermarkAssigner extends PeriodicWatermarkAssigner {
33
private long maxTimestamp = Long.MIN_VALUE;
34
private final long maxOutOfOrderness;
35
36
public MyPeriodicWatermarkAssigner(long maxOutOfOrderness) {
37
this.maxOutOfOrderness = maxOutOfOrderness;
38
}
39
40
@Override
41
public void nextTimestamp(long timestamp) {
42
maxTimestamp = Math.max(maxTimestamp, timestamp);
43
}
44
45
@Override
46
public Watermark getWatermark() {
47
return new Watermark(maxTimestamp - maxOutOfOrderness);
48
}
49
50
@Override
51
public long extractTimestamp(Row element, long recordTimestamp) {
52
// Extract timestamp from the row
53
return (Long) element.getField(2); // Assuming timestamp is at index 2
54
}
55
}
56
```
57
58
### PunctuatedWatermarkAssigner
59
60
Abstract base class for watermark strategies that generate watermarks based on specific events:
61
62
```java { .api }
63
@PublicEvolving
64
public abstract class PunctuatedWatermarkAssigner extends WatermarkStrategy {
65
public abstract Watermark getWatermark(Row row, long timestamp);
66
67
// Inherited from WatermarkStrategy
68
public abstract Map<String, String> toProperties();
69
public abstract boolean equals(Object obj);
70
public abstract int hashCode();
71
}
72
```
73
74
**Usage Pattern:**
75
76
```java
77
public class MyPunctuatedWatermarkAssigner extends PunctuatedWatermarkAssigner {
78
79
@Override
80
public Watermark getWatermark(Row row, long extractedTimestamp) {
81
// Generate watermark based on special marker events
82
String eventType = (String) row.getField(1);
83
if ("WATERMARK_EVENT".equals(eventType)) {
84
return new Watermark(extractedTimestamp);
85
}
86
return null; // No watermark for regular events
87
}
88
89
@Override
90
public long extractTimestamp(Row element, long recordTimestamp) {
91
return (Long) element.getField(0); // Extract timestamp from row
92
}
93
}
94
```
95
96
## Built-in Strategies
97
98
### AscendingTimestamps
99
100
Watermark strategy for streams with strictly ascending timestamps:
101
102
```java { .api }
103
@PublicEvolving
104
public class AscendingTimestamps extends PeriodicWatermarkAssigner {
105
public void nextTimestamp(long timestamp);
106
public Watermark getWatermark();
107
}
108
```
109
110
**Usage Example:**
111
112
```java
113
// For streams where timestamps are guaranteed to be ascending
114
AscendingTimestamps watermarkStrategy = new AscendingTimestamps() {
115
@Override
116
public long extractTimestamp(Row element, long recordTimestamp) {
117
return (Long) element.getField(3); // timestamp field index
118
}
119
};
120
121
// Use with legacy table source
122
public class MyTableSource implements StreamTableSource<Row> {
123
@Override
124
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
125
return execEnv
126
.addSource(new MySourceFunction())
127
.assignTimestampsAndWatermarks(watermarkStrategy);
128
}
129
}
130
```
131
132
### BoundedOutOfOrderTimestamps
133
134
Watermark strategy for streams with bounded out-of-order events:
135
136
```java { .api }
137
@PublicEvolving
138
public class BoundedOutOfOrderTimestamps extends PeriodicWatermarkAssigner {
139
public BoundedOutOfOrderTimestamps(long maxOutOfOrderness);
140
public void nextTimestamp(long timestamp);
141
public Watermark getWatermark();
142
}
143
```
144
145
**Usage Example:**
146
147
```java
148
// For streams where events can arrive up to 5 seconds out of order
149
BoundedOutOfOrderTimestamps watermarkStrategy =
150
new BoundedOutOfOrderTimestamps(5000L) { // 5 seconds max out-of-order
151
@Override
152
public long extractTimestamp(Row element, long recordTimestamp) {
153
return (Long) element.getField(2); // event timestamp field
154
}
155
};
156
157
// Integration with table source
158
public class EventTableSource implements StreamTableSource<Row> {
159
@Override
160
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
161
return execEnv
162
.addSource(new EventSourceFunction())
163
.assignTimestampsAndWatermarks(watermarkStrategy);
164
}
165
}
166
```
167
168
## Modern Watermark Integration
169
170
With modern table sources, watermarks are typically defined in the table schema:
171
172
```java
173
// Define watermark in table schema
174
Schema schema = Schema.newBuilder()
175
.column("user_id", DataTypes.STRING())
176
.column("event_type", DataTypes.STRING())
177
.column("event_time", DataTypes.TIMESTAMP_LTZ(3))
178
.column("processing_time", DataTypes.TIMESTAMP_LTZ(3))
179
.watermark("event_time", "event_time - INTERVAL '5' SECONDS")
180
.build();
181
182
// Create table with watermark
183
tableEnv.createTable("events_with_watermark",
184
TableDescriptor.forConnector("my-connector")
185
.schema(schema)
186
.build());
187
```
188
189
## Legacy Integration Patterns
190
191
### With StreamTableSource
192
193
```java
194
public class WatermarkedStreamTableSource implements StreamTableSource<Row> {
195
private final long maxOutOfOrderness;
196
197
public WatermarkedStreamTableSource(long maxOutOfOrderness) {
198
this.maxOutOfOrderness = maxOutOfOrderness;
199
}
200
201
@Override
202
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
203
BoundedOutOfOrderTimestamps watermarkStrategy =
204
new BoundedOutOfOrderTimestamps(maxOutOfOrderness) {
205
@Override
206
public long extractTimestamp(Row element, long recordTimestamp) {
207
// Extract event time from row
208
Timestamp eventTime = (Timestamp) element.getField(2);
209
return eventTime.getTime();
210
}
211
};
212
213
return execEnv
214
.addSource(new MySourceFunction())
215
.assignTimestampsAndWatermarks(watermarkStrategy);
216
}
217
218
@Override
219
public TableSchema getTableSchema() {
220
return TableSchema.builder()
221
.field("id", DataTypes.BIGINT())
222
.field("data", DataTypes.STRING())
223
.field("event_time", DataTypes.TIMESTAMP(3))
224
.build();
225
}
226
}
227
```
228
229
### With DataStream Conversion
230
231
```java
232
// Convert DataStream with watermarks to Table
233
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
234
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
235
236
// Create DataStream with watermarks
237
DataStream<Event> eventStream = env
238
.addSource(new EventSource())
239
.assignTimestampsAndWatermarks(
240
new BoundedOutOfOrderTimestamps(Duration.ofSeconds(5)) {
241
@Override
242
public long extractTimestamp(Event element, long recordTimestamp) {
243
return element.getEventTime();
244
}
245
}
246
);
247
248
// Convert to Table while preserving watermarks
249
Schema schema = Schema.newBuilder()
250
.column("id", DataTypes.BIGINT())
251
.column("data", DataTypes.STRING())
252
.column("event_time", DataTypes.TIMESTAMP_LTZ(3))
253
.watermark("event_time", "SOURCE_WATERMARK()") // Preserve existing watermarks
254
.build();
255
256
Table eventTable = tableEnv.fromDataStream(eventStream, schema);
257
```
258
259
## Windowed Operations with Watermarks
260
261
### Tumbling Windows
262
263
```java
264
// Use watermarks with tumbling windows
265
Table windowedResult = tableEnv.sqlQuery("""
266
SELECT
267
user_id,
268
COUNT(*) as event_count,
269
SUM(amount) as total_amount,
270
TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
271
TUMBLE_END(event_time, INTERVAL '1' MINUTE) as window_end
272
FROM events_with_watermark
273
GROUP BY
274
user_id,
275
TUMBLE(event_time, INTERVAL '1' MINUTE)
276
""");
277
```
278
279
### Sliding Windows
280
281
```java
282
// Sliding windows with watermarks
283
Table slidingResult = tableEnv.sqlQuery("""
284
SELECT
285
user_id,
286
AVG(value) as avg_value,
287
HOP_START(event_time, INTERVAL '30' SECONDS, INTERVAL '2' MINUTES) as window_start,
288
HOP_END(event_time, INTERVAL '30' SECONDS, INTERVAL '2' MINUTES) as window_end
289
FROM events_with_watermark
290
GROUP BY
291
user_id,
292
HOP(event_time, INTERVAL '30' SECONDS, INTERVAL '2' MINUTES)
293
""");
294
```
295
296
### Session Windows
297
298
```java
299
// Session windows with watermarks
300
Table sessionResult = tableEnv.sqlQuery("""
301
SELECT
302
user_id,
303
COUNT(*) as session_events,
304
MIN(event_time) as session_start,
305
MAX(event_time) as session_end,
306
SESSION_START(event_time, INTERVAL '10' MINUTES) as session_window_start,
307
SESSION_END(event_time, INTERVAL '10' MINUTES) as session_window_end
308
FROM user_events
309
GROUP BY
310
user_id,
311
SESSION(event_time, INTERVAL '10' MINUTES)
312
""");
313
```
314
315
## Custom Watermark Strategies
316
317
### Business Logic Based Watermarks
318
319
```java
320
public class BusinessLogicWatermarkAssigner extends PeriodicWatermarkAssigner {
321
private long maxTimestamp = Long.MIN_VALUE;
322
private final long gracePeriod;
323
324
public BusinessLogicWatermarkAssigner(long gracePeriodMs) {
325
this.gracePeriod = gracePeriodMs;
326
}
327
328
@Override
329
public void nextTimestamp(long timestamp) {
330
maxTimestamp = Math.max(maxTimestamp, timestamp);
331
}
332
333
@Override
334
public Watermark getWatermark() {
335
// Business rule: allow 30% of grace period for late events on weekends
336
long currentTime = System.currentTimeMillis();
337
Calendar cal = Calendar.getInstance();
338
cal.setTimeInMillis(currentTime);
339
340
long adjustedGracePeriod = gracePeriod;
341
if (cal.get(Calendar.DAY_OF_WEEK) == Calendar.SATURDAY ||
342
cal.get(Calendar.DAY_OF_WEEK) == Calendar.SUNDAY) {
343
adjustedGracePeriod = (long) (gracePeriod * 1.3);
344
}
345
346
return new Watermark(maxTimestamp - adjustedGracePeriod);
347
}
348
349
@Override
350
public long extractTimestamp(Row element, long recordTimestamp) {
351
return (Long) element.getField(1);
352
}
353
}
354
```
355
356
### Multi-Source Watermark Coordination
357
358
```java
359
public class CoordinatedWatermarkAssigner extends PeriodicWatermarkAssigner {
360
private final Map<String, Long> sourceWatermarksNeed = new HashMap<>();
361
private final long defaultLag;
362
363
public CoordinatedWatermarkAssigner(long defaultLagMs) {
364
this.defaultLag = defaultLagMs;
365
}
366
367
@Override
368
public void nextTimestamp(long timestamp) {
369
// Update watermark per source
370
// Implementation would track per-source timestamps
371
}
372
373
@Override
374
public Watermark getWatermark() {
375
// Return minimum watermark across all sources
376
long minWatermark = sourceWatermarksNeed.values().stream()
377
.mapToLong(Long::longValue)
378
.min()
379
.orElse(Long.MIN_VALUE);
380
381
return minWatermark == Long.MIN_VALUE ?
382
null : new Watermark(minWatermark - defaultLag);
383
}
384
385
@Override
386
public long extractTimestamp(Row element, long recordTimestamp) {
387
String sourceId = (String) element.getField(0);
388
long timestamp = (Long) element.getField(2);
389
390
// Update per-source watermark tracking
391
sourceWatermarksNeed.put(sourceId, Math.max(
392
sourceWatermarksNeed.getOrDefault(sourceId, Long.MIN_VALUE),
393
timestamp
394
));
395
396
return timestamp;
397
}
398
}
399
```
400
401
## Best Practices
402
403
### Watermark Configuration
404
405
1. **Choose appropriate lag**: Balance between lateness tolerance and result timeliness
406
2. **Monitor late events**: Track events arriving after watermarks
407
3. **Consider business requirements**: Different domains may need different lateness handling
408
409
```java
410
// Monitor late events
411
DataStream<Row> eventStream = tableEnv.toDataStream(eventsTable);
412
OutputTag<Row> lateEventsTag = new OutputTag<Row>("late-events"){};
413
414
SingleOutputStreamOperator<Row> processedStream = eventStream
415
.keyBy(row -> row.getField(0))
416
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
417
.allowedLateness(Time.minutes(1)) // Allow 1 minute lateness
418
.sideOutputLateData(lateEventsTag)
419
.apply(new MyWindowFunction());
420
421
// Handle late events separately
422
DataStream<Row> lateEvents = processedStream.getSideOutput(lateEventsTag);
423
lateEvents.addSink(new LateEventsSink());
424
```
425
426
### Performance Optimization
427
428
1. **Periodic interval**: Configure watermark generation interval appropriately
429
2. **Timestamp extraction**: Optimize timestamp extraction for performance
430
3. **Memory usage**: Be mindful of state size in custom watermark assigners
431
432
```java
433
// Configure watermark interval
434
env.getConfig().setAutoWatermarkInterval(1000L); // 1 second
435
436
// Efficient timestamp extraction
437
public class OptimizedWatermarkAssigner extends BoundedOutOfOrderTimestamps {
438
public OptimizedWatermarkAssigner(long maxOutOfOrderness) {
439
super(maxOutOfOrderness);
440
}
441
442
@Override
443
public long extractTimestamp(Row element, long recordTimestamp) {
444
// Use recordTimestamp when available to avoid field access
445
if (recordTimestamp != Long.MIN_VALUE) {
446
return recordTimestamp;
447
}
448
return (Long) element.getField(2);
449
}
450
}
451
```
452
453
### Error Handling
454
455
```java
456
public class RobustWatermarkAssigner extends PeriodicWatermarkAssigner {
457
@Override
458
public long extractTimestamp(Row element, long recordTimestamp) {
459
try {
460
Object timestampField = element.getField(2);
461
if (timestampField instanceof Long) {
462
return (Long) timestampField;
463
} else if (timestampField instanceof Timestamp) {
464
return ((Timestamp) timestampField).getTime();
465
} else {
466
// Log warning and use processing time
467
LOG.warn("Invalid timestamp field type: {}", timestampField.getClass());
468
return System.currentTimeMillis();
469
}
470
} catch (Exception e) {
471
LOG.error("Error extracting timestamp", e);
472
return System.currentTimeMillis();
473
}
474
}
475
}