CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-core

Apache Flink Core runtime components, type system, and foundational APIs for stream processing applications

Pending
Overview
Eval results
Files

functions-and-operators.mddocs/

Functions and Operators

Apache Flink Core provides a comprehensive set of user-defined function interfaces and operators for building data transformation pipelines. These APIs enable developers to implement custom business logic for stream and batch processing applications.

Core Function Interfaces

MapFunction

Transform elements one-to-one.

import org.apache.flink.api.common.functions.MapFunction;

// Basic map function
public class StringLengthMapper implements MapFunction<String, Integer> {
    @Override
    public Integer map(String value) throws Exception {
        return value.length();
    }
}

// Rich map function with lifecycle methods
public class RichStringMapper extends RichMapFunction<String, String> {
    private String prefix;
    
    @Override
    public void open(OpenContext openContext) throws Exception {
        // Initialize resources, read configuration
        prefix = getRuntimeContext().getExecutionConfig()
            .getGlobalJobParameters().toMap().get("prefix");
    }
    
    @Override
    public String map(String value) throws Exception {
        return prefix + value;
    }
    
    @Override
    public void close() throws Exception {
        // Clean up resources
    }
}

FlatMapFunction

Transform elements one-to-many.

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;

// Split strings into words
public class TokenizerFunction implements FlatMapFunction<String, String> {
    @Override
    public void flatMap(String value, Collector<String> out) throws Exception {
        for (String word : value.split("\\s+")) {
            if (!word.isEmpty()) {
                out.collect(word);
            }
        }
    }
}

// Rich flat map function
public class RichTokenizerFunction extends RichFlatMapFunction<String, String> {
    private Pattern pattern;
    
    @Override
    public void open(OpenContext openContext) throws Exception {
        // Compile regex pattern once during initialization
        pattern = Pattern.compile("\\s+");
    }
    
    @Override
    public void flatMap(String value, Collector<String> out) throws Exception {
        for (String word : pattern.split(value)) {
            if (!word.isEmpty()) {
                out.collect(word.toLowerCase());
            }
        }
    }
}

FilterFunction

Filter elements based on predicates.

import org.apache.flink.api.common.functions.FilterFunction;

// Filter strings by length
public class LengthFilter implements FilterFunction<String> {
    private final int minLength;
    
    public LengthFilter(int minLength) {
        this.minLength = minLength;
    }
    
    @Override
    public boolean filter(String value) throws Exception {
        return value.length() >= minLength;
    }
}

// Rich filter with metrics
public class RichLengthFilter extends RichFilterFunction<String> {
    private Counter filteredCounter;
    
    @Override
    public void open(OpenContext openContext) throws Exception {
        filteredCounter = getRuntimeContext()
            .getMetricGroup()
            .counter("filtered_elements");
    }
    
    @Override
    public boolean filter(String value) throws Exception {
        boolean pass = value.length() >= 5;
        if (!pass) {
            filteredCounter.inc();
        }
        return pass;
    }
}

ReduceFunction

Combine elements of the same type.

import org.apache.flink.api.common.functions.ReduceFunction;

// Sum integers
public class SumReduceFunction implements ReduceFunction<Integer> {
    @Override
    public Integer reduce(Integer value1, Integer value2) throws Exception {
        return value1 + value2;
    }
}

// Combine objects
public class WordCountReduceFunction implements ReduceFunction<Tuple2<String, Integer>> {
    @Override
    public Tuple2<String, Integer> reduce(
            Tuple2<String, Integer> value1, 
            Tuple2<String, Integer> value2) throws Exception {
        return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
    }
}

Advanced Function Interfaces

JoinFunction and FlatJoinFunction

Join elements from two data streams.

import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;

// Simple join function
public class UserOrderJoinFunction implements JoinFunction<User, Order, UserOrder> {
    @Override
    public UserOrder join(User user, Order order) throws Exception {
        return new UserOrder(user.getId(), user.getName(), order.getAmount());
    }
}

// Flat join function producing multiple results
public class UserOrderFlatJoinFunction implements FlatJoinFunction<User, Order, String> {
    @Override
    public void join(User user, Order order, Collector<String> out) throws Exception {
        // Output multiple formats for each join
        out.collect("User: " + user.getName() + " - Order: " + order.getId());
        out.collect("Amount: " + order.getAmount() + " for " + user.getName());
    }
}

CoGroupFunction

Group and process elements from two data streams.

import org.apache.flink.api.common.functions.CoGroupFunction;

public class UserOrderCoGroupFunction implements 
        CoGroupFunction<User, Order, UserOrderSummary> {
    
    @Override
    public void coGroup(Iterable<User> users, Iterable<Order> orders, 
                       Collector<UserOrderSummary> out) throws Exception {
        
        User user = users.iterator().hasNext() ? users.iterator().next() : null;
        
        if (user != null) {
            int totalAmount = 0;
            int orderCount = 0;
            
            for (Order order : orders) {
                totalAmount += order.getAmount();
                orderCount++;
            }
            
            out.collect(new UserOrderSummary(user.getId(), orderCount, totalAmount));
        }
    }
}

