CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-statebackend-rocksdb-2-12

RocksDB state backend for Apache Flink streaming applications providing persistent, scalable state storage with fault tolerance, comprehensive configuration options, and native metrics monitoring.

Pending
Overview
Eval results
Files

native-metrics-configuration.mddocs/

Native Metrics Configuration

Configuration for RocksDB native metrics monitoring, enabling detailed performance monitoring and observability of RocksDB internal operations.

Capabilities

RocksDBNativeMetricOptions

Configuration class for enabling and managing RocksDB native metrics collection.

/**
 * Configuration for RocksDB native metrics collection.
 * Allows enabling specific RocksDB metrics for monitoring and performance analysis.
 */
class RocksDBNativeMetricOptions {
    
    /**
     * Creates RocksDBNativeMetricOptions from Flink configuration.
     * @param config configuration to read metric settings from
     * @return configured native metric options
     */
    static RocksDBNativeMetricOptions fromConfig(ReadableConfig config);
}

Memory Table Metrics

Enable monitoring of RocksDB memory table (memtable) statistics.

/**
 * Enables monitoring of the number of immutable memory tables.
 * Tracks memory tables that are waiting to be flushed to disk.
 */
void enableNumImmutableMemTable();

/**
 * Enables monitoring of memory table flush pending status.
 * Indicates whether a flush operation is currently pending.
 */
void enableMemTableFlushPending();

/**
 * Enables monitoring of current size of active memory table.
 * Tracks memory usage of the currently active memtable.
 */
void enableCurSizeActiveMemTable();

/**
 * Enables monitoring of current size of all memory tables.
 * Tracks total memory usage of all active and immutable memtables.
 */
void enableCurSizeAllMemTables();

/**
 * Enables monitoring of size of all memory tables.
 * Similar to current size but includes additional metadata.
 */
void enableSizeAllMemTables();

/**
 * Enables monitoring of number of entries in active memory table.
 * Tracks the count of key-value pairs in the active memtable.
 */
void enableNumEntriesActiveMemTable();

/**
 * Enables monitoring of number of entries in immutable memory tables.
 * Tracks total entries across all immutable memtables.
 */
void enableNumEntriesImmMemTables();

/**
 * Enables monitoring of number of deletes in active memory table.
 * Tracks delete operations in the currently active memtable.
 */
void enableNumDeletesActiveMemTable();

/**
 * Enables monitoring of number of deletes in immutable memory tables.
 * Tracks delete operations across all immutable memtables.
 */
void enableNumDeletesImmMemTables();

Compaction and Database Metrics

Enable monitoring of RocksDB compaction operations and database-level statistics.

/**
 * Enables monitoring of compaction pending status.
 * Indicates whether compaction operations are currently pending.
 */
void enableCompactionPending();

/**
 * Enables monitoring of background errors.
 * Tracks errors that occur during background operations like compaction.
 */
void enableBackgroundErrors();

/**
 * Enables monitoring of estimated number of keys in the database.
 * Provides an estimate of total key-value pairs across all levels.
 */
void enableEstimateNumKeys();

/**
 * Enables monitoring of number of running compactions.
 * Tracks currently active compaction operations.
 */
void enableNumRunningCompactions();

/**
 * Enables monitoring of number of running flushes.
 * Tracks currently active flush operations from memtables to disk.
 */
void enableNumRunningFlushes();

/**
 * Enables monitoring of actual delayed write rate.
 * Tracks write throttling when RocksDB slows down writes due to compaction lag.
 */
void enableActualDelayedWriteRate();

/**
 * Enables monitoring of write stopped status.
 * Indicates whether writes are currently stopped due to resource constraints.
 */
void enableIsWriteStopped();

Storage and Memory Estimation Metrics

Enable monitoring of RocksDB storage usage and memory consumption estimates.

/**
 * Enables monitoring of estimated table readers memory usage.
 * Tracks memory used by SST file readers (index and filter blocks).
 */
void enableEstimateTableReadersMem();

