Apache Flink State Processor API for reading and writing savepoint state data offline
—
State reading functionality allows you to extract and process state data from existing savepoints. The API supports reading different types of Flink state including operator state (list, union, broadcast) and keyed state.
Read operator list state as individual elements.
public <T> DataSource<T> readListState(
String uid,
String name,
TypeInformation<T> typeInfo
) throws IOException;
public <T> DataSource<T> readListState(
String uid,
String name,
TypeInformation<T> typeInfo,
TypeSerializer<T> serializer
) throws IOException;Usage Example:
// Read list state with type information
DataSource<String> messages = savepoint.readListState(
"kafka-source",
"buffered-messages",
Types.STRING
);
// Read with custom serializer
TypeSerializer<MyCustomType> customSerializer = new MyCustomTypeSerializer();
DataSource<MyCustomType> customData = savepoint.readListState(
"custom-operator",
"custom-state",
TypeInformation.of(MyCustomType.class),
customSerializer
);
// Process the data
messages.map(msg -> "Processed: " + msg).print();Read operator union state which is similar to list state but with different semantics for redistribution.
public <T> DataSource<T> readUnionState(
String uid,
String name,
TypeInformation<T> typeInfo
) throws IOException;
public <T> DataSource<T> readUnionState(
String uid,
String name,
TypeInformation<T> typeInfo,
TypeSerializer<T> serializer
) throws IOException;Usage Example:
// Read union state
DataSource<Configuration> configs = savepoint.readUnionState(
"config-broadcaster",
"broadcast-config",
TypeInformation.of(Configuration.class)
);
configs.map(config -> processConfiguration(config)).collect();Read broadcast state as key-value pairs.
public <K, V> DataSource<Tuple2<K, V>> readBroadcastState(
String uid,
String name,
TypeInformation<K> keyTypeInfo,
TypeInformation<V> valueTypeInfo
) throws IOException;
public <K, V> DataSource<Tuple2<K, V>> readBroadcastState(
String uid,
String name,
TypeInformation<K> keyTypeInfo,
TypeInformation<V> valueTypeInfo,
TypeSerializer<K> keySerializer,
TypeSerializer<V> valueSerializer
) throws IOException;Usage Example:
// Read broadcast state as key-value pairs
DataSource<Tuple2<String, Rule>> rules = savepoint.readBroadcastState(
"rule-processor",
"rules-broadcast-state",
Types.STRING,
TypeInformation.of(Rule.class)
);
// Process the rules
rules.map(tuple -> "Rule " + tuple.f0 + ": " + tuple.f1.getDescription())
.print();Read keyed state using custom reader functions that process each key individually.
public <K, OUT> DataSource<OUT> readKeyedState(
String uid,
KeyedStateReaderFunction<K, OUT> function
) throws IOException;
public <K, OUT> DataSource<OUT> readKeyedState(
String uid,
KeyedStateReaderFunction<K, OUT> function,
TypeInformation<K> keyTypeInfo,
TypeInformation<OUT> outTypeInfo
) throws IOException;public abstract class KeyedStateReaderFunction<K, OUT> extends AbstractRichFunction {
public abstract void open(Configuration parameters) throws Exception;
public abstract void readKey(
K key,
Context ctx,
Collector<OUT> out
) throws Exception;
public interface Context {
Set<Long> registeredEventTimeTimers() throws Exception;
Set<Long> registeredProcessingTimeTimers() throws Exception;
}
}Usage Example:
public class MyKeyedStateReader extends KeyedStateReaderFunction<String, UserStats> {
private ValueState<Long> countState;
private ValueState<Double> avgState;
@Override
public void open(Configuration parameters) throws Exception {
// Register state descriptors in open()
ValueStateDescriptor<Long> countDesc = new ValueStateDescriptor<>(
"count", Long.class
);
countState = getRuntimeContext().getState(countDesc);
ValueStateDescriptor<Double> avgDesc = new ValueStateDescriptor<>(
"average", Double.class
);
avgState = getRuntimeContext().getState(avgDesc);
}
@Override
public void readKey(String key, Context ctx, Collector<UserStats> out) throws Exception {
Long count = countState.value();
Double average = avgState.value();
if (count != null && average != null) {
UserStats stats = new UserStats(key, count, average);
out.collect(stats);
}
// Access timer information if needed
Set<Long> eventTimers = ctx.registeredEventTimeTimers();
Set<Long> processingTimers = ctx.registeredProcessingTimeTimers();
}
}
// Use the reader function
DataSource<UserStats> userStats = savepoint.readKeyedState(
"user-processor",
new MyKeyedStateReader()
);
userStats.print();Value State:
public class ValueStateReader extends KeyedStateReaderFunction<String, Tuple2<String, String>> {
private ValueState<String> valueState;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<String> desc = new ValueStateDescriptor<>("value", String.class);
valueState = getRuntimeContext().getState(desc);
}
@Override
public void readKey(String key, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
String value = valueState.value();
if (value != null) {
out.collect(Tuple2.of(key, value));
}
}
}List State:
public class ListStateReader extends KeyedStateReaderFunction<String, Tuple2<String, List<String>>> {
private ListState<String> listState;
@Override
public void open(Configuration parameters) throws Exception {
ListStateDescriptor<String> desc = new ListStateDescriptor<>("list", String.class);
listState = getRuntimeContext().getListState(desc);
}
@Override
public void readKey(String key, Context ctx, Collector<Tuple2<String, List<String>>> out) throws Exception {
List<String> items = new ArrayList<>();
for (String item : listState.get()) {
items.add(item);
}
if (!items.isEmpty()) {
out.collect(Tuple2.of(key, items));
}
}
}Map State:
public class MapStateReader extends KeyedStateReaderFunction<String, Tuple3<String, String, Integer>> {
private MapState<String, Integer> mapState;
@Override
public void open(Configuration parameters) throws Exception {
MapStateDescriptor<String, Integer> desc = new MapStateDescriptor<>(
"map", String.class, Integer.class
);
mapState = getRuntimeContext().getMapState(desc);
}
@Override
public void readKey(String key, Context ctx, Collector<Tuple3<String, String, Integer>> out) throws Exception {
for (Map.Entry<String, Integer> entry : mapState.entries()) {
out.collect(Tuple3.of(key, entry.getKey(), entry.getValue()));
}
}
}public class FilteringStateReader extends KeyedStateReaderFunction<String, FilteredData> {
private ValueState<MyData> dataState;
private final Predicate<MyData> filter;
public FilteringStateReader(Predicate<MyData> filter) {
this.filter = filter;
}
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<MyData> desc = new ValueStateDescriptor<>("data", MyData.class);
dataState = getRuntimeContext().getState(desc);
}
@Override
public void readKey(String key, Context ctx, Collector<FilteredData> out) throws Exception {
MyData data = dataState.value();
if (data != null && filter.test(data)) {
out.collect(new FilteredData(key, data));
}
}
}
// Usage
DataSource<FilteredData> filtered = savepoint.readKeyedState(
"data-processor",
new FilteringStateReader(data -> data.getScore() > 0.8)
);public class AggregatingStateReader extends KeyedStateReaderFunction<String, KeyAggregate> {
private ValueState<Double> valueState;
private ListState<String> tagState;
@Override
public void open(Configuration parameters) throws Exception {
valueState = getRuntimeContext().getState(
new ValueStateDescriptor<>("value", Double.class)
);
tagState = getRuntimeContext().getListState(
new ListStateDescriptor<>("tags", String.class)
);
}
@Override
public void readKey(String key, Context ctx, Collector<KeyAggregate> out) throws Exception {
Double value = valueState.value();
List<String> tags = new ArrayList<>();
tagState.get().forEach(tags::add);
if (value != null) {
KeyAggregate aggregate = new KeyAggregate(key, value, tags);
out.collect(aggregate);
}
}
}try {
DataSource<MyData> data = savepoint.readListState(
"operator-uid",
"state-name",
TypeInformation.of(MyData.class)
);
data.collect();
} catch (IOException e) {
if (e.getMessage().contains("does not exist")) {
System.err.println("Operator UID not found in savepoint");
} else if (e.getMessage().contains("state")) {
System.err.println("State descriptor not found");
} else {
System.err.println("Failed to read state: " + e.getMessage());
}
}public class SafeStateReader extends KeyedStateReaderFunction<String, MyOutput> {
private ValueState<String> state;
@Override
public void open(Configuration parameters) throws Exception {
try {
ValueStateDescriptor<String> desc = new ValueStateDescriptor<>("myState", String.class);
state = getRuntimeContext().getState(desc);
} catch (Exception e) {
throw new RuntimeException("Failed to register state descriptor", e);
}
}
@Override
public void readKey(String key, Context ctx, Collector<MyOutput> out) throws Exception {
try {
String value = state.value();
if (value != null) {
out.collect(new MyOutput(key, value));
}
} catch (Exception e) {
System.err.println("Failed to read state for key " + key + ": " + e.getMessage());
// Could collect error record or skip
}
}
}// Set appropriate parallelism for reading operations
env.setParallelism(8);
DataSource<MyData> data = savepoint.readKeyedState("operator", readerFunction);
// Can override parallelism for specific operations
data.setParallelism(4).map(processData).print();// For large state, consider processing in batches
public class BatchingStateReader extends KeyedStateReaderFunction<String, List<MyData>> {
private ListState<MyData> listState;
@Override
public void readKey(String key, Context ctx, Collector<List<MyData>> out) throws Exception {
List<MyData> batch = new ArrayList<>();
int batchSize = 0;
for (MyData item : listState.get()) {
batch.add(item);
batchSize++;
// Emit batch when it reaches size limit
if (batchSize >= 1000) {
out.collect(new ArrayList<>(batch));
batch.clear();
batchSize = 0;
}
}
// Emit remaining items
if (!batch.isEmpty()) {
out.collect(batch);
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-state-processor-api-2-12