or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

built-in-connectors.mddatastream-conversions.mdindex.mdlegacy-connector-support.mdmodern-connector-framework.mdstream-table-environment.mdwatermark-strategies.md

watermark-strategies.mddocs/

0

# Watermark Strategies

1

2

Watermark strategies in the Flink Table API Java Bridge enable event-time processing by defining how to handle out-of-order events and when to trigger time-based operations. These strategies are essential for windowed operations and temporal joins in streaming applications.

3

4

## Overview

5

6

Watermarks are timestamps that flow as part of the data stream and indicate the progress of event time. They help Flink determine when all events for a particular time window have arrived, enabling the system to produce complete and correct results for time-based operations.

7

8

The bridge provides several watermark assignment strategies that can be used with table sources.

9

10

## Base Classes

11

12

### PeriodicWatermarkAssigner

13

14

Abstract base class for watermark strategies that generate watermarks periodically:

15

16

```java { .api }

17

@PublicEvolving

18

public abstract class PeriodicWatermarkAssigner extends WatermarkStrategy {

19

public abstract void nextTimestamp(long timestamp);

20

public abstract Watermark getWatermark();

21

22

// Inherited from WatermarkStrategy

23

public abstract Map<String, String> toProperties();

24

public abstract boolean equals(Object obj);

25

public abstract int hashCode();

26

}

27

```

28

29

**Usage Pattern:**

30

31

```java

32

public class MyPeriodicWatermarkAssigner extends PeriodicWatermarkAssigner {

33

private long maxTimestamp = Long.MIN_VALUE;

34

private final long maxOutOfOrderness;

35

36

public MyPeriodicWatermarkAssigner(long maxOutOfOrderness) {

37

this.maxOutOfOrderness = maxOutOfOrderness;

38

}

39

40

@Override

41

public void nextTimestamp(long timestamp) {

42

maxTimestamp = Math.max(maxTimestamp, timestamp);

43

}

44

45

@Override

46

public Watermark getWatermark() {

47

return new Watermark(maxTimestamp - maxOutOfOrderness);

48

}

49

50

@Override

51

public long extractTimestamp(Row element, long recordTimestamp) {

52

// Extract timestamp from the row

53

return (Long) element.getField(2); // Assuming timestamp is at index 2

54

}

55

}

56

```

57

58

### PunctuatedWatermarkAssigner

59

60

Abstract base class for watermark strategies that generate watermarks based on specific events:

61

62

```java { .api }

63

@PublicEvolving

64

public abstract class PunctuatedWatermarkAssigner extends WatermarkStrategy {

65

public abstract Watermark getWatermark(Row row, long timestamp);

66

67

// Inherited from WatermarkStrategy

68

public abstract Map<String, String> toProperties();

69

public abstract boolean equals(Object obj);

70

public abstract int hashCode();

71

}

72

```

73

74

**Usage Pattern:**

75

76

```java

77

public class MyPunctuatedWatermarkAssigner extends PunctuatedWatermarkAssigner {

78

79

@Override

80

public Watermark getWatermark(Row row, long extractedTimestamp) {

81

// Generate watermark based on special marker events

82

String eventType = (String) row.getField(1);

83

if ("WATERMARK_EVENT".equals(eventType)) {

84

return new Watermark(extractedTimestamp);

85

}

86

return null; // No watermark for regular events

87

}

88

89

@Override

90

public long extractTimestamp(Row element, long recordTimestamp) {

91

return (Long) element.getField(0); // Extract timestamp from row

92

}

93

}

94

```

95

96

## Built-in Strategies

97

98

### AscendingTimestamps

99

100

Watermark strategy for streams with strictly ascending timestamps:

101

102

```java { .api }

103

@PublicEvolving

104

public class AscendingTimestamps extends PeriodicWatermarkAssigner {

105

public void nextTimestamp(long timestamp);

106

public Watermark getWatermark();

107

}

108

```

109

110

**Usage Example:**

111

112

