RocksDB state backend for Apache Flink streaming applications providing persistent, scalable state storage with fault tolerance, comprehensive configuration options, and native metrics monitoring.
—
Factory pattern for customizing RocksDB database and column family options, enabling fine-grained performance tuning for specific use cases and hardware configurations.
The core interface for creating custom RocksDB options.
/**
* Factory interface for creating RocksDB options.
* Allows customization of database options, column family options, and other RocksDB settings.
*/
interface RocksDBOptionsFactory extends Serializable {
/**
* Creates or modifies database options for RocksDB.
* @param currentOptions current DB options to modify
* @param handlesToClose collection to register objects that need cleanup
* @return configured DB options
*/
DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose);
/**
* Creates or modifies column family options for RocksDB.
* @param currentOptions current column family options to modify
* @param handlesToClose collection to register objects that need cleanup
* @return configured column family options
*/
ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose);
/**
* Creates native metrics options for RocksDB monitoring.
* @param nativeMetricOptions current native metrics options
* @return configured native metrics options
*/
default RocksDBNativeMetricOptions createNativeMetricsOptions(RocksDBNativeMetricOptions nativeMetricOptions) {
return nativeMetricOptions;
}
/**
* Creates write options for RocksDB write operations.
* @param currentOptions current write options to modify
* @param handlesToClose collection to register objects that need cleanup
* @return configured write options
*/
default WriteOptions createWriteOptions(WriteOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
return currentOptions;
}
/**
* Creates read options for RocksDB read operations.
* @param currentOptions current read options to modify
* @param handlesToClose collection to register objects that need cleanup
* @return configured read options
*/
default ReadOptions createReadOptions(ReadOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
return currentOptions;
}
}Usage Example:
// Custom options factory implementation
RocksDBOptionsFactory customFactory = new RocksDBOptionsFactory() {
@Override
public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
return currentOptions
.setMaxOpenFiles(1024)
.setMaxBackgroundJobs(4)
.setUseFsync(false);
}
@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
BloomFilter bloomFilter = new BloomFilter(10);
handlesToClose.add(bloomFilter);
return currentOptions
.setWriteBufferSize(128 * 1024 * 1024) // 128MB
.setTableFormatConfig(
new BlockBasedTableConfig().setFilter(bloomFilter)
);
}
};
stateBackend.setRocksDBOptions(customFactory);Default implementation that provides configurable options through method chaining.
/**
* Default configurable options factory that can be configured through method chaining.
* Provides common RocksDB configuration options with sensible defaults.
*/
class DefaultConfigurableOptionsFactory implements RocksDBOptionsFactory, ConfigurableRocksDBOptionsFactory {
/**
* Creates a new factory instance.
*/
DefaultConfigurableOptionsFactory();
/**
* Creates a configured factory from ReadableConfig.
* @param configuration configuration to read from
* @return configured factory instance
*/
DefaultConfigurableOptionsFactory configure(ReadableConfig configuration);
}Configure RocksDB database-level options.
/**
* Sets the total number of background threads (compaction + flush).
* @param totalThreadCount total background thread count
* @return this factory for method chaining
*/
DefaultConfigurableOptionsFactory setMaxBackgroundThreads(int totalThreadCount);
/**
* Sets the maximum number of open files that RocksDB can keep open.
* @param maxOpenFiles maximum open files (-1 for unlimited)
* @return this factory for method chaining
*/
DefaultConfigurableOptionsFactory setMaxOpenFiles(int maxOpenFiles);
/**
* Sets the log level for RocksDB logging.
* @param logLevel log level for RocksDB
* @return this factory for method chaining
*/
DefaultConfigurableOptionsFactory setLogLevel(InfoLogLevel logLevel);
/**
* Sets the directory for RocksDB log files.
* @param logDir directory path for log files
* @return this factory for method chaining
*/
DefaultConfigurableOptionsFactory setLogDir(String logDir);
/**
* Sets the maximum size of a single log file.
* @param maxLogFileSize maximum log file size (e.g., "64mb")
* @return this factory for method chaining
*/
DefaultConfigurableOptionsFactory setMaxLogFileSize(String maxLogFileSize);
/**
* Sets the number of log files to keep.
* @param logFileNum number of log files to retain
* @return this factory for method chaining
*/
DefaultConfigurableOptionsFactory setLogFileNum(int logFileNum);Configure RocksDB column family options for performance tuning.
/**
* Sets the compaction style for RocksDB.
* @param compactionStyle compaction algorithm to use
* @return this factory for method chaining
*/
DefaultConfigurableOptionsFactory setCompactionStyle(CompactionStyle compactionStyle);
/**
* Enables or disables dynamic level size adjustment.
* @param value whether to use dynamic level sizes
* @return this factory for method chaining
*/
DefaultConfigurableOptionsFactory setUseDynamicLevelSize(boolean value);
/**
* Sets the target file size for L1 level.
* @param targetFileSizeBase target file size (e.g., "64mb")
* @return this factory for method chaining
*/
DefaultConfigurableOptionsFactory setTargetFileSizeBase(String targetFileSizeBase);
/**
* Sets the maximum total data size for level L1.
* @param maxSizeLevelBase maximum size for L1 (e.g., "256mb")
* @return this factory for method chaining
*/
DefaultConfigurableOptionsFactory setMaxSizeLevelBase(String maxSizeLevelBase);
/**
* Sets the size of write buffer (memtable).
* @param writeBufferSize write buffer size (e.g., "64mb")
* @return this factory for method chaining
*/
DefaultConfigurableOptionsFactory setWriteBufferSize(String writeBufferSize);
/**
* Sets the maximum number of write buffers.
* @param writeBufferNumber maximum write buffer count
* @return this factory for method chaining
*/
DefaultConfigurableOptionsFactory setMaxWriteBufferNumber(int writeBufferNumber);
/**
* Sets the minimum number of write buffers to merge.
* @param writeBufferNumber minimum buffers to merge
* @return this factory for method chaining
*/
DefaultConfigurableOptionsFactory setMinWriteBufferNumberToMerge(int writeBufferNumber);Configure block size and caching options.
/**
* Sets the block size for RocksDB.
* @param blockSize block size (e.g., "4kb", "16kb")
* @return this factory for method chaining
*/
DefaultConfigurableOptionsFactory setBlockSize(String blockSize);
/**
* Sets the metadata block size.
* @param metadataBlockSize metadata block size (e.g., "4kb")
* @return this factory for method chaining
*/
DefaultConfigurableOptionsFactory setMetadataBlockSize(String metadataBlockSize);
/**
* Sets the block cache size for frequently accessed blocks.
* @param blockCacheSize block cache size (e.g., "256mb")
* @return this factory for method chaining
*/
DefaultConfigurableOptionsFactory setBlockCacheSize(String blockCacheSize);Configure Bloom filter options for read performance optimization.
/**
* Enables or disables Bloom filter.
* @param useBloomFilter whether to use Bloom filter
* @return this factory for method chaining
*/
DefaultConfigurableOptionsFactory setUseBloomFilter(boolean useBloomFilter);
/**
* Sets the number of bits per key for Bloom filter.
* @param bitsPerKey bits per key for Bloom filter (typically 10)
* @return this factory for method chaining
*/
DefaultConfigurableOptionsFactory setBloomFilterBitsPerKey(double bitsPerKey);
/**
* Sets the Bloom filter mode (block-based vs full filter).
* @param blockBasedMode true for block-based, false for full filter
* @return this factory for method chaining
*/
DefaultConfigurableOptionsFactory setBloomFilterBlockBasedMode(boolean blockBasedMode);Usage Examples:
// Basic configuration
DefaultConfigurableOptionsFactory factory = new DefaultConfigurableOptionsFactory()
.setMaxBackgroundThreads(4)
.setMaxOpenFiles(-1)
.setWriteBufferSize("128mb")
.setBlockCacheSize("256mb");
// Advanced configuration with Bloom filter
DefaultConfigurableOptionsFactory advancedFactory = new DefaultConfigurableOptionsFactory()
.setCompactionStyle(CompactionStyle.LEVEL)
.setUseDynamicLevelSize(true)
.setTargetFileSizeBase("64mb")
.setMaxSizeLevelBase("256mb")
.setWriteBufferSize("64mb")
.setMaxWriteBufferNumber(3)
.setMinWriteBufferNumberToMerge(2)
.setBlockSize("16kb")
.setBlockCacheSize("512mb")
.setUseBloomFilter(true)
.setBloomFilterBitsPerKey(10.0)
.setBloomFilterBlockBasedMode(false);
stateBackend.setRocksDBOptions(advancedFactory);Interface for factories that can be configured from Flink configuration.
/**
* Interface for RocksDB options factories that can be configured from ReadableConfig.
*/
interface ConfigurableRocksDBOptionsFactory {
/**
* Creates a configured factory from ReadableConfig.
* @param configuration configuration to read settings from
* @return configured RocksDBOptionsFactory instance
*/
RocksDBOptionsFactory configure(ReadableConfig configuration);
}// RocksDB native types (from org.rocksdb package)
enum CompactionStyle {
LEVEL, // Level-based compaction (default)
UNIVERSAL, // Universal compaction
FIFO, // FIFO compaction
NONE // No compaction
}
enum InfoLogLevel {
DEBUG_LEVEL,
INFO_LEVEL,
WARN_LEVEL,
ERROR_LEVEL,
FATAL_LEVEL,
HEADER_LEVEL
}
class DBOptions {
// RocksDB database options
}
class ColumnFamilyOptions {
// RocksDB column family options
}
class WriteOptions {
// RocksDB write options
}
class ReadOptions {
// RocksDB read options
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-statebackend-rocksdb-2-12