CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-statebackend-rocksdb

RocksDB state backend for Apache Flink - provides persistent state storage using RocksDB as the underlying storage engine for stateful stream processing applications

Pending
Overview
Eval results
Files

options-and-factories.mddocs/

RocksDB Options and Factory Patterns

The RocksDB State Backend provides flexible configuration through options factories and predefined option sets. This allows fine-tuning of RocksDB behavior for different hardware configurations and performance requirements.

Core Imports

import org.apache.flink.state.rocksdb.RocksDBOptionsFactory;
import org.apache.flink.state.rocksdb.ConfigurableRocksDBOptionsFactory;
import org.apache.flink.state.rocksdb.PredefinedOptions;
import org.apache.flink.state.rocksdb.RocksDBOptions;
import org.apache.flink.state.rocksdb.RocksDBConfigurableOptions;
import org.apache.flink.configuration.ConfigOption;
import org.rocksdb.DBOptions;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.WriteOptions;
import org.rocksdb.ReadOptions;
import java.util.Collection;
import java.io.Serializable;

RocksDBOptionsFactory Interface

Interface Definition

@PublicEvolving
public interface RocksDBOptionsFactory extends Serializable {
    // Factory interface for creating RocksDB options
}

Required Methods

DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose)

Creates and configures RocksDB database options.

Parameters:

  • currentOptions - The current DBOptions to modify or replace
  • handlesToClose - Collection to register resources that need cleanup

Returns: Configured DBOptions instance

ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose)

Creates and configures RocksDB column family options.

Parameters:

  • currentOptions - The current ColumnFamilyOptions to modify or replace
  • handlesToClose - Collection to register resources that need cleanup

Returns: Configured ColumnFamilyOptions instance

Optional Methods

default RocksDBNativeMetricOptions createNativeMetricsOptions(RocksDBNativeMetricOptions nativeMetricOptions)

Creates and configures native metrics options.

Parameters:

  • nativeMetricOptions - The current native metrics options

Returns: Configured RocksDBNativeMetricOptions instance

default WriteOptions createWriteOptions(WriteOptions currentOptions, Collection<AutoCloseable> handlesToClose)

Creates and configures RocksDB write options.

Parameters:

  • currentOptions - The current WriteOptions to modify or replace
  • handlesToClose - Collection to register resources that need cleanup

Returns: Configured WriteOptions instance

default ReadOptions createReadOptions(ReadOptions currentOptions, Collection<AutoCloseable> handlesToClose)

Creates and configures RocksDB read options.

Parameters:

  • currentOptions - The current ReadOptions to modify or replace
  • handlesToClose - Collection to register resources that need cleanup

Returns: Configured ReadOptions instance

ConfigurableRocksDBOptionsFactory Interface

Interface Definition

@PublicEvolving
public interface ConfigurableRocksDBOptionsFactory extends RocksDBOptionsFactory {
    // Extension for configuration-aware factories
}

Configuration Method

ConfigurableRocksDBOptionsFactory configure(ReadableConfig configuration)

Returns a configured instance of this factory based on the provided configuration.

Parameters:

  • configuration - The configuration containing RocksDB settings

Returns: A new configured factory instance

Custom Options Factory Example

import org.apache.flink.state.rocksdb.RocksDBOptionsFactory;
import org.rocksdb.*;
import java.util.Collection;

public class CustomRocksDBOptionsFactory implements RocksDBOptionsFactory {
    
