Apache Flink Core runtime components, type system, and foundational APIs for stream processing applications
npx @tessl/cli install tessl/maven-org-apache-flink--flink-core@2.1.0Apache Flink Core is the foundational module that provides essential APIs for building distributed data processing applications. It offers a comprehensive set of interfaces and classes for job execution, data transformation, state management, and system configuration.
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
// Configure execution parameters
ExecutionConfig config = new ExecutionConfig();
config.enableClosureCleaner();
config.setParallelism(4);
// Access runtime context in functions
public class MyMapFunction implements MapFunction<String, String> {
@Override
public String map(String value) throws Exception {
RuntimeContext ctx = getRuntimeContext();
int parallelism = ctx.getNumberOfParallelSubtasks();
return value + "_" + parallelism;
}
}import org.apache.flink.api.common.functions.*;
// Map function for 1-to-1 transformations
public class MyMapFunction implements MapFunction<String, Integer> {
@Override
public Integer map(String value) throws Exception {
return value.length();
}
}
// FlatMap function for 1-to-many transformations
public class TokenizerFunction implements FlatMapFunction<String, String> {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
for (String word : value.split(" ")) {
out.collect(word);
}
}
}
// Filter function for predicate-based filtering
public class LengthFilter implements FilterFunction<String> {
@Override
public boolean filter(String value) throws Exception {
return value.length() > 3;
}
}
// Reduce function for aggregations
public class SumReduceFunction implements ReduceFunction<Integer> {
@Override
public Integer reduce(Integer value1, Integer value2) throws Exception {
return value1 + value2;
}
}import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
// Basic type information
TypeInformation<String> stringType = BasicTypeInfo.STRING_TYPE_INFO;
TypeInformation<Integer> intType = BasicTypeInfo.INT_TYPE_INFO;
// Tuple type information
TupleTypeInfo<Tuple2<String, Integer>> tupleType =
new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
// Custom serialization schema
public class MySerializationSchema implements SerializationSchema<MyClass> {
@Override
public byte[] serialize(MyClass element) {
// Custom serialization logic
return element.toString().getBytes();
}
}import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.OpenContext;
public class StatefulMapFunction extends RichMapFunction<String, String> {
private ValueState<Integer> countState;
@Override
public void open(OpenContext openContext) throws Exception {
ValueStateDescriptor<Integer> descriptor =
new ValueStateDescriptor<>("count", Integer.class, 0);
countState = getRuntimeContext().getState(descriptor);
}
@Override
public String map(String value) throws Exception {
Integer currentCount = countState.value();
currentCount++;
countState.update(currentCount);
return value + "_" + currentCount;
}
}import org.apache.flink.api.common.eventtime.*;
// Custom watermark strategy
WatermarkStrategy<MyEvent> watermarkStrategy =
WatermarkStrategy.<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
// Custom watermark generator
public class MyWatermarkGenerator implements WatermarkGenerator<MyEvent> {
private long maxTimestamp = Long.MIN_VALUE;
private final long maxOutOfOrderness = 5000; // 5 seconds
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness - 1));
}
}import org.apache.flink.api.connector.source.*;
import org.apache.flink.api.connector.sink2.*;
// Custom source implementation
public class MySource implements Source<String, MySplit, MySourceEnumState> {
@Override
public Boundedness getBoundedness() {
return Boundedness.CONTINUOUS_UNBOUNDED;
}
@Override
public SourceReader<String, MySplit> createReader(SourceReaderContext readerContext) {
return new MySourceReader();
}
@Override
public SplitEnumerator<MySplit, MySourceEnumState> createEnumerator(
SplitEnumeratorContext<MySplit> enumContext) {
return new MySplitEnumerator();
}
}
// Custom sink implementation
public class MySink implements Sink<String> {
@Override
public SinkWriter<String> createWriter(InitContext context) throws IOException {
return new MySinkWriter();
}
}import org.apache.flink.configuration.*;
// Reading configuration values
Configuration config = new Configuration();
int parallelism = config.getInteger(CoreOptions.DEFAULT_PARALLELISM);
String tmpDir = config.getString(CoreOptions.TMP_DIRS);
// Setting configuration options
config.setInteger(CoreOptions.DEFAULT_PARALLELISM, 8);
config.setString(CoreOptions.TMP_DIRS, "/tmp/flink");
// Custom configuration options
public static final ConfigOption<String> MY_OPTION =
ConfigOptions.key("my.custom.option")
.stringType()
.defaultValue("default-value")
.withDescription("Description of my custom option");Apache Flink Core is organized into several key packages:
org.apache.flink.api.common.* - Core APIs for execution, functions, types, and stateorg.apache.flink.api.connector.* - Source and sink connector interfacesorg.apache.flink.configuration.* - Configuration system and optionsorg.apache.flink.core.* - Core execution, filesystem, I/O, and memory managementorg.apache.flink.types.* - Basic data types and utilitiesorg.apache.flink.util.* - Common utility classes and functionsUser-defined functions, transformation operators, and function interfaces for data processing pipelines.
Type information system, serializers, and type utilities for handling data types in Flink applications.
Stateful computation APIs, state descriptors, and state backends for managing application state.
Time-based processing, watermark generation, and timestamp assignment for event-time computations.
Source and sink APIs for data ingestion and output, including connector interfaces and utilities.
Configuration management, options, and system settings for Flink applications and clusters.
Job execution, task management, runtime contexts, and execution environments.
Common utility classes, I/O operations, memory management, and filesystem abstractions.
To use Apache Flink Core in your project:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.18.0</version>
</dependency>import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
public class BasicFlinkApp {
public static void main(String[] args) throws Exception {
// Create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create data stream
DataStream<String> text = env.fromElements("hello", "world", "flink");
// Apply transformations using Flink Core APIs
DataStream<Integer> lengths = text.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) throws Exception {
return value.length();
}
});
// Output results
lengths.print();
// Execute the job
env.execute("Basic Flink Application");
}
}Apache Flink Core provides the foundation for building robust, scalable, and fault-tolerant stream and batch processing applications. The modular design allows developers to use only the components they need while maintaining full compatibility with the broader Flink ecosystem.