or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconnectors.mdevent-time-watermarks.mdexecution-jobs.mdfunctions-and-operators.mdindex.mdstate-management.mdtype-system-serialization.mdutilities.md

event-time-watermarks.mddocs/

0

# Event Time and Watermarks

1

2

Apache Flink provides sophisticated support for event-time processing, enabling applications to handle out-of-order events and late arrivals through watermarks. This capability is essential for accurate time-based computations in streaming applications.

3

4

## Event Time Concepts

5

6

### Timestamp Assignment

7

8

Assign timestamps to events for event-time processing.

9

10

```java { .api }

11

import org.apache.flink.api.common.eventtime.TimestampAssigner;

12

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;

13

14

// Simple timestamp assigner

15

public class EventTimestampAssigner implements SerializableTimestampAssigner<Event> {

16

@Override

17

public long extractTimestamp(Event element, long recordTimestamp) {

18

// Extract timestamp from the event

19

return element.getTimestamp();

20

}

21

}

22

23

// Custom timestamp extraction logic

24

public class CustomTimestampAssigner implements SerializableTimestampAssigner<LogEntry> {

25

@Override

26

public long extractTimestamp(LogEntry element, long recordTimestamp) {

27

// Parse timestamp from log format

28

return parseTimestamp(element.getTimestampString());

29

}

30

31

private long parseTimestamp(String timestampStr) {

32

// Custom timestamp parsing logic

33

return Instant.parse(timestampStr).toEpochMilli();

34

}

35

}

36

37

// Using ingestion time

38

public class IngestionTimeAssigner implements SerializableTimestampAssigner<Event> {

39

@Override

40

public long extractTimestamp(Event element, long recordTimestamp) {

41

// Use processing time as event time

42

return System.currentTimeMillis();

43

}

44

}

45

```

46

47

### Watermark Strategies

48

49

Define how watermarks are generated for handling late events.

50

51

```java { .api }

52

import org.apache.flink.api.common.eventtime.WatermarkStrategy;

53

import org.apache.flink.api.common.eventtime.TimestampAssigner;

54

import java.time.Duration;

55

56

// Monotonic timestamps (events arrive in order)

57

WatermarkStrategy<Event> ascendingStrategy =

58

WatermarkStrategy.<Event>forMonotonousTimestamps()

59

.withTimestampAssigner((event, timestamp) -> event.getTimestamp());

60

61

// Bounded out-of-orderness

62

WatermarkStrategy<Event> boundedOutOfOrderStrategy =

63

WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))

64

.withTimestampAssigner((event, timestamp) -> event.getTimestamp());

65

66

// Custom watermark strategy with idleness detection

67

WatermarkStrategy<Event> customStrategy =

68

WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))

69

.withTimestampAssigner((event, timestamp) -> event.getTimestamp())

70

.withIdleness(Duration.ofMinutes(1)); // Mark source idle after 1 minute

71

72

// Generator-based strategy

73

WatermarkStrategy<Event> generatorStrategy =

74

WatermarkStrategy.<Event>forGenerator(context -> new CustomWatermarkGenerator())

75

.withTimestampAssigner((event, timestamp) -> event.getTimestamp());

76

```

77

78

## Watermark Generators

79

80

### Built-in Generators

81

82

```java { .api }

83

import org.apache.flink.api.common.eventtime.AscendingTimestampsWatermarks;

84

import org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks;

85

86

// Ascending timestamps generator

87

public class AscendingWatermarkExample {

88

public static WatermarkStrategy<Event> createStrategy() {

89

return WatermarkStrategy.<Event>forGenerator(context ->

90

new AscendingTimestampsWatermarks<>())

91

.withTimestampAssigner((event, timestamp) -> event.getTimestamp());

92

}

93

}

94

95

// Bounded out-of-orderness generator

96

public class BoundedOutOfOrderExample {

97

public static WatermarkStrategy<Event> createStrategy() {

98

Duration maxOutOfOrderness = Duration.ofSeconds(5);

99

return WatermarkStrategy.<Event>forGenerator(context ->

100

new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness))

101

.withTimestampAssigner((event, timestamp) -> event.getTimestamp());

102

}

103

}

104

```

105

106

### Custom Watermark Generators

107

108

