RocksDB state backend for Apache Flink - provides persistent state storage using RocksDB as the underlying storage engine for stateful stream processing applications
—
The RocksDB State Backend provides flexible configuration through options factories and predefined option sets. This allows fine-tuning of RocksDB behavior for different hardware configurations and performance requirements.
import org.apache.flink.state.rocksdb.RocksDBOptionsFactory;
import org.apache.flink.state.rocksdb.ConfigurableRocksDBOptionsFactory;
import org.apache.flink.state.rocksdb.PredefinedOptions;
import org.apache.flink.state.rocksdb.RocksDBOptions;
import org.apache.flink.state.rocksdb.RocksDBConfigurableOptions;
import org.apache.flink.configuration.ConfigOption;
import org.rocksdb.DBOptions;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.WriteOptions;
import org.rocksdb.ReadOptions;
import java.util.Collection;
import java.io.Serializable;@PublicEvolving
public interface RocksDBOptionsFactory extends Serializable {
// Factory interface for creating RocksDB options
}DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose)Creates and configures RocksDB database options.
Parameters:
currentOptions - The current DBOptions to modify or replacehandlesToClose - Collection to register resources that need cleanupReturns: Configured DBOptions instance
ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose)Creates and configures RocksDB column family options.
Parameters:
currentOptions - The current ColumnFamilyOptions to modify or replacehandlesToClose - Collection to register resources that need cleanupReturns: Configured ColumnFamilyOptions instance
default RocksDBNativeMetricOptions createNativeMetricsOptions(RocksDBNativeMetricOptions nativeMetricOptions)Creates and configures native metrics options.
Parameters:
nativeMetricOptions - The current native metrics optionsReturns: Configured RocksDBNativeMetricOptions instance
default WriteOptions createWriteOptions(WriteOptions currentOptions, Collection<AutoCloseable> handlesToClose)Creates and configures RocksDB write options.
Parameters:
currentOptions - The current WriteOptions to modify or replacehandlesToClose - Collection to register resources that need cleanupReturns: Configured WriteOptions instance
default ReadOptions createReadOptions(ReadOptions currentOptions, Collection<AutoCloseable> handlesToClose)Creates and configures RocksDB read options.
Parameters:
currentOptions - The current ReadOptions to modify or replacehandlesToClose - Collection to register resources that need cleanupReturns: Configured ReadOptions instance
@PublicEvolving
public interface ConfigurableRocksDBOptionsFactory extends RocksDBOptionsFactory {
// Extension for configuration-aware factories
}ConfigurableRocksDBOptionsFactory configure(ReadableConfig configuration)Returns a configured instance of this factory based on the provided configuration.
Parameters:
configuration - The configuration containing RocksDB settingsReturns: A new configured factory instance
import org.apache.flink.state.rocksdb.RocksDBOptionsFactory;
import org.rocksdb.*;
import java.util.Collection;
public class CustomRocksDBOptionsFactory implements RocksDBOptionsFactory {
@Override
public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
return currentOptions
.setMaxBackgroundJobs(4) // Limit concurrent background jobs
.setMaxOpenFiles(1000) // Limit open file handles
.setCreateIfMissing(true) // Create database if missing
.setErrorIfExists(false); // Don't error if database exists
}
@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
// Create block-based table config for better caching
BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
tableConfig.setBlockSize(64 * 1024); // 64KB blocks
tableConfig.setBlockCacheSize(256 * 1024 * 1024); // 256MB cache
tableConfig.setCacheIndexAndFilterBlocks(true);
// Register for cleanup
handlesToClose.add(tableConfig);
return currentOptions
.setTableFormatConfig(tableConfig)
.setWriteBufferSize(128 * 1024 * 1024) // 128MB write buffer
.setMaxWriteBufferNumber(3) // Max 3 write buffers
.setMinWriteBufferNumberToMerge(1) // Merge threshold
.setCompressionType(CompressionType.LZ4_COMPRESSION)
.setCompactionStyle(CompactionStyle.LEVEL);
}
}import org.apache.flink.state.rocksdb.ConfigurableRocksDBOptionsFactory;
import org.apache.flink.configuration.ReadableConfig;
import org.rocksdb.*;
public class ConfigurableCustomOptionsFactory implements ConfigurableRocksDBOptionsFactory {
private int maxBackgroundJobs = 2;
private long writeBufferSize = 64 * 1024 * 1024; // 64MB
private long blockCacheSize = 128 * 1024 * 1024; // 128MB
@Override
public ConfigurableRocksDBOptionsFactory configure(ReadableConfig configuration) {
ConfigurableCustomOptionsFactory configured = new ConfigurableCustomOptionsFactory();
// Read configuration values
configured.maxBackgroundJobs = configuration.get(RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS);
configured.writeBufferSize = configuration.get(RocksDBConfigurableOptions.WRITE_BUFFER_SIZE).getBytes();
configured.blockCacheSize = configuration.get(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE).getBytes();
return configured;
}
@Override
public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
return currentOptions.setMaxBackgroundJobs(maxBackgroundJobs);
}
@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
tableConfig.setBlockCacheSize(blockCacheSize);
handlesToClose.add(tableConfig);
return currentOptions
.setTableFormatConfig(tableConfig)
.setWriteBufferSize(writeBufferSize);
}
}public enum PredefinedOptions {
DEFAULT, // Default RocksDB settings
SPINNING_DISK_OPTIMIZED, // Optimized for spinning disk storage
SPINNING_DISK_OPTIMIZED_HIGH_MEM, // Spinning disk with higher memory usage
FLASH_SSD_OPTIMIZED // Optimized for Flash SSD storage
}DEFAULT
SPINNING_DISK_OPTIMIZED
SPINNING_DISK_OPTIMIZED_HIGH_MEM
FLASH_SSD_OPTIMIZED
public <T> T getValue(ConfigOption<T> option)Gets the value of a specific configuration option for this predefined option set.
Parameters:
option - The configuration option to retrieveReturns: The value configured for this option in this predefined set
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend();
// Set predefined options based on your storage type
backend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED);
// Get current predefined options
PredefinedOptions current = backend.getPredefinedOptions();@PublicEvolving
public class RocksDBOptions {
// Configuration options for RocksDB backend
}public static final ConfigOption<String> LOCAL_DIRECTORIESLocal directories where RocksDB stores its data files.
public static final ConfigOption<RocksDBOptions.TimerServiceFactory> TIMER_SERVICE_FACTORYTimer service implementation choice (HEAP or ROCKSDB).
public static final ConfigOption<Integer> ROCKSDB_TIMER_SERVICE_FACTORY_CACHE_SIZECache size per key-group for RocksDB timer service.
public static final ConfigOption<Integer> CHECKPOINT_TRANSFER_THREAD_NUMNumber of threads for transferring files during checkpointing.
public static final ConfigOption<PredefinedOptions> PREDEFINED_OPTIONSPredefined RocksDB configuration settings.
public static final ConfigOption<String> OPTIONS_FACTORYFully qualified class name of custom RocksDBOptionsFactory.
public static final ConfigOption<Boolean> USE_MANAGED_MEMORYWhether to use Flink's managed memory for RocksDB.
public static final ConfigOption<MemorySize> FIX_PER_SLOT_MEMORY_SIZEFixed amount of memory per slot for RocksDB.
public static final ConfigOption<MemorySize> FIX_PER_TM_MEMORY_SIZEFixed amount of memory per TaskManager for RocksDB.
public static final ConfigOption<Double> WRITE_BUFFER_RATIOFraction of memory used for RocksDB write buffers.
public static final ConfigOption<Double> HIGH_PRIORITY_POOL_RATIOFraction of block cache reserved for high priority blocks.
public static final ConfigOption<Boolean> USE_PARTITIONED_INDEX_FILTERSWhether to use partitioned index/filters in RocksDB.
@PublicEvolving
public class RocksDBConfigurableOptions {
// Detailed configurable options for RocksDB behavior
}public static final ConfigOption<Integer> MAX_BACKGROUND_THREADSMaximum number of concurrent background jobs (compactions + flushes).
public static final ConfigOption<Integer> MAX_OPEN_FILESMaximum number of open files that can be cached by RocksDB.
public static final ConfigOption<MemorySize> LOG_MAX_FILE_SIZEMaximum size of RocksDB log files.
public static final ConfigOption<Integer> LOG_FILE_NUMMaximum number of RocksDB log files to keep.
public static final ConfigOption<String> LOG_DIRDirectory for RocksDB log files.
public static final ConfigOption<RocksDBLogLevel> LOG_LEVELRocksDB information logging level.
public static final ConfigOption<CompactionStyle> COMPACTION_STYLECompaction style: LEVEL, FIFO, UNIVERSAL, or NONE.
public static final ConfigOption<Boolean> USE_DYNAMIC_LEVEL_SIZEWhether to use dynamic level sizing for leveled compaction.
public static final ConfigOption<List<CompressionType>> COMPRESSION_PER_LEVELCompression algorithm to use for each level.
public static final ConfigOption<MemorySize> TARGET_FILE_SIZE_BASETarget file size for compaction (level 1).
public static final ConfigOption<MemorySize> MAX_SIZE_LEVEL_BASEMaximum total size for level 1.
public static final ConfigOption<MemorySize> WRITE_BUFFER_SIZEAmount of data to build up in memory before writing to disk.
public static final ConfigOption<Integer> MAX_WRITE_BUFFER_NUMBERMaximum number of write buffers maintained in memory.
public static final ConfigOption<Integer> MIN_WRITE_BUFFER_NUMBER_TO_MERGEMinimum number of write buffers to merge before writing to storage.
public static final ConfigOption<MemorySize> BLOCK_SIZEBlock size for RocksDB data blocks.
public static final ConfigOption<MemorySize> METADATA_BLOCK_SIZEBlock size for RocksDB metadata blocks.
public static final ConfigOption<MemorySize> BLOCK_CACHE_SIZESize of the block cache for uncompressed blocks.
public static final ConfigOption<Boolean> USE_BLOOM_FILTERWhether to use bloom filter to reduce disk reads.
public static final ConfigOption<Double> BLOOM_FILTER_BITS_PER_KEYNumber of bits per key for bloom filter.
public static final ConfigOption<Boolean> BLOOM_FILTER_BLOCK_BASED_MODEWhether to use block-based mode for bloom filter.
public static void checkArgumentValid(ConfigOption<?> option, Object value)Validates that a configuration value is valid for the given option.
Parameters:
option - The configuration option to validate againstvalue - The value to validateThrows: IllegalArgumentException if value is invalid
import org.apache.flink.configuration.Configuration;
import org.apache.flink.state.rocksdb.*;
public class RocksDBConfigurationExample {
public static Configuration createRocksDBConfig() {
Configuration config = new Configuration();
// Basic configuration
config.set(RocksDBOptions.LOCAL_DIRECTORIES, "/ssd1/rocksdb,/ssd2/rocksdb");
config.set(RocksDBOptions.PREDEFINED_OPTIONS, PredefinedOptions.FLASH_SSD_OPTIMIZED);
// Memory configuration
config.set(RocksDBOptions.USE_MANAGED_MEMORY, true);
config.set(RocksDBOptions.WRITE_BUFFER_RATIO, 0.4);
config.set(RocksDBOptions.HIGH_PRIORITY_POOL_RATIO, 0.2);
// Advanced RocksDB options
config.set(RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS, 4);
config.set(RocksDBConfigurableOptions.WRITE_BUFFER_SIZE, MemorySize.ofMebiBytes(128));
config.set(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE, MemorySize.ofMebiBytes(256));
config.set(RocksDBConfigurableOptions.USE_BLOOM_FILTER, true);
config.set(RocksDBConfigurableOptions.BLOOM_FILTER_BITS_PER_KEY, 10.0);
// Custom options factory
config.set(RocksDBOptions.OPTIONS_FACTORY, "com.example.CustomRocksDBOptionsFactory");
return config;
}
}// Method 1: Direct configuration
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend();
backend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
backend.setRocksDBOptions(new CustomRocksDBOptionsFactory());
// Method 2: Configuration-based setup
Configuration config = createRocksDBConfig();
EmbeddedRocksDBStateBackendFactory factory = new EmbeddedRocksDBStateBackendFactory();
EmbeddedRocksDBStateBackend backend = factory.createFromConfig(config, getClass().getClassLoader());
// Method 3: Runtime configuration
EmbeddedRocksDBStateBackend configuredBackend = backend.configure(config, getClass().getClassLoader());FLASH_SSD_OPTIMIZED for SSDs, SPINNING_DISK_OPTIMIZED for HDDsUSE_MANAGED_MEMORYhandlesToClose for proper cleanupConfigurableRocksDBOptionsFactory for configuration integrationInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-statebackend-rocksdb