```java

113

// For streams where timestamps are guaranteed to be ascending

114

AscendingTimestamps watermarkStrategy = new AscendingTimestamps() {

115

@Override

116

public long extractTimestamp(Row element, long recordTimestamp) {

117

return (Long) element.getField(3); // timestamp field index

118

}

119

};

120

121

// Use with legacy table source

122

public class MyTableSource implements StreamTableSource<Row> {

123

@Override

124

public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {

125

return execEnv

126

.addSource(new MySourceFunction())

127

.assignTimestampsAndWatermarks(watermarkStrategy);

128

}

129

}

130

```

131

132

### BoundedOutOfOrderTimestamps

133

134

Watermark strategy for streams with bounded out-of-order events:

135

136

```java { .api }

137

@PublicEvolving

138

public class BoundedOutOfOrderTimestamps extends PeriodicWatermarkAssigner {

139

public BoundedOutOfOrderTimestamps(long maxOutOfOrderness);

140

public void nextTimestamp(long timestamp);

141

public Watermark getWatermark();

142

}

143

```

144

145

**Usage Example:**

146

147

```java

148

// For streams where events can arrive up to 5 seconds out of order

149

BoundedOutOfOrderTimestamps watermarkStrategy =

150

new BoundedOutOfOrderTimestamps(5000L) { // 5 seconds max out-of-order

151

@Override

152

public long extractTimestamp(Row element, long recordTimestamp) {

153

return (Long) element.getField(2); // event timestamp field

154

}

155

};

156

157

// Integration with table source

158

public class EventTableSource implements StreamTableSource<Row> {

159

@Override

160

public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {

161

return execEnv

162

.addSource(new EventSourceFunction())

163

.assignTimestampsAndWatermarks(watermarkStrategy);

164

}

165

}

166

```

167

168

## Modern Watermark Integration

169

170

With modern table sources, watermarks are typically defined in the table schema:

171

172

```java

173

// Define watermark in table schema

174

Schema schema = Schema.newBuilder()

175

.column("user_id", DataTypes.STRING())

176

.column("event_type", DataTypes.STRING())

177

.column("event_time", DataTypes.TIMESTAMP_LTZ(3))

178

.column("processing_time", DataTypes.TIMESTAMP_LTZ(3))

179

.watermark("event_time", "event_time - INTERVAL '5' SECONDS")

180

.build();

181

182

// Create table with watermark

183

tableEnv.createTable("events_with_watermark",

184

TableDescriptor.forConnector("my-connector")

185

.schema(schema)

186

.build());

187

```

188

189

## Legacy Integration Patterns

190

191

### With StreamTableSource

192

193

```java

194

public class WatermarkedStreamTableSource implements StreamTableSource<Row> {

195

private final long maxOutOfOrderness;

196

197

public WatermarkedStreamTableSource(long maxOutOfOrderness) {

198

this.maxOutOfOrderness = maxOutOfOrderness;

199

}

200

201

@Override

202

public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {

203

BoundedOutOfOrderTimestamps watermarkStrategy =

204

new BoundedOutOfOrderTimestamps(maxOutOfOrderness) {

205

@Override

206

public long extractTimestamp(Row element, long recordTimestamp) {

207

// Extract event time from row

208

Timestamp eventTime = (Timestamp) element.getField(2);

209

return eventTime.getTime();

210

}

211

};

212

213

return execEnv

214

.addSource(new MySourceFunction())

215

.assignTimestampsAndWatermarks(watermarkStrategy);

216

}

217

218

@Override

219

public TableSchema getTableSchema() {

220

return TableSchema.builder()

221

.field("id", DataTypes.BIGINT())

222

.field("data", DataTypes.STRING())

223

.field("event_time", DataTypes.TIMESTAMP(3))

224

.build();

225

}

226

}

227

```

228

229

### With DataStream Conversion

230

231