```java { .api }

109

import org.apache.flink.api.common.eventtime.WatermarkGenerator;

110

import org.apache.flink.api.common.eventtime.WatermarkOutput;

111

import org.apache.flink.api.common.eventtime.Watermark;

112

113

// Periodic watermark generator

114

public class PeriodicWatermarkGenerator implements WatermarkGenerator<Event> {

115

private long maxTimestamp = Long.MIN_VALUE;

116

private final long maxOutOfOrderness = 5000; // 5 seconds

117

118

@Override

119

public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {

120

// Update max timestamp seen so far

121

maxTimestamp = Math.max(maxTimestamp, eventTimestamp);

122

}

123

124

@Override

125

public void onPeriodicEmit(WatermarkOutput output) {

126

// Emit watermark periodically (every few hundred milliseconds)

127

if (maxTimestamp != Long.MIN_VALUE) {

128

output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness - 1));

129

}

130

}

131

}

132

133

// Punctuated watermark generator

134

public class PunctuatedWatermarkGenerator implements WatermarkGenerator<Event> {

135

136

@Override

137

public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {

138

// Emit watermark on special events

139

if (event.hasWatermarkMarker()) {

140

output.emitWatermark(new Watermark(eventTimestamp));

141

}

142

}

143

144

@Override

145

public void onPeriodicEmit(WatermarkOutput output) {

146

// No periodic watermarks needed

147

}

148

}

149

150

// Adaptive watermark generator

151

public class AdaptiveWatermarkGenerator implements WatermarkGenerator<SensorReading> {

152

private long maxTimestamp = Long.MIN_VALUE;

153

private long baseDelayMs = 1000; // 1 second base delay

154

private long adaptiveDelayMs = 1000;

155

156

@Override

157

public void onEvent(SensorReading reading, long eventTimestamp, WatermarkOutput output) {

158

maxTimestamp = Math.max(maxTimestamp, eventTimestamp);

159

160

// Adapt delay based on out-of-orderness observed

161

long currentTime = System.currentTimeMillis();

162

long eventDelay = currentTime - eventTimestamp;

163

164

if (eventDelay > adaptiveDelayMs) {

165

// Increase delay if we see more out-of-order events

166

adaptiveDelayMs = Math.min(eventDelay * 2, 30000); // Max 30 seconds

167

} else {

168

// Gradually decrease delay when events are more in order

169

adaptiveDelayMs = Math.max(baseDelayMs, adaptiveDelayMs * 0.95);

170

}

171

}

172

173

@Override

174

public void onPeriodicEmit(WatermarkOutput output) {

175

if (maxTimestamp != Long.MIN_VALUE) {

176

output.emitWatermark(new Watermark(maxTimestamp - adaptiveDelayMs));

177

}

178

}

179

}

180

```

181

182

### Generator with Idleness Detection

183

184

```java { .api }

185

import org.apache.flink.api.common.eventtime.IdlenessTimer;

186

187

// Wrapper generator with idleness detection

188

public class IdleAwareWatermarkGenerator implements WatermarkGenerator<Event> {

189

private final WatermarkGenerator<Event> delegate;

190

private final IdlenessTimer idlenessTimer;

191

192

public IdleAwareWatermarkGenerator(WatermarkGenerator<Event> delegate,

193

Duration idleTimeout) {

194

this.delegate = delegate;

195

this.idlenessTimer = new IdlenessTimer(idleTimeout);

196

}

197

198

@Override

199

public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {

200

// Mark as active and delegate

201

idlenessTimer.activity();

202

delegate.onEvent(event, eventTimestamp, output);

203

}

204

205

@Override

206

public void onPeriodicEmit(WatermarkOutput output) {

207

// Check for idleness and emit watermarks

208

if (idlenessTimer.checkIfIdle(output)) {

209

// Source is idle, watermark advancement is paused

210

return;

211

}

212

213

delegate.onPeriodicEmit(output);

214

}

215

}

216

```

217

218

## Advanced Watermark Patterns

219

220

### Multi-Stream Watermarks

221

222

Handle watermarks from multiple input streams.

223

224

```java { .api }

225

import org.apache.flink.streaming.api.functions.co.CoProcessFunction;

226

import org.apache.flink.util.Collector;

227

228

public class MultiStreamWatermarkFunction extends CoProcessFunction<Event1, Event2, CombinedEvent> {

229

230

@Override

231

public void processElement1(Event1 event1, Context ctx, Collector<CombinedEvent> out)

232

throws Exception {

233

234

// Current watermark for this stream

235

long watermark1 = ctx.timerService().currentWatermark();

236

237

// Process event considering watermark

238

if (event1.getTimestamp() <= watermark1) {

239

// Event is within watermark bounds

240

out.collect(new CombinedEvent(event1, null));

241

}

242

}

243

244

@Override

245

public void processElement2(Event2 event2, Context ctx, Collector<CombinedEvent> out)

246

throws Exception {

247

248

long watermark2 = ctx.timerService().currentWatermark();

249

250

if (event2.getTimestamp() <= watermark2) {

251

out.collect(new CombinedEvent(null, event2));

252

}

253

}

254

}

255

```

