CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-core

Apache Flink Core runtime components, type system, and foundational APIs for stream processing applications

Pending
Overview
Eval results
Files

state-management.mddocs/

State Management

Apache Flink provides comprehensive state management capabilities for stateful stream processing applications. The state APIs enable functions to maintain state across events while providing fault tolerance through checkpointing and recovery mechanisms.

State Types

Value State

Store and update a single value per key.

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.OpenContext;

public class CountingMapFunction extends RichMapFunction<String, Tuple2<String, Integer>> {
    private ValueState<Integer> countState;
    
    @Override
    public void open(OpenContext openContext) throws Exception {
        // Create state descriptor
        ValueStateDescriptor<Integer> descriptor = 
            new ValueStateDescriptor<>("count", Integer.class, 0);
        
        // Get state handle
        countState = getRuntimeContext().getState(descriptor);
    }
    
    @Override
    public Tuple2<String, Integer> map(String value) throws Exception {
        // Read current state
        Integer currentCount = countState.value();
        
        // Update state
        currentCount++;
        countState.update(currentCount);
        
        return new Tuple2<>(value, currentCount);
    }
}

List State

Maintain a list of values per key.

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;

public class BufferingMapFunction extends RichMapFunction<String, List<String>> {
    private ListState<String> bufferState;
    
    @Override
    public void open(OpenContext openContext) throws Exception {
        ListStateDescriptor<String> descriptor = 
            new ListStateDescriptor<>("buffer", String.class);
        bufferState = getRuntimeContext().getListState(descriptor);
    }
    
    @Override
    public List<String> map(String value) throws Exception {
        // Add to list state
        bufferState.add(value);
        
        // Read all values
        List<String> allValues = new ArrayList<>();
        for (String item : bufferState.get()) {
            allValues.add(item);
        }
        
        // Clear state if buffer is full
        if (allValues.size() > 100) {
            bufferState.clear();
        }
        
        return allValues;
    }
}

Map State

Store key-value pairs as state.

import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;

public class UserSessionFunction extends RichMapFunction<Event, SessionInfo> {
    private MapState<String, Long> sessionStartTimes;
    
    @Override
    public void open(OpenContext openContext) throws Exception {
        MapStateDescriptor<String, Long> descriptor = 
            new MapStateDescriptor<>("sessions", String.class, Long.class);
        sessionStartTimes = getRuntimeContext().getMapState(descriptor);
    }
    
    @Override
    public SessionInfo map(Event event) throws Exception {
        String sessionId = event.getSessionId();
        
        // Check if session exists
        if (!sessionStartTimes.contains(sessionId)) {
            // New session
            sessionStartTimes.put(sessionId, event.getTimestamp());
        }
        
        long startTime = sessionStartTimes.get(sessionId);
        long duration = event.getTimestamp() - startTime;
        
        // Remove expired sessions
        Iterator<Map.Entry<String, Long>> iterator = sessionStartTimes.iterator();
        while (iterator.hasNext()) {
            Map.Entry<String, Long> entry = iterator.next();
            if (event.getTimestamp() - entry.getValue() > 3600000) { // 1 hour
                iterator.remove();
            }
        }
        
        return new SessionInfo(sessionId, startTime, duration);
    }
}

Reducing State

Aggregate values using a reduce function.

import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.functions.ReduceFunction;

public class SumAccumulatorFunction extends RichMapFunction<Integer, Integer> {
    private ReducingState<Integer> sumState;
    
    @Override
    public void open(OpenContext openContext) throws Exception {
        ReducingStateDescriptor<Integer> descriptor = 
            new ReducingStateDescriptor<>(
                "sum",
                new ReduceFunction<Integer>() {
                    @Override
                    public Integer reduce(Integer value1, Integer value2) throws Exception {
                        return value1 + value2;
                    }
                },
                Integer.class
            );
        
        sumState = getRuntimeContext().getReducingState(descriptor);
    }
    
    @Override
    public Integer map(Integer value) throws Exception {
        // Add to reducing state
        sumState.add(value);
        
        // Get current sum
        return sumState.get();
    }
}

Aggregating State

Use custom aggregate functions for complex aggregations.

import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.functions.AggregateFunction;

public class AverageAccumulatorFunction extends RichMapFunction<Double, Double> {
    private AggregatingState<Double, Double> avgState;
    
