0
# Process Functions
1
2
Process functions provide the most flexible way to process streams in Apache Flink, offering access to timers, state, side outputs, and watermarks. They enable complex event processing patterns and stateful computations.
3
4
## Capabilities
5
6
### ProcessFunction
7
8
The base process function for transforming elements with access to context and timers.
9
10
```java { .api }
11
/**
12
* Process each element with access to context and timer services
13
* @param value - input element
14
* @param ctx - processing context
15
* @param out - collector for output elements
16
*/
17
abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
18
19
/**
20
* Handle timer events
21
* @param timestamp - timer timestamp
22
* @param ctx - timer context
23
* @param out - collector for output elements
24
*/
25
void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception;
26
```
27
28
**Usage Examples:**
29
30
```java
31
DataStream<String> result = input.process(new ProcessFunction<Event, String>() {
32
@Override
33
public void processElement(Event event, Context ctx, Collector<String> out) {
34
// Process element
35
out.collect("Processed: " + event.getValue());
36
37
// Register timer for 1 minute later
38
ctx.timerService().registerProcessingTimeTimer(
39
ctx.timerService().currentProcessingTime() + 60000
40
);
41
}
42
43
@Override
44
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) {
45
out.collect("Timer fired at: " + timestamp);
46
}
47
});
48
```
49
50
### KeyedProcessFunction
51
52
Process function for keyed streams with access to keyed state and per-key timers.
53
54
```java { .api }
55
/**
56
* Process each element in a keyed stream
57
* @param value - input element
58
* @param ctx - keyed processing context with access to current key
59
* @param out - collector for output elements
60
*/
61
abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
62
63
/**
64
* Handle timer events for keyed streams
65
* @param timestamp - timer timestamp
66
* @param ctx - keyed timer context
67
* @param out - collector for output elements
68
*/
69
void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception;
70
```
71
72
**Usage Examples:**
73
74
```java
75
DataStream<Alert> alerts = keyedStream.process(
76
new KeyedProcessFunction<String, Event, Alert>() {
77
private ValueState<Long> lastEventTime;
78
private ValueState<Integer> eventCount;
79
80
@Override
81
public void open(Configuration parameters) {
82
ValueStateDescriptor<Long> timeDescriptor =
83
new ValueStateDescriptor<>("lastEventTime", Long.class);
84
lastEventTime = getRuntimeContext().getState(timeDescriptor);
85
86
ValueStateDescriptor<Integer> countDescriptor =
87
new ValueStateDescriptor<>("eventCount", Integer.class);
88
eventCount = getRuntimeContext().getState(countDescriptor);
89
}
90
91
@Override
92
public void processElement(Event event, Context ctx, Collector<Alert> out) throws Exception {
93
Long lastTime = lastEventTime.value();
94
Integer count = eventCount.value();
95
96
if (count == null) {
97
count = 0;
98
}
99
100
// Update state
101
lastEventTime.update(event.getTimestamp());
102
eventCount.update(count + 1);
103
104
// Check for rapid events
105
if (lastTime != null && event.getTimestamp() - lastTime < 1000) {
106
out.collect(new Alert("Rapid events for key: " + ctx.getCurrentKey()));
107
}
108
109
// Set cleanup timer
110
ctx.timerService().registerEventTimeTimer(event.getTimestamp() + 300000);
111
}
112
113
@Override
114
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
115
// Clear state on timer
116
lastEventTime.clear();
117
eventCount.clear();
118
}
119
}
120
);
121
```
122
123
### CoProcessFunction
124
125
Process function for connected streams, enabling joint processing of two different stream types.
126
127
```java { .api }
128
/**
129
* Process element from first stream
130
* @param value - element from first stream
131
* @param ctx - processing context
132
* @param out - collector for output elements
133
*/
134
abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;
135
136
/**
137
* Process element from second stream
138
* @param value - element from second stream
139
* @param ctx - processing context
140
* @param out - collector for output elements
141
*/
142
abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
143
144
/**
145
* Handle timer events for connected streams
146
*/
147
void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
148
```
149
150
**Usage Examples:**
151
152
```java
153
ConnectedStreams<Order, Payment> connected = orders.connect(payments);
154
155
DataStream<Transaction> transactions = connected.keyBy(
156
order -> order.getOrderId(),
157
payment -> payment.getOrderId()
158
).process(new CoProcessFunction<Order, Payment, Transaction>() {
159
private ValueState<Order> orderState;
160
private ValueState<Payment> paymentState;
161
162
@Override
163
public void open(Configuration parameters) {
164
orderState = getRuntimeContext().getState(
165
new ValueStateDescriptor<>("order", Order.class));
166
paymentState = getRuntimeContext().getState(
167
new ValueStateDescriptor<>("payment", Payment.class));
168
}
169
170
@Override
171
public void processElement1(Order order, Context ctx, Collector<Transaction> out) throws Exception {
172
Payment payment = paymentState.value();
173
if (payment != null) {
174
// Both order and payment available
175
out.collect(new Transaction(order, payment));
176
paymentState.clear();
177
} else {
178
// Store order and wait for payment
179
orderState.update(order);
180
// Set timeout timer
181
ctx.timerService().registerEventTimeTimer(order.getTimestamp() + 300000);
182
}
183
}
184
185
@Override
186
public void processElement2(Payment payment, Context ctx, Collector<Transaction> out) throws Exception {
187
Order order = orderState.value();
188
if (order != null) {
189
// Both order and payment available
190
out.collect(new Transaction(order, payment));
191
orderState.clear();
192
} else {
193
// Store payment and wait for order
194
paymentState.update(payment);
195
// Set timeout timer
196
ctx.timerService().registerEventTimeTimer(payment.getTimestamp() + 300000);
197
}
198
}
199
200
@Override
201
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Transaction> out) {
202
// Timeout - clear any remaining state
203
orderState.clear();
204
paymentState.clear();
205
}
206
});
207
```
208
209
### KeyedCoProcessFunction
210
211
Process function for keyed connected streams, combining the capabilities of CoProcessFunction with keyed state management and timer functionality.
212
213
```java { .api }
214
/**
215
* Process element from first keyed stream
216
* @param value - element from first stream
217
* @param ctx - processing context with key access
218
* @param out - collector for output elements
219
*/
220
abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;
221
222
/**
223
* Process element from second keyed stream
224
* @param value - element from second stream
225
* @param ctx - processing context with key access
226
* @param out - collector for output elements
227
*/
228
abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
229
230
/**
231
* Handle timer events for keyed connected streams
232
* @param timestamp - timer timestamp
233
* @param ctx - timer context with key access
234
* @param out - collector for output elements
235
*/
236
void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
237
238
/**
239
* Context for keyed connected stream processing
240
*/
241
abstract class Context {
242
/**
243
* Get timestamp of current element
244
*/
245
abstract Long timestamp();
246
247
/**
248
* Get timer service for registering timers
249
*/
250
abstract TimerService timerService();
251
252
/**
253
* Output to side output
254
*/
255
abstract <X> void output(OutputTag<X> outputTag, X value);
256
257
/**
258
* Get current key
259
*/
260
abstract K getCurrentKey();
261
}
262
```
263
264
**Usage Examples:**
265
266
```java
267
ConnectedStreams<Order, Shipment> connected = orders.connect(shipments);
268
269
DataStream<OrderStatus> statusUpdates = connected.keyBy(
270
order -> order.getOrderId(),
271
shipment -> shipment.getOrderId()
272
).process(new KeyedCoProcessFunction<String, Order, Shipment, OrderStatus>() {
273
274
private ValueState<Order> orderState;
275
private ValueState<Shipment> shipmentState;
276
277
@Override
278
public void open(Configuration parameters) {
279
ValueStateDescriptor<Order> orderDescriptor =
280
new ValueStateDescriptor<>("order", Order.class);
281
ValueStateDescriptor<Shipment> shipmentDescriptor =
282
new ValueStateDescriptor<>("shipment", Shipment.class);
283
284
orderState = getRuntimeContext().getState(orderDescriptor);
285
shipmentState = getRuntimeContext().getState(shipmentDescriptor);
286
}
287
288
@Override
289
public void processElement1(Order order, Context ctx, Collector<OrderStatus> out) throws Exception {
290
String orderId = ctx.getCurrentKey();
291
orderState.update(order);
292
293
Shipment shipment = shipmentState.value();
294
if (shipment != null) {
295
// Order and shipment both available
296
out.collect(new OrderStatus(orderId, "SHIPPED", order, shipment));
297
shipmentState.clear();
298
} else {
299
// Order received, waiting for shipment
300
out.collect(new OrderStatus(orderId, "PROCESSING", order, null));
301
// Set timeout for order processing
302
ctx.timerService().registerEventTimeTimer(order.getOrderTime() + Duration.ofHours(24).toMillis());
303
}
304
}
305
306
@Override
307
public void processElement2(Shipment shipment, Context ctx, Collector<OrderStatus> out) throws Exception {
308
String orderId = ctx.getCurrentKey();
309
Order order = orderState.value();
310
311
if (order != null) {
312
// Order and shipment both available
313
out.collect(new OrderStatus(orderId, "SHIPPED", order, shipment));
314
orderState.clear();
315
} else {
316
// Shipment before order (unusual case)
317
shipmentState.update(shipment);
318
out.collect(new OrderStatus(orderId, "SHIPPED_EARLY", null, shipment));
319
}
320
}
321
322
@Override
323
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OrderStatus> out) throws Exception {
324
String orderId = ctx.getCurrentKey();
325
Order order = orderState.value();
326
327
if (order != null) {
328
// Order timeout - no shipment received
329
out.collect(new OrderStatus(orderId, "TIMEOUT", order, null));
330
orderState.clear();
331
}
332
333
// Clean up any remaining shipment state
334
shipmentState.clear();
335
}
336
});
337
```
338
339
### ProcessWindowFunction
340
341
Process function for windowed streams with access to window metadata and state.
342
343
```java { .api }
344
/**
345
* Process all elements in a window
346
* @param key - window key
347
* @param context - window context with metadata
348
* @param elements - all elements in the window
349
* @param out - collector for output elements
350
*/
351
abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
352
353
/**
354
* Clear any window state when window is purged
355
* @param context - window context
356
*/
357
void clear(Context context) throws Exception;
358
```
359
360
**Usage Examples:**
361
362
```java
363
DataStream<WindowResult> windowResults = keyedStream
364
.timeWindow(Time.minutes(5))
365
.process(new ProcessWindowFunction<Event, WindowResult, String, TimeWindow>() {
366
@Override
367
public void process(
368
String key,
369
Context context,
370
Iterable<Event> elements,
371
Collector<WindowResult> out
372
) throws Exception {
373
int count = 0;
374
double sum = 0.0;
375
long minTimestamp = Long.MAX_VALUE;
376
long maxTimestamp = Long.MIN_VALUE;
377
378
for (Event event : elements) {
379
count++;
380
sum += event.getValue();
381
minTimestamp = Math.min(minTimestamp, event.getTimestamp());
382
maxTimestamp = Math.max(maxTimestamp, event.getTimestamp());
383
}
384
385
WindowResult result = new WindowResult(
386
key,
387
context.window().getStart(),
388
context.window().getEnd(),
389
count,
390
sum / count, // average
391
minTimestamp,
392
maxTimestamp,
393
context.currentWatermark()
394
);
395
396
out.collect(result);
397
}
398
});
399
```
400
401
### Side Outputs
402
403
Use side outputs to emit multiple types of data from a single process function.
404
405
```java { .api }
406
// Emit to side output within process function
407
ctx.output(OutputTag<X> outputTag, X value);
408
409
// Retrieve side output from operator result
410
DataStream<X> getSideOutput(OutputTag<X> sideOutputTag);
411
```
412
413
**Usage Examples:**
414
415
```java
416
// Define side output tags
417
final OutputTag<String> errorTag = new OutputTag<String>("errors"){};
418
final OutputTag<String> warningTag = new OutputTag<String>("warnings"){};
419
420
SingleOutputStreamOperator<String> mainStream = input.process(
421
new ProcessFunction<Event, String>() {
422
@Override
423
public void processElement(Event event, Context ctx, Collector<String> out) {
424
if (event.isError()) {
425
ctx.output(errorTag, "Error: " + event.getMessage());
426
} else if (event.isWarning()) {
427
ctx.output(warningTag, "Warning: " + event.getMessage());
428
} else {
429
out.collect("Info: " + event.getMessage());
430
}
431
}
432
}
433
);
434
435
// Get side output streams
436
DataStream<String> errors = mainStream.getSideOutput(errorTag);
437
DataStream<String> warnings = mainStream.getSideOutput(warningTag);
438
```
439
440
### Timer Service
441
442
Access timer services for time-based processing and cleanup.
443
444
```java { .api }
445
// Timer service methods available in process function context
446
TimerService timerService();
447
448
// Timer service interface
449
interface TimerService {
450
long currentProcessingTime();
451
long currentWatermark();
452
453
void registerProcessingTimeTimer(long time);
454
void registerEventTimeTimer(long time);
455
456
void deleteProcessingTimeTimer(long time);
457
void deleteEventTimeTimer(long time);
458
}
459
```
460
461
**Usage Examples:**
462
463
```java
464
public class TimerExampleFunction extends KeyedProcessFunction<String, Event, String> {
465
@Override
466
public void processElement(Event event, Context ctx, Collector<String> out) throws Exception {
467
TimerService timerService = ctx.timerService();
468
469
// Get current times
470
long processingTime = timerService.currentProcessingTime();
471
long watermark = timerService.currentWatermark();
472
473
// Register timers
474
long processingTimeTimer = processingTime + 60000; // 1 minute later
475
long eventTimeTimer = event.getTimestamp() + 300000; // 5 minutes after event
476
477
timerService.registerProcessingTimeTimer(processingTimeTimer);
478
timerService.registerEventTimeTimer(eventTimeTimer);
479
480
// Store timer timestamps for potential deletion
481
// (using state to remember timer timestamps)
482
}
483
484
@Override
485
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
486
if (ctx.timeDomain() == TimeDomain.PROCESSING_TIME) {
487
out.collect("Processing time timer fired at: " + timestamp);
488
} else {
489
out.collect("Event time timer fired at: " + timestamp);
490
}
491
}
492
}
493
```
494
495
## Types
496
497
### Process Function Base Classes
498
499
```java { .api }
500
// Base process function
501
abstract class ProcessFunction<I, O> extends AbstractRichFunction {
502
abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
503
void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception;
504
505
abstract class Context {
506
abstract Long timestamp();
507
abstract TimerService timerService();
508
abstract <X> void output(OutputTag<X> outputTag, X value);
509
}
510
511
abstract class OnTimerContext extends Context {
512
abstract TimeDomain timeDomain();
513
}
514
}
515
516
// Keyed process function
517
abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
518
abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
519
void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception;
520
521
abstract class Context {
522
abstract Long timestamp();
523
abstract TimerService timerService();
524
abstract <X> void output(OutputTag<X> outputTag, X value);
525
abstract K getCurrentKey();
526
}
527
528
abstract class OnTimerContext extends Context {
529
abstract TimeDomain timeDomain();
530
}
531
}
532
533
// Co-process function
534
abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunction {
535
abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;
536
abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
537
void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
538
539
abstract class Context {
540
abstract Long timestamp();
541
abstract TimerService timerService();
542
abstract <X> void output(OutputTag<X> outputTag, X value);
543
}
544
545
abstract class OnTimerContext extends Context {
546
abstract TimeDomain timeDomain();
547
}
548
}
549
550
// Process window function
551
abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {
552
abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
553
void clear(Context context) throws Exception;
554
555
abstract class Context implements Serializable {
556
abstract W window();
557
abstract long currentProcessingTime();
558
abstract long currentWatermark();
559
abstract KeyedStateStore windowState();
560
abstract KeyedStateStore globalState();
561
abstract <X> void output(OutputTag<X> outputTag, X value);
562
}
563
}
564
```
565
566
### Supporting Types
567
568
```java { .api }
569
// Timer service
570
interface TimerService {
571
long currentProcessingTime();
572
long currentWatermark();
573
void registerProcessingTimeTimer(long time);
574
void registerEventTimeTimer(long time);
575
void deleteProcessingTimeTimer(long time);
576
void deleteEventTimeTimer(long time);
577
}
578
579
// Time domain
580
enum TimeDomain {
581
EVENT_TIME,
582
PROCESSING_TIME
583
}
584
585
// Output tag for side outputs
586
class OutputTag<T> {
587
public OutputTag(String id);
588
public OutputTag(String id, TypeInformation<T> typeInfo);
589
String getId();
590
TypeInformation<T> getTypeInfo();
591
}
592
593
// Collector interface
594
interface Collector<T> {
595
void collect(T record);
596
void close();
597
}
598
```