or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-io.mdcheckpointing.mddatastream-transformations.mdexecution-environment.mdindex.mdkeyed-streams-state.mdprocess-functions.mdsources-sinks.mdtime-watermarks.mdwindowing.md

process-functions.mddocs/

0

# Process Functions

1

2

Process functions provide the most flexible way to process streams in Apache Flink, offering access to timers, state, side outputs, and watermarks. They enable complex event processing patterns and stateful computations.

3

4

## Capabilities

5

6

### ProcessFunction

7

8

The base process function for transforming elements with access to context and timers.

9

10

```java { .api }

11

/**

12

* Process each element with access to context and timer services

13

* @param value - input element

14

* @param ctx - processing context

15

* @param out - collector for output elements

16

*/

17

abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;

18

19

/**

20

* Handle timer events

21

* @param timestamp - timer timestamp

22

* @param ctx - timer context

23

* @param out - collector for output elements

24

*/

25

void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception;

26

```

27

28

**Usage Examples:**

29

30

```java

31

DataStream<String> result = input.process(new ProcessFunction<Event, String>() {

32

@Override

33

public void processElement(Event event, Context ctx, Collector<String> out) {

34

// Process element

35

out.collect("Processed: " + event.getValue());

36

37

// Register timer for 1 minute later

38

ctx.timerService().registerProcessingTimeTimer(

39

ctx.timerService().currentProcessingTime() + 60000

40

);

41

}

42

43

@Override

44

public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) {

45

out.collect("Timer fired at: " + timestamp);

46

}

47

});

48

```

49

50

### KeyedProcessFunction

51

52

Process function for keyed streams with access to keyed state and per-key timers.

53

54

```java { .api }

55

/**

56

* Process each element in a keyed stream

57

* @param value - input element

58

* @param ctx - keyed processing context with access to current key

59

* @param out - collector for output elements

60

*/

61

abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;

62

63

/**

64

* Handle timer events for keyed streams

65

* @param timestamp - timer timestamp

66

* @param ctx - keyed timer context

67

* @param out - collector for output elements

68

*/

69

void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception;

70

```

71

72

**Usage Examples:**

73

74

```java

75

DataStream<Alert> alerts = keyedStream.process(

76

new KeyedProcessFunction<String, Event, Alert>() {

77

private ValueState<Long> lastEventTime;

78

private ValueState<Integer> eventCount;

79

80

@Override

81

public void open(Configuration parameters) {

82

ValueStateDescriptor<Long> timeDescriptor =

83

new ValueStateDescriptor<>("lastEventTime", Long.class);

84

lastEventTime = getRuntimeContext().getState(timeDescriptor);

85

86

ValueStateDescriptor<Integer> countDescriptor =

87

new ValueStateDescriptor<>("eventCount", Integer.class);

88

eventCount = getRuntimeContext().getState(countDescriptor);

89

}

90

91

@Override

92

public void processElement(Event event, Context ctx, Collector<Alert> out) throws Exception {

93

Long lastTime = lastEventTime.value();

94

Integer count = eventCount.value();

95

96

if (count == null) {

97

count = 0;

98

}

99

100

// Update state

101

lastEventTime.update(event.getTimestamp());

102

eventCount.update(count + 1);

103

104

// Check for rapid events

105

if (lastTime != null && event.getTimestamp() - lastTime < 1000) {

106

out.collect(new Alert("Rapid events for key: " + ctx.getCurrentKey()));

107

}

108

109

// Set cleanup timer

110

ctx.timerService().registerEventTimeTimer(event.getTimestamp() + 300000);

111

}

112

113

@Override

114

public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {

115

// Clear state on timer

116

lastEventTime.clear();

117

eventCount.clear();

118

}

119

}

120

);

121

```

122

123

### CoProcessFunction

124

125

Process function for connected streams, enabling joint processing of two different stream types.

126

127

```java { .api }

128

/**

129

* Process element from first stream

130

* @param value - element from first stream

131

* @param ctx - processing context

132

* @param out - collector for output elements

133

*/

134

abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;

135

136

/**

137

* Process element from second stream

138

* @param value - element from second stream

139

* @param ctx - processing context

140

* @param out - collector for output elements

141

*/

142

abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;

143

144

/**

145

* Handle timer events for connected streams

146

*/

147

void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;

148

```

149

150

**Usage Examples:**

151

152