```java

232

// Convert DataStream with watermarks to Table

233

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

234

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

235

236

// Create DataStream with watermarks

237

DataStream<Event> eventStream = env

238

.addSource(new EventSource())

239

.assignTimestampsAndWatermarks(

240

new BoundedOutOfOrderTimestamps(Duration.ofSeconds(5)) {

241

@Override

242

public long extractTimestamp(Event element, long recordTimestamp) {

243

return element.getEventTime();

244

}

245

}

246

);

247

248

// Convert to Table while preserving watermarks

249

Schema schema = Schema.newBuilder()

250

.column("id", DataTypes.BIGINT())

251

.column("data", DataTypes.STRING())

252

.column("event_time", DataTypes.TIMESTAMP_LTZ(3))

253

.watermark("event_time", "SOURCE_WATERMARK()") // Preserve existing watermarks

254

.build();

255

256

Table eventTable = tableEnv.fromDataStream(eventStream, schema);

257

```

258

259

## Windowed Operations with Watermarks

260

261

### Tumbling Windows

262

263

```java

264

// Use watermarks with tumbling windows

265

Table windowedResult = tableEnv.sqlQuery("""

266

SELECT

267

user_id,

268

COUNT(*) as event_count,

269

SUM(amount) as total_amount,

270

TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,

271

TUMBLE_END(event_time, INTERVAL '1' MINUTE) as window_end

272

FROM events_with_watermark

273

GROUP BY

274

user_id,

275

TUMBLE(event_time, INTERVAL '1' MINUTE)

276

""");

277

```

278

279

### Sliding Windows

280

281

```java

282

// Sliding windows with watermarks

283

Table slidingResult = tableEnv.sqlQuery("""

284

SELECT

285

user_id,

286

AVG(value) as avg_value,

287

HOP_START(event_time, INTERVAL '30' SECONDS, INTERVAL '2' MINUTES) as window_start,

288

HOP_END(event_time, INTERVAL '30' SECONDS, INTERVAL '2' MINUTES) as window_end

289

FROM events_with_watermark

290

GROUP BY

291

user_id,

292

HOP(event_time, INTERVAL '30' SECONDS, INTERVAL '2' MINUTES)

293

""");

294

```

295

296

### Session Windows

297

298

```java

299

// Session windows with watermarks

300

Table sessionResult = tableEnv.sqlQuery("""

301

SELECT

302

user_id,

303

COUNT(*) as session_events,

304

MIN(event_time) as session_start,

305

MAX(event_time) as session_end,

306

SESSION_START(event_time, INTERVAL '10' MINUTES) as session_window_start,

307

SESSION_END(event_time, INTERVAL '10' MINUTES) as session_window_end

308

FROM user_events

309

GROUP BY

310

user_id,

311

SESSION(event_time, INTERVAL '10' MINUTES)

312

""");

313

```

314

315

## Custom Watermark Strategies

316

317

### Business Logic Based Watermarks

318

319

```java

320

public class BusinessLogicWatermarkAssigner extends PeriodicWatermarkAssigner {

321

private long maxTimestamp = Long.MIN_VALUE;

322

private final long gracePeriod;

323

324

public BusinessLogicWatermarkAssigner(long gracePeriodMs) {

325

this.gracePeriod = gracePeriodMs;

326

}

327

328

@Override

329

public void nextTimestamp(long timestamp) {

330

maxTimestamp = Math.max(maxTimestamp, timestamp);

331

}

332

333

@Override

334

public Watermark getWatermark() {

335

// Business rule: allow 30% of grace period for late events on weekends

336

long currentTime = System.currentTimeMillis();

337

Calendar cal = Calendar.getInstance();

338

cal.setTimeInMillis(currentTime);

339

340

long adjustedGracePeriod = gracePeriod;

341

if (cal.get(Calendar.DAY_OF_WEEK) == Calendar.SATURDAY ||

342

cal.get(Calendar.DAY_OF_WEEK) == Calendar.SUNDAY) {

343

adjustedGracePeriod = (long) (gracePeriod * 1.3);

344

}

345

346

return new Watermark(maxTimestamp - adjustedGracePeriod);

347

}

348

349

@Override

350

public long extractTimestamp(Row element, long recordTimestamp) {

351

return (Long) element.getField(1);

352

}

353

}

354

```

355

356

### Multi-Source Watermark Coordination

357

358

