Apache Flink Core runtime components, type system, and foundational APIs for stream processing applications
npx @tessl/cli install tessl/maven-org-apache-flink--flink-core@2.1.00
# Apache Flink Core
1
2
Apache Flink Core is the foundational module that provides essential APIs for building distributed data processing applications. It offers a comprehensive set of interfaces and classes for job execution, data transformation, state management, and system configuration.
3
4
## Core Capabilities
5
6
### Job Execution and Runtime Context
7
8
```java { .api }
9
import org.apache.flink.api.common.JobExecutionResult;
10
import org.apache.flink.api.common.ExecutionConfig;
11
import org.apache.flink.api.common.functions.RuntimeContext;
12
13
// Configure execution parameters
14
ExecutionConfig config = new ExecutionConfig();
15
config.enableClosureCleaner();
16
config.setParallelism(4);
17
18
// Access runtime context in functions
19
public class MyMapFunction implements MapFunction<String, String> {
20
@Override
21
public String map(String value) throws Exception {
22
RuntimeContext ctx = getRuntimeContext();
23
int parallelism = ctx.getNumberOfParallelSubtasks();
24
return value + "_" + parallelism;
25
}
26
}
27
```
28
29
### User-Defined Functions
30
31
```java { .api }
32
import org.apache.flink.api.common.functions.*;
33
34
// Map function for 1-to-1 transformations
35
public class MyMapFunction implements MapFunction<String, Integer> {
36
@Override
37
public Integer map(String value) throws Exception {
38
return value.length();
39
}
40
}
41
42
// FlatMap function for 1-to-many transformations
43
public class TokenizerFunction implements FlatMapFunction<String, String> {
44
@Override
45
public void flatMap(String value, Collector<String> out) throws Exception {
46
for (String word : value.split(" ")) {
47
out.collect(word);
48
}
49
}
50
}
51
52
// Filter function for predicate-based filtering
53
public class LengthFilter implements FilterFunction<String> {
54
@Override
55
public boolean filter(String value) throws Exception {
56
return value.length() > 3;
57
}
58
}
59
60
// Reduce function for aggregations
61
public class SumReduceFunction implements ReduceFunction<Integer> {
62
@Override
63
public Integer reduce(Integer value1, Integer value2) throws Exception {
64
return value1 + value2;
65
}
66
}
67
```
68
69
### Type System and Serialization
70
71
```java { .api }
72
import org.apache.flink.api.common.typeinfo.TypeInformation;
73
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
74
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
75
import org.apache.flink.api.common.serialization.SimpleStringSchema;
76
77
// Basic type information
78
TypeInformation<String> stringType = BasicTypeInfo.STRING_TYPE_INFO;
79
TypeInformation<Integer> intType = BasicTypeInfo.INT_TYPE_INFO;
80
81
// Tuple type information
82
TupleTypeInfo<Tuple2<String, Integer>> tupleType =
83
new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
84
85
// Custom serialization schema
86
public class MySerializationSchema implements SerializationSchema<MyClass> {
87
@Override
88
public byte[] serialize(MyClass element) {
89
// Custom serialization logic
90
return element.toString().getBytes();
91
}
92
}
93
```
94
95
### State Management
96
97
```java { .api }
98
import org.apache.flink.api.common.state.*;
99
import org.apache.flink.api.common.functions.RichMapFunction;
100
import org.apache.flink.api.common.functions.OpenContext;
101
102
public class StatefulMapFunction extends RichMapFunction<String, String> {
103
private ValueState<Integer> countState;
104
105
@Override
106
public void open(OpenContext openContext) throws Exception {
107
ValueStateDescriptor<Integer> descriptor =
108
new ValueStateDescriptor<>("count", Integer.class, 0);
109
countState = getRuntimeContext().getState(descriptor);
110
}
111
112
@Override
113
public String map(String value) throws Exception {
114
Integer currentCount = countState.value();
115
currentCount++;
116
countState.update(currentCount);
117
return value + "_" + currentCount;
118
}
119
}
120
```
121
122
### Event Time and Watermarks
123
124
```java { .api }
125
import org.apache.flink.api.common.eventtime.*;
126
127
// Custom watermark strategy
128
WatermarkStrategy<MyEvent> watermarkStrategy =
129
WatermarkStrategy.<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
130
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
131
132
// Custom watermark generator
133
public class MyWatermarkGenerator implements WatermarkGenerator<MyEvent> {
134
private long maxTimestamp = Long.MIN_VALUE;
135
private final long maxOutOfOrderness = 5000; // 5 seconds
136
137
@Override
138
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
139
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
140
}
141
142
@Override
143
public void onPeriodicEmit(WatermarkOutput output) {
144
output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness - 1));
145
}
146
}
147
```
148
149
### Data Sources and Sinks
150
151
```java { .api }
152
import org.apache.flink.api.connector.source.*;
153
import org.apache.flink.api.connector.sink2.*;
154
155
// Custom source implementation
156
public class MySource implements Source<String, MySplit, MySourceEnumState> {
157
@Override
158
public Boundedness getBoundedness() {
159
return Boundedness.CONTINUOUS_UNBOUNDED;
160
}
161
162
@Override
163
public SourceReader<String, MySplit> createReader(SourceReaderContext readerContext) {
164
return new MySourceReader();
165
}
166
167
@Override
168
public SplitEnumerator<MySplit, MySourceEnumState> createEnumerator(
169
SplitEnumeratorContext<MySplit> enumContext) {
170
return new MySplitEnumerator();
171
}
172
}
173
174
// Custom sink implementation
175
public class MySink implements Sink<String> {
176
@Override
177
public SinkWriter<String> createWriter(InitContext context) throws IOException {
178
return new MySinkWriter();
179
}
180
}
181
```
182
183
### Configuration Management
184
185
```java { .api }
186
import org.apache.flink.configuration.*;
187
188
// Reading configuration values
189
Configuration config = new Configuration();
190
int parallelism = config.getInteger(CoreOptions.DEFAULT_PARALLELISM);
191
String tmpDir = config.getString(CoreOptions.TMP_DIRS);
192
193
// Setting configuration options
194
config.setInteger(CoreOptions.DEFAULT_PARALLELISM, 8);
195
config.setString(CoreOptions.TMP_DIRS, "/tmp/flink");
196
197
// Custom configuration options
198
public static final ConfigOption<String> MY_OPTION =
199
ConfigOptions.key("my.custom.option")
200
.stringType()
201
.defaultValue("default-value")
202
.withDescription("Description of my custom option");
203
```
204
205
## Package Organization
206
207
Apache Flink Core is organized into several key packages:
208
209
- **`org.apache.flink.api.common.*`** - Core APIs for execution, functions, types, and state
210
- **`org.apache.flink.api.connector.*`** - Source and sink connector interfaces
211
- **`org.apache.flink.configuration.*`** - Configuration system and options
212
- **`org.apache.flink.core.*`** - Core execution, filesystem, I/O, and memory management
213
- **`org.apache.flink.types.*`** - Basic data types and utilities
214
- **`org.apache.flink.util.*`** - Common utility classes and functions
215
216
## Detailed Documentation
217
218
### [Functions and Operators](./functions-and-operators.md)
219
User-defined functions, transformation operators, and function interfaces for data processing pipelines.
220
221
### [Type System and Serialization](./type-system-serialization.md)
222
Type information system, serializers, and type utilities for handling data types in Flink applications.
223
224
### [State Management](./state-management.md)
225
Stateful computation APIs, state descriptors, and state backends for managing application state.
226
227
### [Event Time and Watermarks](./event-time-watermarks.md)
228
Time-based processing, watermark generation, and timestamp assignment for event-time computations.
229
230
### [Connectors](./connectors.md)
231
Source and sink APIs for data ingestion and output, including connector interfaces and utilities.
232
233
### [Configuration System](./configuration.md)
234
Configuration management, options, and system settings for Flink applications and clusters.
235
236
### [Execution and Jobs](./execution-jobs.md)
237
Job execution, task management, runtime contexts, and execution environments.
238
239
### [Core Utilities](./utilities.md)
240
Common utility classes, I/O operations, memory management, and filesystem abstractions.
241
242
## Getting Started
243
244
To use Apache Flink Core in your project:
245
246
```xml { .api }
247
<dependency>
248
<groupId>org.apache.flink</groupId>
249
<artifactId>flink-core</artifactId>
250
<version>1.18.0</version>
251
</dependency>
252
```
253
254
### Basic Usage Example
255
256
```java { .api }
257
import org.apache.flink.api.common.functions.MapFunction;
258
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
259
import org.apache.flink.streaming.api.datastream.DataStream;
260
261
public class BasicFlinkApp {
262
public static void main(String[] args) throws Exception {
263
// Create execution environment
264
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
265
266
// Create data stream
267
DataStream<String> text = env.fromElements("hello", "world", "flink");
268
269
// Apply transformations using Flink Core APIs
270
DataStream<Integer> lengths = text.map(new MapFunction<String, Integer>() {
271
@Override
272
public Integer map(String value) throws Exception {
273
return value.length();
274
}
275
});
276
277
// Output results
278
lengths.print();
279
280
// Execute the job
281
env.execute("Basic Flink Application");
282
}
283
}
284
```
285
286
## Key Concepts
287
288
- **Functions**: User-defined transformation logic implemented via function interfaces
289
- **Type Information**: System for managing data types and serialization
290
- **State**: Managed state for stateful computations and fault tolerance
291
- **Event Time**: Processing based on event timestamps rather than processing time
292
- **Watermarks**: Mechanism for handling out-of-order events in event-time processing
293
- **Sources/Sinks**: Interfaces for data ingestion and output
294
- **Configuration**: System for managing application and cluster settings
295
- **Execution**: Runtime system for distributed job execution
296
297
Apache Flink Core provides the foundation for building robust, scalable, and fault-tolerant stream and batch processing applications. The modular design allows developers to use only the components they need while maintaining full compatibility with the broader Flink ecosystem.