Apache Flink Core runtime components, type system, and foundational APIs for stream processing applications
—
Apache Flink provides comprehensive state management capabilities for stateful stream processing applications. The state APIs enable functions to maintain state across events while providing fault tolerance through checkpointing and recovery mechanisms.
Store and update a single value per key.
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.OpenContext;
public class CountingMapFunction extends RichMapFunction<String, Tuple2<String, Integer>> {
private ValueState<Integer> countState;
@Override
public void open(OpenContext openContext) throws Exception {
// Create state descriptor
ValueStateDescriptor<Integer> descriptor =
new ValueStateDescriptor<>("count", Integer.class, 0);
// Get state handle
countState = getRuntimeContext().getState(descriptor);
}
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
// Read current state
Integer currentCount = countState.value();
// Update state
currentCount++;
countState.update(currentCount);
return new Tuple2<>(value, currentCount);
}
}Maintain a list of values per key.
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
public class BufferingMapFunction extends RichMapFunction<String, List<String>> {
private ListState<String> bufferState;
@Override
public void open(OpenContext openContext) throws Exception {
ListStateDescriptor<String> descriptor =
new ListStateDescriptor<>("buffer", String.class);
bufferState = getRuntimeContext().getListState(descriptor);
}
@Override
public List<String> map(String value) throws Exception {
// Add to list state
bufferState.add(value);
// Read all values
List<String> allValues = new ArrayList<>();
for (String item : bufferState.get()) {
allValues.add(item);
}
// Clear state if buffer is full
if (allValues.size() > 100) {
bufferState.clear();
}
return allValues;
}
}Store key-value pairs as state.
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
public class UserSessionFunction extends RichMapFunction<Event, SessionInfo> {
private MapState<String, Long> sessionStartTimes;
@Override
public void open(OpenContext openContext) throws Exception {
MapStateDescriptor<String, Long> descriptor =
new MapStateDescriptor<>("sessions", String.class, Long.class);
sessionStartTimes = getRuntimeContext().getMapState(descriptor);
}
@Override
public SessionInfo map(Event event) throws Exception {
String sessionId = event.getSessionId();
// Check if session exists
if (!sessionStartTimes.contains(sessionId)) {
// New session
sessionStartTimes.put(sessionId, event.getTimestamp());
}
long startTime = sessionStartTimes.get(sessionId);
long duration = event.getTimestamp() - startTime;
// Remove expired sessions
Iterator<Map.Entry<String, Long>> iterator = sessionStartTimes.iterator();
while (iterator.hasNext()) {
Map.Entry<String, Long> entry = iterator.next();
if (event.getTimestamp() - entry.getValue() > 3600000) { // 1 hour
iterator.remove();
}
}
return new SessionInfo(sessionId, startTime, duration);
}
}Aggregate values using a reduce function.
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.functions.ReduceFunction;
public class SumAccumulatorFunction extends RichMapFunction<Integer, Integer> {
private ReducingState<Integer> sumState;
@Override
public void open(OpenContext openContext) throws Exception {
ReducingStateDescriptor<Integer> descriptor =
new ReducingStateDescriptor<>(
"sum",
new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2) throws Exception {
return value1 + value2;
}
},
Integer.class
);
sumState = getRuntimeContext().getReducingState(descriptor);
}
@Override
public Integer map(Integer value) throws Exception {
// Add to reducing state
sumState.add(value);
// Get current sum
return sumState.get();
}
}Use custom aggregate functions for complex aggregations.
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.functions.AggregateFunction;
public class AverageAccumulatorFunction extends RichMapFunction<Double, Double> {
private AggregatingState<Double, Double> avgState;
// Average aggregate function
public static class AverageAggregate implements AggregateFunction<Double, Tuple2<Double, Long>, Double> {
@Override
public Tuple2<Double, Long> createAccumulator() {
return new Tuple2<>(0.0, 0L);
}
@Override
public Tuple2<Double, Long> add(Double value, Tuple2<Double, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value, accumulator.f1 + 1L);
}
@Override
public Double getResult(Tuple2<Double, Long> accumulator) {
return accumulator.f1 == 0 ? 0.0 : accumulator.f0 / accumulator.f1;
}
@Override
public Tuple2<Double, Long> merge(Tuple2<Double, Long> a, Tuple2<Double, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}
@Override
public void open(OpenContext openContext) throws Exception {
AggregatingStateDescriptor<Double, Tuple2<Double, Long>, Double> descriptor =
new AggregatingStateDescriptor<>("average", new AverageAggregate(),
TypeInformation.of(new TypeHint<Tuple2<Double, Long>>(){}));
avgState = getRuntimeContext().getAggregatingState(descriptor);
}
@Override
public Double map(Double value) throws Exception {
avgState.add(value);
return avgState.get();
}
}Configure automatic state cleanup based on time.
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
public class TTLEnabledFunction extends RichMapFunction<String, String> {
private ValueState<String> ttlState;
@Override
public void open(OpenContext openContext) throws Exception {
// Configure state TTL
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.hours(1)) // TTL of 1 hour
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupFullSnapshot() // Cleanup on full snapshots
.cleanupIncrementally(10, true) // Incremental cleanup
.build();
ValueStateDescriptor<String> descriptor =
new ValueStateDescriptor<>("ttl-state", String.class);
descriptor.enableTimeToLive(ttlConfig);
ttlState = getRuntimeContext().getState(descriptor);
}
@Override
public String map(String value) throws Exception {
String currentValue = ttlState.value();
ttlState.update(value);
return currentValue != null ? currentValue : "first-value";
}
}// Different cleanup strategies
StateTtlConfig incrementalCleanup = StateTtlConfig
.newBuilder(Time.minutes(30))
.cleanupIncrementally(5, true) // Clean 5 entries per access
.build();
StateTtlConfig fullSnapshotCleanup = StateTtlConfig
.newBuilder(Time.days(1))
.cleanupFullSnapshot() // Clean during full snapshots
.build();
StateTtlConfig rocksDBCleanup = StateTtlConfig
.newBuilder(Time.hours(2))
.cleanupInRocksdbCompactFilter(1000) // RocksDB compaction filter
.build();
// Combined cleanup strategies
StateTtlConfig combinedCleanup = StateTtlConfig
.newBuilder(Time.hours(6))
.cleanupIncrementally(10, true)
.cleanupFullSnapshot()
.cleanupInRocksdbCompactFilter(500)
.build();State that is not keyed and maintained per operator instance.
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
public class BufferingMapFunction extends RichMapFunction<String, String>
implements CheckpointedFunction {
private List<String> bufferedElements;
private ListState<String> checkpointedState;
@Override
public String map(String value) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() >= 10) {
String result = String.join(",", bufferedElements);
bufferedElements.clear();
return result;
}
return null; // Buffer not full yet
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
for (String element : bufferedElements) {
checkpointedState.add(element);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<String> descriptor =
new ListStateDescriptor<>("buffered-elements", String.class);
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
if (context.isRestored()) {
// Restore state after failure
bufferedElements = new ArrayList<>();
for (String element : checkpointedState.get()) {
bufferedElements.add(element);
}
} else {
bufferedElements = new ArrayList<>();
}
}
}For redistributing state during rescaling.
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.runtime.state.FunctionInitializationContext;
public class RedistributingFunction extends RichMapFunction<Integer, Integer>
implements CheckpointedFunction {
private List<Integer> localBuffer;
private ListState<Integer> unionState;
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Integer> descriptor =
new ListStateDescriptor<>("union-state", Integer.class);
// Union list state for redistribution during rescaling
unionState = context.getOperatorStateStore().getUnionListState(descriptor);
localBuffer = new ArrayList<>();
if (context.isRestored()) {
// All subtasks receive all state elements
for (Integer element : unionState.get()) {
localBuffer.add(element);
}
}
}
@Override
public Integer map(Integer value) throws Exception {
localBuffer.add(value);
// Process and return result
return localBuffer.size();
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
unionState.clear();
for (Integer element : localBuffer) {
unionState.add(element);
}
}
}Share read-only state across all parallel instances.
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
public class RuleBasedProcessFunction extends BroadcastProcessFunction<Event, Rule, Alert> {
// Broadcast state descriptor
private static final MapStateDescriptor<String, Rule> RULE_STATE_DESCRIPTOR =
new MapStateDescriptor<>("rules", String.class, Rule.class);
@Override
public void processElement(Event event, ReadOnlyContext ctx, Collector<Alert> out)
throws Exception {
// Read from broadcast state (read-only in processElement)
ReadOnlyBroadcastState<String, Rule> broadcastState =
ctx.getBroadcastState(RULE_STATE_DESCRIPTOR);
// Apply rules to event
for (Map.Entry<String, Rule> entry : broadcastState.immutableEntries()) {
Rule rule = entry.getValue();
if (rule.matches(event)) {
out.collect(new Alert(event, rule));
}
}
}
@Override
public void processBroadcastElement(Rule rule, Context ctx, Collector<Alert> out)
throws Exception {
// Update broadcast state (writable in processBroadcastElement)
BroadcastState<String, Rule> broadcastState =
ctx.getBroadcastState(RULE_STATE_DESCRIPTOR);
// Add or update rule
broadcastState.put(rule.getId(), rule);
}
}Make state queryable from external applications.
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.queryablestate.client.QueryableStateClient;
public class QueryableStateFunction extends RichMapFunction<Tuple2<String, Integer>, String> {
private ValueState<Integer> queryableState;
@Override
public void open(OpenContext openContext) throws Exception {
ValueStateDescriptor<Integer> descriptor =
new ValueStateDescriptor<>("queryable-count", Integer.class, 0);
// Make state queryable
descriptor.setQueryable("count-query");
queryableState = getRuntimeContext().getState(descriptor);
}
@Override
public String map(Tuple2<String, Integer> value) throws Exception {
Integer currentCount = queryableState.value();
currentCount += value.f1;
queryableState.update(currentCount);
return "Updated count for " + value.f0 + ": " + currentCount;
}
}
// Client code to query state
public class StateQueryClient {
public static void queryState() throws Exception {
QueryableStateClient client = new QueryableStateClient("localhost", 9069);
CompletableFuture<ValueState<Integer>> future =
client.getKvState(
JobID.generate(),
"count-query",
"key",
BasicTypeInfo.STRING_TYPE_INFO,
new ValueStateDescriptor<>("queryable-count", Integer.class)
);
ValueState<Integer> state = future.get();
Integer count = state.value();
System.out.println("Current count: " + count);
client.close();
}
}React to checkpoint events.
import org.apache.flink.api.common.state.CheckpointListener;
public class CheckpointAwareFunction extends RichMapFunction<String, String>
implements CheckpointListener {
private transient DatabaseConnection connection;
@Override
public void open(OpenContext openContext) throws Exception {
connection = new DatabaseConnection();
}
@Override
public String map(String value) throws Exception {
// Regular processing
return process(value);
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
// Checkpoint completed successfully
connection.commitTransaction();
System.out.println("Checkpoint " + checkpointId + " completed");
}
@Override
public void notifyCheckpointAborted(long checkpointId) throws Exception {
// Checkpoint was aborted
connection.rollbackTransaction();
System.out.println("Checkpoint " + checkpointId + " aborted");
}
}Handle state evolution during application updates.
// Version 1 of the state
public class StateV1 {
public String name;
public int count;
// Constructor, getters, setters
}
// Version 2 of the state (evolved)
public class StateV2 {
public String name;
public int count;
public long timestamp; // New field
public StateV2(StateV1 oldState) {
this.name = oldState.name;
this.count = oldState.count;
this.timestamp = System.currentTimeMillis();
}
}
// State migration function
public class MigratableStateFunction extends RichMapFunction<String, String> {
private ValueState<StateV2> state;
@Override
public void open(OpenContext openContext) throws Exception {
ValueStateDescriptor<StateV2> descriptor =
new ValueStateDescriptor<>("evolved-state", StateV2.class);
state = getRuntimeContext().getState(descriptor);
}
@Override
public String map(String value) throws Exception {
StateV2 currentState = state.value();
if (currentState == null) {
currentState = new StateV2();
currentState.name = value;
currentState.count = 1;
currentState.timestamp = System.currentTimeMillis();
} else {
currentState.count++;
currentState.timestamp = System.currentTimeMillis();
}
state.update(currentState);
return currentState.toString();
}
}// Use appropriate state types for your use case
public class StatePatternExamples extends RichMapFunction<Event, Result> {
// Single values per key
private ValueState<String> lastValue;
// Collections that grow over time
private ListState<Event> eventHistory;
// Key-value mappings
private MapState<String, Counter> categoryCounters;
// Aggregations
private ReducingState<Long> totalSum;
@Override
public void open(OpenContext openContext) throws Exception {
// Configure TTL for all states
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.hours(24))
.cleanupIncrementally(5, true)
.build();
ValueStateDescriptor<String> lastValueDesc =
new ValueStateDescriptor<>("last-value", String.class);
lastValueDesc.enableTimeToLive(ttlConfig);
lastValue = getRuntimeContext().getState(lastValueDesc);
// Other state descriptors with TTL...
}
@Override
public Result map(Event event) throws Exception {
// Efficient state operations
String previous = lastValue.value();
lastValue.update(event.getValue());
return new Result(previous, event.getValue());
}
}
// Minimize state size
public class EfficientStateFunction extends RichMapFunction<Event, Summary> {
private ValueState<CompactSummary> compactState;
// Use compact data structures
public static class CompactSummary {
public long count;
public double sum;
public long lastTimestamp;
// Compact representation instead of storing all events
}
@Override
public void open(OpenContext openContext) throws Exception {
ValueStateDescriptor<CompactSummary> descriptor =
new ValueStateDescriptor<>("compact-summary", CompactSummary.class);
compactState = getRuntimeContext().getState(descriptor);
}
@Override
public Summary map(Event event) throws Exception {
CompactSummary summary = compactState.value();
if (summary == null) {
summary = new CompactSummary();
}
// Update compact state
summary.count++;
summary.sum += event.getValue();
summary.lastTimestamp = event.getTimestamp();
compactState.update(summary);
return new Summary(summary.count, summary.sum / summary.count);
}
}Apache Flink's state management provides powerful capabilities for building stateful stream processing applications with fault tolerance guarantees. By understanding the different state types and following best practices, you can build efficient and reliable stateful applications.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-core