    @Override
    public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
        return currentOptions
            .setMaxBackgroundJobs(4)           // Limit concurrent background jobs
            .setMaxOpenFiles(1000)             // Limit open file handles
            .setCreateIfMissing(true)          // Create database if missing
            .setErrorIfExists(false);          // Don't error if database exists
    }
    
    @Override
    public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
        // Create block-based table config for better caching
        BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
        tableConfig.setBlockSize(64 * 1024);           // 64KB blocks
        tableConfig.setBlockCacheSize(256 * 1024 * 1024); // 256MB cache
        tableConfig.setCacheIndexAndFilterBlocks(true);
        
        // Register for cleanup
        handlesToClose.add(tableConfig);
        
        return currentOptions
            .setTableFormatConfig(tableConfig)
            .setWriteBufferSize(128 * 1024 * 1024)     // 128MB write buffer
            .setMaxWriteBufferNumber(3)                 // Max 3 write buffers
            .setMinWriteBufferNumberToMerge(1)          // Merge threshold
            .setCompressionType(CompressionType.LZ4_COMPRESSION)
            .setCompactionStyle(CompactionStyle.LEVEL);
    }
}

Configurable Options Factory Example

import org.apache.flink.state.rocksdb.ConfigurableRocksDBOptionsFactory;
import org.apache.flink.configuration.ReadableConfig;
import org.rocksdb.*;

public class ConfigurableCustomOptionsFactory implements ConfigurableRocksDBOptionsFactory {
    
    private int maxBackgroundJobs = 2;
    private long writeBufferSize = 64 * 1024 * 1024; // 64MB
    private long blockCacheSize = 128 * 1024 * 1024; // 128MB
    
    @Override
    public ConfigurableRocksDBOptionsFactory configure(ReadableConfig configuration) {
        ConfigurableCustomOptionsFactory configured = new ConfigurableCustomOptionsFactory();
        
        // Read configuration values
        configured.maxBackgroundJobs = configuration.get(RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS);
        configured.writeBufferSize = configuration.get(RocksDBConfigurableOptions.WRITE_BUFFER_SIZE).getBytes();
        configured.blockCacheSize = configuration.get(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE).getBytes();
        
        return configured;
    }
    
    @Override
    public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
        return currentOptions.setMaxBackgroundJobs(maxBackgroundJobs);
    }
    
    @Override
    public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
        BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
        tableConfig.setBlockCacheSize(blockCacheSize);
        handlesToClose.add(tableConfig);
        
        return currentOptions
            .setTableFormatConfig(tableConfig)
            .setWriteBufferSize(writeBufferSize);
    }
}

PredefinedOptions Enum

Enum Definition

public enum PredefinedOptions {
    DEFAULT,                           // Default RocksDB settings
    SPINNING_DISK_OPTIMIZED,          // Optimized for spinning disk storage  
    SPINNING_DISK_OPTIMIZED_HIGH_MEM, // Spinning disk with higher memory usage
    FLASH_SSD_OPTIMIZED               // Optimized for Flash SSD storage
}

Enum Values

DEFAULT

  • Standard RocksDB configuration with no specific optimizations
  • Suitable for general-purpose usage and development

SPINNING_DISK_OPTIMIZED

  • Optimized for traditional spinning disk storage
  • Reduces random I/O operations
  • Larger write buffers to batch writes efficiently
  • Conservative compaction settings

SPINNING_DISK_OPTIMIZED_HIGH_MEM

  • Similar to spinning disk optimization but with higher memory usage
  • Larger block caches and write buffers
  • More aggressive caching strategies

FLASH_SSD_OPTIMIZED

  • Optimized for Flash SSD storage
  • Takes advantage of faster random access
  • More aggressive compaction
  • Smaller write buffers due to faster writes

Methods

public <T> T getValue(ConfigOption<T> option)

Gets the value of a specific configuration option for this predefined option set.

Parameters:

  • option - The configuration option to retrieve

Returns: The value configured for this option in this predefined set

Usage with State Backend

EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend();

// Set predefined options based on your storage type
backend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED);

// Get current predefined options
PredefinedOptions current = backend.getPredefinedOptions();

RocksDB Configuration Options

RocksDBOptions Class

@PublicEvolving  
public class RocksDBOptions {
    // Configuration options for RocksDB backend
}

Key Configuration Options

