RocksDB state backend for Apache Flink - provides persistent state storage using RocksDB as the underlying storage engine for stateful stream processing applications
—
The RocksDB State Backend provides sophisticated memory management capabilities that integrate with Flink's memory model. This allows efficient memory utilization across different deployment scenarios and workload patterns.
import org.apache.flink.state.rocksdb.RocksDBMemoryConfiguration;
import org.apache.flink.state.rocksdb.RocksDBMemoryFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.MemorySize;public final class RocksDBMemoryConfiguration {
// Configuration settings for RocksDB memory usage management
}public void setUseManagedMemory(boolean useManagedMemory)Configures whether to use Flink's managed memory for RocksDB.
Parameters:
useManagedMemory - Whether to use managed memory budget from Flinkpublic boolean isUsingManagedMemory()Checks if this configuration uses Flink's managed memory.
Returns: true if using managed memory, false otherwise
Example:
RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
memConfig.setUseManagedMemory(true);
// Check current setting
if (memConfig.isUsingManagedMemory()) {
// Memory will be allocated from Flink's managed memory pool
}public void setFixedMemoryPerSlot(MemorySize fixedMemoryPerSlot)Sets a fixed amount of memory for RocksDB per task slot.
Parameters:
fixedMemoryPerSlot - Fixed memory size per slotpublic void setFixedMemoryPerSlot(String totalMemoryPerSlotStr)Sets a fixed amount of memory for RocksDB per task slot using string format.
Parameters:
totalMemoryPerSlotStr - Memory size string (e.g., "256mb", "1gb")public boolean isUsingFixedMemoryPerSlot()Checks if this configuration uses fixed memory per slot.
Returns: true if using fixed memory per slot, false otherwise
public MemorySize getFixedMemoryPerSlot()Gets the configured fixed memory per slot.
Returns: Fixed memory size per slot, or null if not configured
Example:
RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
// Set fixed memory using MemorySize
memConfig.setFixedMemoryPerSlot(MemorySize.ofMebiBytes(512));
// Or set using string format
memConfig.setFixedMemoryPerSlot("512mb");
// Check configuration
if (memConfig.isUsingFixedMemoryPerSlot()) {
MemorySize fixedSize = memConfig.getFixedMemoryPerSlot();
System.out.println("Fixed memory per slot: " + fixedSize);
}public void setWriteBufferRatio(double writeBufferRatio)Sets the fraction of available memory to use for RocksDB write buffers.
Parameters:
writeBufferRatio - Fraction of memory for write buffers (0.0 to 1.0)public double getWriteBufferRatio()Gets the configured write buffer ratio.
Returns: Write buffer ratio as a fraction
Example:
// Use 40% of available memory for write buffers
memConfig.setWriteBufferRatio(0.4);
// Get current ratio
double ratio = memConfig.getWriteBufferRatio();public void setHighPriorityPoolRatio(double highPriorityPoolRatio)Sets the fraction of block cache memory reserved for high priority blocks.
Parameters:
highPriorityPoolRatio - Fraction for high priority blocks (0.0 to 1.0)public double getHighPriorityPoolRatio()Gets the configured high priority pool ratio.
Returns: High priority pool ratio as a fraction
Example:
// Reserve 20% of block cache for high priority blocks (index/filter blocks)
memConfig.setHighPriorityPoolRatio(0.2);
// Get current ratio
double highPriorityRatio = memConfig.getHighPriorityPoolRatio();public boolean isUsingPartitionedIndexFilters()Checks if partitioned index/filters are enabled for memory efficiency.
Returns: true if using partitioned index/filters, false otherwise
Example:
if (memConfig.isUsingPartitionedIndexFilters()) {
// Index and filter blocks are partitioned for better memory utilization
}public void validate()Validates the consistency of the memory configuration.
Throws: IllegalArgumentException if configuration is invalid
Example:
RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
memConfig.setUseManagedMemory(true);
memConfig.setWriteBufferRatio(0.4);
memConfig.setHighPriorityPoolRatio(0.2);
memConfig.validate(); // Throws exception if configuration is inconsistentpublic static RocksDBMemoryConfiguration fromOtherAndConfiguration(
RocksDBMemoryConfiguration other,
ReadableConfig config)Creates a new memory configuration from an existing one and additional configuration.
Parameters:
other - Base memory configuration to copy fromconfig - Additional configuration to applyReturns: New configured RocksDBMemoryConfiguration instance
public static RocksDBMemoryConfiguration fromConfiguration(Configuration configuration)Creates a memory configuration from Flink configuration.
Parameters:
configuration - Flink configuration containing memory settingsReturns: RocksDBMemoryConfiguration instance based on configuration
Example:
// Create from Flink configuration
Configuration config = new Configuration();
config.set(RocksDBOptions.USE_MANAGED_MEMORY, true);
config.set(RocksDBOptions.WRITE_BUFFER_RATIO, 0.4);
RocksDBMemoryConfiguration memConfig = RocksDBMemoryConfiguration.fromConfiguration(config);
// Create from existing config with overrides
Configuration overrides = new Configuration();
overrides.set(RocksDBOptions.HIGH_PRIORITY_POOL_RATIO, 0.3);
RocksDBMemoryConfiguration newConfig = RocksDBMemoryConfiguration.fromOtherAndConfiguration(
memConfig, overrides);/**
* Use Flink's managed memory with automatic memory distribution
* Best for: Multi-tenant clusters, resource management
*/
public void configureForManagedMemory(EmbeddedRocksDBStateBackend backend) {
RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
memConfig.setUseManagedMemory(true); // Use Flink's managed memory
memConfig.setWriteBufferRatio(0.4); // 40% for write buffers
memConfig.setHighPriorityPoolRatio(0.2); // 20% high priority in cache
memConfig.validate();
// Memory will be allocated from Flink's managed memory pool
// Automatic scaling based on available memory
}/**
* Fixed memory allocation per task slot
* Best for: Predictable memory usage, dedicated clusters
*/
public void configureFixedMemory(EmbeddedRocksDBStateBackend backend) {
RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
memConfig.setFixedMemoryPerSlot("1gb"); // 1GB per slot
memConfig.setWriteBufferRatio(0.3); // 300MB for write buffers
memConfig.setHighPriorityPoolRatio(0.15); // 15% high priority
memConfig.validate();
// Each task slot will use exactly 1GB for RocksDB
}/**
* Optimized for high write throughput
* Best for: Heavy ingestion, frequent updates
*/
public void configureForHighWrites(EmbeddedRocksDBStateBackend backend) {
RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
memConfig.setUseManagedMemory(true);
memConfig.setWriteBufferRatio(0.6); // 60% for write buffers (higher)
memConfig.setHighPriorityPoolRatio(0.1); // 10% high priority (lower)
memConfig.validate();
// More memory for write buffers to handle high write load
}/**
* Optimized for read-heavy workloads with complex state access
* Best for: Analytics, lookups, windowing operations
*/
public void configureForReads(EmbeddedRocksDBStateBackend backend) {
RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
memConfig.setUseManagedMemory(true);
memConfig.setWriteBufferRatio(0.2); // 20% for write buffers (lower)
memConfig.setHighPriorityPoolRatio(0.3); // 30% high priority (higher)
memConfig.validate();
// More memory for block cache to improve read performance
}public EmbeddedRocksDBStateBackend setRocksDBMemoryFactory(RocksDBMemoryFactory rocksDBMemoryFactory)Sets a custom RocksDB memory factory for advanced memory management.
Parameters:
rocksDBMemoryFactory - Custom memory factory implementationReturns: The state backend instance for method chaining
Example:
// Custom memory factory for specialized allocation strategies
public class CustomRocksDBMemoryFactory implements RocksDBMemoryFactory {
// Implementation for custom memory allocation
}
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend();
backend.setRocksDBMemoryFactory(new CustomRocksDBMemoryFactory());// Set memory configuration through Flink configuration
Configuration config = new Configuration();
// Managed memory integration
config.set(RocksDBOptions.USE_MANAGED_MEMORY, true);
// Fixed memory allocation
config.set(RocksDBOptions.FIX_PER_SLOT_MEMORY_SIZE, MemorySize.parse("512mb"));
config.set(RocksDBOptions.FIX_PER_TM_MEMORY_SIZE, MemorySize.parse("2gb"));
// Memory ratios
config.set(RocksDBOptions.WRITE_BUFFER_RATIO, 0.4);
config.set(RocksDBOptions.HIGH_PRIORITY_POOL_RATIO, 0.2);
// Index/filter optimization
config.set(RocksDBOptions.USE_PARTITIONED_INDEX_FILTERS, true);# flink-conf.yaml
state.backend.rocksdb.memory.managed: true
state.backend.rocksdb.memory.fixed-per-slot: 512mb
state.backend.rocksdb.memory.write-buffer-ratio: 0.4
state.backend.rocksdb.memory.high-priority-pool-ratio: 0.2
state.backend.rocksdb.memory.partitioned-index-filters: trueWrite Buffer Memory: Used for incoming writes before flushing to disk
Block Cache Memory: Used for caching frequently accessed data blocks
Index/Filter Memory: Cached separately for fast lookups
Memory Pressure Indicators:
Tuning Recommendations:
Memory Sizing:
// Example memory calculation for 4GB TaskManager heap
// With 50% managed memory = 2GB available for RocksDB
RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
memConfig.setUseManagedMemory(true)
.setWriteBufferRatio(0.4) // 800MB write buffers
.setHighPriorityPoolRatio(0.2); // 240MB high priority cache
// 960MB regular block cache
// Total: 2GB RocksDB memory usageimport org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
import org.apache.flink.state.rocksdb.RocksDBMemoryConfiguration;
import org.apache.flink.configuration.MemorySize;
public class RocksDBMemoryConfigurationExample {
public static EmbeddedRocksDBStateBackend createOptimizedBackend() {
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);
// Configure storage
backend.setDbStoragePaths("/ssd1/rocksdb", "/ssd2/rocksdb");
// Configure memory management
RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
memConfig.setUseManagedMemory(true); // Use Flink managed memory
memConfig.setWriteBufferRatio(0.4); // 40% for write buffers
memConfig.setHighPriorityPoolRatio(0.2); // 20% high priority cache
memConfig.validate(); // Validate configuration
return backend;
}
public static EmbeddedRocksDBStateBackend createFixedMemoryBackend() {
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);
// Configure with fixed memory per slot
RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
memConfig.setFixedMemoryPerSlot(MemorySize.ofMebiBytes(1024)); // 1GB per slot
memConfig.setWriteBufferRatio(0.3); // 300MB write buffers
memConfig.setHighPriorityPoolRatio(0.15); // 15% high priority
memConfig.validate();
return backend;
}
}validate() to check configuration consistency during setupInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-statebackend-rocksdb