RocksDB state backend for Apache Flink - provides persistent state storage using RocksDB as the underlying storage engine for stateful stream processing applications
—
The RocksDB State Backend provides comprehensive metrics integration that forwards RocksDB native metrics to Flink's metrics system. This enables detailed monitoring of performance, memory usage, and operational health.
import org.apache.flink.state.rocksdb.RocksDBNativeMetricOptions;
import org.apache.flink.state.rocksdb.RocksDBProperty;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.Configuration;
import java.util.Set;
import java.util.Collection;@PublicEvolving
public class RocksDBNativeMetricOptions {
// Configuration for enabling RocksDB native metrics forwarding to Flink's metrics reporter
}public static RocksDBNativeMetricOptions fromConfig(ReadableConfig config)Creates RocksDBNativeMetricOptions from configuration.
Parameters:
config - Configuration containing metric settingsReturns: Configured RocksDBNativeMetricOptions instance
Example:
Configuration config = new Configuration();
config.set(RocksDBNativeMetricOptions.MONITOR_BLOCK_CACHE_HIT, true);
config.set(RocksDBNativeMetricOptions.ESTIMATE_NUM_KEYS, true);
RocksDBNativeMetricOptions metricOptions = RocksDBNativeMetricOptions.fromConfig(config);public static final ConfigOption<Boolean> MONITOR_NUM_IMMUTABLE_MEM_TABLESMonitors the number of immutable memtables that have not yet been flushed.
public static final ConfigOption<Boolean> MONITOR_MEM_TABLE_FLUSH_PENDINGMonitors whether a memtable flush is pending.
public static final ConfigOption<Boolean> MONITOR_CUR_SIZE_ACTIVE_MEM_TABLEMonitors the approximate size of the active memtable in bytes.
public static final ConfigOption<Boolean> MONITOR_CUR_SIZE_ALL_MEM_TABLEMonitors the approximate size of all memtables in bytes.
public static final ConfigOption<Boolean> MONITOR_SIZE_ALL_MEM_TABLESMonitors the total size of all memtables in bytes.
public static final ConfigOption<Boolean> MONITOR_NUM_ENTRIES_ACTIVE_MEM_TABLEMonitors the total number of entries in the active memtable.
public static final ConfigOption<Boolean> MONITOR_NUM_ENTRIES_IMM_MEM_TABLESMonitors the total number of entries in the unflushed immutable memtables.
public static final ConfigOption<Boolean> MONITOR_NUM_DELETES_ACTIVE_MEM_TABLEMonitors the total number of delete entries in the active memtable.
public static final ConfigOption<Boolean> MONITOR_NUM_DELETES_IMM_MEM_TABLEMonitors the total number of delete entries in the unflushed immutable memtables.
public static final ConfigOption<Boolean> TRACK_COMPACTION_PENDINGMonitors whether at least one compaction is pending.
public static final ConfigOption<Boolean> MONITOR_BACKGROUND_ERRORSMonitors the number of background errors encountered.
public static final ConfigOption<Boolean> MONITOR_NUM_RUNNING_COMPACTIONSMonitors the number of currently running compactions.
public static final ConfigOption<Boolean> MONITOR_NUM_RUNNING_FLUSHESMonitors the number of currently running flushes.
public static final ConfigOption<Boolean> ESTIMATE_PENDING_COMPACTION_BYTESEstimates the total bytes of files pending compaction.
public static final ConfigOption<Boolean> ESTIMATE_NUM_KEYSEstimates the number of keys in the RocksDB instance.
public static final ConfigOption<Boolean> ESTIMATE_LIVE_DATA_SIZEEstimates the size of live data in bytes.
public static final ConfigOption<Boolean> MONITOR_TOTAL_SST_FILES_SIZEMonitors the total size of all SST files in bytes.
public static final ConfigOption<Boolean> MONITOR_LIVE_SST_FILES_SIZEMonitors the total size of live SST files in bytes.
public static final ConfigOption<Boolean> ESTIMATE_TABLE_READERS_MEMEstimates the memory used by table readers.
public static final ConfigOption<Boolean> BLOCK_CACHE_CAPACITYMonitors the block cache capacity.
public static final ConfigOption<Boolean> BLOCK_CACHE_USAGEMonitors the memory size used by the block cache.
public static final ConfigOption<Boolean> BLOCK_CACHE_PINNED_USAGEMonitors the memory size used by pinned blocks in the block cache.
public static final ConfigOption<Boolean> MONITOR_NUM_SNAPSHOTSMonitors the number of unreleased snapshots of the database.
public static final ConfigOption<Boolean> MONITOR_NUM_LIVE_VERSIONSMonitors the number of live versions (column family metadata).
public static final ConfigOption<Boolean> MONITOR_ACTUAL_DELAYED_WRITE_RATEMonitors the current actual delayed write rate.
public static final ConfigOption<Boolean> IS_WRITE_STOPPEDMonitors whether writes have been stopped.
public static final ConfigOption<Boolean> MONITOR_NUM_FILES_AT_LEVELMonitors the number of files at each level of the LSM tree.
public static final ConfigOption<Boolean> COLUMN_FAMILY_AS_VARIABLEExposes the column family name as a variable in metric names.
public static final ConfigOption<Boolean> MONITOR_BLOCK_CACHE_HITMonitors the number of block cache hits.
public static final ConfigOption<Boolean> MONITOR_BLOCK_CACHE_MISSMonitors the number of block cache misses.
public static final ConfigOption<Boolean> MONITOR_BLOOM_FILTER_USEFULMonitors the number of times bloom filter was useful in avoiding reads.
public static final ConfigOption<Boolean> MONITOR_BLOOM_FILTER_FULL_POSITIVEMonitors the number of times bloom filter gave a positive result.
public static final ConfigOption<Boolean> MONITOR_BLOOM_FILTER_FULL_TRUE_POSITIVEMonitors the number of times bloom filter gave a true positive result.
public static final ConfigOption<Boolean> MONITOR_BYTES_READMonitors the total number of bytes read.
public static final ConfigOption<Boolean> MONITOR_ITER_BYTES_READMonitors the total number of bytes read by iterators.
public static final ConfigOption<Boolean> MONITOR_BYTES_WRITTENMonitors the total number of bytes written.
public static final ConfigOption<Boolean> MONITOR_COMPACTION_READ_BYTESMonitors the total number of bytes read during compaction.
public static final ConfigOption<Boolean> MONITOR_COMPACTION_WRITE_BYTESMonitors the total number of bytes written during compaction.
public static final ConfigOption<Boolean> MONITOR_STALL_MICROSMonitors the total time spent in write stalls in microseconds.
public void enableNumImmutableMemTable()Enables monitoring of immutable memtable count.
public void enableMemTableFlushPending()Enables monitoring of pending memtable flushes.
public RocksDBNativeMetricOptions enableCurSizeActiveMemTable()Enables monitoring of active memtable size.
public RocksDBNativeMetricOptions enableCurSizeAllMemTables()Enables monitoring of all memtables size.
public RocksDBNativeMetricOptions enableSizeAllMemTables()Enables monitoring of total memtables size.
public RocksDBNativeMetricOptions enableNumEntriesActiveMemTable()Enables monitoring of active memtable entries.
public RocksDBNativeMetricOptions enableNumEntriesImmMemTables()Enables monitoring of immutable memtable entries.
public RocksDBNativeMetricOptions enableNumDeletesActiveMemTable()Enables monitoring of delete entries in active memtable.
public RocksDBNativeMetricOptions enableNumDeletesImmMemTables()Enables monitoring of delete entries in immutable memtables.
public RocksDBNativeMetricOptions enableCompactionPending()Enables monitoring of pending compactions.
public RocksDBNativeMetricOptions enableBackgroundErrors()Enables monitoring of background errors.
public RocksDBNativeMetricOptions enableNumRunningCompactions()Enables monitoring of running compactions.
public RocksDBNativeMetricOptions enableNumRunningFlushes()Enables monitoring of running flushes.
public RocksDBNativeMetricOptions enableEstimatePendingCompactionBytes()Enables estimation of pending compaction bytes.
public RocksDBNativeMetricOptions enableEstimateNumKeys()Enables estimation of key count.
public RocksDBNativeMetricOptions enableEstimateTableReadersMem()Enables estimation of table readers memory.
public RocksDBNativeMetricOptions enableEstimateLiveDataSize()Enables estimation of live data size.
public RocksDBNativeMetricOptions enableTotalSstFilesSize()Enables monitoring of total SST files size.
public RocksDBNativeMetricOptions enableLiveSstFilesSize()Enables monitoring of live SST files size.
public RocksDBNativeMetricOptions enableBlockCacheCapacity()Enables monitoring of block cache capacity.
public RocksDBNativeMetricOptions enableBlockCacheUsage()Enables monitoring of block cache usage.
public RocksDBNativeMetricOptions enableBlockCachePinnedUsage()Enables monitoring of pinned block cache usage.
public RocksDBNativeMetricOptions enableNumSnapshots()Enables monitoring of unreleased snapshots.
public RocksDBNativeMetricOptions enableNumLiveVersions()Enables monitoring of live versions.
public RocksDBNativeMetricOptions enableActualDelayedWriteRate()Enables monitoring of delayed write rate.
public RocksDBNativeMetricOptions enableIsWriteStopped()Enables monitoring of write stopped status.
public RocksDBNativeMetricOptions enableNumFilesAtLevel()Enables monitoring of files per LSM tree level.
public void setColumnFamilyAsVariable(boolean columnFamilyAsVariable)Sets whether to expose column family name as a variable in metric names.
Parameters:
columnFamilyAsVariable - Whether to include column family in metric namespublic void enableNativeStatistics(ConfigOption<Boolean> nativeStatisticsOption)Enables a native statistics metric.
Parameters:
nativeStatisticsOption - The statistics option to enablepublic boolean isEnabled()Checks if any metrics are enabled.
Returns: true if any metrics are configured, false otherwise
public boolean isStatisticsEnabled()Checks if statistics-based metrics are enabled.
Returns: true if statistics metrics are enabled, false otherwise
public boolean isColumnFamilyAsVariable()Checks if column family is exposed as a variable in metric names.
Returns: true if column family is used as variable, false otherwise
public Set<RocksDBProperty> getProperties()Gets the set of enabled property-based metrics.
Returns: Set of enabled RocksDBProperty instances
public Collection<ConfigOption<Boolean>> getMonitorTickerTypes()Gets the collection of enabled statistics-based metrics.
Returns: Collection of enabled statistics metric options
@Internal // Referenced in public APIs
public enum RocksDBProperty {
NumImmutableMemTable,
MemTableFlushPending,
CompactionPending,
BackgroundErrors,
CurSizeActiveMemTable,
CurSizeAllMemTables,
SizeAllMemTables,
NumEntriesActiveMemTable,
NumEntriesImmMemTables,
EstimateNumKeys,
EstimateTableReadersMem,
NumSnapshots,
EstimateLiveDataSize,
TotalSstFilesSize,
LiveSstFilesSize,
BlockCacheCapacity,
BlockCacheUsage,
BlockCachePinnedUsage,
NumFilesAtLevel0,
NumFilesAtLevel1,
NumFilesAtLevel2,
NumFilesAtLevel3,
NumFilesAtLevel4,
NumFilesAtLevel5,
NumFilesAtLevel6
// ... and more levels
}public String getRocksDBProperty()Gets the RocksDB property string for querying.
Returns: RocksDB property name string
public long getNumericalPropertyValue(RocksDB rocksDB, ColumnFamilyHandle handle)Gets the numerical value of this property from RocksDB.
Parameters:
rocksDB - RocksDB instance to queryhandle - Column family handleReturns: Numerical property value
public String getConfigKey()Gets the configuration key for this property.
Returns: Configuration key string
Configuration config = new Configuration();
// Enable essential performance metrics
config.set(RocksDBNativeMetricOptions.MONITOR_BLOCK_CACHE_HIT, true);
config.set(RocksDBNativeMetricOptions.MONITOR_BLOCK_CACHE_MISS, true);
config.set(RocksDBNativeMetricOptions.ESTIMATE_NUM_KEYS, true);
config.set(RocksDBNativeMetricOptions.MONITOR_COMPACTION_READ_BYTES, true);
config.set(RocksDBNativeMetricOptions.MONITOR_COMPACTION_WRITE_BYTES, true);
// Create options from config
RocksDBNativeMetricOptions metricOptions = RocksDBNativeMetricOptions.fromConfig(config);Configuration config = new Configuration();
// Memory-related metrics
config.set(RocksDBNativeMetricOptions.MONITOR_CUR_SIZE_ACTIVE_MEM_TABLE, true);
config.set(RocksDBNativeMetricOptions.MONITOR_CUR_SIZE_ALL_MEM_TABLE, true);
config.set(RocksDBNativeMetricOptions.BLOCK_CACHE_USAGE, true);
config.set(RocksDBNativeMetricOptions.BLOCK_CACHE_CAPACITY, true);
config.set(RocksDBNativeMetricOptions.ESTIMATE_TABLE_READERS_MEM, true);
// Compaction and flush monitoring
config.set(RocksDBNativeMetricOptions.MONITOR_MEM_TABLE_FLUSH_PENDING, true);
config.set(RocksDBNativeMetricOptions.TRACK_COMPACTION_PENDING, true);
config.set(RocksDBNativeMetricOptions.MONITOR_NUM_RUNNING_COMPACTIONS, true);
RocksDBNativeMetricOptions memoryMetrics = RocksDBNativeMetricOptions.fromConfig(config);public RocksDBNativeMetricOptions createComprehensiveMetrics() {
Configuration config = new Configuration();
// Performance metrics
config.set(RocksDBNativeMetricOptions.MONITOR_BLOCK_CACHE_HIT, true);
config.set(RocksDBNativeMetricOptions.MONITOR_BLOCK_CACHE_MISS, true);
config.set(RocksDBNativeMetricOptions.MONITOR_BLOOM_FILTER_USEFUL, true);
// Memory metrics
config.set(RocksDBNativeMetricOptions.BLOCK_CACHE_USAGE, true);
config.set(RocksDBNativeMetricOptions.MONITOR_CUR_SIZE_ALL_MEM_TABLE, true);
config.set(RocksDBNativeMetricOptions.ESTIMATE_TABLE_READERS_MEM, true);
// I/O metrics
config.set(RocksDBNativeMetricOptions.MONITOR_BYTES_READ, true);
config.set(RocksDBNativeMetricOptions.MONITOR_BYTES_WRITTEN, true);
config.set(RocksDBNativeMetricOptions.MONITOR_COMPACTION_READ_BYTES, true);
config.set(RocksDBNativeMetricOptions.MONITOR_COMPACTION_WRITE_BYTES, true);
// Database health metrics
config.set(RocksDBNativeMetricOptions.ESTIMATE_NUM_KEYS, true);
config.set(RocksDBNativeMetricOptions.ESTIMATE_LIVE_DATA_SIZE, true);
config.set(RocksDBNativeMetricOptions.MONITOR_BACKGROUND_ERRORS, true);
config.set(RocksDBNativeMetricOptions.IS_WRITE_STOPPED, true);
// Compaction metrics
config.set(RocksDBNativeMetricOptions.TRACK_COMPACTION_PENDING, true);
config.set(RocksDBNativeMetricOptions.MONITOR_NUM_RUNNING_COMPACTIONS, true);
config.set(RocksDBNativeMetricOptions.ESTIMATE_PENDING_COMPACTION_BYTES, true);
// Column family as variable for better metric organization
config.set(RocksDBNativeMetricOptions.COLUMN_FAMILY_AS_VARIABLE, true);
return RocksDBNativeMetricOptions.fromConfig(config);
}public RocksDBNativeMetricOptions createProgrammaticMetrics() {
RocksDBNativeMetricOptions metrics = new RocksDBNativeMetricOptions();
// Enable using method calls
metrics.enableBlockCacheUsage();
metrics.enableBlockCacheCapacity();
metrics.enableEstimateNumKeys();
metrics.enableEstimateLiveDataSize();
metrics.enableCompactionPending();
metrics.enableBackgroundErrors();
metrics.setColumnFamilyAsVariable(true);
return metrics;
}import org.apache.flink.state.rocksdb.RocksDBOptionsFactory;
public class MetricsEnabledOptionsFactory implements RocksDBOptionsFactory {
@Override
public RocksDBNativeMetricOptions createNativeMetricsOptions(
RocksDBNativeMetricOptions nativeMetricOptions) {
// Override default metrics with custom configuration
nativeMetricOptions.enableBlockCacheUsage();
nativeMetricOptions.enableEstimateNumKeys();
nativeMetricOptions.enableCompactionPending();
nativeMetricOptions.enableBackgroundErrors();
nativeMetricOptions.setColumnFamilyAsVariable(true);
return nativeMetricOptions;
}
// ... other factory methods
}# flink-conf.yaml
state.backend.rocksdb.metrics.block-cache-hit: true
state.backend.rocksdb.metrics.block-cache-miss: true
state.backend.rocksdb.metrics.estimate-num-keys: true
state.backend.rocksdb.metrics.compaction-read-bytes: true
state.backend.rocksdb.metrics.compaction-write-bytes: true
state.backend.rocksdb.metrics.column-family-as-variable: trueThe enabled metrics will be automatically forwarded to Flink's metrics system and can be accessed through:
Metrics follow this naming pattern:
<job_name>.<operator_name>.rocksdb.[column_family.]<metric_name>Example metric names:
my_job.my_operator.rocksdb.estimate-num-keysmy_job.my_operator.rocksdb.default.block-cache-usage (with column family)my_job.my_operator.rocksdb.compaction-read-bytesPerformance Monitoring:
Memory Monitoring:
Health Monitoring:
Capacity Planning:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-statebackend-rocksdb