Apache Flink State Processor API for reading and writing savepoint state data offline
npx @tessl/cli install tessl/maven-org-apache-flink--flink-state-processor-api-2-12@1.14.0The Apache Flink State Processor API provides programmatic access to reading and writing Flink savepoint state data outside of a running Flink application. This API enables batch processing of streaming application state, allowing developers to bootstrap new savepoints with initial state data, query and analyze existing state, modify operator state, and perform state transformations using standard Flink DataSet operations.
pom.xml:<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-state-processor-api_2.12</artifactId>
<version>1.14.6</version>
</dependency>import org.apache.flink.state.api.Savepoint;
import org.apache.flink.state.api.ExistingSavepoint;
import org.apache.flink.state.api.NewSavepoint;
import org.apache.flink.state.api.OperatorTransformation;
import org.apache.flink.state.api.BootstrapTransformation;
import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.runtime.state.StateBackend;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.state.api.Savepoint;
import org.apache.flink.state.api.ExistingSavepoint;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
StateBackend stateBackend = new HashMapStateBackend();
// Load existing savepoint
ExistingSavepoint savepoint = Savepoint.load(env, "/path/to/savepoint", stateBackend);
// Read operator state
DataSource<MyState> states = savepoint.readListState(
"my-operator-uid",
"my-state-name",
TypeInformation.of(MyState.class)
);
states.print();
env.execute();import org.apache.flink.state.api.NewSavepoint;
import org.apache.flink.state.api.OperatorTransformation;
import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
// Create new savepoint
NewSavepoint savepoint = Savepoint.create(stateBackend, 128);
// Bootstrap data from DataSet
DataSet<MyInput> inputData = env.fromCollection(myDataList);
BootstrapTransformation<MyInput> transformation = OperatorTransformation
.bootstrapWith(inputData)
.keyBy(input -> input.getKey())
.transform(new MyKeyedStateBootstrapFunction());
// Add operator and write savepoint
savepoint.withOperator("my-operator-uid", transformation)
.write("/path/to/new-savepoint");The State Processor API consists of several key components:
Core functionality for loading, creating, and managing savepoints.
// Entry point class
public final class Savepoint {
public static ExistingSavepoint load(
ExecutionEnvironment env,
String path,
StateBackend stateBackend
) throws IOException;
public static NewSavepoint create(
StateBackend stateBackend,
int maxParallelism
);
}Read various types of state from existing savepoints including keyed state, operator state, and window state.
// Reading different state types
public <T> DataSource<T> readListState(
String uid,
String name,
TypeInformation<T> typeInfo
) throws IOException;
public <K, OUT> DataSource<OUT> readKeyedState(
String uid,
KeyedStateReaderFunction<K, OUT> function
) throws IOException;
public <W extends Window> WindowReader<W> window(
WindowAssigner<?, W> assigner
);Bootstrap new state data into savepoints using DataSet transformations.
// Bootstrap transformation creation
public static <T> OneInputOperatorTransformation<T> bootstrapWith(
DataSet<T> dataSet
);
// Keyed state bootstrap
public BootstrapTransformation<T> transform(
KeyedStateBootstrapFunction<K, T> processFunction
);User-defined functions for reading and writing state data.
// Base bootstrap function
public abstract class KeyedStateBootstrapFunction<K, IN> extends AbstractRichFunction {
public abstract void processElement(IN value, Context ctx) throws Exception;
}
// Base reader function
public abstract class KeyedStateReaderFunction<K, OUT> extends AbstractRichFunction {
public abstract void readKey(K key, Context ctx, Collector<OUT> out) throws Exception;
}Specialized operations for reading and writing window state data.
// Window reader for different aggregation types
public <T, K> DataSource<T> reduce(
String uid,
ReduceFunction<T> function,
TypeInformation<K> keyType,
TypeInformation<T> reduceType
) throws IOException;
public <K, T, ACC, R> DataSource<R> aggregate(
String uid,
AggregateFunction<T, ACC, R> aggregateFunction,
TypeInformation<K> keyType,
TypeInformation<ACC> accType,
TypeInformation<R> outputType
) throws IOException;// Core context interfaces
public abstract class Context {
public abstract TimerService timerService();
public abstract K getCurrentKey();
}
// Bootstrap transformation
public class BootstrapTransformation<T> {
public DataSet<OperatorState> writeOperatorState(
OperatorID operatorID,
StateBackend stateBackend,
Configuration config,
int globalMaxParallelism,
Path savepointPath
);
}
// Writable savepoint base
public abstract class WritableSavepoint<F extends WritableSavepoint> {
public F removeOperator(String uid);
public <T> F withOperator(String uid, BootstrapTransformation<T> transformation);
public <T> F withConfiguration(ConfigOption<T> option, T value);
public void write(String path);
}The API throws standard Java exceptions:
IOException - For savepoint path and file system operationsInvalidProgramException - For type inference failuresRuntimeException - For validation and state access errorsCommon error scenarios: