0
# Watermark Strategies
1
2
Time-based event processing with configurable watermark assignment strategies for handling out-of-order events in streaming applications. These strategies ensure proper event-time semantics and enable accurate windowing operations.
3
4
## Capabilities
5
6
### Periodic Watermark Assigner
7
8
Base class for watermark strategies that emit watermarks at regular intervals.
9
10
```java { .api }
11
/**
12
* Base class for periodic watermark assignment strategies
13
* Watermarks are emitted periodically based on processed timestamps
14
*/
15
public abstract class PeriodicWatermarkAssigner {
16
17
/**
18
* Process the next timestamp from incoming events
19
* @param timestamp Event timestamp to process
20
*/
21
public abstract void nextTimestamp(long timestamp);
22
23
/**
24
* Get the current watermark based on processed timestamps
25
* @return Current watermark
26
*/
27
public abstract Watermark getWatermark();
28
29
/**
30
* Convert watermark strategy to properties for descriptor usage
31
* @return Properties map for table descriptor configuration
32
*/
33
public abstract Map<String, String> toProperties();
34
}
35
```
36
37
### Bounded Out-of-Order Timestamps
38
39
Watermark strategy for handling events that arrive out-of-order within a bounded time interval.
40
41
```java { .api }
42
/**
43
* Watermark strategy for rowtime attributes which are out-of-order by a bounded time interval
44
* Emits watermarks which are the maximum observed timestamp minus the specified delay
45
*/
46
public final class BoundedOutOfOrderTimestamps extends PeriodicWatermarkAssigner {
47
48
/**
49
* Create bounded out-of-order watermark strategy
50
* @param delay The delay by which watermarks are behind the maximum observed timestamp
51
*/
52
public BoundedOutOfOrderTimestamps(long delay);
53
54
@Override
55
public void nextTimestamp(long timestamp);
56
57
@Override
58
public Watermark getWatermark();
59
60
@Override
61
public Map<String, String> toProperties();
62
}
63
```
64
65
**Usage Examples:**
66
67
```java
68
import org.apache.flink.table.sources.wmstrategies.BoundedOutOfOrderTimestamps;
69
import org.apache.flink.streaming.api.watermark.Watermark;
70
71
// Create watermark strategy with 5-second delay for out-of-order events
72
BoundedOutOfOrderTimestamps watermarkStrategy = new BoundedOutOfOrderTimestamps(5000L);
73
74
// Process timestamps (simulating event processing)
75
watermarkStrategy.nextTimestamp(1000L);
76
watermarkStrategy.nextTimestamp(2000L);
77
watermarkStrategy.nextTimestamp(1500L); // Out-of-order event
78
79
// Get current watermark (max timestamp - delay = 2000 - 5000 = -3000, but clamped)
80
Watermark currentWatermark = watermarkStrategy.getWatermark();
81
System.out.println("Current watermark: " + currentWatermark.getTimestamp());
82
83
// Use in table descriptor
84
Map<String, String> properties = watermarkStrategy.toProperties();
85
// Properties will contain watermark type and delay configuration
86
```
87
88
### Ascending Timestamps
89
90
Watermark strategy for strictly ascending timestamps where events arrive in order.
91
92
```java { .api }
93
/**
94
* Watermark strategy for strictly ascending timestamps
95
* Suitable when events are guaranteed to arrive in timestamp order
96
*/
97
public final class AscendingTimestamps extends PeriodicWatermarkAssigner {
98
99
/**
100
* Create ascending timestamp watermark strategy
101
*/
102
public AscendingTimestamps();
103
104
@Override
105
public void nextTimestamp(long timestamp);
106
107
@Override
108
public Watermark getWatermark();
109
110
@Override
111
public Map<String, String> toProperties();
112
}
113
```
114
115
**Usage Examples:**
116
117
```java
118
import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
119
120
// Create watermark strategy for ascending timestamps
121
AscendingTimestamps watermarkStrategy = new AscendingTimestamps();
122
123
// Process strictly ascending timestamps
124
watermarkStrategy.nextTimestamp(1000L);
125
watermarkStrategy.nextTimestamp(2000L);
126
watermarkStrategy.nextTimestamp(3000L);
127
128
// Watermark will be the maximum seen timestamp (3000L)
129
Watermark watermark = watermarkStrategy.getWatermark();
130
System.out.println("Watermark: " + watermark.getTimestamp()); // 3000
131
```
132
133
### Punctuated Watermark Assigner
134
135
Base class for watermark strategies that emit watermarks based on special marker events.
136
137
```java { .api }
138
/**
139
* Base class for punctuated watermark assignment strategies
140
* Watermarks are emitted when special marker events are encountered
141
*/
142
public abstract class PunctuatedWatermarkAssigner {
143
144
/**
145
* Extract watermark from the current event if it contains watermark information
146
* @param timestamp Current event timestamp
147
* @return Watermark if event triggers watermark emission, null otherwise
148
*/
149
public abstract Watermark getWatermark(long timestamp);
150
151
/**
152
* Convert watermark strategy to properties for descriptor usage
153
* @return Properties map for table descriptor configuration
154
*/
155
public abstract Map<String, String> toProperties();
156
}
157
```
158
159
## Integration with Table API
160
161
### Schema-based Watermark Configuration
162
163
Configure watermarks using the modern Schema API.
164
165
```java
166
import org.apache.flink.table.api.Schema;
167
import org.apache.flink.streaming.api.datastream.DataStream;
168
import org.apache.flink.types.Row;
169
170
// Schema with watermark strategy
171
Schema schemaWithWatermarksSchema = Schema.newBuilder()
172
.column("user_id", "STRING")
173
.column("event_data", "STRING")
174
.column("event_time", "TIMESTAMP(3)")
175
.watermark("event_time", "event_time - INTERVAL '5' SECOND") // 5-second delay
176
.build();
177
178
// Apply to DataStream conversion
179
DataStream<Row> eventStream = env.fromElements(/* data */);
180
Table table = tableEnv.fromDataStream(eventStream, schemaWithWatermarksSchema);
181
182
// Watermark propagation from DataStream
183
Schema sourceWatermarkSchema = Schema.newBuilder()
184
.column("user_id", "STRING")
185
.column("event_data", "STRING")
186
.columnByMetadata("event_time", "TIMESTAMP_LTZ(3)")
187
.watermark("event_time", "SOURCE_WATERMARK()") // Propagate from DataStream
188
.build();
189
```
190
191
### SQL DDL Watermark Configuration
192
193
Configure watermarks in SQL table definitions.
194
195
```sql { .api }
196
-- Table with computed watermark
197
CREATE TABLE events (
198
user_id STRING,
199
event_data STRING,
200
event_time TIMESTAMP(3),
201
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
202
) WITH (
203
'connector' = 'datagen',
204
'fields.event_time.kind' = 'random',
205
'fields.event_time.max-past' = '1h'
206
);
207
208
-- Table with source watermark propagation
209
CREATE TABLE kafka_events (
210
user_id STRING,
211
event_data STRING,
212
event_time TIMESTAMP_LTZ(3) METADATA,
213
WATERMARK FOR event_time AS SOURCE_WATERMARK()
214
) WITH (
215
'connector' = 'kafka',
216
'topic' = 'events'
217
);
218
```
219
220
## Advanced Watermark Patterns
221
222
### Custom Watermark Strategy Implementation
223
224
Implement custom watermark strategies for specific business requirements.
225
226
```java
227
public class BusinessHoursWatermarkStrategy extends PeriodicWatermarkAssigner {
228
private long maxTimestamp = Long.MIN_VALUE;
229
private final long businessHourDelay = 30000L; // 30 seconds during business hours
230
private final long offHourDelay = 300000L; // 5 minutes during off hours
231
232
@Override
233
public void nextTimestamp(long timestamp) {
234
if (timestamp > maxTimestamp) {
235
maxTimestamp = timestamp;
236
}
237
}
238
239
@Override
240
public Watermark getWatermark() {
241
if (maxTimestamp == Long.MIN_VALUE) {
242
return new Watermark(Long.MIN_VALUE);
243
}
244
245
// Determine if current time is during business hours (simplified)
246
long currentHour = (System.currentTimeMillis() / (1000 * 60 * 60)) % 24;
247
boolean isBusinessHours = currentHour >= 9 && currentHour <= 17;
248
249
long delay = isBusinessHours ? businessHourDelay : offHourDelay;
250
return new Watermark(maxTimestamp - delay);
251
}
252
253
@Override
254
public Map<String, String> toProperties() {
255
Map<String, String> properties = new HashMap<>();
256
properties.put("watermark.strategy", "business-hours");
257
return properties;
258
}
259
}
260
```
261
262
### Windowing with Watermarks
263
264
Use watermark strategies with windowing operations.
265
266
```java
267
// Create table with watermark strategy
268
tableEnv.executeSql(
269
"CREATE TABLE sensor_readings (" +
270
" sensor_id STRING," +
271
" temperature DOUBLE," +
272
" reading_time TIMESTAMP(3)," +
273
" WATERMARK FOR reading_time AS reading_time - INTERVAL '30' SECOND" +
274
") WITH (" +
275
" 'connector' = 'datagen'," +
276
" 'fields.temperature.min' = '15.0'," +
277
" 'fields.temperature.max' = '35.0'" +
278
")"
279
);
280
281
// Windowed aggregation with watermarks
282
Table windowedAggregates = tableEnv.sqlQuery(
283
"SELECT " +
284
" sensor_id, " +
285
" window_start, " +
286
" window_end, " +
287
" AVG(temperature) as avg_temp, " +
288
" MAX(temperature) as max_temp " +
289
"FROM TABLE(" +
290
" TUMBLE(TABLE sensor_readings, DESCRIPTOR(reading_time), INTERVAL '1' MINUTE)" +
291
") " +
292
"GROUP BY sensor_id, window_start, window_end"
293
);
294
```
295
296
### Late Data Handling
297
298
Configure late data handling with allowed lateness.
299
300
```sql
301
-- Table with allowed lateness configuration
302
CREATE TABLE late_events (
303
event_id STRING,
304
event_time TIMESTAMP(3),
305
event_data STRING,
306
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
307
) WITH (
308
'connector' = 'kafka',
309
'topic' = 'events',
310
-- Connector-specific late data handling
311
'scan.watermark.allowed-lateness' = '1min'
312
);
313
```
314
315
## Watermark Monitoring
316
317
### Watermark Debugging
318
319
Monitor watermark progress in streaming applications.
320
321
```java
322
// Enable watermark debugging
323
Configuration config = new Configuration();
324
config.setString("metrics.reporters", "jmx");
325
config.setBoolean("metrics.latency.tracking", true);
326
327
StreamExecutionEnvironment env = StreamExecutionEnvironment
328
.getExecutionEnvironment(config);
329
330
// Monitor watermarks in processing
331
DataStream<Row> monitoredStream = tableEnv.toChangelogStream(table)
332
.map(new RichMapFunction<Row, Row>() {
333
private transient MetricGroup metricGroup;
334
private transient Gauge<Long> watermarkGauge;
335
336
@Override
337
public void open(Configuration parameters) {
338
metricGroup = getRuntimeContext().getMetricGroup();
339
watermarkGauge = metricGroup.gauge("currentWatermark",
340
() -> getCurrentWatermark());
341
}
342
343
@Override
344
public Row map(Row row) {
345
// Log watermark progress periodically
346
if (System.currentTimeMillis() % 10000 == 0) {
347
System.out.println("Current watermark: " + getCurrentWatermark());
348
}
349
return row;
350
}
351
352
private long getCurrentWatermark() {
353
// Get current watermark from context
354
return getRuntimeContext().getCurrentWatermark();
355
}
356
});
357
```
358
359
## Types
360
361
### Core Watermark Types
362
363
```java { .api }
364
import org.apache.flink.streaming.api.watermark.Watermark;
365
import org.apache.flink.table.sources.wmstrategies.PeriodicWatermarkAssigner;
366
import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner;
367
import org.apache.flink.table.sources.wmstrategies.BoundedOutOfOrderTimestamps;
368
import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
369
```
370
371
### Legacy Descriptor Types
372
373
```java { .api }
374
import org.apache.flink.table.legacy.descriptors.Rowtime;
375
import java.util.Map;
376
import java.util.HashMap;
377
```
378
379
### Schema Integration Types
380
381
```java { .api }
382
import org.apache.flink.table.api.Schema;
383
import org.apache.flink.table.expressions.Expression;
384
```
385
386
## Migration from Legacy APIs
387
388
### Descriptor to Schema Migration
389
390
Migrate from legacy descriptor-based watermarks to modern Schema API.
391
392
```java
393
// Legacy approach (deprecated)
394
Rowtime rowtimeDescriptor = new Rowtime()
395
.timestampsFromField("event_time")
396
.watermarksPeriodicBounded(5000L);
397
398
// Modern approach
399
Schema modernSchema = Schema.newBuilder()
400
.column("event_id", "STRING")
401
.column("event_time", "TIMESTAMP(3)")
402
.column("event_data", "STRING")
403
.watermark("event_time", "event_time - INTERVAL '5' SECOND")
404
.build();
405
406
// Apply to DataStream
407
Table modernTable = tableEnv.fromDataStream(dataStream, modernSchema);
408
```