Apache Flink Core runtime components, type system, and foundational APIs for stream processing applications
—
Apache Flink Core provides a comprehensive set of user-defined function interfaces and operators for building data transformation pipelines. These APIs enable developers to implement custom business logic for stream and batch processing applications.
Transform elements one-to-one.
import org.apache.flink.api.common.functions.MapFunction;
// Basic map function
public class StringLengthMapper implements MapFunction<String, Integer> {
@Override
public Integer map(String value) throws Exception {
return value.length();
}
}
// Rich map function with lifecycle methods
public class RichStringMapper extends RichMapFunction<String, String> {
private String prefix;
@Override
public void open(OpenContext openContext) throws Exception {
// Initialize resources, read configuration
prefix = getRuntimeContext().getExecutionConfig()
.getGlobalJobParameters().toMap().get("prefix");
}
@Override
public String map(String value) throws Exception {
return prefix + value;
}
@Override
public void close() throws Exception {
// Clean up resources
}
}Transform elements one-to-many.
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
// Split strings into words
public class TokenizerFunction implements FlatMapFunction<String, String> {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
for (String word : value.split("\\s+")) {
if (!word.isEmpty()) {
out.collect(word);
}
}
}
}
// Rich flat map function
public class RichTokenizerFunction extends RichFlatMapFunction<String, String> {
private Pattern pattern;
@Override
public void open(OpenContext openContext) throws Exception {
// Compile regex pattern once during initialization
pattern = Pattern.compile("\\s+");
}
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
for (String word : pattern.split(value)) {
if (!word.isEmpty()) {
out.collect(word.toLowerCase());
}
}
}
}Filter elements based on predicates.
import org.apache.flink.api.common.functions.FilterFunction;
// Filter strings by length
public class LengthFilter implements FilterFunction<String> {
private final int minLength;
public LengthFilter(int minLength) {
this.minLength = minLength;
}
@Override
public boolean filter(String value) throws Exception {
return value.length() >= minLength;
}
}
// Rich filter with metrics
public class RichLengthFilter extends RichFilterFunction<String> {
private Counter filteredCounter;
@Override
public void open(OpenContext openContext) throws Exception {
filteredCounter = getRuntimeContext()
.getMetricGroup()
.counter("filtered_elements");
}
@Override
public boolean filter(String value) throws Exception {
boolean pass = value.length() >= 5;
if (!pass) {
filteredCounter.inc();
}
return pass;
}
}Combine elements of the same type.
import org.apache.flink.api.common.functions.ReduceFunction;
// Sum integers
public class SumReduceFunction implements ReduceFunction<Integer> {
@Override
public Integer reduce(Integer value1, Integer value2) throws Exception {
return value1 + value2;
}
}
// Combine objects
public class WordCountReduceFunction implements ReduceFunction<Tuple2<String, Integer>> {
@Override
public Tuple2<String, Integer> reduce(
Tuple2<String, Integer> value1,
Tuple2<String, Integer> value2) throws Exception {
return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
}
}Join elements from two data streams.
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
// Simple join function
public class UserOrderJoinFunction implements JoinFunction<User, Order, UserOrder> {
@Override
public UserOrder join(User user, Order order) throws Exception {
return new UserOrder(user.getId(), user.getName(), order.getAmount());
}
}
// Flat join function producing multiple results
public class UserOrderFlatJoinFunction implements FlatJoinFunction<User, Order, String> {
@Override
public void join(User user, Order order, Collector<String> out) throws Exception {
// Output multiple formats for each join
out.collect("User: " + user.getName() + " - Order: " + order.getId());
out.collect("Amount: " + order.getAmount() + " for " + user.getName());
}
}Group and process elements from two data streams.
import org.apache.flink.api.common.functions.CoGroupFunction;
public class UserOrderCoGroupFunction implements
CoGroupFunction<User, Order, UserOrderSummary> {
@Override
public void coGroup(Iterable<User> users, Iterable<Order> orders,
Collector<UserOrderSummary> out) throws Exception {
User user = users.iterator().hasNext() ? users.iterator().next() : null;
if (user != null) {
int totalAmount = 0;
int orderCount = 0;
for (Order order : orders) {
totalAmount += order.getAmount();
orderCount++;
}
out.collect(new UserOrderSummary(user.getId(), orderCount, totalAmount));
}
}
}Reduce groups of elements.
import org.apache.flink.api.common.functions.GroupReduceFunction;
public class WordCountGroupReduceFunction implements
GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
@Override
public void reduce(Iterable<Tuple2<String, Integer>> values,
Collector<Tuple2<String, Integer>> out) throws Exception {
String word = null;
int count = 0;
for (Tuple2<String, Integer> value : values) {
word = value.f0;
count += value.f1;
}
out.collect(new Tuple2<>(word, count));
}
}Process entire partitions.
import org.apache.flink.api.common.functions.MapPartitionFunction;
public class StatisticsMapPartitionFunction implements
MapPartitionFunction<Integer, PartitionStatistics> {
@Override
public void mapPartition(Iterable<Integer> values,
Collector<PartitionStatistics> out) throws Exception {
int count = 0;
int sum = 0;
int min = Integer.MAX_VALUE;
int max = Integer.MIN_VALUE;
for (Integer value : values) {
count++;
sum += value;
min = Math.min(min, value);
max = Math.max(max, value);
}
if (count > 0) {
double avg = (double) sum / count;
out.collect(new PartitionStatistics(count, sum, avg, min, max));
}
}
}Rich functions provide additional lifecycle methods and access to runtime context.
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.configuration.Configuration;
public abstract class AbstractRichFunction implements RichFunction {
private RuntimeContext runtimeContext;
@Override
public void setRuntimeContext(RuntimeContext runtimeContext) {
this.runtimeContext = runtimeContext;
}
@Override
public RuntimeContext getRuntimeContext() {
return runtimeContext;
}
@Override
public void open(OpenContext openContext) throws Exception {
// Override in subclasses for initialization
}
@Override
public void close() throws Exception {
// Override in subclasses for cleanup
}
}
// Example rich function implementation
public class DatabaseLookupFunction extends RichMapFunction<String, UserProfile> {
private DatabaseConnection connection;
@Override
public void open(OpenContext openContext) throws Exception {
// Initialize database connection
Configuration config = (Configuration) getRuntimeContext()
.getExecutionConfig().getGlobalJobParameters();
String dbUrl = config.getString("db.url", "localhost:5432");
connection = new DatabaseConnection(dbUrl);
}
@Override
public UserProfile map(String userId) throws Exception {
return connection.getUserProfile(userId);
}
@Override
public void close() throws Exception {
if (connection != null) {
connection.close();
}
}
}Transform broadcast variables during initialization.
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
public class MapBroadcastInitializer implements
BroadcastVariableInitializer<Tuple2<String, Integer>, Map<String, Integer>> {
@Override
public Map<String, Integer> initializeBroadcastVariable(
Iterable<Tuple2<String, Integer>> data) {
Map<String, Integer> map = new HashMap<>();
for (Tuple2<String, Integer> tuple : data) {
map.put(tuple.f0, tuple.f1);
}
return map;
}
}
// Using broadcast variable in rich function
public class EnrichWithBroadcastFunction extends RichMapFunction<String, EnrichedData> {
private Map<String, Integer> broadcastMap;
@Override
public void open(OpenContext openContext) throws Exception {
// Access broadcast variable
broadcastMap = getRuntimeContext()
.getBroadcastVariable("config-map");
}
@Override
public EnrichedData map(String value) throws Exception {
Integer config = broadcastMap.get(value);
return new EnrichedData(value, config != null ? config : 0);
}
}Custom partitioning logic.
import org.apache.flink.api.common.functions.Partitioner;
public class CustomPartitioner implements Partitioner<String> {
@Override
public int partition(String key, int numPartitions) {
// Custom partitioning logic
return Math.abs(key.hashCode()) % numPartitions;
}
}
// Hash-based partitioner for specific business logic
public class UserIdPartitioner implements Partitioner<String> {
@Override
public int partition(String userId, int numPartitions) {
// Ensure users with similar IDs go to same partition
return (userId.hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}Access runtime information and services within functions.
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
public class MetricsAwareFunction extends RichMapFunction<String, String> {
private Counter processedCounter;
private Counter errorCounter;
@Override
public void open(OpenContext openContext) throws Exception {
RuntimeContext ctx = getRuntimeContext();
// Access task information
String taskName = ctx.getTaskName();
int subtaskIndex = ctx.getIndexOfThisSubtask();
int parallelism = ctx.getNumberOfParallelSubtasks();
// Create metrics
MetricGroup metricGroup = ctx.getMetricGroup();
processedCounter = metricGroup.counter("processed");
errorCounter = metricGroup.counter("errors");
// Access state (in keyed operations)
ValueStateDescriptor<Integer> descriptor =
new ValueStateDescriptor<>("count", Integer.class);
ValueState<Integer> countState = ctx.getState(descriptor);
}
@Override
public String map(String value) throws Exception {
try {
processedCounter.inc();
// Process value
return value.toUpperCase();
} catch (Exception e) {
errorCounter.inc();
throw e;
}
}
}For advanced aggregation operations.
import org.apache.flink.api.common.functions.AggregateFunction;
// Average aggregate function
public class AverageAggregateFunction implements
AggregateFunction<Integer, Tuple2<Integer, Integer>, Double> {
@Override
public Tuple2<Integer, Integer> createAccumulator() {
return new Tuple2<>(0, 0); // (sum, count)
}
@Override
public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {
return new Tuple2<>(accumulator.f0 + value, accumulator.f1 + 1);
}
@Override
public Double getResult(Tuple2<Integer, Integer> accumulator) {
return accumulator.f1 == 0 ? 0.0 : (double) accumulator.f0 / accumulator.f1;
}
@Override
public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}
// Rich aggregate function with metrics
public class RichAverageAggregateFunction extends RichAggregateFunction<Integer, Tuple2<Integer, Integer>, Double> {
private Counter aggregationCounter;
@Override
public void open(OpenContext openContext) throws Exception {
aggregationCounter = getRuntimeContext()
.getMetricGroup()
.counter("aggregations");
}
@Override
public Tuple2<Integer, Integer> createAccumulator() {
return new Tuple2<>(0, 0);
}
@Override
public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {
aggregationCounter.inc();
return new Tuple2<>(accumulator.f0 + value, accumulator.f1 + 1);
}
@Override
public Double getResult(Tuple2<Integer, Integer> accumulator) {
return accumulator.f1 == 0 ? 0.0 : (double) accumulator.f0 / accumulator.f1;
}
@Override
public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}For Cartesian product operations.
import org.apache.flink.api.common.functions.CrossFunction;
public class UserProductCrossFunction implements CrossFunction<User, Product, UserProductPair> {
@Override
public UserProductPair cross(User user, Product product) throws Exception {
return new UserProductPair(
user.getId(),
product.getId(),
calculateCompatibility(user, product)
);
}
private double calculateCompatibility(User user, Product product) {
// Custom compatibility calculation
return Math.random(); // Simplified example
}
}// Prefer stateless functions when possible
public class StatelessTransformFunction implements MapFunction<InputType, OutputType> {
@Override
public OutputType map(InputType input) throws Exception {
// Pure transformation logic without side effects
return transform(input);
}
private OutputType transform(InputType input) {
// Deterministic transformation
return new OutputType(input.getValue() * 2);
}
}
// Use rich functions when you need lifecycle management
public class ResourceManagedFunction extends RichMapFunction<InputType, OutputType> {
private transient ExpensiveResource resource;
@Override
public void open(OpenContext openContext) throws Exception {
// Initialize expensive resources once per task
resource = new ExpensiveResource();
}
@Override
public OutputType map(InputType input) throws Exception {
return resource.process(input);
}
@Override
public void close() throws Exception {
if (resource != null) {
resource.cleanup();
}
}
}public class RobustMapFunction implements MapFunction<String, Result> {
@Override
public Result map(String value) throws Exception {
try {
return processValue(value);
} catch (IllegalArgumentException e) {
// Handle known exceptions gracefully
return Result.createErrorResult("Invalid input: " + value);
} catch (Exception e) {
// Re-throw unexpected exceptions to trigger Flink's fault tolerance
throw new Exception("Processing failed for value: " + value, e);
}
}
private Result processValue(String value) {
// Business logic
return new Result(value);
}
}Apache Flink's function interfaces provide a powerful foundation for implementing custom data processing logic. By choosing the appropriate function type and following best practices, you can build efficient, maintainable, and fault-tolerant data processing applications.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-core