256

257

### Custom Watermark Alignment

258

259

```java { .api }

260

import org.apache.flink.streaming.api.watermark.Watermark;

261

262

public class WatermarkAlignmentFunction extends ProcessFunction<Event, Event> {

263

private long lastWatermark = Long.MIN_VALUE;

264

private final long alignmentThreshold = 1000; // 1 second

265

266

@Override

267

public void processElement(Event event, Context ctx, Collector<Event> out)

268

throws Exception {

269

270

long currentWatermark = ctx.timerService().currentWatermark();

271

272

// Align processing based on watermark progression

273

if (currentWatermark - lastWatermark >= alignmentThreshold) {

274

// Watermark has advanced significantly

275

performPeriodicCleanup();

276

lastWatermark = currentWatermark;

277

}

278

279

out.collect(event);

280

}

281

282

private void performPeriodicCleanup() {

283

// Cleanup old state, flush buffers, etc.

284

}

285

}

286

```

287

288

## Working with Late Events

289

290

### Allowed Lateness

291

292

Configure how to handle late events in windowed operations.

293

294

```java { .api }

295

import org.apache.flink.streaming.api.windowing.time.Time;

296

import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

297

import org.apache.flink.util.OutputTag;

298

299

public class LateEventHandling {

300

301

// Output tag for late events

302

private static final OutputTag<Event> LATE_EVENTS_TAG =

303

new OutputTag<Event>("late-events") {};

304

305

public static void handleLateEvents(DataStream<Event> input) {

306

SingleOutputStreamOperator<WindowedResult> mainOutput = input

307

.assignTimestampsAndWatermarks(

308

WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))

309

.withTimestampAssigner((event, timestamp) -> event.getTimestamp())

310

)

311

.keyBy(Event::getKey)

312

.window(TumblingEventTimeWindows.of(Time.minutes(5)))

313

.allowedLateness(Time.minutes(1)) // Allow 1 minute lateness

314

.sideOutputLateData(LATE_EVENTS_TAG) // Collect late events

315

.process(new WindowProcessFunction<Event, WindowedResult, String, TimeWindow>() {

316

@Override

317

public void process(String key, Context context,

318

Iterable<Event> elements,

319

Collector<WindowedResult> out) throws Exception {

320

321

// Process events in window

322

long count = StreamSupport.stream(elements.spliterator(), false).count();

323

out.collect(new WindowedResult(key, count, context.window()));

324

}

325

});

326

327

// Handle late events separately

328

DataStream<Event> lateEvents = mainOutput.getSideOutput(LATE_EVENTS_TAG);

329

lateEvents.process(new ProcessFunction<Event, Void>() {

330

@Override

331

public void processElement(Event event, Context ctx, Collector<Void> out)

332

throws Exception {

333

// Log, store, or reprocess late events

334

System.out.println("Late event: " + event +

335

" arrived " + (ctx.timestamp() - event.getTimestamp()) + "ms late");

336

}

337

});

338

}

339

}

340

```

341

342

### Late Event Reprocessing

343

344

```java { .api }

345

public class LateEventReprocessor extends ProcessFunction<Event, ProcessedEvent> {

346

private final long maxLateness = 60000; // 1 minute

347

348

@Override

349

public void processElement(Event event, Context ctx, Collector<ProcessedEvent> out)

350

throws Exception {

351

352

long currentWatermark = ctx.timerService().currentWatermark();

353

long eventTime = event.getTimestamp();

354

355

if (eventTime <= currentWatermark) {

356

// Event is late

357

long lateness = currentWatermark - eventTime;

358

359

if (lateness <= maxLateness) {

360

// Acceptable lateness - reprocess

361

out.collect(new ProcessedEvent(event, true, lateness));

362

} else {

363

// Too late - handle specially

364

handleTooLateEvent(event, lateness, ctx);

365

}

366

} else {

367

// Event is on time

368

out.collect(new ProcessedEvent(event, false, 0));

369

}

370

}

371

372

private void handleTooLateEvent(Event event, long lateness, Context ctx) {

373

// Store in external system, alert, or discard

374

System.out.println("Event too late by " + lateness + "ms: " + event);

375

}

376

}

377

```

378

379

## Watermark Monitoring and Debugging