```java

153

ConnectedStreams<Order, Payment> connected = orders.connect(payments);

154

155

DataStream<Transaction> transactions = connected.keyBy(

156

order -> order.getOrderId(),

157

payment -> payment.getOrderId()

158

).process(new CoProcessFunction<Order, Payment, Transaction>() {

159

private ValueState<Order> orderState;

160

private ValueState<Payment> paymentState;

161

162

@Override

163

public void open(Configuration parameters) {

164

orderState = getRuntimeContext().getState(

165

new ValueStateDescriptor<>("order", Order.class));

166

paymentState = getRuntimeContext().getState(

167

new ValueStateDescriptor<>("payment", Payment.class));

168

}

169

170

@Override

171

public void processElement1(Order order, Context ctx, Collector<Transaction> out) throws Exception {

172

Payment payment = paymentState.value();

173

if (payment != null) {

174

// Both order and payment available

175

out.collect(new Transaction(order, payment));

176

paymentState.clear();

177

} else {

178

// Store order and wait for payment

179

orderState.update(order);

180

// Set timeout timer

181

ctx.timerService().registerEventTimeTimer(order.getTimestamp() + 300000);

182

}

183

}

184

185

@Override

186

public void processElement2(Payment payment, Context ctx, Collector<Transaction> out) throws Exception {

187

Order order = orderState.value();

188

if (order != null) {

189

// Both order and payment available

190

out.collect(new Transaction(order, payment));

191

orderState.clear();

192

} else {

193

// Store payment and wait for order

194

paymentState.update(payment);

195

// Set timeout timer

196

ctx.timerService().registerEventTimeTimer(payment.getTimestamp() + 300000);

197

}

198

}

199

200

@Override

201

public void onTimer(long timestamp, OnTimerContext ctx, Collector<Transaction> out) {

202

// Timeout - clear any remaining state

203

orderState.clear();

204

paymentState.clear();

205

}

206

});

207

```

208

209

### KeyedCoProcessFunction

210

211

Process function for keyed connected streams, combining the capabilities of CoProcessFunction with keyed state management and timer functionality.

212

213

```java { .api }

214

/**

215

* Process element from first keyed stream

216

* @param value - element from first stream

217

* @param ctx - processing context with key access

218

* @param out - collector for output elements

219

*/

220

abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;

221

222

/**

223

* Process element from second keyed stream

224

* @param value - element from second stream

225

* @param ctx - processing context with key access

226

* @param out - collector for output elements

227

*/

228

abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;

229

230

/**

231

* Handle timer events for keyed connected streams

232

* @param timestamp - timer timestamp

233

* @param ctx - timer context with key access

234

* @param out - collector for output elements

235

*/

236

void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;

237

238

/**

239

* Context for keyed connected stream processing

240

*/

241

abstract class Context {

242

/**

243

* Get timestamp of current element

244

*/

245

abstract Long timestamp();

246

247

/**

248

* Get timer service for registering timers

249

*/

250

abstract TimerService timerService();

251

252

/**

253

* Output to side output

254

*/

255

abstract <X> void output(OutputTag<X> outputTag, X value);

256

257

/**

258

* Get current key

259

*/

260

abstract K getCurrentKey();

261

}

262

```

263

264

**Usage Examples:**

265

266

