or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdconnectors.mdevent-time-watermarks.mdexecution-jobs.mdfunctions-and-operators.mdindex.mdstate-management.mdtype-system-serialization.mdutilities.md
tile.json

tessl/maven-org-apache-flink--flink-core

Apache Flink Core runtime components, type system, and foundational APIs for stream processing applications

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-core@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-core@2.1.0

index.mddocs/

Apache Flink Core

Apache Flink Core is the foundational module that provides essential APIs for building distributed data processing applications. It offers a comprehensive set of interfaces and classes for job execution, data transformation, state management, and system configuration.

Core Capabilities

Job Execution and Runtime Context

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;

// Configure execution parameters
ExecutionConfig config = new ExecutionConfig();
config.enableClosureCleaner();
config.setParallelism(4);

// Access runtime context in functions
public class MyMapFunction implements MapFunction<String, String> {
    @Override
    public String map(String value) throws Exception {
        RuntimeContext ctx = getRuntimeContext();
        int parallelism = ctx.getNumberOfParallelSubtasks();
        return value + "_" + parallelism;
    }
}

User-Defined Functions

import org.apache.flink.api.common.functions.*;

// Map function for 1-to-1 transformations
public class MyMapFunction implements MapFunction<String, Integer> {
    @Override
    public Integer map(String value) throws Exception {
        return value.length();
    }
}

// FlatMap function for 1-to-many transformations
public class TokenizerFunction implements FlatMapFunction<String, String> {
    @Override
    public void flatMap(String value, Collector<String> out) throws Exception {
        for (String word : value.split(" ")) {
            out.collect(word);
        }
    }
}

// Filter function for predicate-based filtering
public class LengthFilter implements FilterFunction<String> {
    @Override
    public boolean filter(String value) throws Exception {
        return value.length() > 3;
    }
}

// Reduce function for aggregations
public class SumReduceFunction implements ReduceFunction<Integer> {
    @Override
    public Integer reduce(Integer value1, Integer value2) throws Exception {
        return value1 + value2;
    }
}

Type System and Serialization

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.common.serialization.SimpleStringSchema;

// Basic type information
TypeInformation<String> stringType = BasicTypeInfo.STRING_TYPE_INFO;
TypeInformation<Integer> intType = BasicTypeInfo.INT_TYPE_INFO;

// Tuple type information
TupleTypeInfo<Tuple2<String, Integer>> tupleType = 
    new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);

// Custom serialization schema
public class MySerializationSchema implements SerializationSchema<MyClass> {
    @Override
    public byte[] serialize(MyClass element) {
        // Custom serialization logic
        return element.toString().getBytes();
    }
}

State Management

import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.OpenContext;

public class StatefulMapFunction extends RichMapFunction<String, String> {
    private ValueState<Integer> countState;
    
    @Override
    public void open(OpenContext openContext) throws Exception {
        ValueStateDescriptor<Integer> descriptor = 
            new ValueStateDescriptor<>("count", Integer.class, 0);
        countState = getRuntimeContext().getState(descriptor);
    }
    
    @Override
    public String map(String value) throws Exception {
        Integer currentCount = countState.value();
        currentCount++;
        countState.update(currentCount);
        return value + "_" + currentCount;
    }
}

Event Time and Watermarks

import org.apache.flink.api.common.eventtime.*;

// Custom watermark strategy
WatermarkStrategy<MyEvent> watermarkStrategy = 
    WatermarkStrategy.<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner((event, timestamp) -> event.getTimestamp());

// Custom watermark generator
public class MyWatermarkGenerator implements WatermarkGenerator<MyEvent> {
    private long maxTimestamp = Long.MIN_VALUE;
    private final long maxOutOfOrderness = 5000; // 5 seconds
    
    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    }
    
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness - 1));
    }
}

Data Sources and Sinks

import org.apache.flink.api.connector.source.*;
import org.apache.flink.api.connector.sink2.*;

