or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

builtin-connectors.mdchangelog-processing.mddatastream-connectors.mdindex.mdprocedures.mdstatement-sets.mdstream-table-environment.mdwatermark-strategies.md

watermark-strategies.mddocs/

0

# Watermark Strategies

1

2

Time-based event processing with configurable watermark assignment strategies for handling out-of-order events in streaming applications. These strategies ensure proper event-time semantics and enable accurate windowing operations.

3

4

## Capabilities

5

6

### Periodic Watermark Assigner

7

8

Base class for watermark strategies that emit watermarks at regular intervals.

9

10

```java { .api }

11

/**

12

* Base class for periodic watermark assignment strategies

13

* Watermarks are emitted periodically based on processed timestamps

14

*/

15

public abstract class PeriodicWatermarkAssigner {

16

17

/**

18

* Process the next timestamp from incoming events

19

* @param timestamp Event timestamp to process

20

*/

21

public abstract void nextTimestamp(long timestamp);

22

23

/**

24

* Get the current watermark based on processed timestamps

25

* @return Current watermark

26

*/

27

public abstract Watermark getWatermark();

28

29

/**

30

* Convert watermark strategy to properties for descriptor usage

31

* @return Properties map for table descriptor configuration

32

*/

33

public abstract Map<String, String> toProperties();

34

}

35

```

36

37

### Bounded Out-of-Order Timestamps

38

39

Watermark strategy for handling events that arrive out-of-order within a bounded time interval.

40

41

```java { .api }

42

/**

43

* Watermark strategy for rowtime attributes which are out-of-order by a bounded time interval

44

* Emits watermarks which are the maximum observed timestamp minus the specified delay

45

*/

46

public final class BoundedOutOfOrderTimestamps extends PeriodicWatermarkAssigner {

47

48

/**

49

* Create bounded out-of-order watermark strategy

50

* @param delay The delay by which watermarks are behind the maximum observed timestamp

51

*/

52

public BoundedOutOfOrderTimestamps(long delay);

53

54

@Override

55

public void nextTimestamp(long timestamp);

56

57

@Override

58

public Watermark getWatermark();

59

60

@Override

61

public Map<String, String> toProperties();

62

}

63

```

64

65

**Usage Examples:**

66

67

```java

68

import org.apache.flink.table.sources.wmstrategies.BoundedOutOfOrderTimestamps;

69

import org.apache.flink.streaming.api.watermark.Watermark;

70

71

// Create watermark strategy with 5-second delay for out-of-order events

72

BoundedOutOfOrderTimestamps watermarkStrategy = new BoundedOutOfOrderTimestamps(5000L);

73

74

// Process timestamps (simulating event processing)

75

watermarkStrategy.nextTimestamp(1000L);

76

watermarkStrategy.nextTimestamp(2000L);

77

watermarkStrategy.nextTimestamp(1500L); // Out-of-order event

78

79

// Get current watermark (max timestamp - delay = 2000 - 5000 = -3000, but clamped)

80

Watermark currentWatermark = watermarkStrategy.getWatermark();

81

System.out.println("Current watermark: " + currentWatermark.getTimestamp());

82

83

// Use in table descriptor

84

Map<String, String> properties = watermarkStrategy.toProperties();

85

// Properties will contain watermark type and delay configuration

86

```

87

88

### Ascending Timestamps

89

90

Watermark strategy for strictly ascending timestamps where events arrive in order.

91

92

```java { .api }

93

/**

94

* Watermark strategy for strictly ascending timestamps

95

* Suitable when events are guaranteed to arrive in timestamp order

96

*/

97

public final class AscendingTimestamps extends PeriodicWatermarkAssigner {

98

99

/**

100

* Create ascending timestamp watermark strategy

101

*/

102

public AscendingTimestamps();

103

104

@Override

105

public void nextTimestamp(long timestamp);

106

107

@Override

108

public Watermark getWatermark();

109

110

@Override

111

public Map<String, String> toProperties();

112

}

113

```

114

115

**Usage Examples:**

116

117

```java

118

import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;

119

120

// Create watermark strategy for ascending timestamps

121

AscendingTimestamps watermarkStrategy = new AscendingTimestamps();

122

123

// Process strictly ascending timestamps

124

watermarkStrategy.nextTimestamp(1000L);

125

watermarkStrategy.nextTimestamp(2000L);

126

watermarkStrategy.nextTimestamp(3000L);

127

128

// Watermark will be the maximum seen timestamp (3000L)

129

Watermark watermark = watermarkStrategy.getWatermark();

130

System.out.println("Watermark: " + watermark.getTimestamp()); // 3000

131

```