```java

267

ConnectedStreams<Order, Shipment> connected = orders.connect(shipments);

268

269

DataStream<OrderStatus> statusUpdates = connected.keyBy(

270

order -> order.getOrderId(),

271

shipment -> shipment.getOrderId()

272

).process(new KeyedCoProcessFunction<String, Order, Shipment, OrderStatus>() {

273

274

private ValueState<Order> orderState;

275

private ValueState<Shipment> shipmentState;

276

277

@Override

278

public void open(Configuration parameters) {

279

ValueStateDescriptor<Order> orderDescriptor =

280

new ValueStateDescriptor<>("order", Order.class);

281

ValueStateDescriptor<Shipment> shipmentDescriptor =

282

new ValueStateDescriptor<>("shipment", Shipment.class);

283

284

orderState = getRuntimeContext().getState(orderDescriptor);

285

shipmentState = getRuntimeContext().getState(shipmentDescriptor);

286

}

287

288

@Override

289

public void processElement1(Order order, Context ctx, Collector<OrderStatus> out) throws Exception {

290

String orderId = ctx.getCurrentKey();

291

orderState.update(order);

292

293

Shipment shipment = shipmentState.value();

294

if (shipment != null) {

295

// Order and shipment both available

296

out.collect(new OrderStatus(orderId, "SHIPPED", order, shipment));

297

shipmentState.clear();

298

} else {

299

// Order received, waiting for shipment

300

out.collect(new OrderStatus(orderId, "PROCESSING", order, null));

301

// Set timeout for order processing

302

ctx.timerService().registerEventTimeTimer(order.getOrderTime() + Duration.ofHours(24).toMillis());

303

}

304

}

305

306

@Override

307

public void processElement2(Shipment shipment, Context ctx, Collector<OrderStatus> out) throws Exception {

308

String orderId = ctx.getCurrentKey();

309

Order order = orderState.value();

310

311

if (order != null) {

312

// Order and shipment both available

313

out.collect(new OrderStatus(orderId, "SHIPPED", order, shipment));

314

orderState.clear();

315

} else {

316

// Shipment before order (unusual case)

317

shipmentState.update(shipment);

318

out.collect(new OrderStatus(orderId, "SHIPPED_EARLY", null, shipment));

319

}

320

}

321

322

@Override

323

public void onTimer(long timestamp, OnTimerContext ctx, Collector<OrderStatus> out) throws Exception {

324

String orderId = ctx.getCurrentKey();

325

Order order = orderState.value();

326

327

if (order != null) {

328

// Order timeout - no shipment received

329

out.collect(new OrderStatus(orderId, "TIMEOUT", order, null));

330

orderState.clear();

331

}

332

333

// Clean up any remaining shipment state

334

shipmentState.clear();

335

}

336

});

337

```

338

339

### ProcessWindowFunction

340

341

Process function for windowed streams with access to window metadata and state.

342

343

```java { .api }

344

/**

345

* Process all elements in a window

346

* @param key - window key

347

* @param context - window context with metadata

348

* @param elements - all elements in the window

349

* @param out - collector for output elements

350

*/

351

abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;

352

353

/**

354

* Clear any window state when window is purged

355

* @param context - window context

356

*/

357

void clear(Context context) throws Exception;

358

```

359

360

**Usage Examples:**

361

362

```java

363

DataStream<WindowResult> windowResults = keyedStream

364

.timeWindow(Time.minutes(5))

365

.process(new ProcessWindowFunction<Event, WindowResult, String, TimeWindow>() {

366

@Override

367

public void process(

368

String key,

369

Context context,

370

Iterable<Event> elements,

371

Collector<WindowResult> out

372

) throws Exception {

373

int count = 0;

374

double sum = 0.0;

375

long minTimestamp = Long.MAX_VALUE;

376

long maxTimestamp = Long.MIN_VALUE;

377

378

for (Event event : elements) {

379

count++;

380

sum += event.getValue();

381

minTimestamp = Math.min(minTimestamp, event.getTimestamp());

382

maxTimestamp = Math.max(maxTimestamp, event.getTimestamp());

383

}

384

385

WindowResult result = new WindowResult(

386

key,

387

context.window().getStart(),

388

context.window().getEnd(),

389

count,

390

sum / count, // average

391

minTimestamp,

392

maxTimestamp,

393

context.currentWatermark()

394

);

395

396

out.collect(result);

397

}

398

});

399

```

400

401

### Side Outputs

402

403

Use side outputs to emit multiple types of data from a single process function.

404

405

```java { .api }

406

// Emit to side output within process function

407

ctx.output(OutputTag<X> outputTag, X value);

408

409

// Retrieve side output from operator result

410

DataStream<X> getSideOutput(OutputTag<X> sideOutputTag);

411

```

412

413

**Usage Examples:**

414

415

```java

416

// Define side output tags

417

final OutputTag<String> errorTag = new OutputTag<String>("errors"){};

418

final OutputTag<String> warningTag = new OutputTag<String>("warnings"){};

419

420

SingleOutputStreamOperator<String> mainStream = input.process(

421

new ProcessFunction<Event, String>() {

422

@Override

423

public void processElement(Event event, Context ctx, Collector<String> out) {

424

if (event.isError()) {

425

ctx.output(errorTag, "Error: " + event.getMessage());

426

} else if (event.isWarning()) {

427

ctx.output(warningTag, "Warning: " + event.getMessage());

428

} else {

429

out.collect("Info: " + event.getMessage());

430

}

431

}

432

}

433

);

434

435

// Get side output streams

436

DataStream<String> errors = mainStream.getSideOutput(errorTag);

437

DataStream<String> warnings = mainStream.getSideOutput(warningTag);

438

```

