0
# Core Functions & Types
1
2
Fundamental function interfaces and type system that form the building blocks for all Flink applications. This includes user-defined functions, tuple system, type descriptors, and core abstractions used across all Flink APIs.
3
4
## Capabilities
5
6
### User-Defined Functions
7
8
Base interfaces for all user-defined functions in Flink applications.
9
10
```java { .api }
11
/**
12
* Base interface for all user-defined functions
13
*/
14
interface Function {}
15
16
/**
17
* Function for 1:1 transformations
18
* @param <T> Input type
19
* @param <O> Output type
20
*/
21
interface MapFunction<T, O> extends Function {
22
/**
23
* Transform a single input element
24
* @param value Input element
25
* @return Transformed element
26
* @throws Exception
27
*/
28
O map(T value) throws Exception;
29
}
30
31
/**
32
* Function for 1:N transformations
33
* @param <T> Input type
34
* @param <O> Output type
35
*/
36
interface FlatMapFunction<T, O> extends Function {
37
/**
38
* Transform one element into zero, one, or more elements
39
* @param value Input element
40
* @param out Collector for output elements
41
* @throws Exception
42
*/
43
void flatMap(T value, Collector<O> out) throws Exception;
44
}
45
46
/**
47
* Function for filtering elements
48
* @param <T> Element type
49
*/
50
interface FilterFunction<T> extends Function {
51
/**
52
* Test whether element should be kept
53
* @param value Element to test
54
* @return true if element should be kept
55
* @throws Exception
56
*/
57
boolean filter(T value) throws Exception;
58
}
59
60
/**
61
* Function for reduce operations on streams/datasets
62
* @param <T> Element type
63
*/
64
interface ReduceFunction<T> extends Function {
65
/**
66
* Combine two elements into one
67
* @param value1 First element
68
* @param value2 Second element
69
* @return Combined element
70
* @throws Exception
71
*/
72
T reduce(T value1, T value2) throws Exception;
73
}
74
75
/**
76
* Function for incremental aggregation operations
77
* @param <IN> Input type
78
* @param <ACC> Accumulator type
79
* @param <OUT> Output type
80
*/
81
interface AggregateFunction<IN, ACC, OUT> extends Function {
82
/**
83
* Create new accumulator
84
* @return New accumulator
85
*/
86
ACC createAccumulator();
87
88
/**
89
* Add input to accumulator
90
* @param accumulator Current accumulator
91
* @param value Input value
92
* @return Updated accumulator
93
*/
94
ACC add(IN value, ACC accumulator);
95
96
/**
97
* Get result from accumulator
98
* @param accumulator Final accumulator
99
* @return Result
100
*/
101
OUT getResult(ACC accumulator);
102
103
/**
104
* Merge two accumulators
105
* @param a First accumulator
106
* @param b Second accumulator
107
* @return Merged accumulator
108
*/
109
ACC merge(ACC a, ACC b);
110
}
111
112
/**
113
* Function for extracting keys from elements
114
* @param <IN> Input type
115
* @param <KEY> Key type
116
*/
117
interface KeySelector<IN, KEY> extends Function {
118
/**
119
* Extract key from element
120
* @param value Input element
121
* @return Key
122
* @throws Exception
123
*/
124
KEY getKey(IN value) throws Exception;
125
}
126
```
127
128
### Rich Function Variants
129
130
Rich variants of user-defined functions that provide access to runtime context including state, metrics, and configuration.
131
132
```java { .api }
133
/**
134
* Base class for rich user-defined functions
135
*/
136
abstract class AbstractRichFunction implements Function {
137
/**
138
* Get runtime context
139
* @return Runtime context
140
*/
141
public RuntimeContext getRuntimeContext();
142
143
/**
144
* Initialization method called once per parallel instance
145
* @param parameters Configuration parameters
146
* @throws Exception
147
*/
148
public void open(Configuration parameters) throws Exception {}
149
150
/**
151
* Cleanup method called once per parallel instance
152
* @throws Exception
153
*/
154
public void close() throws Exception {}
155
}
156
157
/**
158
* Rich map function with runtime context access
159
* @param <T> Input type
160
* @param <O> Output type
161
*/
162
abstract class RichMapFunction<T, O> extends AbstractRichFunction implements MapFunction<T, O> {}
163
164
/**
165
* Rich flat map function with runtime context access
166
* @param <T> Input type
167
* @param <O> Output type
168
*/
169
abstract class RichFlatMapFunction<T, O> extends AbstractRichFunction implements FlatMapFunction<T, O> {}
170
171
/**
172
* Rich filter function with runtime context access
173
* @param <T> Element type
174
*/
175
abstract class RichFilterFunction<T> extends AbstractRichFunction implements FilterFunction<T> {}
176
177
/**
178
* Rich reduce function with runtime context access
179
* @param <T> Element type
180
*/
181
abstract class RichReduceFunction<T> extends AbstractRichFunction implements ReduceFunction<T> {}
182
183
/**
184
* Rich aggregate function with runtime context access
185
* @param <IN> Input type
186
* @param <ACC> Accumulator type
187
* @param <OUT> Output type
188
*/
189
abstract class RichAggregateFunction<IN, ACC, OUT> extends AbstractRichFunction implements AggregateFunction<IN, ACC, OUT> {}
190
191
/**
192
* Runtime context providing access to state, metrics, and configuration
193
*/
194
interface RuntimeContext {
195
/**
196
* Get task name
197
* @return Task name
198
*/
199
String getTaskName();
200
201
/**
202
* Get parallelism of current operator
203
* @return Parallelism
204
*/
205
int getParallelism();
206
207
/**
208
* Get index of current parallel subtask
209
* @return Subtask index (0-based)
210
*/
211
int getIndexOfThisSubtask();
212
213
/**
214
* Get state for the given descriptor
215
* @param stateDescriptor State descriptor
216
* @param <T> State type
217
* @return State instance
218
*/
219
<T> ValueState<T> getState(ValueStateDescriptor<T> stateDescriptor);
220
221
/**
222
* Get list state for the given descriptor
223
* @param stateDescriptor State descriptor
224
* @param <T> Element type
225
* @return List state instance
226
*/
227
<T> ListState<T> getListState(ListStateDescriptor<T> stateDescriptor);
228
229
/**
230
* Get map state for the given descriptor
231
* @param stateDescriptor State descriptor
232
* @param <UK> User key type
233
* @param <UV> User value type
234
* @return Map state instance
235
*/
236
<UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateDescriptor);
237
}
238
```
239
240
### Join and CoGroup Functions
241
242
Functions for joining and co-grouping datasets and streams.
243
244
```java { .api }
245
/**
246
* Function for joining two datasets/streams
247
* @param <IN1> First input type
248
* @param <IN2> Second input type
249
* @param <OUT> Output type
250
*/
251
interface JoinFunction<IN1, IN2, OUT> extends Function {
252
/**
253
* Join two elements
254
* @param first Element from first input
255
* @param second Element from second input
256
* @return Joined result
257
* @throws Exception
258
*/
259
OUT join(IN1 first, IN2 second) throws Exception;
260
}
261
262
/**
263
* Function for flat joining two datasets/streams
264
* @param <IN1> First input type
265
* @param <IN2> Second input type
266
* @param <OUT> Output type
267
*/
268
interface FlatJoinFunction<IN1, IN2, OUT> extends Function {
269
/**
270
* Join two elements producing zero, one, or more results
271
* @param first Element from first input
272
* @param second Element from second input
273
* @param out Collector for output elements
274
* @throws Exception
275
*/
276
void join(IN1 first, IN2 second, Collector<OUT> out) throws Exception;
277
}
278
279
/**
280
* Function for co-grouping two datasets/streams
281
* @param <IN1> First input type
282
* @param <IN2> Second input type
283
* @param <OUT> Output type
284
*/
285
interface CoGroupFunction<IN1, IN2, OUT> extends Function {
286
/**
287
* Co-group iterables from both inputs
288
* @param first Iterable from first input
289
* @param second Iterable from second input
290
* @param out Collector for output elements
291
* @throws Exception
292
*/
293
void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<OUT> out) throws Exception;
294
}
295
296
/**
297
* Function for cross product operations
298
* @param <IN1> First input type
299
* @param <IN2> Second input type
300
* @param <OUT> Output type
301
*/
302
interface CrossFunction<IN1, IN2, OUT> extends Function {
303
/**
304
* Cross two elements
305
* @param first Element from first input
306
* @param second Element from second input
307
* @return Cross result
308
* @throws Exception
309
*/
310
OUT cross(IN1 first, IN2 second) throws Exception;
311
}
312
313
/**
314
* Rich join function with runtime context access
315
* @param <IN1> First input type
316
* @param <IN2> Second input type
317
* @param <OUT> Output type
318
*/
319
abstract class RichJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction implements JoinFunction<IN1, IN2, OUT> {}
320
321
/**
322
* Rich co-group function with runtime context access
323
* @param <IN1> First input type
324
* @param <IN2> Second input type
325
* @param <OUT> Output type
326
*/
327
abstract class RichCoGroupFunction<IN1, IN2, OUT> extends AbstractRichFunction implements CoGroupFunction<IN1, IN2, OUT> {}
328
```
329
330
### Tuple System
331
332
Strongly-typed tuple classes for handling multiple values.
333
334
```java { .api }
335
/**
336
* Tuple with 0 fields
337
*/
338
class Tuple0 {
339
public Tuple0();
340
public static Tuple0 INSTANCE;
341
}
342
343
/**
344
* Tuple with 2 fields
345
* @param <T0> Type of field 0
346
* @param <T1> Type of field 1
347
*/
348
class Tuple2<T0, T1> {
349
public T0 f0;
350
public T1 f1;
351
352
public Tuple2();
353
public Tuple2(T0 f0, T1 f1);
354
355
public static <T0, T1> Tuple2<T0, T1> of(T0 f0, T1 f1);
356
public <T> T getField(int pos);
357
public void setField(Object value, int pos);
358
}
359
360
/**
361
* Tuple with 3 fields
362
* @param <T0> Type of field 0
363
* @param <T1> Type of field 1
364
* @param <T2> Type of field 2
365
*/
366
class Tuple3<T0, T1, T2> {
367
public T0 f0;
368
public T1 f1;
369
public T2 f2;
370
371
public Tuple3();
372
public Tuple3(T0 f0, T1 f1, T2 f2);
373
374
public static <T0, T1, T2> Tuple3<T0, T1, T2> of(T0 f0, T1 f1, T2 f2);
375
}
376
377
// Additional tuple classes available: Tuple4 through Tuple25
378
```
379
380
### Type System
381
382
Runtime type information and descriptors for Flink's type system.
383
384
```java { .api }
385
/**
386
* Type descriptor for runtime type information
387
* @param <T> The type being described
388
*/
389
interface TypeDescriptor<T> {
390
/**
391
* Get type information
392
* @return TypeInformation instance
393
*/
394
TypeInformation<T> getTypeInformation();
395
}
396
397
/**
398
* Factory methods for common type descriptors
399
*/
400
class TypeDescriptors {
401
public static TypeDescriptor<String> STRING;
402
public static TypeDescriptor<Integer> INT;
403
public static TypeDescriptor<Long> LONG;
404
public static TypeDescriptor<Double> DOUBLE;
405
public static TypeDescriptor<Boolean> BOOLEAN;
406
407
public static <T> TypeDescriptor<T[]> array(TypeDescriptor<T> elementType);
408
public static <T> TypeDescriptor<List<T>> list(TypeDescriptor<T> elementType);
409
public static <K, V> TypeDescriptor<Map<K, V>> map(TypeDescriptor<K> keyType, TypeDescriptor<V> valueType);
410
}
411
412
/**
413
* Runtime type information
414
* @param <T> The type
415
*/
416
abstract class TypeInformation<T> {
417
/**
418
* Get type class
419
* @return Class object
420
*/
421
public abstract Class<T> getTypeClass();
422
423
/**
424
* Check if type is basic type
425
* @return true if basic type
426
*/
427
public abstract boolean isBasicType();
428
429
/**
430
* Create serializer for this type
431
* @param config Configuration
432
* @return TypeSerializer instance
433
*/
434
public abstract TypeSerializer<T> createSerializer(ExecutionConfig config);
435
}
436
437
/**
438
* Serializer for data types
439
* @param <T> The type to serialize
440
*/
441
abstract class TypeSerializer<T> {
442
/**
443
* Create copy of element
444
* @param from Source element
445
* @return Copied element
446
*/
447
public abstract T copy(T from);
448
449
/**
450
* Serialize element to DataOutputView
451
* @param record Element to serialize
452
* @param target Output target
453
* @throws IOException
454
*/
455
public abstract void serialize(T record, DataOutputView target) throws IOException;
456
457
/**
458
* Deserialize element from DataInputView
459
* @param source Input source
460
* @return Deserialized element
461
* @throws IOException
462
*/
463
public abstract T deserialize(DataInputView source) throws IOException;
464
}
465
```
466
467
### Execution Configuration
468
469
Configuration and execution mode settings.
470
471
```java { .api }
472
/**
473
* Execution mode for Flink applications
474
*/
475
enum RuntimeExecutionMode {
476
/** Streaming execution mode */
477
STREAMING,
478
/** Batch execution mode */
479
BATCH,
480
/** Automatic mode selection based on data source characteristics */
481
AUTOMATIC
482
}
483
484
/**
485
* Configuration for resource sharing between operators
486
*/
487
class SlotSharingGroup {
488
/**
489
* Create slot sharing group with name
490
* @param name Group name
491
*/
492
public SlotSharingGroup(String name);
493
494
/**
495
* Get group name
496
* @return Group name
497
*/
498
public String getName();
499
}
500
```
501
502
### Utility Interfaces
503
504
Core utility interfaces used throughout Flink APIs.
505
506
```java { .api }
507
/**
508
* Interface for collecting output elements
509
* @param <T> Element type
510
*/
511
interface Collector<T> {
512
/**
513
* Collect/emit an element
514
* @param record Element to collect
515
*/
516
void collect(T record);
517
518
/**
519
* Close the collector
520
*/
521
void close();
522
}
523
524
/**
525
* Iterator that can be closed
526
* @param <E> Element type
527
*/
528
interface CloseableIterator<E> extends Iterator<E>, AutoCloseable {
529
@Override
530
void close();
531
}
532
533
/**
534
* Tag for side outputs in DataStream API
535
* @param <T> Type of side output
536
*/
537
class OutputTag<T> {
538
/**
539
* Create output tag with ID and type information
540
* @param id Unique identifier
541
* @param typeInfo Type information
542
*/
543
public OutputTag(String id, TypeInformation<T> typeInfo);
544
545
/**
546
* Create output tag with ID (type inferred)
547
* @param id Unique identifier
548
*/
549
public OutputTag(String id);
550
551
/**
552
* Get tag ID
553
* @return Tag identifier
554
*/
555
public String getId();
556
557
/**
558
* Get type information
559
* @return Type information
560
*/
561
public TypeInformation<T> getTypeInfo();
562
}
563
```
564
565
### Exception Types
566
567
Common exception types used in Flink applications.
568
569
```java { .api }
570
/**
571
* Base exception for Flink-specific errors
572
*/
573
class FlinkException extends Exception {
574
public FlinkException(String message);
575
public FlinkException(String message, Throwable cause);
576
}
577
578
/**
579
* Exception thrown when accessing null fields
580
*/
581
class NullFieldException extends RuntimeException {
582
public NullFieldException(int fieldPos);
583
public NullFieldException(String fieldName);
584
public int getFieldPos();
585
}
586
```
587
588
### Supporting Types
589
590
Supporting types used by the core API.
591
592
```java { .api }
593
/**
594
* Execution configuration for Flink programs
595
*/
596
class ExecutionConfig {
597
/**
598
* Set parallelism
599
* @param parallelism Parallelism level
600
*/
601
public void setParallelism(int parallelism);
602
603
/**
604
* Get parallelism
605
* @return Parallelism level
606
*/
607
public int getParallelism();
608
609
/**
610
* Enable/disable object reuse
611
* @param objectReuse Whether to reuse objects
612
*/
613
public void enableObjectReuse(boolean objectReuse);
614
615
/**
616
* Check if object reuse is enabled
617
* @return true if object reuse enabled
618
*/
619
public boolean isObjectReuseEnabled();
620
}
621
622
/**
623
* Interface for writing binary data
624
*/
625
interface DataOutputView {
626
/**
627
* Write byte value
628
* @param b Byte value
629
* @throws IOException
630
*/
631
void writeByte(int b) throws IOException;
632
633
/**
634
* Write int value
635
* @param v Int value
636
* @throws IOException
637
*/
638
void writeInt(int v) throws IOException;
639
640
/**
641
* Write long value
642
* @param v Long value
643
* @throws IOException
644
*/
645
void writeLong(long v) throws IOException;
646
647
/**
648
* Write byte array
649
* @param b Byte array
650
* @throws IOException
651
*/
652
void write(byte[] b) throws IOException;
653
}
654
655
/**
656
* Interface for reading binary data
657
*/
658
interface DataInputView {
659
/**
660
* Read byte value
661
* @return Byte value
662
* @throws IOException
663
*/
664
int readByte() throws IOException;
665
666
/**
667
* Read int value
668
* @return Int value
669
* @throws IOException
670
*/
671
int readInt() throws IOException;
672
673
/**
674
* Read long value
675
* @return Long value
676
* @throws IOException
677
*/
678
long readLong() throws IOException;
679
680
/**
681
* Read bytes into array
682
* @param b Byte array
683
* @return Number of bytes read
684
* @throws IOException
685
*/
686
int read(byte[] b) throws IOException;
687
}
688
```