0
# Functions and Operators
1
2
Apache Flink Core provides a comprehensive set of user-defined function interfaces and operators for building data transformation pipelines. These APIs enable developers to implement custom business logic for stream and batch processing applications.
3
4
## Core Function Interfaces
5
6
### MapFunction
7
8
Transform elements one-to-one.
9
10
```java { .api }
11
import org.apache.flink.api.common.functions.MapFunction;
12
13
// Basic map function
14
public class StringLengthMapper implements MapFunction<String, Integer> {
15
@Override
16
public Integer map(String value) throws Exception {
17
return value.length();
18
}
19
}
20
21
// Rich map function with lifecycle methods
22
public class RichStringMapper extends RichMapFunction<String, String> {
23
private String prefix;
24
25
@Override
26
public void open(OpenContext openContext) throws Exception {
27
// Initialize resources, read configuration
28
prefix = getRuntimeContext().getExecutionConfig()
29
.getGlobalJobParameters().toMap().get("prefix");
30
}
31
32
@Override
33
public String map(String value) throws Exception {
34
return prefix + value;
35
}
36
37
@Override
38
public void close() throws Exception {
39
// Clean up resources
40
}
41
}
42
```
43
44
### FlatMapFunction
45
46
Transform elements one-to-many.
47
48
```java { .api }
49
import org.apache.flink.api.common.functions.FlatMapFunction;
50
import org.apache.flink.util.Collector;
51
52
// Split strings into words
53
public class TokenizerFunction implements FlatMapFunction<String, String> {
54
@Override
55
public void flatMap(String value, Collector<String> out) throws Exception {
56
for (String word : value.split("\\s+")) {
57
if (!word.isEmpty()) {
58
out.collect(word);
59
}
60
}
61
}
62
}
63
64
// Rich flat map function
65
public class RichTokenizerFunction extends RichFlatMapFunction<String, String> {
66
private Pattern pattern;
67
68
@Override
69
public void open(OpenContext openContext) throws Exception {
70
// Compile regex pattern once during initialization
71
pattern = Pattern.compile("\\s+");
72
}
73
74
@Override
75
public void flatMap(String value, Collector<String> out) throws Exception {
76
for (String word : pattern.split(value)) {
77
if (!word.isEmpty()) {
78
out.collect(word.toLowerCase());
79
}
80
}
81
}
82
}
83
```
84
85
### FilterFunction
86
87
Filter elements based on predicates.
88
89
```java { .api }
90
import org.apache.flink.api.common.functions.FilterFunction;
91
92
// Filter strings by length
93
public class LengthFilter implements FilterFunction<String> {
94
private final int minLength;
95
96
public LengthFilter(int minLength) {
97
this.minLength = minLength;
98
}
99
100
@Override
101
public boolean filter(String value) throws Exception {
102
return value.length() >= minLength;
103
}
104
}
105
106
// Rich filter with metrics
107
public class RichLengthFilter extends RichFilterFunction<String> {
108
private Counter filteredCounter;
109
110
@Override
111
public void open(OpenContext openContext) throws Exception {
112
filteredCounter = getRuntimeContext()
113
.getMetricGroup()
114
.counter("filtered_elements");
115
}
116
117
@Override
118
public boolean filter(String value) throws Exception {
119
boolean pass = value.length() >= 5;
120
if (!pass) {
121
filteredCounter.inc();
122
}
123
return pass;
124
}
125
}
126
```
127
128
### ReduceFunction
129
130
Combine elements of the same type.
131
132
```java { .api }
133
import org.apache.flink.api.common.functions.ReduceFunction;
134
135
// Sum integers
136
public class SumReduceFunction implements ReduceFunction<Integer> {
137
@Override
138
public Integer reduce(Integer value1, Integer value2) throws Exception {
139
return value1 + value2;
140
}
141
}
142
143
// Combine objects
144
public class WordCountReduceFunction implements ReduceFunction<Tuple2<String, Integer>> {
145
@Override
146
public Tuple2<String, Integer> reduce(
147
Tuple2<String, Integer> value1,
148
Tuple2<String, Integer> value2) throws Exception {
149
return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
150
}
151
}
152
```
153
154
## Advanced Function Interfaces
155
156
### JoinFunction and FlatJoinFunction
157
158
Join elements from two data streams.
159
160
```java { .api }
161
import org.apache.flink.api.common.functions.JoinFunction;
162
import org.apache.flink.api.common.functions.FlatJoinFunction;
163
164
// Simple join function
165
public class UserOrderJoinFunction implements JoinFunction<User, Order, UserOrder> {
166
@Override
167
public UserOrder join(User user, Order order) throws Exception {
168
return new UserOrder(user.getId(), user.getName(), order.getAmount());
169
}
170
}
171
172
// Flat join function producing multiple results
173
public class UserOrderFlatJoinFunction implements FlatJoinFunction<User, Order, String> {
174
@Override
175
public void join(User user, Order order, Collector<String> out) throws Exception {
176
// Output multiple formats for each join
177
out.collect("User: " + user.getName() + " - Order: " + order.getId());
178
out.collect("Amount: " + order.getAmount() + " for " + user.getName());
179
}
180
}
181
```
182
183
### CoGroupFunction
184
185
Group and process elements from two data streams.
186
187
```java { .api }
188
import org.apache.flink.api.common.functions.CoGroupFunction;
189
190
public class UserOrderCoGroupFunction implements
191
CoGroupFunction<User, Order, UserOrderSummary> {
192
193
@Override
194
public void coGroup(Iterable<User> users, Iterable<Order> orders,
195
Collector<UserOrderSummary> out) throws Exception {
196
197
User user = users.iterator().hasNext() ? users.iterator().next() : null;
198
199
if (user != null) {
200
int totalAmount = 0;
201
int orderCount = 0;
202
203
for (Order order : orders) {
204
totalAmount += order.getAmount();
205
orderCount++;
206
}
207
208
out.collect(new UserOrderSummary(user.getId(), orderCount, totalAmount));
209
}
210
}
211
}
212
```
213
214
### GroupReduceFunction
215
216
Reduce groups of elements.
217
218
```java { .api }
219
import org.apache.flink.api.common.functions.GroupReduceFunction;
220
221
public class WordCountGroupReduceFunction implements
222
GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
223
224
@Override
225
public void reduce(Iterable<Tuple2<String, Integer>> values,
226
Collector<Tuple2<String, Integer>> out) throws Exception {
227
228
String word = null;
229
int count = 0;
230
231
for (Tuple2<String, Integer> value : values) {
232
word = value.f0;
233
count += value.f1;
234
}
235
236
out.collect(new Tuple2<>(word, count));
237
}
238
}
239
```
240
241
### MapPartitionFunction
242
243
Process entire partitions.
244
245
```java { .api }
246
import org.apache.flink.api.common.functions.MapPartitionFunction;
247
248
public class StatisticsMapPartitionFunction implements
249
MapPartitionFunction<Integer, PartitionStatistics> {
250
251
@Override
252
public void mapPartition(Iterable<Integer> values,
253
Collector<PartitionStatistics> out) throws Exception {
254
255
int count = 0;
256
int sum = 0;
257
int min = Integer.MAX_VALUE;
258
int max = Integer.MIN_VALUE;
259
260
for (Integer value : values) {
261
count++;
262
sum += value;
263
min = Math.min(min, value);
264
max = Math.max(max, value);
265
}
266
267
if (count > 0) {
268
double avg = (double) sum / count;
269
out.collect(new PartitionStatistics(count, sum, avg, min, max));
270
}
271
}
272
}
273
```
274
275
## Rich Functions
276
277
Rich functions provide additional lifecycle methods and access to runtime context.
278
279
```java { .api }
280
import org.apache.flink.api.common.functions.RichFunction;
281
import org.apache.flink.api.common.functions.OpenContext;
282
import org.apache.flink.configuration.Configuration;
283
284
public abstract class AbstractRichFunction implements RichFunction {
285
private RuntimeContext runtimeContext;
286
287
@Override
288
public void setRuntimeContext(RuntimeContext runtimeContext) {
289
this.runtimeContext = runtimeContext;
290
}
291
292
@Override
293
public RuntimeContext getRuntimeContext() {
294
return runtimeContext;
295
}
296
297
@Override
298
public void open(OpenContext openContext) throws Exception {
299
// Override in subclasses for initialization
300
}
301
302
@Override
303
public void close() throws Exception {
304
// Override in subclasses for cleanup
305
}
306
}
307
308
// Example rich function implementation
309
public class DatabaseLookupFunction extends RichMapFunction<String, UserProfile> {
310
private DatabaseConnection connection;
311
312
@Override
313
public void open(OpenContext openContext) throws Exception {
314
// Initialize database connection
315
Configuration config = (Configuration) getRuntimeContext()
316
.getExecutionConfig().getGlobalJobParameters();
317
318
String dbUrl = config.getString("db.url", "localhost:5432");
319
connection = new DatabaseConnection(dbUrl);
320
}
321
322
@Override
323
public UserProfile map(String userId) throws Exception {
324
return connection.getUserProfile(userId);
325
}
326
327
@Override
328
public void close() throws Exception {
329
if (connection != null) {
330
connection.close();
331
}
332
}
333
}
334
```
335
336
## Function Utilities
337
338
### BroadcastVariableInitializer
339
340
Transform broadcast variables during initialization.
341
342
```java { .api }
343
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
344
345
public class MapBroadcastInitializer implements
346
BroadcastVariableInitializer<Tuple2<String, Integer>, Map<String, Integer>> {
347
348
@Override
349
public Map<String, Integer> initializeBroadcastVariable(
350
Iterable<Tuple2<String, Integer>> data) {
351
352
Map<String, Integer> map = new HashMap<>();
353
for (Tuple2<String, Integer> tuple : data) {
354
map.put(tuple.f0, tuple.f1);
355
}
356
return map;
357
}
358
}
359
360
// Using broadcast variable in rich function
361
public class EnrichWithBroadcastFunction extends RichMapFunction<String, EnrichedData> {
362
private Map<String, Integer> broadcastMap;
363
364
@Override
365
public void open(OpenContext openContext) throws Exception {
366
// Access broadcast variable
367
broadcastMap = getRuntimeContext()
368
.getBroadcastVariable("config-map");
369
}
370
371
@Override
372
public EnrichedData map(String value) throws Exception {
373
Integer config = broadcastMap.get(value);
374
return new EnrichedData(value, config != null ? config : 0);
375
}
376
}
377
```
378
379
### Partitioner
380
381
Custom partitioning logic.
382
383
```java { .api }
384
import org.apache.flink.api.common.functions.Partitioner;
385
386
public class CustomPartitioner implements Partitioner<String> {
387
@Override
388
public int partition(String key, int numPartitions) {
389
// Custom partitioning logic
390
return Math.abs(key.hashCode()) % numPartitions;
391
}
392
}
393
394
// Hash-based partitioner for specific business logic
395
public class UserIdPartitioner implements Partitioner<String> {
396
@Override
397
public int partition(String userId, int numPartitions) {
398
// Ensure users with similar IDs go to same partition
399
return (userId.hashCode() & Integer.MAX_VALUE) % numPartitions;
400
}
401
}
402
```
403
404
## Runtime Context
405
406
Access runtime information and services within functions.
407
408
```java { .api }
409
import org.apache.flink.api.common.functions.RuntimeContext;
410
import org.apache.flink.metrics.Counter;
411
import org.apache.flink.metrics.MetricGroup;
412
413
public class MetricsAwareFunction extends RichMapFunction<String, String> {
414
private Counter processedCounter;
415
private Counter errorCounter;
416
417
@Override
418
public void open(OpenContext openContext) throws Exception {
419
RuntimeContext ctx = getRuntimeContext();
420
421
// Access task information
422
String taskName = ctx.getTaskName();
423
int subtaskIndex = ctx.getIndexOfThisSubtask();
424
int parallelism = ctx.getNumberOfParallelSubtasks();
425
426
// Create metrics
427
MetricGroup metricGroup = ctx.getMetricGroup();
428
processedCounter = metricGroup.counter("processed");
429
errorCounter = metricGroup.counter("errors");
430
431
// Access state (in keyed operations)
432
ValueStateDescriptor<Integer> descriptor =
433
new ValueStateDescriptor<>("count", Integer.class);
434
ValueState<Integer> countState = ctx.getState(descriptor);
435
}
436
437
@Override
438
public String map(String value) throws Exception {
439
try {
440
processedCounter.inc();
441
// Process value
442
return value.toUpperCase();
443
} catch (Exception e) {
444
errorCounter.inc();
445
throw e;
446
}
447
}
448
}
449
```
450
451
## Aggregate Functions
452
453
For advanced aggregation operations.
454
455
```java { .api }
456
import org.apache.flink.api.common.functions.AggregateFunction;
457
458
// Average aggregate function
459
public class AverageAggregateFunction implements
460
AggregateFunction<Integer, Tuple2<Integer, Integer>, Double> {
461
462
@Override
463
public Tuple2<Integer, Integer> createAccumulator() {
464
return new Tuple2<>(0, 0); // (sum, count)
465
}
466
467
@Override
468
public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {
469
return new Tuple2<>(accumulator.f0 + value, accumulator.f1 + 1);
470
}
471
472
@Override
473
public Double getResult(Tuple2<Integer, Integer> accumulator) {
474
return accumulator.f1 == 0 ? 0.0 : (double) accumulator.f0 / accumulator.f1;
475
}
476
477
@Override
478
public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
479
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
480
}
481
}
482
483
// Rich aggregate function with metrics
484
public class RichAverageAggregateFunction extends RichAggregateFunction<Integer, Tuple2<Integer, Integer>, Double> {
485
private Counter aggregationCounter;
486
487
@Override
488
public void open(OpenContext openContext) throws Exception {
489
aggregationCounter = getRuntimeContext()
490
.getMetricGroup()
491
.counter("aggregations");
492
}
493
494
@Override
495
public Tuple2<Integer, Integer> createAccumulator() {
496
return new Tuple2<>(0, 0);
497
}
498
499
@Override
500
public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {
501
aggregationCounter.inc();
502
return new Tuple2<>(accumulator.f0 + value, accumulator.f1 + 1);
503
}
504
505
@Override
506
public Double getResult(Tuple2<Integer, Integer> accumulator) {
507
return accumulator.f1 == 0 ? 0.0 : (double) accumulator.f0 / accumulator.f1;
508
}
509
510
@Override
511
public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
512
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
513
}
514
}
515
```
516
517
## Cross Functions
518
519
For Cartesian product operations.
520
521
```java { .api }
522
import org.apache.flink.api.common.functions.CrossFunction;
523
524
public class UserProductCrossFunction implements CrossFunction<User, Product, UserProductPair> {
525
@Override
526
public UserProductPair cross(User user, Product product) throws Exception {
527
return new UserProductPair(
528
user.getId(),
529
product.getId(),
530
calculateCompatibility(user, product)
531
);
532
}
533
534
private double calculateCompatibility(User user, Product product) {
535
// Custom compatibility calculation
536
return Math.random(); // Simplified example
537
}
538
}
539
```
540
541
## Best Practices
542
543
### Function Design
544
545
```java { .api }
546
// Prefer stateless functions when possible
547
public class StatelessTransformFunction implements MapFunction<InputType, OutputType> {
548
@Override
549
public OutputType map(InputType input) throws Exception {
550
// Pure transformation logic without side effects
551
return transform(input);
552
}
553
554
private OutputType transform(InputType input) {
555
// Deterministic transformation
556
return new OutputType(input.getValue() * 2);
557
}
558
}
559
560
// Use rich functions when you need lifecycle management
561
public class ResourceManagedFunction extends RichMapFunction<InputType, OutputType> {
562
private transient ExpensiveResource resource;
563
564
@Override
565
public void open(OpenContext openContext) throws Exception {
566
// Initialize expensive resources once per task
567
resource = new ExpensiveResource();
568
}
569
570
@Override
571
public OutputType map(InputType input) throws Exception {
572
return resource.process(input);
573
}
574
575
@Override
576
public void close() throws Exception {
577
if (resource != null) {
578
resource.cleanup();
579
}
580
}
581
}
582
```
583
584
### Exception Handling
585
586
```java { .api }
587
public class RobustMapFunction implements MapFunction<String, Result> {
588
@Override
589
public Result map(String value) throws Exception {
590
try {
591
return processValue(value);
592
} catch (IllegalArgumentException e) {
593
// Handle known exceptions gracefully
594
return Result.createErrorResult("Invalid input: " + value);
595
} catch (Exception e) {
596
// Re-throw unexpected exceptions to trigger Flink's fault tolerance
597
throw new Exception("Processing failed for value: " + value, e);
598
}
599
}
600
601
private Result processValue(String value) {
602
// Business logic
603
return new Result(value);
604
}
605
}
606
```
607
608
Apache Flink's function interfaces provide a powerful foundation for implementing custom data processing logic. By choosing the appropriate function type and following best practices, you can build efficient, maintainable, and fault-tolerant data processing applications.