// Custom source implementation
public class MySource implements Source<String, MySplit, MySourceEnumState> {
    @Override
    public Boundedness getBoundedness() {
        return Boundedness.CONTINUOUS_UNBOUNDED;
    }
    
    @Override
    public SourceReader<String, MySplit> createReader(SourceReaderContext readerContext) {
        return new MySourceReader();
    }
    
    @Override
    public SplitEnumerator<MySplit, MySourceEnumState> createEnumerator(
            SplitEnumeratorContext<MySplit> enumContext) {
        return new MySplitEnumerator();
    }
}

// Custom sink implementation  
public class MySink implements Sink<String> {
    @Override
    public SinkWriter<String> createWriter(InitContext context) throws IOException {
        return new MySinkWriter();
    }
}

Configuration Management

import org.apache.flink.configuration.*;

// Reading configuration values
Configuration config = new Configuration();
int parallelism = config.getInteger(CoreOptions.DEFAULT_PARALLELISM);
String tmpDir = config.getString(CoreOptions.TMP_DIRS);

// Setting configuration options
config.setInteger(CoreOptions.DEFAULT_PARALLELISM, 8);
config.setString(CoreOptions.TMP_DIRS, "/tmp/flink");

// Custom configuration options
public static final ConfigOption<String> MY_OPTION = 
    ConfigOptions.key("my.custom.option")
        .stringType()
        .defaultValue("default-value")
        .withDescription("Description of my custom option");

Package Organization

Apache Flink Core is organized into several key packages:

  • org.apache.flink.api.common.* - Core APIs for execution, functions, types, and state
  • org.apache.flink.api.connector.* - Source and sink connector interfaces
  • org.apache.flink.configuration.* - Configuration system and options
  • org.apache.flink.core.* - Core execution, filesystem, I/O, and memory management
  • org.apache.flink.types.* - Basic data types and utilities
  • org.apache.flink.util.* - Common utility classes and functions

Detailed Documentation

Functions and Operators

User-defined functions, transformation operators, and function interfaces for data processing pipelines.

Type System and Serialization

Type information system, serializers, and type utilities for handling data types in Flink applications.

State Management

Stateful computation APIs, state descriptors, and state backends for managing application state.

Event Time and Watermarks

Time-based processing, watermark generation, and timestamp assignment for event-time computations.

Connectors

Source and sink APIs for data ingestion and output, including connector interfaces and utilities.

Configuration System

Configuration management, options, and system settings for Flink applications and clusters.

Execution and Jobs

Job execution, task management, runtime contexts, and execution environments.

Core Utilities

Common utility classes, I/O operations, memory management, and filesystem abstractions.

Getting Started

To use Apache Flink Core in your project:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>1.18.0</version>
</dependency>

Basic Usage Example

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;

public class BasicFlinkApp {
    public static void main(String[] args) throws Exception {
        // Create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Create data stream
        DataStream<String> text = env.fromElements("hello", "world", "flink");
        
        // Apply transformations using Flink Core APIs
        DataStream<Integer> lengths = text.map(new MapFunction<String, Integer>() {
            @Override
            public Integer map(String value) throws Exception {
                return value.length();
            }
        });
        
        // Output results
        lengths.print();
        
        // Execute the job
        env.execute("Basic Flink Application");
    }
}

Key Concepts

  • Functions: User-defined transformation logic implemented via function interfaces
  • Type Information: System for managing data types and serialization
  • State: Managed state for stateful computations and fault tolerance
  • Event Time: Processing based on event timestamps rather than processing time
  • Watermarks: Mechanism for handling out-of-order events in event-time processing
  • Sources/Sinks: Interfaces for data ingestion and output
  • Configuration: System for managing application and cluster settings
  • Execution: Runtime system for distributed job execution

Apache Flink Core provides the foundation for building robust, scalable, and fault-tolerant stream and batch processing applications. The modular design allows developers to use only the components they need while maintaining full compatibility with the broader Flink ecosystem.