CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-statebackend-rocksdb-2-12

RocksDB state backend for Apache Flink streaming applications providing persistent, scalable state storage with fault tolerance, comprehensive configuration options, and native metrics monitoring.

Pending
Overview
Eval results
Files

options-factory.mddocs/

Options Factory

Factory pattern for customizing RocksDB database and column family options, enabling fine-grained performance tuning for specific use cases and hardware configurations.

Capabilities

RocksDBOptionsFactory Interface

The core interface for creating custom RocksDB options.

/**
 * Factory interface for creating RocksDB options.
 * Allows customization of database options, column family options, and other RocksDB settings.
 */
interface RocksDBOptionsFactory extends Serializable {
    
    /**
     * Creates or modifies database options for RocksDB.
     * @param currentOptions current DB options to modify
     * @param handlesToClose collection to register objects that need cleanup
     * @return configured DB options
     */
    DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose);
    
    /**
     * Creates or modifies column family options for RocksDB.
     * @param currentOptions current column family options to modify  
     * @param handlesToClose collection to register objects that need cleanup
     * @return configured column family options
     */
    ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose);
    
    /**
     * Creates native metrics options for RocksDB monitoring.
     * @param nativeMetricOptions current native metrics options
     * @return configured native metrics options
     */
    default RocksDBNativeMetricOptions createNativeMetricsOptions(RocksDBNativeMetricOptions nativeMetricOptions) {
        return nativeMetricOptions;
    }
    
    /**
     * Creates write options for RocksDB write operations.
     * @param currentOptions current write options to modify
     * @param handlesToClose collection to register objects that need cleanup
     * @return configured write options
     */
    default WriteOptions createWriteOptions(WriteOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
        return currentOptions;
    }
    
    /**
     * Creates read options for RocksDB read operations.
     * @param currentOptions current read options to modify
     * @param handlesToClose collection to register objects that need cleanup
     * @return configured read options
     */
    default ReadOptions createReadOptions(ReadOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
        return currentOptions;
    }
}

Usage Example:

// Custom options factory implementation
RocksDBOptionsFactory customFactory = new RocksDBOptionsFactory() {
    @Override
    public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
        return currentOptions
            .setMaxOpenFiles(1024)
            .setMaxBackgroundJobs(4)
            .setUseFsync(false);
    }
    
    @Override
    public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
        BloomFilter bloomFilter = new BloomFilter(10);
        handlesToClose.add(bloomFilter);
        
        return currentOptions
            .setWriteBufferSize(128 * 1024 * 1024)  // 128MB
            .setTableFormatConfig(
                new BlockBasedTableConfig().setFilter(bloomFilter)
            );
    }
};

stateBackend.setRocksDBOptions(customFactory);

DefaultConfigurableOptionsFactory

Default implementation that provides configurable options through method chaining.

/**
 * Default configurable options factory that can be configured through method chaining.
 * Provides common RocksDB configuration options with sensible defaults.
 */
class DefaultConfigurableOptionsFactory implements RocksDBOptionsFactory, ConfigurableRocksDBOptionsFactory {
    
    /**
     * Creates a new factory instance.
     */
    DefaultConfigurableOptionsFactory();
    
    /**
     * Creates a configured factory from ReadableConfig.
     * @param configuration configuration to read from
     * @return configured factory instance
     */
    DefaultConfigurableOptionsFactory configure(ReadableConfig configuration);
}

Database Options Configuration

Configure RocksDB database-level options.

/**
 * Sets the total number of background threads (compaction + flush).
 * @param totalThreadCount total background thread count
 * @return this factory for method chaining
 */
DefaultConfigurableOptionsFactory setMaxBackgroundThreads(int totalThreadCount);

/**
 * Sets the maximum number of open files that RocksDB can keep open.
 * @param maxOpenFiles maximum open files (-1 for unlimited)
 * @return this factory for method chaining
 */
DefaultConfigurableOptionsFactory setMaxOpenFiles(int maxOpenFiles);

/**
 * Sets the log level for RocksDB logging.
 * @param logLevel log level for RocksDB
 * @return this factory for method chaining
 */
DefaultConfigurableOptionsFactory setLogLevel(InfoLogLevel logLevel);

/**
 * Sets the directory for RocksDB log files.
 * @param logDir directory path for log files
 * @return this factory for method chaining
 */
DefaultConfigurableOptionsFactory setLogDir(String logDir);

/**
 * Sets the maximum size of a single log file.
 * @param maxLogFileSize maximum log file size (e.g., "64mb")
 * @return this factory for method chaining
 */
DefaultConfigurableOptionsFactory setMaxLogFileSize(String maxLogFileSize);

/**
 * Sets the number of log files to keep.
 * @param logFileNum number of log files to retain
 * @return this factory for method chaining
 */
DefaultConfigurableOptionsFactory setLogFileNum(int logFileNum);

Column Family Options Configuration

Configure RocksDB column family options for performance tuning.

/**
 * Sets the compaction style for RocksDB.
 * @param compactionStyle compaction algorithm to use
 * @return this factory for method chaining
 */
DefaultConfigurableOptionsFactory setCompactionStyle(CompactionStyle compactionStyle);

/**
 * Enables or disables dynamic level size adjustment.
 * @param value whether to use dynamic level sizes
 * @return this factory for method chaining
 */
DefaultConfigurableOptionsFactory setUseDynamicLevelSize(boolean value);