    // Average aggregate function
    public static class AverageAggregate implements AggregateFunction<Double, Tuple2<Double, Long>, Double> {
        @Override
        public Tuple2<Double, Long> createAccumulator() {
            return new Tuple2<>(0.0, 0L);
        }
        
        @Override
        public Tuple2<Double, Long> add(Double value, Tuple2<Double, Long> accumulator) {
            return new Tuple2<>(accumulator.f0 + value, accumulator.f1 + 1L);
        }
        
        @Override
        public Double getResult(Tuple2<Double, Long> accumulator) {
            return accumulator.f1 == 0 ? 0.0 : accumulator.f0 / accumulator.f1;
        }
        
        @Override
        public Tuple2<Double, Long> merge(Tuple2<Double, Long> a, Tuple2<Double, Long> b) {
            return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
        }
    }
    
    @Override
    public void open(OpenContext openContext) throws Exception {
        AggregatingStateDescriptor<Double, Tuple2<Double, Long>, Double> descriptor = 
            new AggregatingStateDescriptor<>("average", new AverageAggregate(), 
                TypeInformation.of(new TypeHint<Tuple2<Double, Long>>(){}));
        
        avgState = getRuntimeContext().getAggregatingState(descriptor);
    }
    
    @Override
    public Double map(Double value) throws Exception {
        avgState.add(value);
        return avgState.get();
    }
}

State TTL (Time To Live)

Configure automatic state cleanup based on time.

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;

public class TTLEnabledFunction extends RichMapFunction<String, String> {
    private ValueState<String> ttlState;
    
    @Override
    public void open(OpenContext openContext) throws Exception {
        // Configure state TTL
        StateTtlConfig ttlConfig = StateTtlConfig
            .newBuilder(Time.hours(1)) // TTL of 1 hour
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .cleanupFullSnapshot() // Cleanup on full snapshots
            .cleanupIncrementally(10, true) // Incremental cleanup
            .build();
        
        ValueStateDescriptor<String> descriptor = 
            new ValueStateDescriptor<>("ttl-state", String.class);
        descriptor.enableTimeToLive(ttlConfig);
        
        ttlState = getRuntimeContext().getState(descriptor);
    }
    
    @Override
    public String map(String value) throws Exception {
        String currentValue = ttlState.value();
        ttlState.update(value);
        return currentValue != null ? currentValue : "first-value";
    }
}

Advanced TTL Configuration

// Different cleanup strategies
StateTtlConfig incrementalCleanup = StateTtlConfig
    .newBuilder(Time.minutes(30))
    .cleanupIncrementally(5, true) // Clean 5 entries per access
    .build();

StateTtlConfig fullSnapshotCleanup = StateTtlConfig
    .newBuilder(Time.days(1))
    .cleanupFullSnapshot() // Clean during full snapshots
    .build();

StateTtlConfig rocksDBCleanup = StateTtlConfig
    .newBuilder(Time.hours(2))
    .cleanupInRocksdbCompactFilter(1000) // RocksDB compaction filter
    .build();

// Combined cleanup strategies
StateTtlConfig combinedCleanup = StateTtlConfig
    .newBuilder(Time.hours(6))
    .cleanupIncrementally(10, true)
    .cleanupFullSnapshot()
    .cleanupInRocksdbCompactFilter(500)
    .build();

Operator State

State that is not keyed and maintained per operator instance.

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;

public class BufferingMapFunction extends RichMapFunction<String, String> 
        implements CheckpointedFunction {
    
    private List<String> bufferedElements;
    private ListState<String> checkpointedState;
    
    @Override
    public String map(String value) throws Exception {
        bufferedElements.add(value);
        
        if (bufferedElements.size() >= 10) {
            String result = String.join(",", bufferedElements);
            bufferedElements.clear();
            return result;
        }
        
        return null; // Buffer not full yet
    }
    
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear();
        for (String element : bufferedElements) {
            checkpointedState.add(element);
        }
    }
    
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<String> descriptor = 
            new ListStateDescriptor<>("buffered-elements", String.class);
        
        checkpointedState = context.getOperatorStateStore().getListState(descriptor);
        
        if (context.isRestored()) {
            // Restore state after failure
            bufferedElements = new ArrayList<>();
            for (String element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        } else {
            bufferedElements = new ArrayList<>();
        }
    }
}