GroupReduceFunction

Reduce groups of elements.

import org.apache.flink.api.common.functions.GroupReduceFunction;

public class WordCountGroupReduceFunction implements 
        GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
    
    @Override
    public void reduce(Iterable<Tuple2<String, Integer>> values, 
                      Collector<Tuple2<String, Integer>> out) throws Exception {
        
        String word = null;
        int count = 0;
        
        for (Tuple2<String, Integer> value : values) {
            word = value.f0;
            count += value.f1;
        }
        
        out.collect(new Tuple2<>(word, count));
    }
}

MapPartitionFunction

Process entire partitions.

import org.apache.flink.api.common.functions.MapPartitionFunction;

public class StatisticsMapPartitionFunction implements 
        MapPartitionFunction<Integer, PartitionStatistics> {
    
    @Override
    public void mapPartition(Iterable<Integer> values, 
                           Collector<PartitionStatistics> out) throws Exception {
        
        int count = 0;
        int sum = 0;
        int min = Integer.MAX_VALUE;
        int max = Integer.MIN_VALUE;
        
        for (Integer value : values) {
            count++;
            sum += value;
            min = Math.min(min, value);
            max = Math.max(max, value);
        }
        
        if (count > 0) {
            double avg = (double) sum / count;
            out.collect(new PartitionStatistics(count, sum, avg, min, max));
        }
    }
}

Rich Functions

Rich functions provide additional lifecycle methods and access to runtime context.

import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.configuration.Configuration;

public abstract class AbstractRichFunction implements RichFunction {
    private RuntimeContext runtimeContext;
    
    @Override
    public void setRuntimeContext(RuntimeContext runtimeContext) {
        this.runtimeContext = runtimeContext;
    }
    
    @Override
    public RuntimeContext getRuntimeContext() {
        return runtimeContext;
    }
    
    @Override
    public void open(OpenContext openContext) throws Exception {
        // Override in subclasses for initialization
    }
    
    @Override
    public void close() throws Exception {
        // Override in subclasses for cleanup
    }
}

// Example rich function implementation
public class DatabaseLookupFunction extends RichMapFunction<String, UserProfile> {
    private DatabaseConnection connection;
    
    @Override
    public void open(OpenContext openContext) throws Exception {
        // Initialize database connection
        Configuration config = (Configuration) getRuntimeContext()
            .getExecutionConfig().getGlobalJobParameters();
        
        String dbUrl = config.getString("db.url", "localhost:5432");
        connection = new DatabaseConnection(dbUrl);
    }
    
    @Override
    public UserProfile map(String userId) throws Exception {
        return connection.getUserProfile(userId);
    }
    
    @Override
    public void close() throws Exception {
        if (connection != null) {
            connection.close();
        }
    }
}

Function Utilities

BroadcastVariableInitializer

Transform broadcast variables during initialization.

import org.apache.flink.api.common.functions.BroadcastVariableInitializer;

public class MapBroadcastInitializer implements 
        BroadcastVariableInitializer<Tuple2<String, Integer>, Map<String, Integer>> {
    
    @Override
    public Map<String, Integer> initializeBroadcastVariable(
            Iterable<Tuple2<String, Integer>> data) {
        
        Map<String, Integer> map = new HashMap<>();
        for (Tuple2<String, Integer> tuple : data) {
            map.put(tuple.f0, tuple.f1);
        }
        return map;
    }
}

// Using broadcast variable in rich function
public class EnrichWithBroadcastFunction extends RichMapFunction<String, EnrichedData> {
    private Map<String, Integer> broadcastMap;
    
    @Override
    public void open(OpenContext openContext) throws Exception {
        // Access broadcast variable
        broadcastMap = getRuntimeContext()
            .getBroadcastVariable("config-map");
    }
    
    @Override
    public EnrichedData map(String value) throws Exception {
        Integer config = broadcastMap.get(value);
        return new EnrichedData(value, config != null ? config : 0);
    }
}

Partitioner

Custom partitioning logic.

import org.apache.flink.api.common.functions.Partitioner;

public class CustomPartitioner implements Partitioner<String> {
    @Override
    public int partition(String key, int numPartitions) {
        // Custom partitioning logic
        return Math.abs(key.hashCode()) % numPartitions;
    }
}

// Hash-based partitioner for specific business logic
public class UserIdPartitioner implements Partitioner<String> {
    @Override
    public int partition(String userId, int numPartitions) {
        // Ensure users with similar IDs go to same partition
        return (userId.hashCode() & Integer.MAX_VALUE) % numPartitions;
    }
}

Runtime Context

Access runtime information and services within functions.

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;

public class MetricsAwareFunction extends RichMapFunction<String, String> {
    private Counter processedCounter;
    private Counter errorCounter;
    