/**
 * Enables monitoring of number of snapshots.
 * Tracks currently active database snapshots.
 */
void enableNumSnapshots();

/**
 * Enables monitoring of number of live versions.
 * Tracks active database versions (used for MVCC).
 */
void enableNumLiveVersions();

/**
 * Enables monitoring of estimated live data size.
 * Estimates the size of data that's not deleted or overwritten.
 */
void enableEstimateLiveDataSize();

/**
 * Enables monitoring of total SST files size.
 * Tracks total disk space used by all SST files.
 */
void enableTotalSstFilesSize();

/**
 * Enables monitoring of live SST files size.
 * Tracks disk space used by currently active SST files.
 */
void enableLiveSstFilesSize();

/**
 * Enables monitoring of estimated pending compaction bytes.
 * Estimates the amount of data waiting to be compacted.
 */
void enableEstimatePendingCompactionBytes();

Block Cache Metrics

Enable monitoring of RocksDB block cache performance and usage.

/**
 * Enables monitoring of block cache capacity.
 * Tracks the maximum size configured for the block cache.
 */
void enableBlockCacheCapacity();

/**
 * Enables monitoring of block cache usage.
 * Tracks current memory usage of the block cache.
 */
void enableBlockCacheUsage();

/**
 * Enables monitoring of block cache pinned usage.
 * Tracks memory used by pinned blocks in the cache.
 */
void enableBlockCachePinnedUsage();

Configuration and Query Methods

Configure metric collection behavior and query enabled metrics.

/**
 * Configures whether column family name should be used as a variable in metrics.
 * When enabled, metrics are tagged with column family names for better granularity.
 * @param columnFamilyAsVariable whether to use column family as metric variable
 */
void setColumnFamilyAsVariable(boolean columnFamilyAsVariable);

/**
 * Checks whether column family is used as a variable in metrics.
 * @return true if column family names are included in metric tags
 */
boolean isColumnFamilyAsVariable();

/**
 * Gets the collection of enabled metric properties.
 * @return collection of RocksDB property names that are enabled for monitoring
 */
Collection<String> getProperties();

/**
 * Checks whether any metrics are enabled.
 * @return true if at least one metric is enabled for collection
 */
boolean isEnabled();

Usage Examples

Basic Metrics Configuration

import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions;
import org.apache.flink.configuration.Configuration;

// Create state backend
EmbeddedRocksDBStateBackend stateBackend = new EmbeddedRocksDBStateBackend(true);

// Create custom options factory with metrics
RocksDBOptionsFactory optionsFactory = new RocksDBOptionsFactory() {
    @Override
    public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
        return currentOptions;
    }
    
    @Override
    public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
        return currentOptions;
    }
    
    @Override
    public RocksDBNativeMetricOptions createNativeMetricsOptions(RocksDBNativeMetricOptions nativeMetricOptions) {
        // Enable basic performance metrics
        nativeMetricOptions.enableNumImmutableMemTable();
        nativeMetricOptions.enableCompactionPending();
        nativeMetricOptions.enableBlockCacheUsage();
        nativeMetricOptions.enableEstimateNumKeys();
        
        return nativeMetricOptions;
    }
};

stateBackend.setRocksDBOptions(optionsFactory);

Comprehensive Metrics Configuration