Union List State

For redistributing state during rescaling.

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.runtime.state.FunctionInitializationContext;

public class RedistributingFunction extends RichMapFunction<Integer, Integer> 
        implements CheckpointedFunction {
    
    private List<Integer> localBuffer;
    private ListState<Integer> unionState;
    
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Integer> descriptor = 
            new ListStateDescriptor<>("union-state", Integer.class);
        
        // Union list state for redistribution during rescaling
        unionState = context.getOperatorStateStore().getUnionListState(descriptor);
        
        localBuffer = new ArrayList<>();
        
        if (context.isRestored()) {
            // All subtasks receive all state elements
            for (Integer element : unionState.get()) {
                localBuffer.add(element);
            }
        }
    }
    
    @Override
    public Integer map(Integer value) throws Exception {
        localBuffer.add(value);
        
        // Process and return result
        return localBuffer.size();
    }
    
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        unionState.clear();
        for (Integer element : localBuffer) {
            unionState.add(element);
        }
    }
}

Broadcast State

Share read-only state across all parallel instances.

import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;

public class RuleBasedProcessFunction extends BroadcastProcessFunction<Event, Rule, Alert> {
    
    // Broadcast state descriptor
    private static final MapStateDescriptor<String, Rule> RULE_STATE_DESCRIPTOR =
        new MapStateDescriptor<>("rules", String.class, Rule.class);
    
    @Override
    public void processElement(Event event, ReadOnlyContext ctx, Collector<Alert> out) 
            throws Exception {
        
        // Read from broadcast state (read-only in processElement)
        ReadOnlyBroadcastState<String, Rule> broadcastState = 
            ctx.getBroadcastState(RULE_STATE_DESCRIPTOR);
        
        // Apply rules to event
        for (Map.Entry<String, Rule> entry : broadcastState.immutableEntries()) {
            Rule rule = entry.getValue();
            if (rule.matches(event)) {
                out.collect(new Alert(event, rule));
            }
        }
    }
    
    @Override
    public void processBroadcastElement(Rule rule, Context ctx, Collector<Alert> out) 
            throws Exception {
        
        // Update broadcast state (writable in processBroadcastElement)
        BroadcastState<String, Rule> broadcastState = 
            ctx.getBroadcastState(RULE_STATE_DESCRIPTOR);
        
        // Add or update rule
        broadcastState.put(rule.getId(), rule);
    }
}

Queryable State

Make state queryable from external applications.

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.queryablestate.client.QueryableStateClient;

public class QueryableStateFunction extends RichMapFunction<Tuple2<String, Integer>, String> {
    private ValueState<Integer> queryableState;
    
    @Override
    public void open(OpenContext openContext) throws Exception {
        ValueStateDescriptor<Integer> descriptor = 
            new ValueStateDescriptor<>("queryable-count", Integer.class, 0);
        
        // Make state queryable
        descriptor.setQueryable("count-query");
        
        queryableState = getRuntimeContext().getState(descriptor);
    }
    
    @Override
    public String map(Tuple2<String, Integer> value) throws Exception {
        Integer currentCount = queryableState.value();
        currentCount += value.f1;
        queryableState.update(currentCount);
        
        return "Updated count for " + value.f0 + ": " + currentCount;
    }
}

// Client code to query state
public class StateQueryClient {
    public static void queryState() throws Exception {
        QueryableStateClient client = new QueryableStateClient("localhost", 9069);
        
        CompletableFuture<ValueState<Integer>> future = 
            client.getKvState(
                JobID.generate(),
                "count-query",
                "key",
                BasicTypeInfo.STRING_TYPE_INFO,
                new ValueStateDescriptor<>("queryable-count", Integer.class)
            );
        
        ValueState<Integer> state = future.get();
        Integer count = state.value();
        System.out.println("Current count: " + count);
        
        client.close();
    }
}

Checkpoint Listeners

React to checkpoint events.

import org.apache.flink.api.common.state.CheckpointListener;

