Apache Flink State Processor API for reading and writing savepoint state data offline
—
State writing functionality allows you to bootstrap new state data into savepoints using DataSet transformations. This enables you to initialize operator state from batch data sources.
Bootstrap transformations define how to convert DataSet elements into operator state. They are created through the OperatorTransformation API and can be used with both keyed and non-keyed operators.
public final class OperatorTransformation {
public static <T> OneInputOperatorTransformation<T> bootstrapWith(
DataSet<T> dataSet
);
}For operators without keyed state partitioning.
public class OneInputOperatorTransformation<T> {
public OneInputOperatorTransformation<T> setMaxParallelism(int maxParallelism);
public OneInputOperatorTransformation<T> assignTimestamps(Timestamper<T> timestamper);
public OneInputOperatorTransformation<T> assignTimestamps(TimestampAssigner<T> assigner);
public BootstrapTransformation<T> transform(StateBootstrapFunction<T> processFunction);
public BootstrapTransformation<T> transform(
BroadcastStateBootstrapFunction<T> processFunction
);
public BootstrapTransformation<T> transform(SavepointWriterOperatorFactory factory);
public <K> KeyedOperatorTransformation<K, T> keyBy(
KeySelector<T, K> keySelector
);
public <K> KeyedOperatorTransformation<K, T> keyBy(
KeySelector<T, K> keySelector,
TypeInformation<K> keyType
);
public KeyedOperatorTransformation<Tuple, T> keyBy(int... fields);
public KeyedOperatorTransformation<Tuple, T> keyBy(String... fields);
}Usage Example:
import org.apache.flink.state.api.functions.StateBootstrapFunction;
// Prepare bootstrap data
DataSet<String> messages = env.fromCollection(Arrays.asList(
"message1", "message2", "message3"
));
// Create bootstrap transformation
BootstrapTransformation<String> transformation = OperatorTransformation
.bootstrapWith(messages)
.transform(new MessageBootstrapFunction());
// Add to savepoint
savepoint.withOperator("message-buffer", transformation);Base function for non-keyed state bootstrap.
public abstract class StateBootstrapFunction<T> extends AbstractRichFunction {
public abstract void processElement(T value, Context ctx) throws Exception;
public abstract class Context {
public abstract long currentProcessingTime();
public abstract long currentWatermark();
}
}Implementation Example:
public class MessageBootstrapFunction extends StateBootstrapFunction<String> {
private ListState<String> messagesState;
private ValueState<Integer> countState;
@Override
public void open(Configuration parameters) throws Exception {
// Register state descriptors
ListStateDescriptor<String> messagesDesc = new ListStateDescriptor<>(
"messages", String.class
);
messagesState = getRuntimeContext().getListState(messagesDesc);
ValueStateDescriptor<Integer> countDesc = new ValueStateDescriptor<>(
"count", Integer.class
);
countState = getRuntimeContext().getState(countDesc);
}
@Override
public void processElement(String message, Context ctx) throws Exception {
// Add message to list state
messagesState.add(message);
// Update count
Integer currentCount = countState.value();
countState.update((currentCount != null ? currentCount : 0) + 1);
// Access context information
long processingTime = ctx.currentProcessingTime();
long watermark = ctx.currentWatermark();
System.out.println("Processed message at " + processingTime +
", watermark: " + watermark);
}
}For operators with keyed state partitioning.
public class KeyedOperatorTransformation<K, T> {
public BootstrapTransformation<T> transform(
KeyedStateBootstrapFunction<K, T> processFunction
);
public BootstrapTransformation<T> transform(SavepointWriterOperatorFactory factory);
public <W extends Window> WindowedOperatorTransformation<T, K, W> window(
WindowAssigner<? super T, W> assigner
);
}Function for bootstrapping keyed state.
public abstract class KeyedStateBootstrapFunction<K, IN> extends AbstractRichFunction {
public abstract void processElement(IN value, Context ctx) throws Exception;
public abstract class Context {
public abstract TimerService timerService();
public abstract K getCurrentKey();
}
}Usage Example:
// Prepare keyed bootstrap data
DataSet<UserAction> userActions = env.fromCollection(getUserActions());
// Create keyed bootstrap transformation
BootstrapTransformation<UserAction> transformation = OperatorTransformation
.bootstrapWith(userActions)
.keyBy(UserAction::getUserId) // Key by user ID
.transform(new UserStateBootstrapFunction());
// Add to savepoint
savepoint.withOperator("user-processor", transformation);KeyedStateBootstrapFunction Implementation:
public class UserStateBootstrapFunction extends KeyedStateBootstrapFunction<String, UserAction> {
private ValueState<UserProfile> profileState;
private ListState<String> activityState;
private ValueState<Long> lastSeenState;
@Override
public void open(Configuration parameters) throws Exception {
// Register keyed state descriptors
ValueStateDescriptor<UserProfile> profileDesc = new ValueStateDescriptor<>(
"profile", UserProfile.class
);
profileState = getRuntimeContext().getState(profileDesc);
ListStateDescriptor<String> activityDesc = new ListStateDescriptor<>(
"activity", String.class
);
activityState = getRuntimeContext().getListState(activityDesc);
ValueStateDescriptor<Long> lastSeenDesc = new ValueStateDescriptor<>(
"lastSeen", Long.class
);
lastSeenState = getRuntimeContext().getState(lastSeenDesc);
}
@Override
public void processElement(UserAction action, Context ctx) throws Exception {
String userId = ctx.getCurrentKey();
// Update user profile
UserProfile profile = profileState.value();
if (profile == null) {
profile = new UserProfile(userId);
}
profile.updateFromAction(action);
profileState.update(profile);
// Add to activity log
activityState.add(action.getActionType() + ":" + action.getTimestamp());
// Update last seen timestamp
lastSeenState.update(action.getTimestamp());
// Set timer for cleanup (example)
TimerService timerService = ctx.timerService();
timerService.registerEventTimeTimer(action.getTimestamp() + Duration.ofDays(30).toMillis());
}
}Function for bootstrapping broadcast state.
public abstract class BroadcastStateBootstrapFunction<T> extends AbstractRichFunction {
public abstract void processElement(T value, Context ctx) throws Exception;
public abstract class Context {
public abstract long currentProcessingTime();
public abstract long currentWatermark();
}
}Usage Example:
// Bootstrap broadcast state with configuration data
DataSet<ConfigEntry> configs = env.fromCollection(getConfigEntries());
BootstrapTransformation<ConfigEntry> transformation = OperatorTransformation
.bootstrapWith(configs)
.keyBy(ConfigEntry::getKey) // Key by config key
.transform(new ConfigBroadcastBootstrapFunction());
savepoint.withOperator("config-broadcaster", transformation);Implementation:
public class ConfigBroadcastBootstrapFunction extends BroadcastStateBootstrapFunction<ConfigEntry> {
private MapStateDescriptor<String, String> configDescriptor;
@Override
public void open(Configuration parameters) throws Exception {
configDescriptor = new MapStateDescriptor<>(
"config-broadcast", String.class, String.class
);
}
@Override
public void processElement(ConfigEntry entry, Context ctx) throws Exception {
BroadcastState<String, String> broadcastState =
getRuntimeContext().getBroadcastState(configDescriptor);
broadcastState.put(entry.getKey(), entry.getValue());
System.out.println("Bootstrapped config: " + entry.getKey() +
" = " + entry.getValue());
}
}public class ConditionalBootstrapFunction extends KeyedStateBootstrapFunction<String, DataRecord> {
private ValueState<String> statusState;
private ValueState<Double> scoreState;
@Override
public void open(Configuration parameters) throws Exception {
statusState = getRuntimeContext().getState(
new ValueStateDescriptor<>("status", String.class)
);
scoreState = getRuntimeContext().getState(
new ValueStateDescriptor<>("score", Double.class)
);
}
@Override
public void processElement(DataRecord record, Context ctx) throws Exception {
// Only bootstrap state for certain conditions
if (record.isValid() && record.getScore() > 0.5) {
statusState.update(record.getStatus());
scoreState.update(record.getScore());
// Set cleanup timer based on record type
if (record.getType().equals("TEMPORARY")) {
ctx.timerService().registerEventTimeTimer(
record.getTimestamp() + Duration.ofHours(24).toMillis()
);
}
}
}
}public class BatchBootstrapFunction extends KeyedStateBootstrapFunction<String, BatchData> {
private ListState<String> batchState;
private ValueState<Integer> batchSizeState;
@Override
public void processElement(BatchData data, Context ctx) throws Exception {
// Accumulate batch data
batchState.add(data.getPayload());
Integer currentSize = batchSizeState.value();
int newSize = (currentSize != null ? currentSize : 0) + 1;
batchSizeState.update(newSize);
// Process batch when it reaches target size
if (newSize >= data.getBatchSize()) {
processBatch();
batchState.clear();
batchSizeState.clear();
}
}
private void processBatch() throws Exception {
List<String> items = new ArrayList<>();
batchState.get().forEach(items::add);
// Process accumulated batch
System.out.println("Processing batch of " + items.size() + " items");
}
}// Create multiple bootstrap transformations
DataSet<UserData> userData = env.fromCollection(users);
DataSet<ConfigData> configData = env.fromCollection(configs);
BootstrapTransformation<UserData> userTransform = OperatorTransformation
.bootstrapWith(userData)
.keyBy(UserData::getId)
.transform(new UserBootstrapFunction());
BootstrapTransformation<ConfigData> configTransform = OperatorTransformation
.bootstrapWith(configData)
.transform(new ConfigBootstrapFunction());
// Add multiple operators to savepoint
savepoint.withOperator("user-state", userTransform)
.withOperator("config-state", configTransform)
.write("/path/to/savepoint");// Custom key selector with type information
KeySelector<MyData, String> keySelector = data -> data.getPartitionKey();
TypeInformation<String> keyType = Types.STRING;
BootstrapTransformation<MyData> transformation = OperatorTransformation
.bootstrapWith(dataSet)
.keyBy(keySelector, keyType) // Explicit key type
.transform(new MyBootstrapFunction());// Set operator-specific max parallelism
BootstrapTransformation<MyData> transformation = OperatorTransformation
.bootstrapWith(dataSet)
.keyBy(MyData::getKey)
.transform(new MyBootstrapFunction())
.setMaxParallelism(256); // Override global max parallelism// Configure memory for bootstrap operations
Configuration config = new Configuration();
config.setString("taskmanager.memory.process.size", "2g");
config.setDouble("taskmanager.memory.flink.fraction", 0.8);
savepoint.withConfiguration(config)
.withOperator("data-processor", transformation)
.write("/path/to/savepoint");public class RobustBootstrapFunction extends KeyedStateBootstrapFunction<String, MyData> {
private ValueState<MyData> dataState;
@Override
public void processElement(MyData data, Context ctx) throws Exception {
try {
// Validate input data
if (data == null || data.getId() == null) {
System.err.println("Invalid data received, skipping");
return;
}
// Process data
dataState.update(data);
} catch (Exception e) {
System.err.println("Failed to process data for key " +
ctx.getCurrentKey() + ": " + e.getMessage());
// Could choose to re-throw or continue processing
// throw new RuntimeException("Bootstrap failed", e);
}
}
}try {
BootstrapTransformation<MyData> transformation = OperatorTransformation
.bootstrapWith(dataSet)
.keyBy(MyData::getKey)
.transform(new MyBootstrapFunction());
savepoint.withOperator("my-operator", transformation);
} catch (InvalidProgramException e) {
System.err.println("Type inference failed: " + e.getMessage());
} catch (RuntimeException e) {
System.err.println("Transformation validation failed: " + e.getMessage());
}For windowed operators that process data within time or count-based windows.
public class WindowedOperatorTransformation<T, K, W extends Window> {
public WindowedOperatorTransformation<T, K, W> trigger(
Trigger<? super T, ? super W> trigger
);
public WindowedOperatorTransformation<T, K, W> evictor(
Evictor<? super T, ? super W> evictor
);
public BootstrapTransformation<T> reduce(ReduceFunction<T> function);
public <R> BootstrapTransformation<T> reduce(
ReduceFunction<T> reduceFunction,
WindowFunction<T, R, K, W> windowFunction
);
public <ACC, R> BootstrapTransformation<T> aggregate(
AggregateFunction<T, ACC, R> function
);
public <ACC, V, R> BootstrapTransformation<T> aggregate(
AggregateFunction<T, ACC, V> aggregateFunction,
WindowFunction<V, R, K, W> windowFunction
);
public <R> BootstrapTransformation<T> apply(
WindowFunction<T, R, K, W> function
);
public <R> BootstrapTransformation<T> process(
ProcessWindowFunction<T, R, K, W> function
);
}Usage Example:
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
// Create windowed bootstrap transformation
DataSet<SensorReading> sensorData = env.fromCollection(getSensorReadings());
BootstrapTransformation<SensorReading> windowTransformation = OperatorTransformation
.bootstrapWith(sensorData)
.keyBy(SensorReading::getSensorId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new AverageAggregateFunction());
savepoint.withOperator("sensor-window-operator", windowTransformation);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-state-processor-api-2-12