public static final ConfigOption<String> LOCAL_DIRECTORIES

Local directories where RocksDB stores its data files.

public static final ConfigOption<RocksDBOptions.TimerServiceFactory> TIMER_SERVICE_FACTORY

Timer service implementation choice (HEAP or ROCKSDB).

public static final ConfigOption<Integer> ROCKSDB_TIMER_SERVICE_FACTORY_CACHE_SIZE

Cache size per key-group for RocksDB timer service.

public static final ConfigOption<Integer> CHECKPOINT_TRANSFER_THREAD_NUM

Number of threads for transferring files during checkpointing.

public static final ConfigOption<PredefinedOptions> PREDEFINED_OPTIONS

Predefined RocksDB configuration settings.

public static final ConfigOption<String> OPTIONS_FACTORY

Fully qualified class name of custom RocksDBOptionsFactory.

Memory Configuration Options

public static final ConfigOption<Boolean> USE_MANAGED_MEMORY

Whether to use Flink's managed memory for RocksDB.

public static final ConfigOption<MemorySize> FIX_PER_SLOT_MEMORY_SIZE

Fixed amount of memory per slot for RocksDB.

public static final ConfigOption<MemorySize> FIX_PER_TM_MEMORY_SIZE

Fixed amount of memory per TaskManager for RocksDB.

public static final ConfigOption<Double> WRITE_BUFFER_RATIO

Fraction of memory used for RocksDB write buffers.

public static final ConfigOption<Double> HIGH_PRIORITY_POOL_RATIO

Fraction of block cache reserved for high priority blocks.

public static final ConfigOption<Boolean> USE_PARTITIONED_INDEX_FILTERS

Whether to use partitioned index/filters in RocksDB.

RocksDBConfigurableOptions Class

Class Definition

@PublicEvolving
public class RocksDBConfigurableOptions {
    // Detailed configurable options for RocksDB behavior
}

DBOptions Configuration

public static final ConfigOption<Integer> MAX_BACKGROUND_THREADS

Maximum number of concurrent background jobs (compactions + flushes).

public static final ConfigOption<Integer> MAX_OPEN_FILES

Maximum number of open files that can be cached by RocksDB.

public static final ConfigOption<MemorySize> LOG_MAX_FILE_SIZE

Maximum size of RocksDB log files.

public static final ConfigOption<Integer> LOG_FILE_NUM

Maximum number of RocksDB log files to keep.

public static final ConfigOption<String> LOG_DIR

Directory for RocksDB log files.

public static final ConfigOption<RocksDBLogLevel> LOG_LEVEL

RocksDB information logging level.

ColumnFamilyOptions Configuration

public static final ConfigOption<CompactionStyle> COMPACTION_STYLE

Compaction style: LEVEL, FIFO, UNIVERSAL, or NONE.

public static final ConfigOption<Boolean> USE_DYNAMIC_LEVEL_SIZE

Whether to use dynamic level sizing for leveled compaction.

public static final ConfigOption<List<CompressionType>> COMPRESSION_PER_LEVEL

Compression algorithm to use for each level.

public static final ConfigOption<MemorySize> TARGET_FILE_SIZE_BASE

Target file size for compaction (level 1).

public static final ConfigOption<MemorySize> MAX_SIZE_LEVEL_BASE

Maximum total size for level 1.

public static final ConfigOption<MemorySize> WRITE_BUFFER_SIZE

Amount of data to build up in memory before writing to disk.

public static final ConfigOption<Integer> MAX_WRITE_BUFFER_NUMBER

Maximum number of write buffers maintained in memory.

public static final ConfigOption<Integer> MIN_WRITE_BUFFER_NUMBER_TO_MERGE

Minimum number of write buffers to merge before writing to storage.

public static final ConfigOption<MemorySize> BLOCK_SIZE

Block size for RocksDB data blocks.

public static final ConfigOption<MemorySize> METADATA_BLOCK_SIZE

Block size for RocksDB metadata blocks.

public static final ConfigOption<MemorySize> BLOCK_CACHE_SIZE

Size of the block cache for uncompressed blocks.

public static final ConfigOption<Boolean> USE_BLOOM_FILTER

Whether to use bloom filter to reduce disk reads.

public static final ConfigOption<Double> BLOOM_FILTER_BITS_PER_KEY

Number of bits per key for bloom filter.

public static final ConfigOption<Boolean> BLOOM_FILTER_BLOCK_BASED_MODE

Whether to use block-based mode for bloom filter.

Validation Method

public static void checkArgumentValid(ConfigOption<?> option, Object value)

Validates that a configuration value is valid for the given option.

Parameters:

  • option - The configuration option to validate against
  • value - The value to validate

Throws: IllegalArgumentException if value is invalid

Configuration Example

import org.apache.flink.configuration.Configuration;
import org.apache.flink.state.rocksdb.*;

public class RocksDBConfigurationExample {
    public static Configuration createRocksDBConfig() {
        Configuration config = new Configuration();
        
        // Basic configuration  
        config.set(RocksDBOptions.LOCAL_DIRECTORIES, "/ssd1/rocksdb,/ssd2/rocksdb");
        config.set(RocksDBOptions.PREDEFINED_OPTIONS, PredefinedOptions.FLASH_SSD_OPTIMIZED);
        
        // Memory configuration
        config.set(RocksDBOptions.USE_MANAGED_MEMORY, true);
        config.set(RocksDBOptions.WRITE_BUFFER_RATIO, 0.4);
        config.set(RocksDBOptions.HIGH_PRIORITY_POOL_RATIO, 0.2);
        
        // Advanced RocksDB options
        config.set(RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS, 4);
        config.set(RocksDBConfigurableOptions.WRITE_BUFFER_SIZE, MemorySize.ofMebiBytes(128));
        config.set(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE, MemorySize.ofMebiBytes(256));
        config.set(RocksDBConfigurableOptions.USE_BLOOM_FILTER, true);
        config.set(RocksDBConfigurableOptions.BLOOM_FILTER_BITS_PER_KEY, 10.0);
        
        // Custom options factory
        config.set(RocksDBOptions.OPTIONS_FACTORY, "com.example.CustomRocksDBOptionsFactory");
        
        return config;
    }
}

Using Options in State Backend

// Method 1: Direct configuration
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend();
backend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
backend.setRocksDBOptions(new CustomRocksDBOptionsFactory());

// Method 2: Configuration-based setup
Configuration config = createRocksDBConfig();
EmbeddedRocksDBStateBackendFactory factory = new EmbeddedRocksDBStateBackendFactory();
EmbeddedRocksDBStateBackend backend = factory.createFromConfig(config, getClass().getClassLoader());

// Method 3: Runtime configuration
EmbeddedRocksDBStateBackend configuredBackend = backend.configure(config, getClass().getClassLoader());

Best Practices

Performance Tuning

  • Use FLASH_SSD_OPTIMIZED for SSDs, SPINNING_DISK_OPTIMIZED for HDDs
  • Set write buffer size to 128-256MB for high-throughput workloads
  • Enable bloom filters to reduce read amplification
  • Use partitioned index/filters for large state

Memory Management

  • Enable managed memory integration with USE_MANAGED_MEMORY
  • Set appropriate write buffer ratio (0.3-0.5) based on workload
  • Configure high priority pool ratio (0.1-0.2) for metadata caching

Custom Factory Guidelines

  • Always register resources in handlesToClose for proper cleanup
  • Prefer modifying current options over creating new instances
  • Implement ConfigurableRocksDBOptionsFactory for configuration integration
  • Test custom configurations thoroughly under load

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-statebackend-rocksdb

docs

core-state-backend.md

index.md

memory-configuration.md

metrics-monitoring.md

options-and-factories.md

tile.json