439

440

### Timer Service

441

442

Access timer services for time-based processing and cleanup.

443

444

```java { .api }

445

// Timer service methods available in process function context

446

TimerService timerService();

447

448

// Timer service interface

449

interface TimerService {

450

long currentProcessingTime();

451

long currentWatermark();

452

453

void registerProcessingTimeTimer(long time);

454

void registerEventTimeTimer(long time);

455

456

void deleteProcessingTimeTimer(long time);

457

void deleteEventTimeTimer(long time);

458

}

459

```

460

461

**Usage Examples:**

462

463

```java

464

public class TimerExampleFunction extends KeyedProcessFunction<String, Event, String> {

465

@Override

466

public void processElement(Event event, Context ctx, Collector<String> out) throws Exception {

467

TimerService timerService = ctx.timerService();

468

469

// Get current times

470

long processingTime = timerService.currentProcessingTime();

471

long watermark = timerService.currentWatermark();

472

473

// Register timers

474

long processingTimeTimer = processingTime + 60000; // 1 minute later

475

long eventTimeTimer = event.getTimestamp() + 300000; // 5 minutes after event

476

477

timerService.registerProcessingTimeTimer(processingTimeTimer);

478

timerService.registerEventTimeTimer(eventTimeTimer);

479

480

// Store timer timestamps for potential deletion

481

// (using state to remember timer timestamps)

482

}

483

484

@Override

485

public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {

486

if (ctx.timeDomain() == TimeDomain.PROCESSING_TIME) {

487

out.collect("Processing time timer fired at: " + timestamp);

488

} else {

489

out.collect("Event time timer fired at: " + timestamp);

490

}

491

}

492

}

493

```

494

495

## Types

496

497

### Process Function Base Classes

498

499

```java { .api }

500

// Base process function

501

abstract class ProcessFunction<I, O> extends AbstractRichFunction {

502

abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;

503

void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception;

504

505

abstract class Context {

506

abstract Long timestamp();

507

abstract TimerService timerService();

508

abstract <X> void output(OutputTag<X> outputTag, X value);

509

}

510

511

abstract class OnTimerContext extends Context {

512

abstract TimeDomain timeDomain();

513

}

514

}

515

516

// Keyed process function

517

abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {

518

abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;

519

void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception;

520

521

abstract class Context {

522

abstract Long timestamp();

523

abstract TimerService timerService();

524

abstract <X> void output(OutputTag<X> outputTag, X value);

525

abstract K getCurrentKey();

526

}

527

528

abstract class OnTimerContext extends Context {

529

abstract TimeDomain timeDomain();

530

}

531

}

532

533

// Co-process function

534

abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunction {

535

abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;

536

abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;

537

void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;

538

539

abstract class Context {

540

abstract Long timestamp();

541

abstract TimerService timerService();

542

abstract <X> void output(OutputTag<X> outputTag, X value);

543

}

544

545

abstract class OnTimerContext extends Context {

546

abstract TimeDomain timeDomain();

547

}

548

}

549

550

// Process window function

551

abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {

552

abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;

553

void clear(Context context) throws Exception;

554

555

abstract class Context implements Serializable {

556

abstract W window();

557

abstract long currentProcessingTime();

558

abstract long currentWatermark();

559

abstract KeyedStateStore windowState();

560

abstract KeyedStateStore globalState();

561

abstract <X> void output(OutputTag<X> outputTag, X value);

562

}

563

}

564

```

565

566

### Supporting Types

567

568

```java { .api }

569

// Timer service

570

interface TimerService {

571

long currentProcessingTime();

572

long currentWatermark();

573

void registerProcessingTimeTimer(long time);

574

void registerEventTimeTimer(long time);

575

void deleteProcessingTimeTimer(long time);

576

void deleteEventTimeTimer(long time);

577

}

578

579

// Time domain

580

enum TimeDomain {

581

EVENT_TIME,

582

PROCESSING_TIME

583

}

584

585

// Output tag for side outputs

586

class OutputTag<T> {

587

public OutputTag(String id);

588

public OutputTag(String id, TypeInformation<T> typeInfo);

589

String getId();

590

TypeInformation<T> getTypeInfo();

591

}

592

593

// Collector interface

594

interface Collector<T> {

595

void collect(T record);

596

void close();

597

}

598

```