380

381

### Watermark Metrics

382

383

```java { .api }

384

public class WatermarkMetricsGenerator implements WatermarkGenerator<Event> {

385

private long maxTimestamp = Long.MIN_VALUE;

386

private final long maxOutOfOrderness = 5000;

387

388

// Metrics

389

private Counter watermarkEmissions;

390

private Gauge<Long> currentWatermark;

391

private Histogram eventLateness;

392

393

public void setMetrics(MetricGroup metricGroup) {

394

this.watermarkEmissions = metricGroup.counter("watermark-emissions");

395

this.currentWatermark = metricGroup.gauge("current-watermark", () ->

396

maxTimestamp - maxOutOfOrderness);

397

this.eventLateness = metricGroup.histogram("event-lateness");

398

}

399

400

@Override

401

public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {

402

maxTimestamp = Math.max(maxTimestamp, eventTimestamp);

403

404

// Track lateness metrics

405

long currentTime = System.currentTimeMillis();

406

long lateness = currentTime - eventTimestamp;

407

eventLateness.update(lateness);

408

}

409

410

@Override

411

public void onPeriodicEmit(WatermarkOutput output) {

412

if (maxTimestamp != Long.MIN_VALUE) {

413

watermarkEmissions.inc();

414

output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness - 1));

415

}

416

}

417

}

418

```

419

420

### Watermark Debugging

421

422

```java { .api }

423

public class WatermarkDebugger implements WatermarkGenerator<Event> {

424

private final WatermarkGenerator<Event> delegate;

425

private final String sourceName;

426

427

public WatermarkDebugger(WatermarkGenerator<Event> delegate, String sourceName) {

428

this.delegate = delegate;

429

this.sourceName = sourceName;

430

}

431

432

@Override

433

public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {

434

System.out.println(String.format(

435

"[%s] Event received - timestamp: %d, extracted: %d, delay: %dms",

436

sourceName, event.getTimestamp(), eventTimestamp,

437

System.currentTimeMillis() - eventTimestamp

438

));

439

440

delegate.onEvent(event, eventTimestamp, new WatermarkOutputWrapper(output));

441

}

442

443

@Override

444

public void onPeriodicEmit(WatermarkOutput output) {

445

delegate.onPeriodicEmit(new WatermarkOutputWrapper(output));

446

}

447

448

private class WatermarkOutputWrapper implements WatermarkOutput {

449

private final WatermarkOutput delegate;

450

451

public WatermarkOutputWrapper(WatermarkOutput delegate) {

452

this.delegate = delegate;

453

}

454

455

@Override

456

public void emitWatermark(Watermark watermark) {

457

System.out.println(String.format(

458

"[%s] Watermark emitted: %d (%s)",

459

sourceName, watermark.getTimestamp(),

460

new Date(watermark.getTimestamp())

461

));

462

delegate.emitWatermark(watermark);

463

}

464

465

@Override

466

public void markIdle() {

467

System.out.println(String.format("[%s] Source marked as idle", sourceName));

468

delegate.markIdle();

469

}

470

471

@Override

472

public void markActive() {

473

System.out.println(String.format("[%s] Source marked as active", sourceName));

474

delegate.markActive();

475

}

476

}

477

}

478

```

479

480

## Timestamp and Watermark Utilities

481

482

### Time Extraction Utilities

483

484

```java { .api }

485

public class TimeExtractionUtils {

486

487

// Extract from JSON timestamp field

488

public static SerializableTimestampAssigner<JsonNode> jsonTimestampExtractor(String timestampField) {

489

return (element, recordTimestamp) -> {

490

JsonNode timestampNode = element.get(timestampField);

491

return timestampNode != null ? timestampNode.asLong() : recordTimestamp;

492

};

493

}

494

495

// Extract from formatted string

496

public static SerializableTimestampAssigner<String> formatTimestampExtractor(String pattern) {

497

DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);

498

return (element, recordTimestamp) -> {

499

try {

500

LocalDateTime dateTime = LocalDateTime.parse(element, formatter);

501

return dateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();

502

} catch (Exception e) {

503

return recordTimestamp; // Fallback to record timestamp

504

}

505

};

506

}

507

508

// Extract from CSV with timestamp column

509

public static SerializableTimestampAssigner<String> csvTimestampExtractor(int timestampColumn,

510

String delimiter) {

511

return (element, recordTimestamp) -> {

512

String[] fields = element.split(delimiter);

513

if (fields.length > timestampColumn) {

514

try {

515

return Long.parseLong(fields[timestampColumn]);

516

} catch (NumberFormatException e) {

517

return recordTimestamp;

518

}

519

}

520

return recordTimestamp;

521

};

522

}

523

}

524

```

