RocksDB state backend for Apache Flink streaming applications providing persistent, scalable state storage with fault tolerance, comprehensive configuration options, and native metrics monitoring.
—
Pre-configured RocksDB options optimized for different hardware profiles and use cases, providing easy setup for common deployment scenarios.
Enumeration of pre-configured RocksDB option sets optimized for different hardware and workload characteristics.
/**
* Predefined RocksDB options optimized for different hardware profiles.
* Each option set provides tuned database and column family configurations.
*/
enum PredefinedOptions {
/** Default configuration with basic optimizations */
DEFAULT,
/** Optimized for spinning disk storage (HDDs) */
SPINNING_DISK_OPTIMIZED,
/** Optimized for spinning disks with higher memory usage */
SPINNING_DISK_OPTIMIZED_HIGH_MEM,
/** Optimized for flash SSD storage */
FLASH_SSD_OPTIMIZED;
/**
* Creates database options for this predefined configuration.
* @param handlesToClose collection to register objects that need cleanup
* @return configured DBOptions instance
*/
abstract DBOptions createDBOptions(Collection<AutoCloseable> handlesToClose);
/**
* Creates column family options for this predefined configuration.
* @param handlesToClose collection to register objects that need cleanup
* @return configured ColumnFamilyOptions instance
*/
abstract ColumnFamilyOptions createColumnOptions(Collection<AutoCloseable> handlesToClose);
}Basic configuration suitable for general-purpose workloads with minimal tuning.
Characteristics:
Usage:
EmbeddedRocksDBStateBackend stateBackend = new EmbeddedRocksDBStateBackend();
stateBackend.setPredefinedOptions(PredefinedOptions.DEFAULT);Configuration Details:
setUseFsync(false) - Disables fsync for performancesetInfoLogLevel(InfoLogLevel.HEADER_LEVEL) - Minimal loggingsetStatsDumpPeriodSec(0) - Disables stats dumpingOptimized for traditional hard disk drives (HDDs) with slower sequential I/O characteristics.
Characteristics:
Usage:
EmbeddedRocksDBStateBackend stateBackend = new EmbeddedRocksDBStateBackend();
stateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);Configuration Details:
Database Options:
setIncreaseParallelism(4) - Increases background thread countsetUseFsync(false) - Disables fsync for performancesetMaxOpenFiles(-1) - Unlimited open filessetInfoLogLevel(InfoLogLevel.HEADER_LEVEL) - Minimal loggingsetStatsDumpPeriodSec(0) - Disables stats dumpingColumn Family Options:
setCompactionStyle(CompactionStyle.LEVEL) - Uses level-based compactionsetLevelCompactionDynamicLevelBytes(true) - Enables dynamic level sizingOptimized for spinning disks with higher memory usage to reduce I/O operations.
Characteristics:
Usage:
EmbeddedRocksDBStateBackend stateBackend = new EmbeddedRocksDBStateBackend();
stateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);Configuration Details:
Includes all SPINNING_DISK_OPTIMIZED settings plus:
Enhanced Memory Usage:
Optimized for flash-based SSD storage with fast random I/O characteristics.
Characteristics:
Usage:
EmbeddedRocksDBStateBackend stateBackend = new EmbeddedRocksDBStateBackend();
stateBackend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED);Configuration Details:
Database Options:
setIncreaseParallelism(4) - Increases background thread countsetUseFsync(false) - Disables fsync for performancesetMaxOpenFiles(-1) - Unlimited open filessetInfoLogLevel(InfoLogLevel.HEADER_LEVEL) - Minimal loggingsetStatsDumpPeriodSec(0) - Disables stats dumpingColumn Family Options:
| Configuration | Use Case | Memory Usage | I/O Pattern | Parallelism |
|---|---|---|---|---|
| DEFAULT | General purpose | Low | Balanced | Default |
| SPINNING_DISK_OPTIMIZED | HDD storage | Moderate | Sequential-optimized | High (4x) |
| SPINNING_DISK_OPTIMIZED_HIGH_MEM | HDD with more RAM | High | Sequential-optimized | High (4x) |
| FLASH_SSD_OPTIMIZED | SSD storage | Moderate | Random-optimized | High (4x) |
Recommended: SPINNING_DISK_OPTIMIZED or SPINNING_DISK_OPTIMIZED_HIGH_MEM
// For HDDs with limited memory
stateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
// For HDDs with abundant memory (>8GB available for Flink)
stateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);Benefits:
Recommended: FLASH_SSD_OPTIMIZED
stateBackend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED);Benefits:
Recommended: Start with FLASH_SSD_OPTIMIZED, tune based on performance characteristics
// Most cloud storage behaves like SSDs
stateBackend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED);
// For high-IOPS volumes with abundant memory
stateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.PredefinedOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create state backend with incremental checkpointing
EmbeddedRocksDBStateBackend stateBackend = new EmbeddedRocksDBStateBackend(true);
// Configure for SSD storage
stateBackend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED);
stateBackend.setDbStoragePath("/ssd/flink/rocksdb");
env.setStateBackend(stateBackend);import org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory;
// Start with predefined options
EmbeddedRocksDBStateBackend stateBackend = new EmbeddedRocksDBStateBackend(true);
stateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
// Add custom optimizations
DefaultConfigurableOptionsFactory customFactory = new DefaultConfigurableOptionsFactory()
.setWriteBufferSize("128mb") // Custom write buffer size
.setBlockCacheSize("512mb") // Custom block cache size
.setUseBloomFilter(true) // Enable Bloom filter
.setBloomFilterBitsPerKey(10.0); // Configure Bloom filter
stateBackend.setRocksDBOptions(customFactory);// Development/Testing Environment
EmbeddedRocksDBStateBackend devBackend = new EmbeddedRocksDBStateBackend(false);
devBackend.setPredefinedOptions(PredefinedOptions.DEFAULT);
// Production Environment with HDDs
EmbeddedRocksDBStateBackend prodBackend = new EmbeddedRocksDBStateBackend(true);
prodBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);
prodBackend.setDbStoragePaths("/data1/rocksdb", "/data2/rocksdb", "/data3/rocksdb");
// Production Environment with SSDs
EmbeddedRocksDBStateBackend ssdBackend = new EmbeddedRocksDBStateBackend(true);
ssdBackend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED);
ssdBackend.setDbStoragePath("/nvme/flink/rocksdb");// Optimized setup for high-memory SSD environment
EmbeddedRocksDBStateBackend stateBackend = new EmbeddedRocksDBStateBackend(true);
stateBackend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED);
// Configure memory allocation
RocksDBMemoryConfiguration memConfig = stateBackend.getMemoryConfiguration();
memConfig.setUseManagedMemory(true);
memConfig.setWriteBufferRatio(0.3); // More memory for caching with SSD
memConfig.setHighPriorityPoolRatio(0.1);
stateBackend.setNumberOfTransferThreads(8); // More threads for SSDInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-statebackend-rocksdb-2-12