RocksDB state backend for Apache Flink streaming applications providing persistent, scalable state storage with fault tolerance, comprehensive configuration options, and native metrics monitoring.
—
Configuration for RocksDB native metrics monitoring, enabling detailed performance monitoring and observability of RocksDB internal operations.
Configuration class for enabling and managing RocksDB native metrics collection.
/**
* Configuration for RocksDB native metrics collection.
* Allows enabling specific RocksDB metrics for monitoring and performance analysis.
*/
class RocksDBNativeMetricOptions {
/**
* Creates RocksDBNativeMetricOptions from Flink configuration.
* @param config configuration to read metric settings from
* @return configured native metric options
*/
static RocksDBNativeMetricOptions fromConfig(ReadableConfig config);
}Enable monitoring of RocksDB memory table (memtable) statistics.
/**
* Enables monitoring of the number of immutable memory tables.
* Tracks memory tables that are waiting to be flushed to disk.
*/
void enableNumImmutableMemTable();
/**
* Enables monitoring of memory table flush pending status.
* Indicates whether a flush operation is currently pending.
*/
void enableMemTableFlushPending();
/**
* Enables monitoring of current size of active memory table.
* Tracks memory usage of the currently active memtable.
*/
void enableCurSizeActiveMemTable();
/**
* Enables monitoring of current size of all memory tables.
* Tracks total memory usage of all active and immutable memtables.
*/
void enableCurSizeAllMemTables();
/**
* Enables monitoring of size of all memory tables.
* Similar to current size but includes additional metadata.
*/
void enableSizeAllMemTables();
/**
* Enables monitoring of number of entries in active memory table.
* Tracks the count of key-value pairs in the active memtable.
*/
void enableNumEntriesActiveMemTable();
/**
* Enables monitoring of number of entries in immutable memory tables.
* Tracks total entries across all immutable memtables.
*/
void enableNumEntriesImmMemTables();
/**
* Enables monitoring of number of deletes in active memory table.
* Tracks delete operations in the currently active memtable.
*/
void enableNumDeletesActiveMemTable();
/**
* Enables monitoring of number of deletes in immutable memory tables.
* Tracks delete operations across all immutable memtables.
*/
void enableNumDeletesImmMemTables();Enable monitoring of RocksDB compaction operations and database-level statistics.
/**
* Enables monitoring of compaction pending status.
* Indicates whether compaction operations are currently pending.
*/
void enableCompactionPending();
/**
* Enables monitoring of background errors.
* Tracks errors that occur during background operations like compaction.
*/
void enableBackgroundErrors();
/**
* Enables monitoring of estimated number of keys in the database.
* Provides an estimate of total key-value pairs across all levels.
*/
void enableEstimateNumKeys();
/**
* Enables monitoring of number of running compactions.
* Tracks currently active compaction operations.
*/
void enableNumRunningCompactions();
/**
* Enables monitoring of number of running flushes.
* Tracks currently active flush operations from memtables to disk.
*/
void enableNumRunningFlushes();
/**
* Enables monitoring of actual delayed write rate.
* Tracks write throttling when RocksDB slows down writes due to compaction lag.
*/
void enableActualDelayedWriteRate();
/**
* Enables monitoring of write stopped status.
* Indicates whether writes are currently stopped due to resource constraints.
*/
void enableIsWriteStopped();Enable monitoring of RocksDB storage usage and memory consumption estimates.
/**
* Enables monitoring of estimated table readers memory usage.
* Tracks memory used by SST file readers (index and filter blocks).
*/
void enableEstimateTableReadersMem();
/**
* Enables monitoring of number of snapshots.
* Tracks currently active database snapshots.
*/
void enableNumSnapshots();
/**
* Enables monitoring of number of live versions.
* Tracks active database versions (used for MVCC).
*/
void enableNumLiveVersions();
/**
* Enables monitoring of estimated live data size.
* Estimates the size of data that's not deleted or overwritten.
*/
void enableEstimateLiveDataSize();
/**
* Enables monitoring of total SST files size.
* Tracks total disk space used by all SST files.
*/
void enableTotalSstFilesSize();
/**
* Enables monitoring of live SST files size.
* Tracks disk space used by currently active SST files.
*/
void enableLiveSstFilesSize();
/**
* Enables monitoring of estimated pending compaction bytes.
* Estimates the amount of data waiting to be compacted.
*/
void enableEstimatePendingCompactionBytes();Enable monitoring of RocksDB block cache performance and usage.
/**
* Enables monitoring of block cache capacity.
* Tracks the maximum size configured for the block cache.
*/
void enableBlockCacheCapacity();
/**
* Enables monitoring of block cache usage.
* Tracks current memory usage of the block cache.
*/
void enableBlockCacheUsage();
/**
* Enables monitoring of block cache pinned usage.
* Tracks memory used by pinned blocks in the cache.
*/
void enableBlockCachePinnedUsage();Configure metric collection behavior and query enabled metrics.
/**
* Configures whether column family name should be used as a variable in metrics.
* When enabled, metrics are tagged with column family names for better granularity.
* @param columnFamilyAsVariable whether to use column family as metric variable
*/
void setColumnFamilyAsVariable(boolean columnFamilyAsVariable);
/**
* Checks whether column family is used as a variable in metrics.
* @return true if column family names are included in metric tags
*/
boolean isColumnFamilyAsVariable();
/**
* Gets the collection of enabled metric properties.
* @return collection of RocksDB property names that are enabled for monitoring
*/
Collection<String> getProperties();
/**
* Checks whether any metrics are enabled.
* @return true if at least one metric is enabled for collection
*/
boolean isEnabled();import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions;
import org.apache.flink.configuration.Configuration;
// Create state backend
EmbeddedRocksDBStateBackend stateBackend = new EmbeddedRocksDBStateBackend(true);
// Create custom options factory with metrics
RocksDBOptionsFactory optionsFactory = new RocksDBOptionsFactory() {
@Override
public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
return currentOptions;
}
@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
return currentOptions;
}
@Override
public RocksDBNativeMetricOptions createNativeMetricsOptions(RocksDBNativeMetricOptions nativeMetricOptions) {
// Enable basic performance metrics
nativeMetricOptions.enableNumImmutableMemTable();
nativeMetricOptions.enableCompactionPending();
nativeMetricOptions.enableBlockCacheUsage();
nativeMetricOptions.enableEstimateNumKeys();
return nativeMetricOptions;
}
};
stateBackend.setRocksDBOptions(optionsFactory);// Enable extensive monitoring for production environments
RocksDBOptionsFactory comprehensiveFactory = new RocksDBOptionsFactory() {
@Override
public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
return currentOptions;
}
@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
return currentOptions;
}
@Override
public RocksDBNativeMetricOptions createNativeMetricsOptions(RocksDBNativeMetricOptions nativeMetricOptions) {
// Memory table metrics
nativeMetricOptions.enableNumImmutableMemTable();
nativeMetricOptions.enableMemTableFlushPending();
nativeMetricOptions.enableCurSizeActiveMemTable();
nativeMetricOptions.enableCurSizeAllMemTables();
// Compaction metrics
nativeMetricOptions.enableCompactionPending();
nativeMetricOptions.enableNumRunningCompactions();
nativeMetricOptions.enableNumRunningFlushes();
nativeMetricOptions.enableEstimatePendingCompactionBytes();
// Storage metrics
nativeMetricOptions.enableEstimateNumKeys();
nativeMetricOptions.enableEstimateLiveDataSize();
nativeMetricOptions.enableTotalSstFilesSize();
// Cache metrics
nativeMetricOptions.enableBlockCacheCapacity();
nativeMetricOptions.enableBlockCacheUsage();
nativeMetricOptions.enableBlockCachePinnedUsage();
// Write throttling metrics
nativeMetricOptions.enableActualDelayedWriteRate();
nativeMetricOptions.enableIsWriteStopped();
// Use column family names in metrics for better granularity
nativeMetricOptions.setColumnFamilyAsVariable(true);
return nativeMetricOptions;
}
};
stateBackend.setRocksDBOptions(comprehensiveFactory);import org.apache.flink.configuration.Configuration;
// Configure metrics through Flink configuration
Configuration config = new Configuration();
config.setString("state.backend.rocksdb.metrics.num-immutable-mem-table", "true");
config.setString("state.backend.rocksdb.metrics.compaction-pending", "true");
config.setString("state.backend.rocksdb.metrics.block-cache-usage", "true");
config.setString("state.backend.rocksdb.metrics.estimate-num-keys", "true");
config.setString("state.backend.rocksdb.metrics.column-family-as-variable", "true");
// Create metrics options from configuration
RocksDBNativeMetricOptions metricsOptions = RocksDBNativeMetricOptions.fromConfig(config);
// Check what metrics are enabled
if (metricsOptions.isEnabled()) {
Collection<String> enabledProperties = metricsOptions.getProperties();
System.out.println("Enabled metrics: " + enabledProperties);
}For production environments, consider enabling these key metrics:
// Essential performance indicators
nativeMetricOptions.enableNumImmutableMemTable(); // Memory pressure
nativeMetricOptions.enableCompactionPending(); // Compaction lag
nativeMetricOptions.enableBlockCacheUsage(); // Cache efficiency
nativeMetricOptions.enableEstimateNumKeys(); // Data growth
nativeMetricOptions.enableActualDelayedWriteRate(); // Write throttling
nativeMetricOptions.enableIsWriteStopped(); // Write blockingFor memory-constrained environments:
// Memory usage tracking
nativeMetricOptions.enableCurSizeAllMemTables(); // Memory table usage
nativeMetricOptions.enableEstimateTableReadersMem(); // Reader memory
nativeMetricOptions.enableBlockCacheUsage(); // Cache memory
nativeMetricOptions.enableBlockCachePinnedUsage(); // Pinned cache memoryFor I/O-intensive workloads:
// I/O and compaction monitoring
nativeMetricOptions.enableNumRunningCompactions(); // Active compactions
nativeMetricOptions.enableNumRunningFlushes(); // Active flushes
nativeMetricOptions.enableEstimatePendingCompactionBytes(); // Compaction backlog
nativeMetricOptions.enableTotalSstFilesSize(); // Disk usageenableEstimateNumKeys(), enableCompactionPending()Enable metrics incrementally and monitor the performance impact on your specific workload.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-statebackend-rocksdb-2-12