/**
 * Sets the target file size for L1 level.
 * @param targetFileSizeBase target file size (e.g., "64mb")
 * @return this factory for method chaining
 */
DefaultConfigurableOptionsFactory setTargetFileSizeBase(String targetFileSizeBase);

/**
 * Sets the maximum total data size for level L1.
 * @param maxSizeLevelBase maximum size for L1 (e.g., "256mb")
 * @return this factory for method chaining
 */
DefaultConfigurableOptionsFactory setMaxSizeLevelBase(String maxSizeLevelBase);

/**
 * Sets the size of write buffer (memtable).
 * @param writeBufferSize write buffer size (e.g., "64mb")
 * @return this factory for method chaining
 */
DefaultConfigurableOptionsFactory setWriteBufferSize(String writeBufferSize);

/**
 * Sets the maximum number of write buffers.
 * @param writeBufferNumber maximum write buffer count
 * @return this factory for method chaining
 */
DefaultConfigurableOptionsFactory setMaxWriteBufferNumber(int writeBufferNumber);

/**
 * Sets the minimum number of write buffers to merge.
 * @param writeBufferNumber minimum buffers to merge
 * @return this factory for method chaining
 */
DefaultConfigurableOptionsFactory setMinWriteBufferNumberToMerge(int writeBufferNumber);

Block and Cache Configuration

Configure block size and caching options.

/**
 * Sets the block size for RocksDB.
 * @param blockSize block size (e.g., "4kb", "16kb")
 * @return this factory for method chaining
 */
DefaultConfigurableOptionsFactory setBlockSize(String blockSize);

/**
 * Sets the metadata block size.
 * @param metadataBlockSize metadata block size (e.g., "4kb")
 * @return this factory for method chaining
 */
DefaultConfigurableOptionsFactory setMetadataBlockSize(String metadataBlockSize);

/**
 * Sets the block cache size for frequently accessed blocks.
 * @param blockCacheSize block cache size (e.g., "256mb")
 * @return this factory for method chaining
 */
DefaultConfigurableOptionsFactory setBlockCacheSize(String blockCacheSize);

Bloom Filter Configuration

Configure Bloom filter options for read performance optimization.

/**
 * Enables or disables Bloom filter.
 * @param useBloomFilter whether to use Bloom filter
 * @return this factory for method chaining
 */
DefaultConfigurableOptionsFactory setUseBloomFilter(boolean useBloomFilter);

/**
 * Sets the number of bits per key for Bloom filter.
 * @param bitsPerKey bits per key for Bloom filter (typically 10)
 * @return this factory for method chaining
 */
DefaultConfigurableOptionsFactory setBloomFilterBitsPerKey(double bitsPerKey);

/**
 * Sets the Bloom filter mode (block-based vs full filter).
 * @param blockBasedMode true for block-based, false for full filter
 * @return this factory for method chaining
 */
DefaultConfigurableOptionsFactory setBloomFilterBlockBasedMode(boolean blockBasedMode);

Usage Examples:

// Basic configuration
DefaultConfigurableOptionsFactory factory = new DefaultConfigurableOptionsFactory()
    .setMaxBackgroundThreads(4)
    .setMaxOpenFiles(-1)
    .setWriteBufferSize("128mb")
    .setBlockCacheSize("256mb");

// Advanced configuration with Bloom filter
DefaultConfigurableOptionsFactory advancedFactory = new DefaultConfigurableOptionsFactory()
    .setCompactionStyle(CompactionStyle.LEVEL)
    .setUseDynamicLevelSize(true)
    .setTargetFileSizeBase("64mb")
    .setMaxSizeLevelBase("256mb")
    .setWriteBufferSize("64mb")
    .setMaxWriteBufferNumber(3)
    .setMinWriteBufferNumberToMerge(2)
    .setBlockSize("16kb")
    .setBlockCacheSize("512mb")
    .setUseBloomFilter(true)
    .setBloomFilterBitsPerKey(10.0)
    .setBloomFilterBlockBasedMode(false);

stateBackend.setRocksDBOptions(advancedFactory);

ConfigurableRocksDBOptionsFactory Interface

Interface for factories that can be configured from Flink configuration.

/**
 * Interface for RocksDB options factories that can be configured from ReadableConfig.
 */
interface ConfigurableRocksDBOptionsFactory {
    
    /**
     * Creates a configured factory from ReadableConfig.
     * @param configuration configuration to read settings from
     * @return configured RocksDBOptionsFactory instance
     */
    RocksDBOptionsFactory configure(ReadableConfig configuration);
}

Types

// RocksDB native types (from org.rocksdb package)
enum CompactionStyle {
    LEVEL,      // Level-based compaction (default)
    UNIVERSAL,  // Universal compaction
    FIFO,       // FIFO compaction
    NONE        // No compaction
}

enum InfoLogLevel {
    DEBUG_LEVEL,
    INFO_LEVEL,
    WARN_LEVEL,
    ERROR_LEVEL,
    FATAL_LEVEL,
    HEADER_LEVEL
}

class DBOptions {
    // RocksDB database options
}

class ColumnFamilyOptions {
    // RocksDB column family options
}

class WriteOptions {
    // RocksDB write options
}

class ReadOptions {
    // RocksDB read options
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-statebackend-rocksdb-2-12

docs

index.md

memory-configuration.md

native-metrics-configuration.md

options-factory.md

predefined-options.md

state-backend-configuration.md

tile.json