0
# User-Defined Functions
1
2
Function interfaces for implementing custom transformation logic. These abstract base classes define the contracts for various processing patterns, enabling users to implement custom business logic while leveraging Flink's distributed execution capabilities.
3
4
## Capabilities
5
6
### Base Function Interface
7
8
#### Base Function Class
9
10
Abstract base class for all user-defined functions providing common infrastructure.
11
12
```python { .api }
13
class Function:
14
"""
15
Abstract base class for all user-defined functions.
16
17
Provides common functionality for function execution including
18
configuration, lifecycle management, and error handling.
19
"""
20
21
def _run(self):
22
"""Abstract method implemented by subclasses for function execution."""
23
24
def _configure(self, input_file, output_file, mmap_size, port, env, info, subtask_index):
25
"""Sets up function execution context with runtime parameters."""
26
27
def _close(self):
28
"""Cleanup method called after function execution."""
29
```
30
31
### Transformation Functions
32
33
#### Map Function
34
35
Transforms each input element to exactly one output element.
36
37
```python { .api }
38
class MapFunction(Function):
39
def map(self, value):
40
"""
41
Transforms single input element to single output element.
42
43
Parameters:
44
value: Input element of any type
45
46
Returns:
47
Transformed element (can be different type)
48
"""
49
50
def collect(self, value):
51
"""
52
Internal method for collecting transformed values.
53
54
Parameters:
55
value: Input value to transform and collect
56
"""
57
```
58
59
#### Flat Map Function
60
61
Transforms each input element to zero or more output elements.
62
63
```python { .api }
64
class FlatMapFunction(Function):
65
def flat_map(self, value, collector):
66
"""
67
Transforms single input to zero or more outputs.
68
69
Parameters:
70
value: Input element
71
collector: Output collector - call collector.collect(output) for each result
72
"""
73
74
def collect(self, value):
75
"""
76
Internal method for collecting values using the flat_map transformation.
77
78
Parameters:
79
value: Input value to transform via flat_map
80
"""
81
```
82
83
#### Filter Function
84
85
Determines whether elements should be included in the result.
86
87
```python { .api }
88
class FilterFunction(Function):
89
def filter(self, value):
90
"""
91
Predicate function to include/exclude elements.
92
93
Parameters:
94
value: Input element to test
95
96
Returns:
97
bool: True to include element, False to exclude
98
"""
99
```
100
101
#### Map Partition Function
102
103
Processes entire partitions of data rather than individual elements.
104
105
```python { .api }
106
class MapPartitionFunction(Function):
107
def map_partition(self, iterator, collector):
108
"""
109
Processes entire partition of elements.
110
111
Allows for more efficient processing when setup/cleanup costs are high
112
or when processing requires access to multiple elements.
113
114
Parameters:
115
iterator: Iterator over all elements in the partition
116
collector: Output collector - call collector.collect(output) for each result
117
"""
118
```
119
120
### Reduction Functions
121
122
#### Reduce Function
123
124
Combines two elements into one of the same type.
125
126
```python { .api }
127
class ReduceFunction(Function):
128
def reduce(self, value1, value2):
129
"""
130
Combines two elements into one of the same type.
131
132
This function is applied associatively to reduce a set of elements
133
down to a single element.
134
135
Parameters:
136
value1: First element to combine
137
value2: Second element to combine
138
139
Returns:
140
Combined element of the same type as inputs
141
"""
142
143
def combine(self, value1, value2):
144
"""
145
Optional combiner function for partial aggregation.
146
147
Used for optimization - should have same semantics as reduce().
148
149
Parameters:
150
value1: First element to combine
151
value2: Second element to combine
152
153
Returns:
154
Combined element of the same type as inputs
155
"""
156
```
157
158
#### Group Reduce Function
159
160
Processes groups of elements with the same key.
161
162
```python { .api }
163
class GroupReduceFunction(Function):
164
def reduce(self, iterator, collector):
165
"""
166
Processes group of elements, emitting zero or more results.
167
168
Called once per group (or once for entire DataSet if not grouped).
169
Can iterate over all elements in the group and emit any number of results.
170
171
Parameters:
172
iterator: Iterator over all elements in the group
173
collector: Output collector - call collector.collect(output) for each result
174
"""
175
176
def combine(self, iterator, collector):
177
"""
178
Optional combiner for partial aggregation within partitions.
179
180
Used for optimization - should produce partial results that can be
181
further reduced by the reduce() method.
182
183
Parameters:
184
iterator: Iterator over elements in the partition
185
collector: Output collector for partial results
186
"""
187
```
188
189
### Multi-Input Functions
190
191
#### Join Function
192
193
Combines matching elements from two DataSets.
194
195
```python { .api }
196
class JoinFunction(Function):
197
def join(self, value1, value2):
198
"""
199
Combines matching elements from two DataSets.
200
201
Called for each pair of elements with matching keys.
202
203
Parameters:
204
value1: Element from first DataSet
205
value2: Element from second DataSet
206
207
Returns:
208
Combined element (can be any type)
209
"""
210
```
211
212
#### CoGroup Function
213
214
Processes groups from two DataSets with matching keys.
215
216
```python { .api }
217
class CoGroupFunction(Function):
218
def co_group(self, iterator1, iterator2, collector):
219
"""
220
Processes groups from two DataSets with the same key.
221
222
Called once per key, even if one or both groups are empty.
223
Useful for implementing outer joins and complex multi-input operations.
224
225
Parameters:
226
iterator1: Iterator over elements from first DataSet with this key
227
iterator2: Iterator over elements from second DataSet with this key
228
collector: Output collector for results
229
"""
230
```
231
232
#### Cross Function
233
234
Combines elements in a cross product (Cartesian product).
235
236
```python { .api }
237
class CrossFunction(Function):
238
def cross(self, value1, value2):
239
"""
240
Combines elements in cross product.
241
242
Called for every combination of elements from two DataSets.
243
244
Parameters:
245
value1: Element from first DataSet
246
value2: Element from second DataSet
247
248
Returns:
249
Combined element (can be any type)
250
"""
251
```
252
253
### Key Selection Functions
254
255
#### Key Selector Function
256
257
Extracts keys from elements for grouping and joining operations.
258
259
```python { .api }
260
class KeySelectorFunction(Function):
261
def get_key(self, value):
262
"""
263
Extracts key from element for grouping/joining.
264
265
Parameters:
266
value: Input element
267
268
Returns:
269
Key value used for grouping/joining
270
"""
271
```
272
273
### Runtime Context
274
275
Provides runtime information and services to functions.
276
277
```python { .api }
278
class RuntimeContext:
279
def get_broadcast_variable(self, name):
280
"""
281
Accesses broadcast variable by name.
282
283
Parameters:
284
name (str): Name of the broadcast variable
285
286
Returns:
287
Broadcast variable value
288
"""
289
290
def get_index_of_this_subtask(self):
291
"""
292
Gets index of current parallel subtask.
293
294
Returns:
295
int: Zero-based subtask index
296
"""
297
```
298
299
## Aggregation Functions
300
301
### Built-in Aggregation Types
302
303
```python { .api }
304
class Sum:
305
"""Aggregation for summing numeric values."""
306
307
class Min:
308
"""Aggregation for finding minimum values."""
309
310
class Max:
311
"""Aggregation for finding maximum values."""
312
```
313
314
### Aggregation Function Builder
315
316
```python { .api }
317
class AggregationFunction:
318
"""Combines multiple aggregations on different fields."""
319
320
def add_aggregation(self, aggregation, field):
321
"""
322
Adds additional aggregation to different field.
323
324
Parameters:
325
aggregation (Aggregation): Aggregation type (Sum, Min, Max)
326
field (int): Field index to aggregate
327
328
Returns:
329
AggregationFunction: Self for method chaining
330
"""
331
```
332
333
## Usage Examples
334
335
### Simple Map Function
336
337
```python
338
from flink.functions.MapFunction import MapFunction
339
340
class DoubleValue(MapFunction):
341
def map(self, value):
342
return value * 2
343
344
# Usage
345
data = env.from_elements(1, 2, 3, 4, 5)
346
doubled = data.map(DoubleValue())
347
348
# Or using lambda
349
doubled = data.map(lambda x: x * 2)
350
```
351
352
### Flat Map for Tokenization
353
354
```python
355
from flink.functions.FlatMapFunction import FlatMapFunction
356
357
class Tokenizer(FlatMapFunction):
358
def flat_map(self, line, collector):
359
words = line.lower().split()
360
for word in words:
361
collector.collect(word)
362
363
# Usage
364
text = env.from_elements("hello world", "flink python", "data processing")
365
words = text.flat_map(Tokenizer())
366
```
367
368
### Custom Filter Function
369
370
```python
371
from flink.functions.FilterFunction import FilterFunction
372
373
class EvenNumberFilter(FilterFunction):
374
def filter(self, value):
375
return value % 2 == 0
376
377
# Usage
378
numbers = env.from_elements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
379
evens = numbers.filter(EvenNumberFilter())
380
```
381
382
### Group Reduce for Word Counting
383
384
```python
385
from flink.functions.GroupReduceFunction import GroupReduceFunction
386
387
class WordCounter(GroupReduceFunction):
388
def reduce(self, iterator, collector):
389
word = None
390
count = 0
391
392
for element in iterator:
393
if word is None:
394
word = element
395
count += 1
396
397
collector.collect((word, count))
398
399
# Usage
400
words = text.flat_map(Tokenizer())
401
word_counts = words.group_by(0).reduce_group(WordCounter())
402
```
403
404
### Custom Reduce Function
405
406
```python
407
from flink.functions.ReduceFunction import ReduceFunction
408
409
class SumReduce(ReduceFunction):
410
def reduce(self, value1, value2):
411
return value1 + value2
412
413
def combine(self, value1, value2):
414
# Same implementation for this simple case
415
return value1 + value2
416
417
# Usage
418
numbers = env.from_elements(1, 2, 3, 4, 5)
419
total = numbers.reduce(SumReduce())
420
```
421
422
### Map Partition for Batch Processing
423
424
```python
425
from flink.functions.MapPartitionFunction import MapPartitionFunction
426
427
class BatchProcessor(MapPartitionFunction):
428
def map_partition(self, iterator, collector):
429
# Setup expensive resources once per partition
430
processor = ExpensiveProcessor()
431
432
batch = []
433
for element in iterator:
434
batch.append(element)
435
436
# Process in batches of 100
437
if len(batch) >= 100:
438
results = processor.process_batch(batch)
439
for result in results:
440
collector.collect(result)
441
batch = []
442
443
# Process remaining elements
444
if batch:
445
results = processor.process_batch(batch)
446
for result in results:
447
collector.collect(result)
448
449
# Cleanup
450
processor.close()
451
452
# Usage
453
large_dataset = env.read_csv("large_file.csv", [str, int, float])
454
processed = large_dataset.map_partition(BatchProcessor())
455
```
456
457
### Custom Join Function
458
459
```python
460
from flink.functions.JoinFunction import JoinFunction
461
462
class CustomerOrderJoin(JoinFunction):
463
def join(self, customer, order):
464
return {
465
'customer_id': customer[0],
466
'customer_name': customer[1],
467
'order_id': order[0],
468
'order_amount': order[2]
469
}
470
471
# Usage
472
customers = env.read_csv("customers.csv", [str, str])
473
orders = env.read_csv("orders.csv", [int, str, float])
474
475
result = customers.join(orders) \
476
.where(0) \
477
.equal_to(1) \
478
.using(CustomerOrderJoin())
479
```
480
481
### CoGroup for Outer Join
482
483
```python
484
from flink.functions.CoGroupFunction import CoGroupFunction
485
486
class LeftOuterJoin(CoGroupFunction):
487
def co_group(self, iterator1, iterator2, collector):
488
left_items = list(iterator1)
489
right_items = list(iterator2)
490
491
if not left_items:
492
return # No items in left dataset for this key
493
494
if not right_items:
495
# Left outer join - emit left items with null right side
496
for left_item in left_items:
497
collector.collect((left_item, None))
498
else:
499
# Inner join - emit all combinations
500
for left_item in left_items:
501
for right_item in right_items:
502
collector.collect((left_item, right_item))
503
504
# Usage
505
result = dataset1.co_group(dataset2) \
506
.where(0) \
507
.equal_to(0) \
508
.using(LeftOuterJoin())
509
```