CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-statebackend-changelog

Apache Flink changelog state backend implementation that provides durable state management with changelog capabilities for stream processing applications

Pending
Overview
Eval results
Files

state-backend-configuration.mddocs/

State Backend Configuration

Core state backend configuration and initialization functionality for integrating changelog capabilities with existing Flink state backends. This capability provides the main entry point for configuring changelog-based incremental checkpointing.

Capabilities

ChangelogStateBackend Constructor

Creates a new changelog state backend that wraps an existing Flink state backend to add changelog functionality.

/**
 * Creates a changelog state backend wrapping the specified delegated state backend.
 * The delegated state backend handles the actual state storage while changelog
 * functionality is added transparently.
 * 
 * @param stateBackend The underlying state backend to wrap (cannot be null)
 * @throws IllegalArgumentException if stateBackend is null or is already a DelegatingStateBackend
 */
public ChangelogStateBackend(StateBackend stateBackend);

Usage Example:

import org.apache.flink.state.changelog.ChangelogStateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;

// Wrap a HashMap state backend
StateBackend hashMapBackend = new HashMapStateBackend();
ChangelogStateBackend changelogHashMap = new ChangelogStateBackend(hashMapBackend);

// Wrap a RocksDB state backend
StateBackend rocksDBBackend = new EmbeddedRocksDBStateBackend();
ChangelogStateBackend changelogRocksDB = new ChangelogStateBackend(rocksDBBackend);

Get Delegated State Backend

Returns the underlying state backend that is being wrapped by the changelog state backend.

/**
 * Returns the underlying state backend that this changelog state backend delegates to.
 * 
 * @return The wrapped state backend instance
 */
public StateBackend getDelegatedStateBackend();

Memory Usage Configuration

Indicates whether the changelog state backend uses managed memory, based on the underlying delegated state backend.

/**
 * Indicates whether this state backend uses Flink's managed memory.
 * The result depends on the wrapped state backend's memory usage.
 * 
 * @return true if the delegated state backend uses managed memory, false otherwise
 */
public boolean useManagedMemory();

Configuration

Configures the changelog state backend with the provided configuration and class loader. If the delegated state backend is configurable, it will be configured as well.

/**
 * Configures the changelog state backend with the specified configuration.
 * If the delegated state backend implements ConfigurableStateBackend,
 * it will be configured and wrapped in a new ChangelogStateBackend instance.
 * 
 * @param config Configuration to apply
 * @param classLoader Class loader for loading configuration-specific classes
 * @return Configured state backend instance
 * @throws IllegalConfigurationException if configuration is invalid
 */
public StateBackend configure(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException;

Keyed State Backend Creation

Creates a keyed state backend for managing partitioned state with changelog functionality.

/**
 * Creates a keyed state backend that wraps the delegated state backend's
 * keyed state backend with changelog functionality.
 * 
 * @param env Execution environment
 * @param jobID Job identifier  
 * @param operatorIdentifier Operator identifier
 * @param keySerializer Serializer for state keys
 * @param numberOfKeyGroups Total number of key groups
 * @param keyGroupRange Range of key groups assigned to this backend
 * @param kvStateRegistry Registry for queryable state
 * @param ttlTimeProvider Time provider for TTL functionality
 * @param metricGroup Metric group for measurements
 * @param stateHandles State handles for recovery
 * @param cancelStreamRegistry Registry for cancellable streams
 * @return ChangelogKeyedStateBackend instance with changelog capabilities
 * @throws Exception if backend creation fails
 */
public <K> ChangelogKeyedStateBackend<K> createKeyedStateBackend(
    Environment env,
    JobID jobID,
    String operatorIdentifier,
    TypeSerializer<K> keySerializer,
    int numberOfKeyGroups,
    KeyGroupRange keyGroupRange,
    TaskKvStateRegistry kvStateRegistry,
    TtlTimeProvider ttlTimeProvider,
    MetricGroup metricGroup,
    @Nonnull Collection<KeyedStateHandle> stateHandles,
    CloseableRegistry cancelStreamRegistry
) throws Exception;

Keyed State Backend Creation with Memory Fraction

Creates a keyed state backend with explicit managed memory fraction specification.

/**
 * Creates a keyed state backend with explicit managed memory fraction.
 * This version allows fine-grained control over memory allocation.
 * 
 * @param env Execution environment
 * @param jobID Job identifier
 * @param operatorIdentifier Operator identifier
 * @param keySerializer Serializer for state keys
 * @param numberOfKeyGroups Total number of key groups
 * @param keyGroupRange Range of key groups assigned to this backend
 * @param kvStateRegistry Registry for queryable state
 * @param ttlTimeProvider Time provider for TTL functionality
 * @param metricGroup Metric group for measurements
 * @param stateHandles State handles for recovery
 * @param cancelStreamRegistry Registry for cancellable streams
 * @param managedMemoryFraction Fraction of managed memory to use (0.0 to 1.0)
 * @return CheckpointableKeyedStateBackend instance with changelog capabilities
 * @throws Exception if backend creation fails
 */
public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(
    Environment env,
    JobID jobID,
    String operatorIdentifier,
    TypeSerializer<K> keySerializer,
    int numberOfKeyGroups,
    KeyGroupRange keyGroupRange,
    TaskKvStateRegistry kvStateRegistry,
    TtlTimeProvider ttlTimeProvider,
    MetricGroup metricGroup,
    @Nonnull Collection<KeyedStateHandle> stateHandles,
    CloseableRegistry cancelStreamRegistry,
    double managedMemoryFraction
) throws Exception;

Operator State Backend Creation

Creates an operator state backend by delegating to the underlying state backend. Operator state is not affected by changelog functionality.

/**
 * Creates an operator state backend by delegating to the wrapped state backend.
 * Operator state does not use changelog functionality.
 * 
 * @param env Execution environment
 * @param operatorIdentifier Operator identifier
 * @param stateHandles State handles for recovery
 * @param cancelStreamRegistry Registry for cancellable streams
 * @return OperatorStateBackend instance from the delegated backend
 * @throws Exception if backend creation fails
 */
public OperatorStateBackend createOperatorStateBackend(
    Environment env,
    String operatorIdentifier,
    @Nonnull Collection<OperatorStateHandle> stateHandles,
    CloseableRegistry cancelStreamRegistry
) throws Exception;

Configuration Integration

The changelog state backend integrates with Flink's configuration system through the standard state backend loading mechanisms:

// Via configuration
Configuration config = new Configuration();
config.setString(StateBackendOptions.STATE_BACKEND, "changelog");
// Additional changelog-specific configuration can be added

// Via programmatic configuration
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StateBackend delegateBackend = new HashMapStateBackend();
ChangelogStateBackend changelogBackend = new ChangelogStateBackend(delegateBackend);
env.setStateBackend(changelogBackend);

Error Handling

The changelog state backend validates that:

  • The delegated state backend is not null
  • Recursive delegation is not allowed (cannot wrap another DelegatingStateBackend)
  • All delegated operations are properly forwarded and their exceptions propagated

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-statebackend-changelog

docs

index.md

keyed-state-management.md

state-backend-configuration.md

state-types-operations.md

tile.json