132

133

### Punctuated Watermark Assigner

134

135

Base class for watermark strategies that emit watermarks based on special marker events.

136

137

```java { .api }

138

/**

139

* Base class for punctuated watermark assignment strategies

140

* Watermarks are emitted when special marker events are encountered

141

*/

142

public abstract class PunctuatedWatermarkAssigner {

143

144

/**

145

* Extract watermark from the current event if it contains watermark information

146

* @param timestamp Current event timestamp

147

* @return Watermark if event triggers watermark emission, null otherwise

148

*/

149

public abstract Watermark getWatermark(long timestamp);

150

151

/**

152

* Convert watermark strategy to properties for descriptor usage

153

* @return Properties map for table descriptor configuration

154

*/

155

public abstract Map<String, String> toProperties();

156

}

157

```

158

159

## Integration with Table API

160

161

### Schema-based Watermark Configuration

162

163

Configure watermarks using the modern Schema API.

164

165

```java

166

import org.apache.flink.table.api.Schema;

167

import org.apache.flink.streaming.api.datastream.DataStream;

168

import org.apache.flink.types.Row;

169

170

// Schema with watermark strategy

171

Schema schemaWithWatermarksSchema = Schema.newBuilder()

172

.column("user_id", "STRING")

173

.column("event_data", "STRING")

174

.column("event_time", "TIMESTAMP(3)")

175

.watermark("event_time", "event_time - INTERVAL '5' SECOND") // 5-second delay

176

.build();

177

178

// Apply to DataStream conversion

179

DataStream<Row> eventStream = env.fromElements(/* data */);

180

Table table = tableEnv.fromDataStream(eventStream, schemaWithWatermarksSchema);

181

182

// Watermark propagation from DataStream

183

Schema sourceWatermarkSchema = Schema.newBuilder()

184

.column("user_id", "STRING")

185

.column("event_data", "STRING")

186

.columnByMetadata("event_time", "TIMESTAMP_LTZ(3)")

187

.watermark("event_time", "SOURCE_WATERMARK()") // Propagate from DataStream

188

.build();

189

```

190

191

### SQL DDL Watermark Configuration

192

193

Configure watermarks in SQL table definitions.

194

195

```sql { .api }

196

-- Table with computed watermark

197

CREATE TABLE events (

198

user_id STRING,

199

event_data STRING,

200

event_time TIMESTAMP(3),

201

WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND

202

) WITH (

203

'connector' = 'datagen',

204

'fields.event_time.kind' = 'random',

205

'fields.event_time.max-past' = '1h'

206

);

207

208

-- Table with source watermark propagation

209

CREATE TABLE kafka_events (

210

user_id STRING,

211

event_data STRING,

212

event_time TIMESTAMP_LTZ(3) METADATA,

213

WATERMARK FOR event_time AS SOURCE_WATERMARK()

214

) WITH (

215

'connector' = 'kafka',

216

'topic' = 'events'

217

);

218

```

219

220

## Advanced Watermark Patterns

221

222

### Custom Watermark Strategy Implementation

223

224

Implement custom watermark strategies for specific business requirements.

225

226

```java

227

public class BusinessHoursWatermarkStrategy extends PeriodicWatermarkAssigner {

228

private long maxTimestamp = Long.MIN_VALUE;

229

private final long businessHourDelay = 30000L; // 30 seconds during business hours

230

private final long offHourDelay = 300000L; // 5 minutes during off hours

231

232

@Override

233

public void nextTimestamp(long timestamp) {

234

if (timestamp > maxTimestamp) {

235

maxTimestamp = timestamp;

236

}

237

}

238

239

@Override

240

public Watermark getWatermark() {

241

if (maxTimestamp == Long.MIN_VALUE) {

242

return new Watermark(Long.MIN_VALUE);

243

}

244

245

// Determine if current time is during business hours (simplified)

246

long currentHour = (System.currentTimeMillis() / (1000 * 60 * 60)) % 24;

247

boolean isBusinessHours = currentHour >= 9 && currentHour <= 17;

248

249

long delay = isBusinessHours ? businessHourDelay : offHourDelay;

250

return new Watermark(maxTimestamp - delay);

251

}

252

253

@Override

254

public Map<String, String> toProperties() {

255

Map<String, String> properties = new HashMap<>();

256

properties.put("watermark.strategy", "business-hours");

257

return properties;

258

}

259

}

260

```

261

262

### Windowing with Watermarks

263

264

Use watermark strategies with windowing operations.

265

266

