CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-statebackend-rocksdb

RocksDB state backend for Apache Flink - provides persistent state storage using RocksDB as the underlying storage engine for stateful stream processing applications

Pending
Overview
Eval results
Files

core-state-backend.mddocs/

Core State Backend Configuration

The EmbeddedRocksDBStateBackend is the main entry point for using RocksDB as a state backend in Apache Flink. It provides persistent state storage with support for very large state sizes and efficient checkpointing.

Core Imports

import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackendFactory;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.util.TernaryBoolean;

EmbeddedRocksDBStateBackend Class

Class Definition

@PublicEvolving
public class EmbeddedRocksDBStateBackend implements StateBackend, Serializable {
    // Main implementation class for RocksDB state backend
}

Constructors

public EmbeddedRocksDBStateBackend()

Creates a new EmbeddedRocksDBStateBackend with default settings.

public EmbeddedRocksDBStateBackend(boolean enableIncrementalCheckpointing)

Creates a new EmbeddedRocksDBStateBackend with specified incremental checkpointing setting.

Parameters:

  • enableIncrementalCheckpointing - Whether to enable incremental checkpointing
public EmbeddedRocksDBStateBackend(TernaryBoolean enableIncrementalCheckpointing)

Creates a new EmbeddedRocksDBStateBackend with ternary boolean for incremental checkpointing.

Parameters:

  • enableIncrementalCheckpointing - Ternary boolean for incremental checkpointing (TRUE, FALSE, UNDEFINED)

Basic Configuration

Storage Path Configuration

public void setDbStoragePath(String dbStoragePath)

Sets the path where RocksDB stores its data files locally on the TaskManager.

Parameters:

  • dbStoragePath - The path to the local RocksDB data directory

Example:

EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend();
backend.setDbStoragePath("/tmp/flink-rocksdb");
public void setDbStoragePaths(String... dbStoragePaths)

Sets multiple paths where RocksDB can store its data files, allowing distribution across multiple devices.

Parameters:

  • dbStoragePaths - Multiple paths to local RocksDB data directories

Example:

backend.setDbStoragePaths("/ssd1/rocksdb", "/ssd2/rocksdb", "/ssd3/rocksdb");
public String[] getDbStoragePaths()

Gets the configured storage paths for RocksDB data files.

Returns: Array of configured storage paths

Checkpointing Configuration

Incremental Checkpointing

public TernaryBoolean isIncrementalCheckpointsEnabled()

Returns whether incremental checkpointing is enabled for this state backend.

Returns: TernaryBoolean indicating incremental checkpointing status

public boolean supportsNoClaimRestoreMode()

Returns whether this state backend supports the NO_CLAIM restore mode.

Returns: true - RocksDB backend supports all restore modes

public boolean supportsSavepointFormat(SavepointFormatType formatType)

Returns whether this state backend supports the specified savepoint format.

Parameters:

  • formatType - The savepoint format type to check

Returns: true for all supported savepoint formats

Transfer Configuration

public int getNumberOfTransferThreads()

Gets the number of threads used for transferring files during checkpointing.

Returns: Number of transfer threads

public void setNumberOfTransferThreads(int numberOfTransferThreads)

Sets the number of threads used for transferring files during checkpointing.

Parameters:

  • numberOfTransferThreads - Number of threads to use for file transfers
public long getWriteBatchSize()

Gets the write batch size for RocksDB operations.

Returns: Write batch size in bytes

public void setWriteBatchSize(long writeBatchSize)

Sets the write batch size for RocksDB operations.

Parameters:

  • writeBatchSize - Write batch size in bytes

Priority Queue Configuration

Priority Queue Types

public enum PriorityQueueStateType {
    HEAP,    // Use heap-based priority queue
    ROCKSDB  // Use RocksDB-based priority queue
}
public PriorityQueueStateType getPriorityQueueStateType()

Gets the priority queue implementation type.

Returns: Current priority queue state type

public void setPriorityQueueStateType(PriorityQueueStateType priorityQueueStateType)

Sets the priority queue implementation type.

Parameters:

  • priorityQueueStateType - The priority queue implementation to use

Example:

// Use RocksDB for priority queue state (timers, windows)
backend.setPriorityQueueStateType(PriorityQueueStateType.ROCKSDB);

