CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-statebackend-rocksdb

RocksDB state backend for Apache Flink - provides persistent state storage using RocksDB as the underlying storage engine for stateful stream processing applications

Pending
Overview
Eval results
Files

memory-configuration.mddocs/

Memory Configuration and Management

The RocksDB State Backend provides sophisticated memory management capabilities that integrate with Flink's memory model. This allows efficient memory utilization across different deployment scenarios and workload patterns.

Core Imports

import org.apache.flink.state.rocksdb.RocksDBMemoryConfiguration;
import org.apache.flink.state.rocksdb.RocksDBMemoryFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.MemorySize;

RocksDBMemoryConfiguration Class

Class Definition

public final class RocksDBMemoryConfiguration {
    // Configuration settings for RocksDB memory usage management
}

Memory Management Strategies

Managed Memory Integration

public void setUseManagedMemory(boolean useManagedMemory)

Configures whether to use Flink's managed memory for RocksDB.

Parameters:

  • useManagedMemory - Whether to use managed memory budget from Flink
public boolean isUsingManagedMemory()

Checks if this configuration uses Flink's managed memory.

Returns: true if using managed memory, false otherwise

Example:

RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
memConfig.setUseManagedMemory(true);

// Check current setting
if (memConfig.isUsingManagedMemory()) {
    // Memory will be allocated from Flink's managed memory pool
}

Fixed Memory Per Slot

public void setFixedMemoryPerSlot(MemorySize fixedMemoryPerSlot)

Sets a fixed amount of memory for RocksDB per task slot.

Parameters:

  • fixedMemoryPerSlot - Fixed memory size per slot
public void setFixedMemoryPerSlot(String totalMemoryPerSlotStr)

Sets a fixed amount of memory for RocksDB per task slot using string format.

Parameters:

  • totalMemoryPerSlotStr - Memory size string (e.g., "256mb", "1gb")
public boolean isUsingFixedMemoryPerSlot()

Checks if this configuration uses fixed memory per slot.

Returns: true if using fixed memory per slot, false otherwise

public MemorySize getFixedMemoryPerSlot()

Gets the configured fixed memory per slot.

Returns: Fixed memory size per slot, or null if not configured

Example:

RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();

// Set fixed memory using MemorySize
memConfig.setFixedMemoryPerSlot(MemorySize.ofMebiBytes(512));

// Or set using string format
memConfig.setFixedMemoryPerSlot("512mb");

// Check configuration
if (memConfig.isUsingFixedMemoryPerSlot()) {
    MemorySize fixedSize = memConfig.getFixedMemoryPerSlot();
    System.out.println("Fixed memory per slot: " + fixedSize);
}

Memory Allocation Ratios

Write Buffer Ratio

public void setWriteBufferRatio(double writeBufferRatio)

Sets the fraction of available memory to use for RocksDB write buffers.

Parameters:

  • writeBufferRatio - Fraction of memory for write buffers (0.0 to 1.0)
public double getWriteBufferRatio()

Gets the configured write buffer ratio.

Returns: Write buffer ratio as a fraction

Example:

// Use 40% of available memory for write buffers
memConfig.setWriteBufferRatio(0.4);

// Get current ratio
double ratio = memConfig.getWriteBufferRatio();

High Priority Pool Ratio

public void setHighPriorityPoolRatio(double highPriorityPoolRatio)

Sets the fraction of block cache memory reserved for high priority blocks.

Parameters:

  • highPriorityPoolRatio - Fraction for high priority blocks (0.0 to 1.0)
public double getHighPriorityPoolRatio()

Gets the configured high priority pool ratio.

Returns: High priority pool ratio as a fraction

Example:

// Reserve 20% of block cache for high priority blocks (index/filter blocks)
memConfig.setHighPriorityPoolRatio(0.2);

// Get current ratio
double highPriorityRatio = memConfig.getHighPriorityPoolRatio();

Advanced Memory Features

Partitioned Index Filters

public boolean isUsingPartitionedIndexFilters()

Checks if partitioned index/filters are enabled for memory efficiency.

Returns: true if using partitioned index/filters, false otherwise

Example:

if (memConfig.isUsingPartitionedIndexFilters()) {
    // Index and filter blocks are partitioned for better memory utilization
}

Configuration Validation and Utilities

Configuration Validation

public void validate()

Validates the consistency of the memory configuration.

Throws: IllegalArgumentException if configuration is invalid

Example:

RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
memConfig.setUseManagedMemory(true);
memConfig.setWriteBufferRatio(0.4);
memConfig.setHighPriorityPoolRatio(0.2);
memConfig.validate(); // Throws exception if configuration is inconsistent

Factory Methods

public static RocksDBMemoryConfiguration fromOtherAndConfiguration(
    RocksDBMemoryConfiguration other, 
    ReadableConfig config)

Creates a new memory configuration from an existing one and additional configuration.

Parameters:

  • other - Base memory configuration to copy from
  • config - Additional configuration to apply

Returns: New configured RocksDBMemoryConfiguration instance

public static RocksDBMemoryConfiguration fromConfiguration(Configuration configuration)

Creates a memory configuration from Flink configuration.

Parameters:

  • configuration - Flink configuration containing memory settings

Returns: RocksDBMemoryConfiguration instance based on configuration

Example:

// Create from Flink configuration
Configuration config = new Configuration();
config.set(RocksDBOptions.USE_MANAGED_MEMORY, true);
config.set(RocksDBOptions.WRITE_BUFFER_RATIO, 0.4);

RocksDBMemoryConfiguration memConfig = RocksDBMemoryConfiguration.fromConfiguration(config);

// Create from existing config with overrides
Configuration overrides = new Configuration();
overrides.set(RocksDBOptions.HIGH_PRIORITY_POOL_RATIO, 0.3);

RocksDBMemoryConfiguration newConfig = RocksDBMemoryConfiguration.fromOtherAndConfiguration(
    memConfig, overrides);

Memory Configuration Patterns

Pattern 1: Managed Memory Integration

/**
 * Use Flink's managed memory with automatic memory distribution
 * Best for: Multi-tenant clusters, resource management
 */
public void configureForManagedMemory(EmbeddedRocksDBStateBackend backend) {
    RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
    
    memConfig.setUseManagedMemory(true);           // Use Flink's managed memory
    memConfig.setWriteBufferRatio(0.4);            // 40% for write buffers  
    memConfig.setHighPriorityPoolRatio(0.2);       // 20% high priority in cache
    memConfig.validate();
    
    // Memory will be allocated from Flink's managed memory pool
    // Automatic scaling based on available memory
}

Pattern 2: Fixed Memory Per Slot

/**
 * Fixed memory allocation per task slot
 * Best for: Predictable memory usage, dedicated clusters
 */
public void configureFixedMemory(EmbeddedRocksDBStateBackend backend) {
    RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
    
    memConfig.setFixedMemoryPerSlot("1gb");        // 1GB per slot
    memConfig.setWriteBufferRatio(0.3);            // 300MB for write buffers
    memConfig.setHighPriorityPoolRatio(0.15);      // 15% high priority 
    memConfig.validate();
    
    // Each task slot will use exactly 1GB for RocksDB
}

Pattern 3: High-Throughput Write Workload

/**
 * Optimized for high write throughput
 * Best for: Heavy ingestion, frequent updates
 */
public void configureForHighWrites(EmbeddedRocksDBStateBackend backend) {
    RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
    
    memConfig.setUseManagedMemory(true);
    memConfig.setWriteBufferRatio(0.6);            // 60% for write buffers (higher)
    memConfig.setHighPriorityPoolRatio(0.1);       // 10% high priority (lower)
    memConfig.validate();
    
    // More memory for write buffers to handle high write load
}

Pattern 4: Read-Heavy Workload

/**
 * Optimized for read-heavy workloads with complex state access
 * Best for: Analytics, lookups, windowing operations
 */
public void configureForReads(EmbeddedRocksDBStateBackend backend) {
    RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
    
    memConfig.setUseManagedMemory(true);
    memConfig.setWriteBufferRatio(0.2);            // 20% for write buffers (lower)
    memConfig.setHighPriorityPoolRatio(0.3);       // 30% high priority (higher)  
    memConfig.validate();
    
    // More memory for block cache to improve read performance
}

Memory Factory Integration

Custom Memory Factory

public EmbeddedRocksDBStateBackend setRocksDBMemoryFactory(RocksDBMemoryFactory rocksDBMemoryFactory)

Sets a custom RocksDB memory factory for advanced memory management.

Parameters:

  • rocksDBMemoryFactory - Custom memory factory implementation

Returns: The state backend instance for method chaining

Example:

// Custom memory factory for specialized allocation strategies
public class CustomRocksDBMemoryFactory implements RocksDBMemoryFactory {
    // Implementation for custom memory allocation
}

EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend();
backend.setRocksDBMemoryFactory(new CustomRocksDBMemoryFactory());

Configuration Integration

Flink Configuration Keys

// Set memory configuration through Flink configuration
Configuration config = new Configuration();

// Managed memory integration
config.set(RocksDBOptions.USE_MANAGED_MEMORY, true);

// Fixed memory allocation  
config.set(RocksDBOptions.FIX_PER_SLOT_MEMORY_SIZE, MemorySize.parse("512mb"));
config.set(RocksDBOptions.FIX_PER_TM_MEMORY_SIZE, MemorySize.parse("2gb"));

// Memory ratios
config.set(RocksDBOptions.WRITE_BUFFER_RATIO, 0.4);  
config.set(RocksDBOptions.HIGH_PRIORITY_POOL_RATIO, 0.2);

// Index/filter optimization
config.set(RocksDBOptions.USE_PARTITIONED_INDEX_FILTERS, true);

YAML Configuration

# flink-conf.yaml
state.backend.rocksdb.memory.managed: true
state.backend.rocksdb.memory.fixed-per-slot: 512mb  
state.backend.rocksdb.memory.write-buffer-ratio: 0.4
state.backend.rocksdb.memory.high-priority-pool-ratio: 0.2
state.backend.rocksdb.memory.partitioned-index-filters: true

Memory Monitoring and Tuning

Memory Usage Patterns

  1. Write Buffer Memory: Used for incoming writes before flushing to disk

    • Higher ratio for write-heavy workloads
    • Lower ratio for read-heavy workloads
  2. Block Cache Memory: Used for caching frequently accessed data blocks

    • Remaining memory after write buffers
    • High priority pool for metadata (index/filter blocks)
  3. Index/Filter Memory: Cached separately for fast lookups

    • Configure high priority pool ratio appropriately
    • Use partitioned index/filters for large state

Tuning Guidelines

Memory Pressure Indicators:

  • Frequent compactions (monitor via metrics)
  • High read amplification
  • Slow checkpoint performance

Tuning Recommendations:

  • Start with managed memory integration
  • Use 0.3-0.5 write buffer ratio for balanced workloads
  • Set 0.1-0.3 high priority pool ratio based on read patterns
  • Enable partitioned index/filters for large state (>1GB per operator)

Memory Sizing:

// Example memory calculation for 4GB TaskManager heap
// With 50% managed memory = 2GB available for RocksDB

RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
memConfig.setUseManagedMemory(true)
         .setWriteBufferRatio(0.4)        // 800MB write buffers
         .setHighPriorityPoolRatio(0.2);  // 240MB high priority cache
                                          // 960MB regular block cache
// Total: 2GB RocksDB memory usage

Complete Configuration Example

import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
import org.apache.flink.state.rocksdb.RocksDBMemoryConfiguration;
import org.apache.flink.configuration.MemorySize;

public class RocksDBMemoryConfigurationExample {
    
    public static EmbeddedRocksDBStateBackend createOptimizedBackend() {
        EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);
        
        // Configure storage
        backend.setDbStoragePaths("/ssd1/rocksdb", "/ssd2/rocksdb");
        
        // Configure memory management
        RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
        memConfig.setUseManagedMemory(true);              // Use Flink managed memory
        memConfig.setWriteBufferRatio(0.4);               // 40% for write buffers
        memConfig.setHighPriorityPoolRatio(0.2);          // 20% high priority cache
        memConfig.validate();                            // Validate configuration
        
        return backend;
    }
    
    public static EmbeddedRocksDBStateBackend createFixedMemoryBackend() {
        EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);
        
        // Configure with fixed memory per slot
        RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
        memConfig.setFixedMemoryPerSlot(MemorySize.ofMebiBytes(1024));  // 1GB per slot
        memConfig.setWriteBufferRatio(0.3);                             // 300MB write buffers
        memConfig.setHighPriorityPoolRatio(0.15);                       // 15% high priority
        memConfig.validate();
        
        return backend;
    }
}

Thread Safety and Lifecycle

  • Configuration Phase: Memory configuration should be set before the job starts
  • Runtime: Memory allocation is managed automatically by RocksDB and Flink
  • Validation: Use validate() to check configuration consistency during setup
  • Monitoring: Use RocksDB native metrics to monitor actual memory usage patterns

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-statebackend-rocksdb

docs

core-state-backend.md

index.md

memory-configuration.md

metrics-monitoring.md

options-and-factories.md

tile.json