```java

267

// Create table with watermark strategy

268

tableEnv.executeSql(

269

"CREATE TABLE sensor_readings (" +

270

" sensor_id STRING," +

271

" temperature DOUBLE," +

272

" reading_time TIMESTAMP(3)," +

273

" WATERMARK FOR reading_time AS reading_time - INTERVAL '30' SECOND" +

274

") WITH (" +

275

" 'connector' = 'datagen'," +

276

" 'fields.temperature.min' = '15.0'," +

277

" 'fields.temperature.max' = '35.0'" +

278

")"

279

);

280

281

// Windowed aggregation with watermarks

282

Table windowedAggregates = tableEnv.sqlQuery(

283

"SELECT " +

284

" sensor_id, " +

285

" window_start, " +

286

" window_end, " +

287

" AVG(temperature) as avg_temp, " +

288

" MAX(temperature) as max_temp " +

289

"FROM TABLE(" +

290

" TUMBLE(TABLE sensor_readings, DESCRIPTOR(reading_time), INTERVAL '1' MINUTE)" +

291

") " +

292

"GROUP BY sensor_id, window_start, window_end"

293

);

294

```

295

296

### Late Data Handling

297

298

Configure late data handling with allowed lateness.

299

300

```sql

301

-- Table with allowed lateness configuration

302

CREATE TABLE late_events (

303

event_id STRING,

304

event_time TIMESTAMP(3),

305

event_data STRING,

306

WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND

307

) WITH (

308

'connector' = 'kafka',

309

'topic' = 'events',

310

-- Connector-specific late data handling

311

'scan.watermark.allowed-lateness' = '1min'

312

);

313

```

314

315

## Watermark Monitoring

316

317

### Watermark Debugging

318

319

Monitor watermark progress in streaming applications.

320

321

```java

322

// Enable watermark debugging

323

Configuration config = new Configuration();

324

config.setString("metrics.reporters", "jmx");

325

config.setBoolean("metrics.latency.tracking", true);

326

327

StreamExecutionEnvironment env = StreamExecutionEnvironment

328

.getExecutionEnvironment(config);

329

330

// Monitor watermarks in processing

331

DataStream<Row> monitoredStream = tableEnv.toChangelogStream(table)

332

.map(new RichMapFunction<Row, Row>() {

333

private transient MetricGroup metricGroup;

334

private transient Gauge<Long> watermarkGauge;

335

336

@Override

337

public void open(Configuration parameters) {

338

metricGroup = getRuntimeContext().getMetricGroup();

339

watermarkGauge = metricGroup.gauge("currentWatermark",

340

() -> getCurrentWatermark());

341

}

342

343

@Override

344

public Row map(Row row) {

345

// Log watermark progress periodically

346

if (System.currentTimeMillis() % 10000 == 0) {

347

System.out.println("Current watermark: " + getCurrentWatermark());

348

}

349

return row;

350

}

351

352

private long getCurrentWatermark() {

353

// Get current watermark from context

354

return getRuntimeContext().getCurrentWatermark();

355

}

356

});

357

```

358

359

## Types

360

361

### Core Watermark Types

362

363

```java { .api }

364

import org.apache.flink.streaming.api.watermark.Watermark;

365

import org.apache.flink.table.sources.wmstrategies.PeriodicWatermarkAssigner;

366

import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner;

367

import org.apache.flink.table.sources.wmstrategies.BoundedOutOfOrderTimestamps;

368

import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;

369

```

370

371

### Legacy Descriptor Types

372

373

```java { .api }

374

import org.apache.flink.table.legacy.descriptors.Rowtime;

375

import java.util.Map;

376

import java.util.HashMap;

377

```

378

379

### Schema Integration Types

380

381

```java { .api }

382

import org.apache.flink.table.api.Schema;

383

import org.apache.flink.table.expressions.Expression;

384

```

385

386

## Migration from Legacy APIs

387

388

### Descriptor to Schema Migration

389

390

Migrate from legacy descriptor-based watermarks to modern Schema API.

391

392

```java

393

// Legacy approach (deprecated)

394

Rowtime rowtimeDescriptor = new Rowtime()

395

.timestampsFromField("event_time")

396

.watermarksPeriodicBounded(5000L);

397

398

// Modern approach

399

Schema modernSchema = Schema.newBuilder()

400

.column("event_id", "STRING")

401

.column("event_time", "TIMESTAMP(3)")

402

.column("event_data", "STRING")

403

.watermark("event_time", "event_time - INTERVAL '5' SECOND")

404

.build();

405

406

// Apply to DataStream

407

Table modernTable = tableEnv.fromDataStream(dataStream, modernSchema);

408

```