RocksDB state backend for Apache Flink streaming applications providing persistent, scalable state storage with fault tolerance, comprehensive configuration options, and native metrics monitoring.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-statebackend-rocksdb-2-12@1.14.0Apache Flink RocksDB State Backend provides a persistent state backend implementation that uses RocksDB as the underlying storage engine. This enables streaming applications to maintain large amounts of keyed state that can survive job failures and restarts through checkpoints, with support for incremental checkpointing and customizable performance tuning.
pom.xml:<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
<version>1.14.6</version>
</dependency>import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBOptions;
import org.apache.flink.contrib.streaming.state.PredefinedOptions;
import org.apache.flink.contrib.streaming.state.RocksDBOptionsFactory;
import org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory;
import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackendFactory;Note: The legacy RocksDBStateBackend class is deprecated as of Flink 1.13. Use EmbeddedRocksDBStateBackend for new applications.
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.PredefinedOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
// Create Flink execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configure RocksDB state backend
EmbeddedRocksDBStateBackend stateBackend = new EmbeddedRocksDBStateBackend(true);
stateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
stateBackend.setDbStoragePath("/path/to/rocksdb/storage");
// Set state backend on environment
env.setStateBackend(stateBackend);
// Your Flink application logic...The RocksDB State Backend is built around several key components:
Core functionality for creating and configuring the RocksDB state backend, including storage paths, incremental checkpointing, and basic setup options.
class EmbeddedRocksDBStateBackend {
EmbeddedRocksDBStateBackend();
EmbeddedRocksDBStateBackend(boolean enableIncrementalCheckpointing);
void setDbStoragePath(String path);
void setDbStoragePaths(String... paths);
void setPredefinedOptions(PredefinedOptions options);
void setRocksDBOptions(RocksDBOptionsFactory optionsFactory);
}Factory pattern for customizing RocksDB database and column family options, enabling fine-grained performance tuning for specific use cases and hardware configurations.
interface RocksDBOptionsFactory {
DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose);
ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose);
default RocksDBNativeMetricOptions createNativeMetricsOptions(RocksDBNativeMetricOptions nativeMetricOptions);
}
class DefaultConfigurableOptionsFactory implements RocksDBOptionsFactory, ConfigurableRocksDBOptionsFactory {
DefaultConfigurableOptionsFactory setMaxBackgroundThreads(int totalThreadCount);
DefaultConfigurableOptionsFactory setWriteBufferSize(String writeBufferSize);
DefaultConfigurableOptionsFactory setBlockCacheSize(String blockCacheSize);
}Memory management configuration for RocksDB integration with Flink's managed memory system, including write buffer ratios and memory allocation strategies.
class RocksDBMemoryConfiguration {
void setUseManagedMemory(boolean useManagedMemory);
void setFixedMemoryPerSlot(MemorySize fixedMemoryPerSlot);
void setWriteBufferRatio(double writeBufferRatio);
void setHighPriorityPoolRatio(double highPriorityPoolRatio);
boolean isUsingManagedMemory();
}Pre-configured RocksDB options optimized for different hardware profiles and use cases, providing easy setup for common deployment scenarios.
enum PredefinedOptions {
DEFAULT,
SPINNING_DISK_OPTIMIZED,
SPINNING_DISK_OPTIMIZED_HIGH_MEM,
FLASH_SSD_OPTIMIZED;
abstract DBOptions createDBOptions(Collection<AutoCloseable> handlesToClose);
abstract ColumnFamilyOptions createColumnOptions(Collection<AutoCloseable> handlesToClose);
}Factory class for creating state backend instances from configuration, enabling integration with Flink's configuration system and deployment frameworks.
class EmbeddedRocksDBStateBackendFactory {
static EmbeddedRocksDBStateBackend createFromConfig(ReadableConfig config, ClassLoader classLoader);
}Configuration for RocksDB native metrics monitoring, enabling detailed performance monitoring and observability of RocksDB internal operations.
class RocksDBNativeMetricOptions {
static RocksDBNativeMetricOptions fromConfig(ReadableConfig config);
void enableNumImmutableMemTable();
void enableCompactionPending();
void enableBlockCacheUsage();
void setColumnFamilyAsVariable(boolean columnFamilyAsVariable);
}enum EmbeddedRocksDBStateBackend.PriorityQueueStateType {
HEAP,
ROCKSDB
}
enum org.rocksdb.CompactionStyle {
LEVEL,
UNIVERSAL,
FIFO,
NONE
}
enum org.rocksdb.InfoLogLevel {
DEBUG_LEVEL,
INFO_LEVEL,
WARN_LEVEL,
ERROR_LEVEL,
FATAL_LEVEL,
HEADER_LEVEL
}