Apache Flink State Processor API for reading and writing savepoint state data offline
—
Savepoint management provides the core functionality for loading existing savepoints and creating new ones. This includes the main entry points and base classes for savepoint operations.
The main entry point for all savepoint operations.
public final class Savepoint {
public static ExistingSavepoint load(
ExecutionEnvironment env,
String path,
StateBackend stateBackend
) throws IOException;
public static NewSavepoint create(
StateBackend stateBackend,
int maxParallelism
);
}Loading Existing Savepoints
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
StateBackend stateBackend = new HashMapStateBackend();
// Load savepoint from file system
ExistingSavepoint savepoint = Savepoint.load(env, "/path/to/savepoint", stateBackend);Creating New Savepoints
StateBackend stateBackend = new HashMapStateBackend();
// Create new empty savepoint with max parallelism of 128
NewSavepoint savepoint = Savepoint.create(stateBackend, 128);Represents a savepoint loaded from disk that can be read from and modified.
public class ExistingSavepoint extends WritableSavepoint<ExistingSavepoint> {
// State reading methods
public <T> DataSource<T> readListState(
String uid,
String name,
TypeInformation<T> typeInfo
) throws IOException;
public <T> DataSource<T> readListState(
String uid,
String name,
TypeInformation<T> typeInfo,
TypeSerializer<T> serializer
) throws IOException;
public <T> DataSource<T> readUnionState(
String uid,
String name,
TypeInformation<T> typeInfo
) throws IOException;
public <T> DataSource<T> readUnionState(
String uid,
String name,
TypeInformation<T> typeInfo,
TypeSerializer<T> serializer
) throws IOException;
public <K, V> DataSource<Tuple2<K, V>> readBroadcastState(
String uid,
String name,
TypeInformation<K> keyTypeInfo,
TypeInformation<V> valueTypeInfo
) throws IOException;
public <K, V> DataSource<Tuple2<K, V>> readBroadcastState(
String uid,
String name,
TypeInformation<K> keyTypeInfo,
TypeInformation<V> valueTypeInfo,
TypeSerializer<K> keySerializer,
TypeSerializer<V> valueSerializer
) throws IOException;
public <K, OUT> DataSource<OUT> readKeyedState(
String uid,
KeyedStateReaderFunction<K, OUT> function
) throws IOException;
public <K, OUT> DataSource<OUT> readKeyedState(
String uid,
KeyedStateReaderFunction<K, OUT> function,
TypeInformation<K> keyTypeInfo,
TypeInformation<OUT> outTypeInfo
) throws IOException;
public <W extends Window> WindowReader<W> window(
WindowAssigner<?, W> assigner
);
public <W extends Window> WindowReader<W> window(
TypeSerializer<W> windowSerializer
);
}Represents a new empty savepoint that can be written to.
public class NewSavepoint extends WritableSavepoint<NewSavepoint> {
// Inherits all methods from WritableSavepoint
}Base class for savepoints that can be modified and written.
public abstract class WritableSavepoint<F extends WritableSavepoint> {
public F removeOperator(String uid);
public <T> F withOperator(
String uid,
BootstrapTransformation<T> transformation
);
public <T> F withConfiguration(
ConfigOption<T> option,
T value
);
public void write(String path);
}import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.state.api.Savepoint;
import org.apache.flink.state.api.ExistingSavepoint;
public class SavepointExample {
public void processSavepoint() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
StateBackend stateBackend = new HashMapStateBackend();
// Load existing savepoint
ExistingSavepoint existing = Savepoint.load(
env,
"/path/to/input/savepoint",
stateBackend
);
// Read some state
DataSource<MyState> currentState = existing.readListState(
"operator-1",
"list-state",
TypeInformation.of(MyState.class)
);
// Create bootstrap transformation for new state
DataSet<NewState> newData = env.fromCollection(getNewStateData());
BootstrapTransformation<NewState> transformation = OperatorTransformation
.bootstrapWith(newData)
.keyBy(NewState::getKey)
.transform(new MyBootstrapFunction());
// Remove old operator and add new one
existing.removeOperator("operator-1")
.withOperator("operator-2", transformation)
.write("/path/to/output/savepoint");
env.execute("Savepoint Processing");
}
}public class NewSavepointExample {
public void createSavepoint() throws Exception {
StateBackend stateBackend = new HashMapStateBackend();
// Create new savepoint
NewSavepoint savepoint = Savepoint.create(stateBackend, 256);
// Prepare bootstrap data
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<InitialData> bootstrapData = env.readTextFile("/path/to/data")
.map(line -> parseInitialData(line));
// Create transformation
BootstrapTransformation<InitialData> transformation = OperatorTransformation
.bootstrapWith(bootstrapData)
.keyBy(InitialData::getPartitionKey)
.transform(new InitialStateBootstrapFunction());
// Add operator and write
savepoint.withOperator("main-operator", transformation)
.write("/path/to/new/savepoint");
env.execute("Create New Savepoint");
}
}When creating new savepoints, the max parallelism must be specified:
The state backend determines how state is stored and accessed:
// Use HashMapStateBackend for in-memory state
StateBackend hashMapBackend = new HashMapStateBackend();
// Use EmbeddedRocksDBStateBackend for disk-based state
StateBackend rocksDbBackend = new EmbeddedRocksDBStateBackend();
// Use custom configuration
Configuration config = new Configuration();
config.setString("state.backend.rocksdb.localdir", "/tmp/rocksdb");
StateBackend configuredBackend = new EmbeddedRocksDBStateBackend().configure(config);Common exceptions and their meanings:
try {
ExistingSavepoint savepoint = Savepoint.load(env, savepointPath, stateBackend);
// Process savepoint...
} catch (IOException e) {
if (e.getMessage().contains("does not exist")) {
System.err.println("Savepoint path not found: " + savepointPath);
} else {
System.err.println("Failed to load savepoint: " + e.getMessage());
}
} catch (RuntimeException e) {
System.err.println("Savepoint validation failed: " + e.getMessage());
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-state-processor-api-2-12