```java

359

public class CoordinatedWatermarkAssigner extends PeriodicWatermarkAssigner {

360

private final Map<String, Long> sourceWatermarksNeed = new HashMap<>();

361

private final long defaultLag;

362

363

public CoordinatedWatermarkAssigner(long defaultLagMs) {

364

this.defaultLag = defaultLagMs;

365

}

366

367

@Override

368

public void nextTimestamp(long timestamp) {

369

// Update watermark per source

370

// Implementation would track per-source timestamps

371

}

372

373

@Override

374

public Watermark getWatermark() {

375

// Return minimum watermark across all sources

376

long minWatermark = sourceWatermarksNeed.values().stream()

377

.mapToLong(Long::longValue)

378

.min()

379

.orElse(Long.MIN_VALUE);

380

381

return minWatermark == Long.MIN_VALUE ?

382

null : new Watermark(minWatermark - defaultLag);

383

}

384

385

@Override

386

public long extractTimestamp(Row element, long recordTimestamp) {

387

String sourceId = (String) element.getField(0);

388

long timestamp = (Long) element.getField(2);

389

390

// Update per-source watermark tracking

391

sourceWatermarksNeed.put(sourceId, Math.max(

392

sourceWatermarksNeed.getOrDefault(sourceId, Long.MIN_VALUE),

393

timestamp

394

));

395

396

return timestamp;

397

}

398

}

399

```

400

401

## Best Practices

402

403

### Watermark Configuration

404

405

1. **Choose appropriate lag**: Balance between lateness tolerance and result timeliness

406

2. **Monitor late events**: Track events arriving after watermarks

407

3. **Consider business requirements**: Different domains may need different lateness handling

408

409

```java

410

// Monitor late events

411

DataStream<Row> eventStream = tableEnv.toDataStream(eventsTable);

412

OutputTag<Row> lateEventsTag = new OutputTag<Row>("late-events"){};

413

414

SingleOutputStreamOperator<Row> processedStream = eventStream

415

.keyBy(row -> row.getField(0))

416

.window(TumblingEventTimeWindows.of(Time.minutes(5)))

417

.allowedLateness(Time.minutes(1)) // Allow 1 minute lateness

418

.sideOutputLateData(lateEventsTag)

419

.apply(new MyWindowFunction());

420

421

// Handle late events separately

422

DataStream<Row> lateEvents = processedStream.getSideOutput(lateEventsTag);

423

lateEvents.addSink(new LateEventsSink());

424

```

425

426

### Performance Optimization

427

428

1. **Periodic interval**: Configure watermark generation interval appropriately

429

2. **Timestamp extraction**: Optimize timestamp extraction for performance

430

3. **Memory usage**: Be mindful of state size in custom watermark assigners

431

432

```java

433

// Configure watermark interval

434

env.getConfig().setAutoWatermarkInterval(1000L); // 1 second

435

436

// Efficient timestamp extraction

437

public class OptimizedWatermarkAssigner extends BoundedOutOfOrderTimestamps {

438

public OptimizedWatermarkAssigner(long maxOutOfOrderness) {

439

super(maxOutOfOrderness);

440

}

441

442

@Override

443

public long extractTimestamp(Row element, long recordTimestamp) {

444

// Use recordTimestamp when available to avoid field access

445

if (recordTimestamp != Long.MIN_VALUE) {

446

return recordTimestamp;

447

}

448

return (Long) element.getField(2);

449

}

450

}

451

```

452

453

### Error Handling

454

455

```java

456

public class RobustWatermarkAssigner extends PeriodicWatermarkAssigner {

457

@Override

458

public long extractTimestamp(Row element, long recordTimestamp) {

459

try {

460

Object timestampField = element.getField(2);

461

if (timestampField instanceof Long) {

462

return (Long) timestampField;

463

} else if (timestampField instanceof Timestamp) {

464

return ((Timestamp) timestampField).getTime();

465

} else {

466

// Log warning and use processing time

467

LOG.warn("Invalid timestamp field type: {}", timestampField.getClass());

468

return System.currentTimeMillis();

469

}

470

} catch (Exception e) {

471

LOG.error("Error extracting timestamp", e);

472

return System.currentTimeMillis();

473

}

474

}

475

}