RocksDB state backend for Apache Flink streaming applications providing persistent, scalable state storage with fault tolerance, comprehensive configuration options, and native metrics monitoring.
—
Memory management configuration for RocksDB integration with Flink's managed memory system, including write buffer ratios and memory allocation strategies.
Configuration class for managing RocksDB memory usage within Flink's memory management system.
/**
* Configuration for RocksDB memory management.
* Controls how RocksDB integrates with Flink's managed memory system.
*/
class RocksDBMemoryConfiguration {
/**
* Creates a new memory configuration with default settings.
*/
RocksDBMemoryConfiguration();
}Configure integration with Flink's managed memory system.
/**
* Enables or disables the use of Flink's managed memory for RocksDB.
* When enabled, RocksDB memory usage is bounded by Flink's memory management.
* @param useManagedMemory whether to use Flink's managed memory
*/
void setUseManagedMemory(boolean useManagedMemory);
/**
* Checks if RocksDB is using Flink's managed memory.
* @return true if managed memory is enabled
*/
boolean isUsingManagedMemory();Usage Examples:
EmbeddedRocksDBStateBackend stateBackend = new EmbeddedRocksDBStateBackend();
RocksDBMemoryConfiguration memConfig = stateBackend.getMemoryConfiguration();
// Enable managed memory (recommended for production)
memConfig.setUseManagedMemory(true);
// Check if managed memory is enabled
if (memConfig.isUsingManagedMemory()) {
System.out.println("Using Flink managed memory");
}Configure fixed memory allocation per task slot.
/**
* Sets a fixed amount of memory per task slot for RocksDB.
* This overrides managed memory settings and allocates a specific amount.
* @param fixedMemoryPerSlot fixed memory size per slot
*/
void setFixedMemoryPerSlot(MemorySize fixedMemoryPerSlot);
/**
* Sets a fixed amount of memory per task slot from string representation.
* @param totalMemoryPerSlotStr memory size string (e.g., "128mb", "1gb")
*/
void setFixedMemoryPerSlot(String totalMemoryPerSlotStr);
/**
* Checks if fixed memory per slot is configured.
* @return true if fixed memory per slot is set
*/
boolean isUsingFixedMemoryPerSlot();
/**
* Gets the configured fixed memory per slot.
* @return fixed memory size per slot, or null if not configured
*/
MemorySize getFixedMemoryPerSlot();Usage Examples:
// Set fixed memory per slot (512MB)
memConfig.setFixedMemoryPerSlot(MemorySize.ofMebiBytes(512));
// Set fixed memory per slot from string
memConfig.setFixedMemoryPerSlot("512mb");
// Check configuration
if (memConfig.isUsingFixedMemoryPerSlot()) {
MemorySize fixedSize = memConfig.getFixedMemoryPerSlot();
System.out.println("Fixed memory per slot: " + fixedSize);
}Configure memory allocation ratios for different RocksDB components.
/**
* Sets the ratio of total memory allocated to write buffers (memtables).
* Value must be between 0.0 and 1.0.
* @param writeBufferRatio fraction of memory for write buffers (0 < ratio < 1)
*/
void setWriteBufferRatio(double writeBufferRatio);
/**
* Gets the configured write buffer memory ratio.
* @return write buffer ratio
*/
double getWriteBufferRatio();
/**
* Sets the ratio of block cache memory allocated to high-priority pool.
* High-priority pool is used for index/filter blocks and other metadata.
* Value must be between 0.0 and 1.0.
* @param highPriorityPoolRatio fraction of cache for high-priority pool (0 < ratio < 1)
*/
void setHighPriorityPoolRatio(double highPriorityPoolRatio);
/**
* Gets the configured high-priority pool ratio.
* @return high-priority pool ratio
*/
double getHighPriorityPoolRatio();Usage Examples:
// Allocate 40% of memory to write buffers, 60% to block cache
memConfig.setWriteBufferRatio(0.4);
// Allocate 20% of block cache to high-priority pool (index/filter blocks)
memConfig.setHighPriorityPoolRatio(0.2);
// Check current ratios
double writeRatio = memConfig.getWriteBufferRatio();
double highPrioRatio = memConfig.getHighPriorityPoolRatio();
System.out.println("Write buffer ratio: " + writeRatio);
System.out.println("High priority pool ratio: " + highPrioRatio);Configure partitioned index and filter blocks for better memory management.
/**
* Checks if partitioned index filters are enabled.
* Partitioned filters can help with memory management for large datasets.
* @return true if partitioned index filters are enabled, null if not configured
*/
Boolean isUsingPartitionedIndexFilters();Usage Example:
// Check if partitioned index filters are enabled
Boolean partitioned = memConfig.isUsingPartitionedIndexFilters();
if (partitioned != null && partitioned) {
System.out.println("Using partitioned index filters");
}Validate configuration consistency and create configurations from existing ones.
/**
* Validates the memory configuration for consistency.
* Throws exception if configuration is invalid (e.g., ratios out of range).
* @throws IllegalArgumentException if configuration is invalid
*/
void validate();
/**
* Creates a memory configuration from an existing one and additional config.
* @param other existing memory configuration to copy from
* @param config additional configuration to apply
* @return new memory configuration instance
*/
static RocksDBMemoryConfiguration fromOtherAndConfiguration(
RocksDBMemoryConfiguration other,
ReadableConfig config
);Usage Examples:
// Validate configuration before use
try {
memConfig.validate();
System.out.println("Memory configuration is valid");
} catch (IllegalArgumentException e) {
System.err.println("Invalid memory configuration: " + e.getMessage());
}
// Create configuration from existing one
RocksDBMemoryConfiguration baseConfig = stateBackend.getMemoryConfiguration();
ReadableConfig flinkConfig = getFlinkConfiguration();
RocksDBMemoryConfiguration newConfig = RocksDBMemoryConfiguration
.fromOtherAndConfiguration(baseConfig, flinkConfig);import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBMemoryConfiguration;
import org.apache.flink.configuration.MemorySize;
// Create state backend
EmbeddedRocksDBStateBackend stateBackend = new EmbeddedRocksDBStateBackend(true);
// Configure memory settings
RocksDBMemoryConfiguration memConfig = stateBackend.getMemoryConfiguration();
// Option 1: Use managed memory with custom ratios
memConfig.setUseManagedMemory(true);
memConfig.setWriteBufferRatio(0.4); // 40% for write buffers
memConfig.setHighPriorityPoolRatio(0.1); // 10% of cache for high-priority
// Option 2: Use fixed memory per slot
// memConfig.setFixedMemoryPerSlot("1gb");
// Validate configuration
memConfig.validate();
// Set state backend on environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(stateBackend);Use Managed Memory when:
Use Fixed Memory when:
Write Buffer Ratio:
High Priority Pool Ratio:
class MemorySize {
static MemorySize ofMebiBytes(long mebiBytes);
static MemorySize ofBytes(long bytes);
static MemorySize parse(String text);
long getBytes();
long getMebiBytes();
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-statebackend-rocksdb-2-12