Apache Flink changelog state backend implementation that provides durable state management with changelog capabilities for stream processing applications
npx @tessl/cli install tessl/maven-org-apache-flink--flink-statebackend-changelog@1.13.0Apache Flink changelog state backend implementation that provides durable state management with changelog capabilities for stream processing applications. This state backend enables incremental checkpointing by logging state changes to a changelog, allowing for efficient recovery and state consistency in distributed streaming applications.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-changelog_2.11</artifactId>
<version>1.13.6</version>
</dependency>import org.apache.flink.state.changelog.ChangelogStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.delegate.DelegatingStateBackend;
import org.apache.flink.runtime.state.ConfigurableStateBackend;import org.apache.flink.state.changelog.ChangelogStateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
// Create a changelog state backend wrapping a HashMap state backend
StateBackend delegateBackend = new HashMapStateBackend();
ChangelogStateBackend changelogBackend = new ChangelogStateBackend(delegateBackend);
// Configure Flink to use the changelog state backend
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(changelogBackend);
// State operations will now be logged to changelog for incremental checkpointing
// Use standard Flink state operations - they will be transparently loggedThe changelog state backend is built around a delegation pattern with several key components:
This design allows existing Flink applications to benefit from changelog-based incremental checkpointing without code changes, simply by configuring the changelog state backend.
Core state backend configuration and initialization functionality for integrating changelog capabilities with existing Flink state backends.
public class ChangelogStateBackend implements DelegatingStateBackend, ConfigurableStateBackend {
public ChangelogStateBackend(StateBackend stateBackend);
public StateBackend getDelegatedStateBackend();
public StateBackend configure(ReadableConfig config, ClassLoader classLoader);
}Keyed state backend implementation that provides transparent changelog logging for all state operations while maintaining full compatibility with Flink's state management system.
public <K> ChangelogKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry,
TtlTimeProvider ttlTimeProvider,
MetricGroup metricGroup,
@Nonnull Collection<KeyedStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry
) throws Exception;Comprehensive support for all Flink state types including Value, List, Map, Reducing, and Aggregating states, with transparent changelog logging for all state mutations.
// State creation and access patterns
public <N, S extends State> S getPartitionedState(
N namespace,
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, ?> stateDescriptor
) throws Exception;
public <N, S extends State, T> S getOrCreateKeyedState(
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, T> stateDescriptor
) throws Exception;public class ChangelogStateBackend implements DelegatingStateBackend, ConfigurableStateBackend {
// Constructor and configuration
public ChangelogStateBackend(StateBackend stateBackend);
public StateBackend getDelegatedStateBackend();
public boolean useManagedMemory();
public StateBackend configure(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException;
// State backend creation methods
public <K> ChangelogKeyedStateBackend<K> createKeyedStateBackend(
Environment env, JobID jobID, String operatorIdentifier,
TypeSerializer<K> keySerializer, int numberOfKeyGroups,
KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry,
TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup,
Collection<KeyedStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry
) throws Exception;
public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(
Environment env, JobID jobID, String operatorIdentifier,
TypeSerializer<K> keySerializer, int numberOfKeyGroups,
KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry,
TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup,
Collection<KeyedStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry, double managedMemoryFraction
) throws Exception;
public OperatorStateBackend createOperatorStateBackend(
Environment env, String operatorIdentifier,
Collection<OperatorStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry
) throws Exception;
}
interface DelegatingStateBackend extends StateBackend {
StateBackend getDelegatedStateBackend();
}
interface ConfigurableStateBackend extends StateBackend {
StateBackend configure(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException;
}class ChangelogKeyedStateBackend<K> implements CheckpointableKeyedStateBackend<K>, CheckpointListener, TestableKeyedStateBackend {
// Core state backend operations
public KeyGroupRange getKeyGroupRange();
public void setCurrentKey(K newKey);
public K getCurrentKey();
public TypeSerializer<K> getKeySerializer();
// State access and management
public <N, S extends State> S getPartitionedState(
N namespace, TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, ?> stateDescriptor
) throws Exception;
public <N, S extends State, T> S getOrCreateKeyedState(
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, T> stateDescriptor
) throws Exception;
// Lifecycle management
public void close() throws IOException;
public void dispose();
// Checkpointing
public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
long checkpointId, long timestamp,
CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions
) throws Exception;
// Priority queue support
public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
KeyGroupedInternalPriorityQueue<T> create(
String stateName,
TypeSerializer<T> byteOrderedElementSerializer
);
}