525

526

### Watermark Strategy Builders

527

528

```java { .api }

529

public class WatermarkStrategyBuilder {

530

531

public static <T> WatermarkStrategy<T> createRobustStrategy(

532

SerializableTimestampAssigner<T> timestampAssigner,

533

Duration maxOutOfOrderness,

534

Duration idlenessTimeout) {

535

536

return WatermarkStrategy.<T>forBoundedOutOfOrderness(maxOutOfOrderness)

537

.withTimestampAssigner(timestampAssigner)

538

.withIdleness(idlenessTimeout);

539

}

540

541

public static <T> WatermarkStrategy<T> createAdaptiveStrategy(

542

SerializableTimestampAssigner<T> timestampAssigner) {

543

544

return WatermarkStrategy.<T>forGenerator(context ->

545

new AdaptiveWatermarkGenerator<>())

546

.withTimestampAssigner(timestampAssigner);

547

}

548

549

public static <T> WatermarkStrategy<T> createDebugStrategy(

550

SerializableTimestampAssigner<T> timestampAssigner,

551

Duration maxOutOfOrderness,

552

String sourceName) {

553

554

return WatermarkStrategy.<T>forGenerator(context ->

555

new WatermarkDebugger<>(

556

new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness),

557

sourceName

558

))

559

.withTimestampAssigner(timestampAssigner);

560

}

561

}

562

```

563

564

## Best Practices

565

566

### Choosing Watermark Strategies

567

568

```java { .api }

569

public class WatermarkBestPractices {

570

571

// For mostly ordered streams with occasional late events

572

public static <T> WatermarkStrategy<T> forMostlyOrdered(

573

SerializableTimestampAssigner<T> timestampAssigner) {

574

575

return WatermarkStrategy.<T>forBoundedOutOfOrderness(Duration.ofSeconds(5))

576

.withTimestampAssigner(timestampAssigner)

577

.withIdleness(Duration.ofMinutes(1));

578

}

579

580

// For highly out-of-order streams

581

public static <T> WatermarkStrategy<T> forHighlyDisordered(

582

SerializableTimestampAssigner<T> timestampAssigner) {

583

584

return WatermarkStrategy.<T>forBoundedOutOfOrderness(Duration.ofMinutes(5))

585

.withTimestampAssigner(timestampAssigner)

586

.withIdleness(Duration.ofMinutes(2));

587

}

588

589

// For streams with unpredictable patterns

590

public static <T> WatermarkStrategy<T> forUnpredictableStreams(

591

SerializableTimestampAssigner<T> timestampAssigner) {

592

593

return WatermarkStrategy.<T>forGenerator(context ->

594

new AdaptiveWatermarkGenerator<>())

595

.withTimestampAssigner(timestampAssigner)

596

.withIdleness(Duration.ofMinutes(5));

597

}

598

}

599

600

// Performance considerations

601

public class PerformanceOptimizedWatermarkGenerator implements WatermarkGenerator<Event> {

602

private long maxTimestamp = Long.MIN_VALUE;

603

private final long maxOutOfOrderness;

604

private long lastWatermarkTime = 0;

605

private final long watermarkInterval = 1000; // Emit at most once per second

606

607

public PerformanceOptimizedWatermarkGenerator(Duration maxOutOfOrderness) {

608

this.maxOutOfOrderness = maxOutOfOrderness.toMillis();

609

}

610

611

@Override

612

public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {

613

// Only update if timestamp is actually newer

614

if (eventTimestamp > maxTimestamp) {

615

maxTimestamp = eventTimestamp;

616

617

// Emit watermark immediately for significant advances

618

long currentTime = System.currentTimeMillis();

619

if (currentTime - lastWatermarkTime > watermarkInterval) {

620

output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness));

621

lastWatermarkTime = currentTime;

622

}

623

}

624

}

625

626

@Override

627

public void onPeriodicEmit(WatermarkOutput output) {

628

// Periodic emission as backup

629

if (maxTimestamp != Long.MIN_VALUE) {

630

output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness));

631

lastWatermarkTime = System.currentTimeMillis();

632

}

633

}

634

}

635

```

636

637

Apache Flink's event-time processing and watermark system provides robust support for handling temporal aspects of streaming data. By understanding these concepts and applying appropriate strategies, you can build applications that accurately process time-based computations even with out-of-order and late-arriving events.