Apache Flink changelog state backend implementation that provides durable state management with changelog capabilities for stream processing applications
—
Comprehensive support for all Flink state types including Value, List, Map, Reducing, and Aggregating states, with transparent changelog logging for all state mutations. This capability provides changelog-enabled wrappers for every type of state supported by Apache Flink.
Value state stores a single value per key and supports read, write, and clear operations with changelog logging.
/**
* Changelog-enabled value state that logs all state changes.
* Wraps Flink's InternalValueState with transparent logging.
*/
class ChangelogValueState<K, N, V> extends AbstractChangelogState<K, N, V, InternalValueState<K, N, V>>
implements InternalValueState<K, N, V> {
/**
* Gets the current value for the current key and namespace.
*
* @return The current value, or null if no value is set
* @throws IOException if value retrieval fails
*/
public V value() throws IOException;
/**
* Updates the value for the current key and namespace.
* The change is automatically logged to the changelog.
*
* @param value The new value to set
* @throws IOException if value update fails
*/
public void update(V value) throws IOException;
/**
* Clears the value for the current key and namespace.
* The clear operation is logged to the changelog.
*/
public void clear();
/**
* Static factory method for creating changelog value state instances.
* Used internally by the state backend factory system.
*
* @param valueState The underlying value state to wrap
* @return Changelog-enabled value state instance
*/
static <K, N, SV, S extends State, IS extends S> IS create(InternalKvState<K, N, SV> valueState);
}Usage Example:
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
// In a Flink operator
ValueStateDescriptor<String> descriptor =
new ValueStateDescriptor<>("my-value", String.class);
ValueState<String> valueState = getRuntimeContext().getState(descriptor);
// All operations are automatically logged to changelog
String currentValue = valueState.value();
valueState.update("new value");
valueState.clear();List state maintains a list of values per key, supporting add, update, and iteration operations with changelog logging.
/**
* Changelog-enabled list state that logs all list modifications.
* Wraps Flink's InternalListState with transparent logging.
*/
class ChangelogListState<K, N, V> extends AbstractChangelogState<K, N, List<V>, InternalListState<K, N, V>>
implements InternalListState<K, N, V> {
/**
* Gets the current list of values for the current key and namespace.
*
* @return Iterable over the current list values
* @throws Exception if list retrieval fails
*/
public Iterable<V> get() throws Exception;
/**
* Adds a value to the list for the current key and namespace.
* The addition is logged to the changelog.
*
* @param value The value to add to the list
* @throws Exception if value addition fails
*/
public void add(V value) throws Exception;
/**
* Replaces the entire list with the provided values.
* The update is logged to the changelog.
*
* @param values The new list of values
* @throws Exception if list update fails
*/
public void update(List<V> values) throws Exception;
/**
* Adds all values from the provided list to the current list.
* Each addition is logged to the changelog.
*
* @param values The values to add
* @throws Exception if addition fails
*/
public void addAll(List<V> values) throws Exception;
/**
* Clears all values from the list.
* The clear operation is logged to the changelog.
*/
public void clear();
/**
* Updates the list internally with the provided values.
* This is an internal method used by the state backend.
*
* @param valueToStore The list of values to store internally
* @throws Exception if internal update fails
*/
public void updateInternal(List<V> valueToStore) throws Exception;
/**
* Gets the internal list representation.
* This is an internal method used by the state backend.
*
* @return The internal list of values
* @throws Exception if internal retrieval fails
*/
public List<V> getInternal() throws Exception;
/**
* Merges state from multiple namespaces into a target namespace.
* This is used during key group merging and state migration.
*
* @param target The target namespace to merge into
* @param sources The source namespaces to merge from
* @throws Exception if namespace merging fails
*/
public void mergeNamespaces(N target, Collection<N> sources) throws Exception;
/**
* Static factory method for creating changelog list state instances.
* Used internally by the state backend factory system.
*
* @param listState The underlying list state to wrap
* @return Changelog-enabled list state instance
*/
static <K, N, SV, S extends State, IS extends S> IS create(InternalKvState<K, N, SV> listState);
}Usage Example:
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
// In a Flink operator
ListStateDescriptor<String> descriptor =
new ListStateDescriptor<>("my-list", String.class);
ListState<String> listState = getRuntimeContext().getListState(descriptor);
// All operations are automatically logged to changelog
listState.add("item1");
listState.add("item2");
listState.update(Arrays.asList("new1", "new2", "new3"));
for (String item : listState.get()) {
// Process each item
}Map state maintains key-value mappings per key, supporting put, get, remove, and iteration operations with changelog logging.
/**
* Changelog-enabled map state that logs all map modifications.
* Wraps Flink's InternalMapState with transparent logging.
*/
class ChangelogMapState<K, N, UK, UV> extends AbstractChangelogState<K, N, Map<UK, UV>, InternalMapState<K, N, UK, UV>>
implements InternalMapState<K, N, UK, UV> {
/**
* Gets the value for the specified map key.
*
* @param key The map key to look up
* @return The value associated with the key, or null if not present
* @throws Exception if value retrieval fails
*/
public UV get(UK key) throws Exception;
/**
* Puts a key-value pair into the map.
* The put operation is logged to the changelog.
*
* @param key The map key
* @param value The value to associate with the key
* @throws Exception if put operation fails
*/
public void put(UK key, UV value) throws Exception;
/**
* Puts all entries from the provided map into the state map.
* Each put operation is logged to the changelog.
*
* @param map The map containing entries to add
* @throws Exception if put operations fail
*/
public void putAll(Map<UK, UV> map) throws Exception;
/**
* Removes the entry for the specified key.
* The remove operation is logged to the changelog.
*
* @param key The key to remove
* @throws Exception if remove operation fails
*/
public void remove(UK key) throws Exception;
/**
* Checks if the map contains the specified key.
*
* @param key The key to check
* @return true if the key exists, false otherwise
* @throws Exception if check operation fails
*/
public boolean contains(UK key) throws Exception;
/**
* Gets all entries in the map.
*
* @return Iterable over all key-value entries
* @throws Exception if entry retrieval fails
*/
public Iterable<Map.Entry<UK, UV>> entries() throws Exception;
/**
* Gets all keys in the map.
*
* @return Iterable over all keys
* @throws Exception if key retrieval fails
*/
public Iterable<UK> keys() throws Exception;
/**
* Gets all values in the map.
*
* @return Iterable over all values
* @throws Exception if value retrieval fails
*/
public Iterable<UV> values() throws Exception;
/**
* Gets an iterator over all entries in the map.
*
* @return Iterator over key-value entries
* @throws Exception if iterator creation fails
*/
public Iterator<Map.Entry<UK, UV>> iterator() throws Exception;
/**
* Checks if the map is empty.
*
* @return true if the map contains no entries, false otherwise
* @throws Exception if empty check fails
*/
public boolean isEmpty() throws Exception;
/**
* Clears all entries from the map.
* The clear operation is logged to the changelog.
*/
public void clear();
/**
* Static factory method for creating changelog map state instances.
* Used internally by the state backend factory system.
*
* @param mapState The underlying map state to wrap
* @return Changelog-enabled map state instance
*/
static <UK, UV, K, N, SV, S extends State, IS extends S> IS create(InternalKvState<K, N, SV> mapState);
}Usage Example:
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
// In a Flink operator
MapStateDescriptor<String, Long> descriptor =
new MapStateDescriptor<>("my-map", String.class, Long.class);
MapState<String, Long> mapState = getRuntimeContext().getMapState(descriptor);
// All operations are automatically logged to changelog
mapState.put("key1", 100L);
mapState.put("key2", 200L);
Long value = mapState.get("key1");
mapState.remove("key2");
for (Map.Entry<String, Long> entry : mapState.entries()) {
// Process each entry
}Reducing state maintains a single value that is updated using a reduce function, with changelog logging for all reductions.
/**
* Changelog-enabled reducing state that logs all reduce operations.
* Wraps Flink's InternalReducingState with transparent logging.
*/
class ChangelogReducingState<K, N, V> extends AbstractChangelogState<K, N, V, InternalReducingState<K, N, V>>
implements InternalReducingState<K, N, V> {
/**
* Gets the current reduced value.
*
* @return The current reduced value, or null if no value has been added
* @throws Exception if value retrieval fails
*/
public V get() throws Exception;
/**
* Adds a value to the reducing state, applying the reduce function.
* The resulting state change is logged to the changelog.
*
* @param value The value to add/reduce
* @throws Exception if add operation fails
*/
public void add(V value) throws Exception;
/**
* Clears the reducing state.
* The clear operation is logged to the changelog.
*/
public void clear();
/**
* Static factory method for creating changelog reducing state instances.
* Used internally by the state backend factory system.
*
* @param reducingState The underlying reducing state to wrap
* @return Changelog-enabled reducing state instance
*/
static <K, N, SV, S extends State, IS extends S> IS create(InternalKvState<K, N, SV> reducingState);
}Usage Example:
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.functions.ReduceFunction;
// In a Flink operator
ReducingStateDescriptor<Long> descriptor = new ReducingStateDescriptor<>(
"sum-state",
new ReduceFunction<Long>() {
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
},
Long.class
);
ReducingState<Long> reducingState = getRuntimeContext().getReducingState(descriptor);
// All operations are automatically logged to changelog
reducingState.add(10L);
reducingState.add(20L);
Long sum = reducingState.get(); // Returns 30LAggregating state maintains an accumulator that is updated using aggregation functions, with changelog logging for all aggregations.
/**
* Changelog-enabled aggregating state that logs all aggregation operations.
* Wraps Flink's InternalAggregatingState with transparent logging.
*/
class ChangelogAggregatingState<K, N, IN, ACC, OUT> extends AbstractChangelogState<K, N, ACC, InternalAggregatingState<K, N, IN, ACC, OUT>>
implements InternalAggregatingState<K, N, IN, ACC, OUT> {
/**
* Gets the current aggregated result.
*
* @return The current aggregated output value
* @throws Exception if value retrieval fails
*/
public OUT get() throws Exception;
/**
* Adds an input value to the aggregating state.
* The aggregation is performed and the state change is logged to the changelog.
*
* @param value The input value to aggregate
* @throws Exception if add operation fails
*/
public void add(IN value) throws Exception;
/**
* Clears the aggregating state.
* The clear operation is logged to the changelog.
*/
public void clear();
/**
* Static factory method for creating changelog aggregating state instances.
* Used internally by the state backend factory system.
*
* @param aggregatingState The underlying aggregating state to wrap
* @return Changelog-enabled aggregating state instance
*/
static <T, K, N, SV, S extends State, IS extends S> IS create(InternalKvState<K, N, SV> aggregatingState);
}Usage Example:
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.functions.AggregateFunction;
// In a Flink operator
AggregatingStateDescriptor<Double, Tuple2<Double, Long>, Double> descriptor =
new AggregatingStateDescriptor<>(
"average-state",
new AggregateFunction<Double, Tuple2<Double, Long>, Double>() {
@Override
public Tuple2<Double, Long> createAccumulator() {
return Tuple2.of(0.0, 0L);
}
@Override
public Tuple2<Double, Long> add(Double value, Tuple2<Double, Long> accumulator) {
return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1);
}
@Override
public Double getResult(Tuple2<Double, Long> accumulator) {
return accumulator.f1 == 0 ? 0.0 : accumulator.f0 / accumulator.f1;
}
@Override
public Tuple2<Double, Long> merge(Tuple2<Double, Long> a, Tuple2<Double, Long> b) {
return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
}
},
Types.TUPLE(Types.DOUBLE, Types.LONG)
);
AggregatingState<Double, Double> aggregatingState = getRuntimeContext().getAggregatingState(descriptor);
// All operations are automatically logged to changelog
aggregatingState.add(10.0);
aggregatingState.add(20.0);
aggregatingState.add(30.0);
Double average = aggregatingState.get(); // Returns 20.0Priority queue support for timers and ordered event processing with changelog logging.
/**
* Changelog-enabled priority queue that logs all queue operations.
* Wraps Flink's KeyGroupedInternalPriorityQueue with transparent logging.
*/
class ChangelogKeyGroupedPriorityQueue<T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
implements KeyGroupedInternalPriorityQueue<T> {
/**
* Adds an element to the priority queue.
* The addition is logged to the changelog.
*
* @param element The element to add
* @return true if the element was added successfully
*/
public boolean add(T element);
/**
* Removes and returns the element with the highest priority.
* The removal is logged to the changelog.
*
* @return The highest priority element, or null if empty
*/
public T poll();
/**
* Returns the element with the highest priority without removing it.
*
* @return The highest priority element, or null if empty
*/
public T peek();
/**
* Removes the specified element from the queue.
* The removal is logged to the changelog.
*
* @param element The element to remove
* @return true if the element was removed
*/
public boolean remove(T element);
/**
* Checks if the queue is empty.
*
* @return true if the queue contains no elements
*/
public boolean isEmpty();
/**
* Gets the number of elements in the queue.
*
* @return The number of elements
*/
public int size();
/**
* Adds all elements from the provided collection to the queue.
* The additions are logged to the changelog.
*
* @param toAdd Collection of elements to add (can be null)
*/
public void addAll(@Nullable Collection<? extends T> toAdd);
/**
* Gets a subset of elements for a specific key group.
* Used for key group-based partitioning.
*
* @param keyGroupId The key group identifier
* @return Set of elements belonging to the key group
*/
public Set<T> getSubsetForKeyGroup(int keyGroupId);
/**
* Gets an iterator over all elements in the queue.
*
* @return CloseableIterator for traversing queue elements
*/
public CloseableIterator<T> iterator();
}All changelog state types extend a common abstract base class that provides shared functionality.
/**
* Base class for all changelog state wrappers.
* Provides common functionality for delegation and serialization.
*/
abstract class AbstractChangelogState<K, N, V, S extends InternalKvState<K, N, V>>
implements InternalKvState<K, N, V> {
/**
* Gets the underlying delegated state instance.
*
* @return The wrapped state object
*/
public S getDelegatedState();
/**
* Gets the key serializer.
*
* @return TypeSerializer for key type K
*/
public TypeSerializer<K> getKeySerializer();
/**
* Gets the namespace serializer.
*
* @return TypeSerializer for namespace type N
*/
public TypeSerializer<N> getNamespaceSerializer();
/**
* Gets the value serializer.
*
* @return TypeSerializer for value type V
*/
public TypeSerializer<V> getValueSerializer();
/**
* Sets the current namespace for state operations.
*
* @param namespace The namespace to set as current
*/
public void setCurrentNamespace(N namespace);
/**
* Gets the serialized value for the given serialized key and namespace.
*
* @param serializedKeyAndNamespace Serialized key and namespace
* @param safeKeySerializer Safe key serializer
* @param safeNamespaceSerializer Safe namespace serializer
* @param safeValueSerializer Safe value serializer
* @return Serialized value bytes
* @throws Exception if serialization fails
*/
public byte[] getSerializedValue(
byte[] serializedKeyAndNamespace,
TypeSerializer<K> safeKeySerializer,
TypeSerializer<N> safeNamespaceSerializer,
TypeSerializer<V> safeValueSerializer
) throws Exception;
}The changelog state backend uses a factory system to create appropriate state wrappers for each state type.
// Internal factory interface for creating changelog states
interface StateFactory {
<K, N, SV, S extends State, IS extends InternalKvState<K, N, SV>> IS create(
IS originalState,
StateDescriptor<S, ?> stateDescriptor
);
}
// Factory mapping for different state types
private static final Map<StateDescriptor.Type, StateFactory> STATE_FACTORIES = Map.of(
StateDescriptor.Type.VALUE, ChangelogValueState::create,
StateDescriptor.Type.LIST, ChangelogListState::create,
StateDescriptor.Type.REDUCING, ChangelogReducingState::create,
StateDescriptor.Type.AGGREGATING, ChangelogAggregatingState::create,
StateDescriptor.Type.MAP, ChangelogMapState::create
);All state operations are transparently logged to the changelog:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-statebackend-changelog