public class CheckpointAwareFunction extends RichMapFunction<String, String> 
        implements CheckpointListener {
    
    private transient DatabaseConnection connection;
    
    @Override
    public void open(OpenContext openContext) throws Exception {
        connection = new DatabaseConnection();
    }
    
    @Override
    public String map(String value) throws Exception {
        // Regular processing
        return process(value);
    }
    
    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        // Checkpoint completed successfully
        connection.commitTransaction();
        System.out.println("Checkpoint " + checkpointId + " completed");
    }
    
    @Override
    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        // Checkpoint was aborted
        connection.rollbackTransaction();
        System.out.println("Checkpoint " + checkpointId + " aborted");
    }
}

State Migration

Handle state evolution during application updates.

// Version 1 of the state
public class StateV1 {
    public String name;
    public int count;
    
    // Constructor, getters, setters
}

// Version 2 of the state (evolved)
public class StateV2 {
    public String name;
    public int count;
    public long timestamp; // New field
    
    public StateV2(StateV1 oldState) {
        this.name = oldState.name;
        this.count = oldState.count;
        this.timestamp = System.currentTimeMillis();
    }
}

// State migration function
public class MigratableStateFunction extends RichMapFunction<String, String> {
    private ValueState<StateV2> state;
    
    @Override
    public void open(OpenContext openContext) throws Exception {
        ValueStateDescriptor<StateV2> descriptor = 
            new ValueStateDescriptor<>("evolved-state", StateV2.class);
        
        state = getRuntimeContext().getState(descriptor);
    }
    
    @Override
    public String map(String value) throws Exception {
        StateV2 currentState = state.value();
        
        if (currentState == null) {
            currentState = new StateV2();
            currentState.name = value;
            currentState.count = 1;
            currentState.timestamp = System.currentTimeMillis();
        } else {
            currentState.count++;
            currentState.timestamp = System.currentTimeMillis();
        }
        
        state.update(currentState);
        return currentState.toString();
    }
}

Best Practices

State Design Patterns

// Use appropriate state types for your use case
public class StatePatternExamples extends RichMapFunction<Event, Result> {
    
    // Single values per key
    private ValueState<String> lastValue;
    
    // Collections that grow over time
    private ListState<Event> eventHistory;
    
    // Key-value mappings
    private MapState<String, Counter> categoryCounters;
    
    // Aggregations
    private ReducingState<Long> totalSum;
    
    @Override
    public void open(OpenContext openContext) throws Exception {
        // Configure TTL for all states
        StateTtlConfig ttlConfig = StateTtlConfig
            .newBuilder(Time.hours(24))
            .cleanupIncrementally(5, true)
            .build();
        
        ValueStateDescriptor<String> lastValueDesc = 
            new ValueStateDescriptor<>("last-value", String.class);
        lastValueDesc.enableTimeToLive(ttlConfig);
        lastValue = getRuntimeContext().getState(lastValueDesc);
        
        // Other state descriptors with TTL...
    }
    
    @Override
    public Result map(Event event) throws Exception {
        // Efficient state operations
        String previous = lastValue.value();
        lastValue.update(event.getValue());
        
        return new Result(previous, event.getValue());
    }
}

// Minimize state size
public class EfficientStateFunction extends RichMapFunction<Event, Summary> {
    private ValueState<CompactSummary> compactState;
    
    // Use compact data structures
    public static class CompactSummary {
        public long count;
        public double sum;
        public long lastTimestamp;
        
        // Compact representation instead of storing all events
    }
    
    @Override
    public void open(OpenContext openContext) throws Exception {
        ValueStateDescriptor<CompactSummary> descriptor = 
            new ValueStateDescriptor<>("compact-summary", CompactSummary.class);
        compactState = getRuntimeContext().getState(descriptor);
    }
    
    @Override
    public Summary map(Event event) throws Exception {
        CompactSummary summary = compactState.value();
        if (summary == null) {
            summary = new CompactSummary();
        }
        
        // Update compact state
        summary.count++;
        summary.sum += event.getValue();
        summary.lastTimestamp = event.getTimestamp();
        
        compactState.update(summary);
        
        return new Summary(summary.count, summary.sum / summary.count);
    }
}

Apache Flink's state management provides powerful capabilities for building stateful stream processing applications with fault tolerance guarantees. By understanding the different state types and following best practices, you can build efficient and reliable stateful applications.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-core

docs

configuration.md

connectors.md

event-time-watermarks.md

execution-jobs.md

functions-and-operators.md

index.md

state-management.md

type-system-serialization.md

utilities.md

tile.json