// Enable extensive monitoring for production environments
RocksDBOptionsFactory comprehensiveFactory = new RocksDBOptionsFactory() {
    @Override
    public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
        return currentOptions;
    }
    
    @Override
    public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
        return currentOptions;
    }
    
    @Override
    public RocksDBNativeMetricOptions createNativeMetricsOptions(RocksDBNativeMetricOptions nativeMetricOptions) {
        // Memory table metrics
        nativeMetricOptions.enableNumImmutableMemTable();
        nativeMetricOptions.enableMemTableFlushPending();
        nativeMetricOptions.enableCurSizeActiveMemTable();
        nativeMetricOptions.enableCurSizeAllMemTables();
        
        // Compaction metrics
        nativeMetricOptions.enableCompactionPending();
        nativeMetricOptions.enableNumRunningCompactions();
        nativeMetricOptions.enableNumRunningFlushes();
        nativeMetricOptions.enableEstimatePendingCompactionBytes();
        
        // Storage metrics
        nativeMetricOptions.enableEstimateNumKeys();
        nativeMetricOptions.enableEstimateLiveDataSize();
        nativeMetricOptions.enableTotalSstFilesSize();
        
        // Cache metrics
        nativeMetricOptions.enableBlockCacheCapacity();
        nativeMetricOptions.enableBlockCacheUsage();
        nativeMetricOptions.enableBlockCachePinnedUsage();
        
        // Write throttling metrics
        nativeMetricOptions.enableActualDelayedWriteRate();
        nativeMetricOptions.enableIsWriteStopped();
        
        // Use column family names in metrics for better granularity
        nativeMetricOptions.setColumnFamilyAsVariable(true);
        
        return nativeMetricOptions;
    }
};

stateBackend.setRocksDBOptions(comprehensiveFactory);

Configuration-Based Metrics Setup

import org.apache.flink.configuration.Configuration;

// Configure metrics through Flink configuration
Configuration config = new Configuration();
config.setString("state.backend.rocksdb.metrics.num-immutable-mem-table", "true");
config.setString("state.backend.rocksdb.metrics.compaction-pending", "true");
config.setString("state.backend.rocksdb.metrics.block-cache-usage", "true");
config.setString("state.backend.rocksdb.metrics.estimate-num-keys", "true");
config.setString("state.backend.rocksdb.metrics.column-family-as-variable", "true");

// Create metrics options from configuration
RocksDBNativeMetricOptions metricsOptions = RocksDBNativeMetricOptions.fromConfig(config);

// Check what metrics are enabled
if (metricsOptions.isEnabled()) {
    Collection<String> enabledProperties = metricsOptions.getProperties();
    System.out.println("Enabled metrics: " + enabledProperties);
}

Monitoring Best Practices

Essential Metrics for Production

For production environments, consider enabling these key metrics:

// Essential performance indicators
nativeMetricOptions.enableNumImmutableMemTable();    // Memory pressure
nativeMetricOptions.enableCompactionPending();       // Compaction lag  
nativeMetricOptions.enableBlockCacheUsage();         // Cache efficiency
nativeMetricOptions.enableEstimateNumKeys();         // Data growth
nativeMetricOptions.enableActualDelayedWriteRate();  // Write throttling
nativeMetricOptions.enableIsWriteStopped();          // Write blocking

Memory Monitoring

For memory-constrained environments:

// Memory usage tracking
nativeMetricOptions.enableCurSizeAllMemTables();     // Memory table usage
nativeMetricOptions.enableEstimateTableReadersMem(); // Reader memory
nativeMetricOptions.enableBlockCacheUsage();         // Cache memory
nativeMetricOptions.enableBlockCachePinnedUsage();   // Pinned cache memory

I/O Performance Monitoring

For I/O-intensive workloads:

// I/O and compaction monitoring
nativeMetricOptions.enableNumRunningCompactions();   // Active compactions
nativeMetricOptions.enableNumRunningFlushes();       // Active flushes
nativeMetricOptions.enableEstimatePendingCompactionBytes(); // Compaction backlog
nativeMetricOptions.enableTotalSstFilesSize();       // Disk usage

Performance Impact

  • Low Impact: Basic metrics like enableEstimateNumKeys(), enableCompactionPending()
  • Medium Impact: Cache metrics, memory table metrics
  • Higher Impact: Detailed compaction metrics, fine-grained storage metrics

Enable metrics incrementally and monitor the performance impact on your specific workload.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-statebackend-rocksdb-2-12

docs

index.md

memory-configuration.md

native-metrics-configuration.md

options-factory.md

predefined-options.md

state-backend-configuration.md

tile.json