RocksDB state backend for Apache Flink - provides persistent state storage using RocksDB as the underlying storage engine for stateful stream processing applications
—
The EmbeddedRocksDBStateBackend is the main entry point for using RocksDB as a state backend in Apache Flink. It provides persistent state storage with support for very large state sizes and efficient checkpointing.
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackendFactory;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.util.TernaryBoolean;@PublicEvolving
public class EmbeddedRocksDBStateBackend implements StateBackend, Serializable {
// Main implementation class for RocksDB state backend
}public EmbeddedRocksDBStateBackend()Creates a new EmbeddedRocksDBStateBackend with default settings.
public EmbeddedRocksDBStateBackend(boolean enableIncrementalCheckpointing)Creates a new EmbeddedRocksDBStateBackend with specified incremental checkpointing setting.
Parameters:
enableIncrementalCheckpointing - Whether to enable incremental checkpointingpublic EmbeddedRocksDBStateBackend(TernaryBoolean enableIncrementalCheckpointing)Creates a new EmbeddedRocksDBStateBackend with ternary boolean for incremental checkpointing.
Parameters:
enableIncrementalCheckpointing - Ternary boolean for incremental checkpointing (TRUE, FALSE, UNDEFINED)public void setDbStoragePath(String dbStoragePath)Sets the path where RocksDB stores its data files locally on the TaskManager.
Parameters:
dbStoragePath - The path to the local RocksDB data directoryExample:
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend();
backend.setDbStoragePath("/tmp/flink-rocksdb");public void setDbStoragePaths(String... dbStoragePaths)Sets multiple paths where RocksDB can store its data files, allowing distribution across multiple devices.
Parameters:
dbStoragePaths - Multiple paths to local RocksDB data directoriesExample:
backend.setDbStoragePaths("/ssd1/rocksdb", "/ssd2/rocksdb", "/ssd3/rocksdb");public String[] getDbStoragePaths()Gets the configured storage paths for RocksDB data files.
Returns: Array of configured storage paths
public TernaryBoolean isIncrementalCheckpointsEnabled()Returns whether incremental checkpointing is enabled for this state backend.
Returns: TernaryBoolean indicating incremental checkpointing status
public boolean supportsNoClaimRestoreMode()Returns whether this state backend supports the NO_CLAIM restore mode.
Returns: true - RocksDB backend supports all restore modes
public boolean supportsSavepointFormat(SavepointFormatType formatType)Returns whether this state backend supports the specified savepoint format.
Parameters:
formatType - The savepoint format type to checkReturns: true for all supported savepoint formats
public int getNumberOfTransferThreads()Gets the number of threads used for transferring files during checkpointing.
Returns: Number of transfer threads
public void setNumberOfTransferThreads(int numberOfTransferThreads)Sets the number of threads used for transferring files during checkpointing.
Parameters:
numberOfTransferThreads - Number of threads to use for file transferspublic long getWriteBatchSize()Gets the write batch size for RocksDB operations.
Returns: Write batch size in bytes
public void setWriteBatchSize(long writeBatchSize)Sets the write batch size for RocksDB operations.
Parameters:
writeBatchSize - Write batch size in bytespublic enum PriorityQueueStateType {
HEAP, // Use heap-based priority queue
ROCKSDB // Use RocksDB-based priority queue
}public PriorityQueueStateType getPriorityQueueStateType()Gets the priority queue implementation type.
Returns: Current priority queue state type
public void setPriorityQueueStateType(PriorityQueueStateType priorityQueueStateType)Sets the priority queue implementation type.
Parameters:
priorityQueueStateType - The priority queue implementation to useExample:
// Use RocksDB for priority queue state (timers, windows)
backend.setPriorityQueueStateType(PriorityQueueStateType.ROCKSDB);
// Use heap for priority queue state (default, faster for small state)
backend.setPriorityQueueStateType(PriorityQueueStateType.HEAP);public EmbeddedRocksDBStateBackend configure(ReadableConfig config, ClassLoader classLoader)Creates a copy of this state backend configured with the provided configuration and class loader.
Parameters:
config - The configuration to applyclassLoader - The class loader to useReturns: A new configured instance of this state backend
public RocksDBMemoryConfiguration getMemoryConfiguration()Gets the memory configuration for this RocksDB state backend.
Returns: The RocksDB memory configuration object
Example:
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend();
RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
memConfig.setUseManagedMemory(true);
memConfig.setWriteBufferRatio(0.4);public void setRocksDBMemoryFactory(RocksDBMemoryFactory rocksDBMemoryFactory)Sets the RocksDB memory factory for custom memory management strategies.
Parameters:
rocksDBMemoryFactory - Custom memory factory implementationimport org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
import org.apache.flink.state.rocksdb.PredefinedOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class RocksDBStateBackendExample {
public static void main(String[] args) {
// Create the state backend
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true); // Enable incremental checkpointing
// Configure storage
backend.setDbStoragePath("/tmp/flink-rocksdb");
// Configure performance settings
backend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
backend.setNumberOfTransferThreads(4);
backend.setWriteBatchSize(2 * 1024 * 1024); // 2MB
// Configure priority queue type
backend.setPriorityQueueStateType(
EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB
);
// Configure memory
RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
memConfig.setUseManagedMemory(true);
memConfig.setWriteBufferRatio(0.4);
memConfig.setHighPriorityPoolRatio(0.2);
// Apply to execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(backend);
// Now use the environment for your Flink job
// env.addSource(...).keyBy(...).process(...);
}
}@PublicEvolving
public class EmbeddedRocksDBStateBackendFactory implements StateBackendFactory<EmbeddedRocksDBStateBackend> {
// Factory for creating state backend from configuration
}public EmbeddedRocksDBStateBackend createFromConfig(ReadableConfig config, ClassLoader classLoader)Creates an EmbeddedRocksDBStateBackend from the given configuration.
Parameters:
config - The configuration containing state backend settingsclassLoader - The class loader for loading classesReturns: A configured EmbeddedRocksDBStateBackend instance
Example:
Configuration config = new Configuration();
config.set(RocksDBOptions.LOCAL_DIRECTORIES, "/tmp/rocksdb");
config.set(RocksDBOptions.PREDEFINED_OPTIONS, PredefinedOptions.FLASH_SSD_OPTIMIZED);
EmbeddedRocksDBStateBackendFactory factory = new EmbeddedRocksDBStateBackendFactory();
EmbeddedRocksDBStateBackend backend = factory.createFromConfig(config, getClass().getClassLoader());You can also configure the RocksDB state backend through Flink's configuration system:
# flink-conf.yaml
state.backend: rocksdb
state.backend.rocksdb.localdir: /tmp/flink-rocksdb
state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED
state.backend.rocksdb.checkpoint.transfer.thread.num: 4
state.backend.rocksdb.memory.managed: true
state.backend.rocksdb.memory.write-buffer-ratio: 0.4EmbeddedRocksDBStateBackend configuration methods are not thread-safe and should be called during setup before the job startsconfigure() method to create configured copiesInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-statebackend-rocksdb