Apache Flink changelog state backend implementation that provides durable state management with changelog capabilities for stream processing applications
—
Core state backend configuration and initialization functionality for integrating changelog capabilities with existing Flink state backends. This capability provides the main entry point for configuring changelog-based incremental checkpointing.
Creates a new changelog state backend that wraps an existing Flink state backend to add changelog functionality.
/**
* Creates a changelog state backend wrapping the specified delegated state backend.
* The delegated state backend handles the actual state storage while changelog
* functionality is added transparently.
*
* @param stateBackend The underlying state backend to wrap (cannot be null)
* @throws IllegalArgumentException if stateBackend is null or is already a DelegatingStateBackend
*/
public ChangelogStateBackend(StateBackend stateBackend);Usage Example:
import org.apache.flink.state.changelog.ChangelogStateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
// Wrap a HashMap state backend
StateBackend hashMapBackend = new HashMapStateBackend();
ChangelogStateBackend changelogHashMap = new ChangelogStateBackend(hashMapBackend);
// Wrap a RocksDB state backend
StateBackend rocksDBBackend = new EmbeddedRocksDBStateBackend();
ChangelogStateBackend changelogRocksDB = new ChangelogStateBackend(rocksDBBackend);Returns the underlying state backend that is being wrapped by the changelog state backend.
/**
* Returns the underlying state backend that this changelog state backend delegates to.
*
* @return The wrapped state backend instance
*/
public StateBackend getDelegatedStateBackend();Indicates whether the changelog state backend uses managed memory, based on the underlying delegated state backend.
/**
* Indicates whether this state backend uses Flink's managed memory.
* The result depends on the wrapped state backend's memory usage.
*
* @return true if the delegated state backend uses managed memory, false otherwise
*/
public boolean useManagedMemory();Configures the changelog state backend with the provided configuration and class loader. If the delegated state backend is configurable, it will be configured as well.
/**
* Configures the changelog state backend with the specified configuration.
* If the delegated state backend implements ConfigurableStateBackend,
* it will be configured and wrapped in a new ChangelogStateBackend instance.
*
* @param config Configuration to apply
* @param classLoader Class loader for loading configuration-specific classes
* @return Configured state backend instance
* @throws IllegalConfigurationException if configuration is invalid
*/
public StateBackend configure(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException;Creates a keyed state backend for managing partitioned state with changelog functionality.
/**
* Creates a keyed state backend that wraps the delegated state backend's
* keyed state backend with changelog functionality.
*
* @param env Execution environment
* @param jobID Job identifier
* @param operatorIdentifier Operator identifier
* @param keySerializer Serializer for state keys
* @param numberOfKeyGroups Total number of key groups
* @param keyGroupRange Range of key groups assigned to this backend
* @param kvStateRegistry Registry for queryable state
* @param ttlTimeProvider Time provider for TTL functionality
* @param metricGroup Metric group for measurements
* @param stateHandles State handles for recovery
* @param cancelStreamRegistry Registry for cancellable streams
* @return ChangelogKeyedStateBackend instance with changelog capabilities
* @throws Exception if backend creation fails
*/
public <K> ChangelogKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry,
TtlTimeProvider ttlTimeProvider,
MetricGroup metricGroup,
@Nonnull Collection<KeyedStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry
) throws Exception;Creates a keyed state backend with explicit managed memory fraction specification.
/**
* Creates a keyed state backend with explicit managed memory fraction.
* This version allows fine-grained control over memory allocation.
*
* @param env Execution environment
* @param jobID Job identifier
* @param operatorIdentifier Operator identifier
* @param keySerializer Serializer for state keys
* @param numberOfKeyGroups Total number of key groups
* @param keyGroupRange Range of key groups assigned to this backend
* @param kvStateRegistry Registry for queryable state
* @param ttlTimeProvider Time provider for TTL functionality
* @param metricGroup Metric group for measurements
* @param stateHandles State handles for recovery
* @param cancelStreamRegistry Registry for cancellable streams
* @param managedMemoryFraction Fraction of managed memory to use (0.0 to 1.0)
* @return CheckpointableKeyedStateBackend instance with changelog capabilities
* @throws Exception if backend creation fails
*/
public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry,
TtlTimeProvider ttlTimeProvider,
MetricGroup metricGroup,
@Nonnull Collection<KeyedStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry,
double managedMemoryFraction
) throws Exception;Creates an operator state backend by delegating to the underlying state backend. Operator state is not affected by changelog functionality.
/**
* Creates an operator state backend by delegating to the wrapped state backend.
* Operator state does not use changelog functionality.
*
* @param env Execution environment
* @param operatorIdentifier Operator identifier
* @param stateHandles State handles for recovery
* @param cancelStreamRegistry Registry for cancellable streams
* @return OperatorStateBackend instance from the delegated backend
* @throws Exception if backend creation fails
*/
public OperatorStateBackend createOperatorStateBackend(
Environment env,
String operatorIdentifier,
@Nonnull Collection<OperatorStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry
) throws Exception;The changelog state backend integrates with Flink's configuration system through the standard state backend loading mechanisms:
// Via configuration
Configuration config = new Configuration();
config.setString(StateBackendOptions.STATE_BACKEND, "changelog");
// Additional changelog-specific configuration can be added
// Via programmatic configuration
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StateBackend delegateBackend = new HashMapStateBackend();
ChangelogStateBackend changelogBackend = new ChangelogStateBackend(delegateBackend);
env.setStateBackend(changelogBackend);The changelog state backend validates that:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-statebackend-changelog