// Use heap for priority queue state (default, faster for small state)
backend.setPriorityQueueStateType(PriorityQueueStateType.HEAP);

Runtime Configuration

Configuration Method

public EmbeddedRocksDBStateBackend configure(ReadableConfig config, ClassLoader classLoader)

Creates a copy of this state backend configured with the provided configuration and class loader.

Parameters:

  • config - The configuration to apply
  • classLoader - The class loader to use

Returns: A new configured instance of this state backend

Memory Configuration Access

public RocksDBMemoryConfiguration getMemoryConfiguration()

Gets the memory configuration for this RocksDB state backend.

Returns: The RocksDB memory configuration object

Example:

EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend();
RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
memConfig.setUseManagedMemory(true);
memConfig.setWriteBufferRatio(0.4);

Memory Factory Configuration

public void setRocksDBMemoryFactory(RocksDBMemoryFactory rocksDBMemoryFactory)

Sets the RocksDB memory factory for custom memory management strategies.

Parameters:

  • rocksDBMemoryFactory - Custom memory factory implementation

Complete Usage Example

import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
import org.apache.flink.state.rocksdb.PredefinedOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class RocksDBStateBackendExample {
    public static void main(String[] args) {
        // Create the state backend
        EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true); // Enable incremental checkpointing
        
        // Configure storage
        backend.setDbStoragePath("/tmp/flink-rocksdb");
        
        // Configure performance settings
        backend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
        backend.setNumberOfTransferThreads(4);
        backend.setWriteBatchSize(2 * 1024 * 1024); // 2MB
        
        // Configure priority queue type
        backend.setPriorityQueueStateType(
            EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB
        );
        
        // Configure memory
        RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
        memConfig.setUseManagedMemory(true);
        memConfig.setWriteBufferRatio(0.4);
        memConfig.setHighPriorityPoolRatio(0.2);
        
        // Apply to execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStateBackend(backend);
        
        // Now use the environment for your Flink job
        // env.addSource(...).keyBy(...).process(...);
    }
}

EmbeddedRocksDBStateBackendFactory

Factory Class

@PublicEvolving
public class EmbeddedRocksDBStateBackendFactory implements StateBackendFactory<EmbeddedRocksDBStateBackend> {
    // Factory for creating state backend from configuration
}

Factory Method

public EmbeddedRocksDBStateBackend createFromConfig(ReadableConfig config, ClassLoader classLoader)

Creates an EmbeddedRocksDBStateBackend from the given configuration.

Parameters:

  • config - The configuration containing state backend settings
  • classLoader - The class loader for loading classes

Returns: A configured EmbeddedRocksDBStateBackend instance

Example:

Configuration config = new Configuration();
config.set(RocksDBOptions.LOCAL_DIRECTORIES, "/tmp/rocksdb");
config.set(RocksDBOptions.PREDEFINED_OPTIONS, PredefinedOptions.FLASH_SSD_OPTIMIZED);

EmbeddedRocksDBStateBackendFactory factory = new EmbeddedRocksDBStateBackendFactory();
EmbeddedRocksDBStateBackend backend = factory.createFromConfig(config, getClass().getClassLoader());

Configuration via Flink Configuration

You can also configure the RocksDB state backend through Flink's configuration system:

# flink-conf.yaml
state.backend: rocksdb
state.backend.rocksdb.localdir: /tmp/flink-rocksdb
state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED
state.backend.rocksdb.checkpoint.transfer.thread.num: 4
state.backend.rocksdb.memory.managed: true
state.backend.rocksdb.memory.write-buffer-ratio: 0.4

Thread Safety and Lifecycle

  • Thread Safety: The EmbeddedRocksDBStateBackend configuration methods are not thread-safe and should be called during setup before the job starts
  • Serialization: The state backend is serializable and will be distributed to TaskManagers
  • Lifecycle: Configuration is immutable once the job starts; use configure() method to create configured copies
  • Resource Management: RocksDB resources are managed automatically by Flink's lifecycle management

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-statebackend-rocksdb

docs

core-state-backend.md

index.md

memory-configuration.md

metrics-monitoring.md

options-and-factories.md

tile.json