    @Override
    public void open(OpenContext openContext) throws Exception {
        RuntimeContext ctx = getRuntimeContext();
        
        // Access task information
        String taskName = ctx.getTaskName();
        int subtaskIndex = ctx.getIndexOfThisSubtask();
        int parallelism = ctx.getNumberOfParallelSubtasks();
        
        // Create metrics
        MetricGroup metricGroup = ctx.getMetricGroup();
        processedCounter = metricGroup.counter("processed");
        errorCounter = metricGroup.counter("errors");
        
        // Access state (in keyed operations)
        ValueStateDescriptor<Integer> descriptor = 
            new ValueStateDescriptor<>("count", Integer.class);
        ValueState<Integer> countState = ctx.getState(descriptor);
    }
    
    @Override
    public String map(String value) throws Exception {
        try {
            processedCounter.inc();
            // Process value
            return value.toUpperCase();
        } catch (Exception e) {
            errorCounter.inc();
            throw e;
        }
    }
}

Aggregate Functions

For advanced aggregation operations.

import org.apache.flink.api.common.functions.AggregateFunction;

// Average aggregate function
public class AverageAggregateFunction implements 
        AggregateFunction<Integer, Tuple2<Integer, Integer>, Double> {
    
    @Override
    public Tuple2<Integer, Integer> createAccumulator() {
        return new Tuple2<>(0, 0); // (sum, count)
    }
    
    @Override
    public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {
        return new Tuple2<>(accumulator.f0 + value, accumulator.f1 + 1);
    }
    
    @Override
    public Double getResult(Tuple2<Integer, Integer> accumulator) {
        return accumulator.f1 == 0 ? 0.0 : (double) accumulator.f0 / accumulator.f1;
    }
    
    @Override
    public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
        return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
    }
}

// Rich aggregate function with metrics
public class RichAverageAggregateFunction extends RichAggregateFunction<Integer, Tuple2<Integer, Integer>, Double> {
    private Counter aggregationCounter;
    
    @Override
    public void open(OpenContext openContext) throws Exception {
        aggregationCounter = getRuntimeContext()
            .getMetricGroup()
            .counter("aggregations");
    }
    
    @Override
    public Tuple2<Integer, Integer> createAccumulator() {
        return new Tuple2<>(0, 0);
    }
    
    @Override
    public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {
        aggregationCounter.inc();
        return new Tuple2<>(accumulator.f0 + value, accumulator.f1 + 1);
    }
    
    @Override
    public Double getResult(Tuple2<Integer, Integer> accumulator) {
        return accumulator.f1 == 0 ? 0.0 : (double) accumulator.f0 / accumulator.f1;
    }
    
    @Override
    public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
        return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
    }
}

Cross Functions

For Cartesian product operations.

import org.apache.flink.api.common.functions.CrossFunction;

public class UserProductCrossFunction implements CrossFunction<User, Product, UserProductPair> {
    @Override
    public UserProductPair cross(User user, Product product) throws Exception {
        return new UserProductPair(
            user.getId(), 
            product.getId(), 
            calculateCompatibility(user, product)
        );
    }
    
    private double calculateCompatibility(User user, Product product) {
        // Custom compatibility calculation
        return Math.random(); // Simplified example
    }
}

Best Practices

Function Design

// Prefer stateless functions when possible
public class StatelessTransformFunction implements MapFunction<InputType, OutputType> {
    @Override
    public OutputType map(InputType input) throws Exception {
        // Pure transformation logic without side effects
        return transform(input);
    }
    
    private OutputType transform(InputType input) {
        // Deterministic transformation
        return new OutputType(input.getValue() * 2);
    }
}

// Use rich functions when you need lifecycle management
public class ResourceManagedFunction extends RichMapFunction<InputType, OutputType> {
    private transient ExpensiveResource resource;
    
    @Override
    public void open(OpenContext openContext) throws Exception {
        // Initialize expensive resources once per task
        resource = new ExpensiveResource();
    }
    
    @Override
    public OutputType map(InputType input) throws Exception {
        return resource.process(input);
    }
    
    @Override
    public void close() throws Exception {
        if (resource != null) {
            resource.cleanup();
        }
    }
}

Exception Handling

public class RobustMapFunction implements MapFunction<String, Result> {
    @Override
    public Result map(String value) throws Exception {
        try {
            return processValue(value);
        } catch (IllegalArgumentException e) {
            // Handle known exceptions gracefully
            return Result.createErrorResult("Invalid input: " + value);
        } catch (Exception e) {
            // Re-throw unexpected exceptions to trigger Flink's fault tolerance
            throw new Exception("Processing failed for value: " + value, e);
        }
    }
    
    private Result processValue(String value) {
        // Business logic
        return new Result(value);
    }
}

Apache Flink's function interfaces provide a powerful foundation for implementing custom data processing logic. By choosing the appropriate function type and following best practices, you can build efficient, maintainable, and fault-tolerant data processing applications.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-core

docs

configuration.md

connectors.md

event-time-watermarks.md

execution-jobs.md

functions-and-operators.md

index.md

state-management.md

type-system-serialization.md

utilities.md

tile.json