CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-statebackend-changelog

Apache Flink changelog state backend implementation that provides durable state management with changelog capabilities for stream processing applications

Pending
Overview
Eval results
Files

state-types-operations.mddocs/

State Types and Operations

Comprehensive support for all Flink state types including Value, List, Map, Reducing, and Aggregating states, with transparent changelog logging for all state mutations. This capability provides changelog-enabled wrappers for every type of state supported by Apache Flink.

Capabilities

Value State Operations

Value state stores a single value per key and supports read, write, and clear operations with changelog logging.

/**
 * Changelog-enabled value state that logs all state changes.
 * Wraps Flink's InternalValueState with transparent logging.
 */
class ChangelogValueState<K, N, V> extends AbstractChangelogState<K, N, V, InternalValueState<K, N, V>>
    implements InternalValueState<K, N, V> {
    
    /**
     * Gets the current value for the current key and namespace.
     * 
     * @return The current value, or null if no value is set
     * @throws IOException if value retrieval fails
     */
    public V value() throws IOException;
    
    /**
     * Updates the value for the current key and namespace.
     * The change is automatically logged to the changelog.
     * 
     * @param value The new value to set
     * @throws IOException if value update fails
     */
    public void update(V value) throws IOException;
    
    /**
     * Clears the value for the current key and namespace.
     * The clear operation is logged to the changelog.
     */
    public void clear();
    
    /**
     * Static factory method for creating changelog value state instances.
     * Used internally by the state backend factory system.
     * 
     * @param valueState The underlying value state to wrap
     * @return Changelog-enabled value state instance
     */
    static <K, N, SV, S extends State, IS extends S> IS create(InternalKvState<K, N, SV> valueState);
}

Usage Example:

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;

// In a Flink operator
ValueStateDescriptor<String> descriptor = 
    new ValueStateDescriptor<>("my-value", String.class);
ValueState<String> valueState = getRuntimeContext().getState(descriptor);

// All operations are automatically logged to changelog
String currentValue = valueState.value();
valueState.update("new value");
valueState.clear();

List State Operations

List state maintains a list of values per key, supporting add, update, and iteration operations with changelog logging.

/**
 * Changelog-enabled list state that logs all list modifications.
 * Wraps Flink's InternalListState with transparent logging.
 */
class ChangelogListState<K, N, V> extends AbstractChangelogState<K, N, List<V>, InternalListState<K, N, V>>
    implements InternalListState<K, N, V> {
    
    /**
     * Gets the current list of values for the current key and namespace.
     * 
     * @return Iterable over the current list values
     * @throws Exception if list retrieval fails
     */
    public Iterable<V> get() throws Exception;
    
    /**
     * Adds a value to the list for the current key and namespace.
     * The addition is logged to the changelog.
     * 
     * @param value The value to add to the list
     * @throws Exception if value addition fails
     */
    public void add(V value) throws Exception;
    
    /**
     * Replaces the entire list with the provided values.
     * The update is logged to the changelog.
     * 
     * @param values The new list of values
     * @throws Exception if list update fails
     */
    public void update(List<V> values) throws Exception;
    
    /**
     * Adds all values from the provided list to the current list.
     * Each addition is logged to the changelog.
     * 
     * @param values The values to add
     * @throws Exception if addition fails
     */
    public void addAll(List<V> values) throws Exception;
    
    /**
     * Clears all values from the list.
     * The clear operation is logged to the changelog.
     */
    public void clear();
    
    /**
     * Updates the list internally with the provided values.
     * This is an internal method used by the state backend.
     * 
     * @param valueToStore The list of values to store internally
     * @throws Exception if internal update fails
     */
    public void updateInternal(List<V> valueToStore) throws Exception;
    
    /**
     * Gets the internal list representation.
     * This is an internal method used by the state backend.
     * 
     * @return The internal list of values
     * @throws Exception if internal retrieval fails
     */
    public List<V> getInternal() throws Exception;
    
    /**
     * Merges state from multiple namespaces into a target namespace.
     * This is used during key group merging and state migration.
     * 
     * @param target The target namespace to merge into
     * @param sources The source namespaces to merge from
     * @throws Exception if namespace merging fails
     */
    public void mergeNamespaces(N target, Collection<N> sources) throws Exception;
    
    /**
     * Static factory method for creating changelog list state instances.
     * Used internally by the state backend factory system.
     * 
     * @param listState The underlying list state to wrap
     * @return Changelog-enabled list state instance
     */
    static <K, N, SV, S extends State, IS extends S> IS create(InternalKvState<K, N, SV> listState);
}

Usage Example:

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;

// In a Flink operator
ListStateDescriptor<String> descriptor = 
    new ListStateDescriptor<>("my-list", String.class);
ListState<String> listState = getRuntimeContext().getListState(descriptor);

// All operations are automatically logged to changelog
listState.add("item1");
listState.add("item2");
listState.update(Arrays.asList("new1", "new2", "new3"));

for (String item : listState.get()) {
    // Process each item
}

Map State Operations

Map state maintains key-value mappings per key, supporting put, get, remove, and iteration operations with changelog logging.

/**
 * Changelog-enabled map state that logs all map modifications.
 * Wraps Flink's InternalMapState with transparent logging.
 */
class ChangelogMapState<K, N, UK, UV> extends AbstractChangelogState<K, N, Map<UK, UV>, InternalMapState<K, N, UK, UV>>
    implements InternalMapState<K, N, UK, UV> {
    
    /**
     * Gets the value for the specified map key.
     * 
     * @param key The map key to look up
     * @return The value associated with the key, or null if not present
     * @throws Exception if value retrieval fails
     */
    public UV get(UK key) throws Exception;
    
    /**
     * Puts a key-value pair into the map.
     * The put operation is logged to the changelog.
     * 
     * @param key The map key
     * @param value The value to associate with the key
     * @throws Exception if put operation fails
     */
    public void put(UK key, UV value) throws Exception;
    
    /**
     * Puts all entries from the provided map into the state map.
     * Each put operation is logged to the changelog.
     * 
     * @param map The map containing entries to add
     * @throws Exception if put operations fail
     */
    public void putAll(Map<UK, UV> map) throws Exception;
    
    /**
     * Removes the entry for the specified key.
     * The remove operation is logged to the changelog.
     * 
     * @param key The key to remove
     * @throws Exception if remove operation fails
     */
    public void remove(UK key) throws Exception;
    
    /**
     * Checks if the map contains the specified key.
     * 
     * @param key The key to check
     * @return true if the key exists, false otherwise
     * @throws Exception if check operation fails
     */
    public boolean contains(UK key) throws Exception;
    
    /**
     * Gets all entries in the map.
     * 
     * @return Iterable over all key-value entries
     * @throws Exception if entry retrieval fails
     */
    public Iterable<Map.Entry<UK, UV>> entries() throws Exception;
    
    /**
     * Gets all keys in the map.
     * 
     * @return Iterable over all keys
     * @throws Exception if key retrieval fails
     */
    public Iterable<UK> keys() throws Exception;
    
    /**
     * Gets all values in the map.
     * 
     * @return Iterable over all values
     * @throws Exception if value retrieval fails
     */
    public Iterable<UV> values() throws Exception;
    
    /**
     * Gets an iterator over all entries in the map.
     * 
     * @return Iterator over key-value entries
     * @throws Exception if iterator creation fails
     */
    public Iterator<Map.Entry<UK, UV>> iterator() throws Exception;
    
    /**
     * Checks if the map is empty.
     * 
     * @return true if the map contains no entries, false otherwise
     * @throws Exception if empty check fails
     */
    public boolean isEmpty() throws Exception;
    
    /**
     * Clears all entries from the map.
     * The clear operation is logged to the changelog.
     */
    public void clear();
    
    /**
     * Static factory method for creating changelog map state instances.
     * Used internally by the state backend factory system.
     * 
     * @param mapState The underlying map state to wrap
     * @return Changelog-enabled map state instance
     */
    static <UK, UV, K, N, SV, S extends State, IS extends S> IS create(InternalKvState<K, N, SV> mapState);
}

Usage Example:

import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;

// In a Flink operator
MapStateDescriptor<String, Long> descriptor = 
    new MapStateDescriptor<>("my-map", String.class, Long.class);
MapState<String, Long> mapState = getRuntimeContext().getMapState(descriptor);

// All operations are automatically logged to changelog
mapState.put("key1", 100L);
mapState.put("key2", 200L);

Long value = mapState.get("key1");
mapState.remove("key2");

for (Map.Entry<String, Long> entry : mapState.entries()) {
    // Process each entry
}

Reducing State Operations

Reducing state maintains a single value that is updated using a reduce function, with changelog logging for all reductions.

/**
 * Changelog-enabled reducing state that logs all reduce operations.
 * Wraps Flink's InternalReducingState with transparent logging.
 */
class ChangelogReducingState<K, N, V> extends AbstractChangelogState<K, N, V, InternalReducingState<K, N, V>>
    implements InternalReducingState<K, N, V> {
    
    /**
     * Gets the current reduced value.
     * 
     * @return The current reduced value, or null if no value has been added
     * @throws Exception if value retrieval fails
     */
    public V get() throws Exception;
    
    /**
     * Adds a value to the reducing state, applying the reduce function.
     * The resulting state change is logged to the changelog.
     * 
     * @param value The value to add/reduce
     * @throws Exception if add operation fails
     */
    public void add(V value) throws Exception;
    
    /**
     * Clears the reducing state.
     * The clear operation is logged to the changelog.
     */
    public void clear();
    
    /**
     * Static factory method for creating changelog reducing state instances.
     * Used internally by the state backend factory system.
     * 
     * @param reducingState The underlying reducing state to wrap
     * @return Changelog-enabled reducing state instance
     */
    static <K, N, SV, S extends State, IS extends S> IS create(InternalKvState<K, N, SV> reducingState);
}

Usage Example:

import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.functions.ReduceFunction;

// In a Flink operator
ReducingStateDescriptor<Long> descriptor = new ReducingStateDescriptor<>(
    "sum-state",
    new ReduceFunction<Long>() {
        @Override
        public Long reduce(Long value1, Long value2) throws Exception {
            return value1 + value2;
        }
    },
    Long.class
);
ReducingState<Long> reducingState = getRuntimeContext().getReducingState(descriptor);

// All operations are automatically logged to changelog
reducingState.add(10L);
reducingState.add(20L);
Long sum = reducingState.get(); // Returns 30L

Aggregating State Operations

Aggregating state maintains an accumulator that is updated using aggregation functions, with changelog logging for all aggregations.

/**
 * Changelog-enabled aggregating state that logs all aggregation operations.
 * Wraps Flink's InternalAggregatingState with transparent logging.
 */
class ChangelogAggregatingState<K, N, IN, ACC, OUT> extends AbstractChangelogState<K, N, ACC, InternalAggregatingState<K, N, IN, ACC, OUT>>
    implements InternalAggregatingState<K, N, IN, ACC, OUT> {
    
    /**
     * Gets the current aggregated result.
     * 
     * @return The current aggregated output value
     * @throws Exception if value retrieval fails
     */
    public OUT get() throws Exception;
    
    /**
     * Adds an input value to the aggregating state.
     * The aggregation is performed and the state change is logged to the changelog.
     * 
     * @param value The input value to aggregate
     * @throws Exception if add operation fails
     */
    public void add(IN value) throws Exception;
    
    /**
     * Clears the aggregating state.
     * The clear operation is logged to the changelog.
     */
    public void clear();
    
    /**
     * Static factory method for creating changelog aggregating state instances.
     * Used internally by the state backend factory system.
     * 
     * @param aggregatingState The underlying aggregating state to wrap
     * @return Changelog-enabled aggregating state instance
     */
    static <T, K, N, SV, S extends State, IS extends S> IS create(InternalKvState<K, N, SV> aggregatingState);
}

Usage Example:

import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.functions.AggregateFunction;

// In a Flink operator
AggregatingStateDescriptor<Double, Tuple2<Double, Long>, Double> descriptor = 
    new AggregatingStateDescriptor<>(
        "average-state",
        new AggregateFunction<Double, Tuple2<Double, Long>, Double>() {
            @Override
            public Tuple2<Double, Long> createAccumulator() {
                return Tuple2.of(0.0, 0L);
            }
            
            @Override
            public Tuple2<Double, Long> add(Double value, Tuple2<Double, Long> accumulator) {
                return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1);
            }
            
            @Override
            public Double getResult(Tuple2<Double, Long> accumulator) {
                return accumulator.f1 == 0 ? 0.0 : accumulator.f0 / accumulator.f1;
            }
            
            @Override
            public Tuple2<Double, Long> merge(Tuple2<Double, Long> a, Tuple2<Double, Long> b) {
                return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
            }
        },
        Types.TUPLE(Types.DOUBLE, Types.LONG)
    );

AggregatingState<Double, Double> aggregatingState = getRuntimeContext().getAggregatingState(descriptor);

// All operations are automatically logged to changelog
aggregatingState.add(10.0);
aggregatingState.add(20.0);
aggregatingState.add(30.0);
Double average = aggregatingState.get(); // Returns 20.0

Priority Queue Operations

Priority queue support for timers and ordered event processing with changelog logging.

/**
 * Changelog-enabled priority queue that logs all queue operations.
 * Wraps Flink's KeyGroupedInternalPriorityQueue with transparent logging.
 */
class ChangelogKeyGroupedPriorityQueue<T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
    implements KeyGroupedInternalPriorityQueue<T> {
    
    /**
     * Adds an element to the priority queue.
     * The addition is logged to the changelog.
     * 
     * @param element The element to add
     * @return true if the element was added successfully
     */
    public boolean add(T element);
    
    /**
     * Removes and returns the element with the highest priority.
     * The removal is logged to the changelog.
     * 
     * @return The highest priority element, or null if empty
     */
    public T poll();
    
    /**
     * Returns the element with the highest priority without removing it.
     * 
     * @return The highest priority element, or null if empty
     */
    public T peek();
    
    /**
     * Removes the specified element from the queue.
     * The removal is logged to the changelog.
     * 
     * @param element The element to remove
     * @return true if the element was removed
     */
    public boolean remove(T element);
    
    /**
     * Checks if the queue is empty.
     * 
     * @return true if the queue contains no elements
     */
    public boolean isEmpty();
    
    /**
     * Gets the number of elements in the queue.
     * 
     * @return The number of elements
     */
    public int size();
    
    /**
     * Adds all elements from the provided collection to the queue.
     * The additions are logged to the changelog.
     * 
     * @param toAdd Collection of elements to add (can be null)
     */
    public void addAll(@Nullable Collection<? extends T> toAdd);
    
    /**
     * Gets a subset of elements for a specific key group.
     * Used for key group-based partitioning.
     * 
     * @param keyGroupId The key group identifier
     * @return Set of elements belonging to the key group
     */
    public Set<T> getSubsetForKeyGroup(int keyGroupId);
    
    /**
     * Gets an iterator over all elements in the queue.
     * 
     * @return CloseableIterator for traversing queue elements
     */
    public CloseableIterator<T> iterator();
}

Abstract State Base Class

All changelog state types extend a common abstract base class that provides shared functionality.

/**
 * Base class for all changelog state wrappers.
 * Provides common functionality for delegation and serialization.
 */
abstract class AbstractChangelogState<K, N, V, S extends InternalKvState<K, N, V>>
    implements InternalKvState<K, N, V> {
    
    /**
     * Gets the underlying delegated state instance.
     * 
     * @return The wrapped state object
     */
    public S getDelegatedState();
    
    /**
     * Gets the key serializer.
     * 
     * @return TypeSerializer for key type K
     */
    public TypeSerializer<K> getKeySerializer();
    
    /**
     * Gets the namespace serializer.
     * 
     * @return TypeSerializer for namespace type N
     */
    public TypeSerializer<N> getNamespaceSerializer();
    
    /**
     * Gets the value serializer.
     * 
     * @return TypeSerializer for value type V
     */
    public TypeSerializer<V> getValueSerializer();
    
    /**
     * Sets the current namespace for state operations.
     * 
     * @param namespace The namespace to set as current
     */
    public void setCurrentNamespace(N namespace);
    
    /**
     * Gets the serialized value for the given serialized key and namespace.
     * 
     * @param serializedKeyAndNamespace Serialized key and namespace
     * @param safeKeySerializer Safe key serializer
     * @param safeNamespaceSerializer Safe namespace serializer
     * @param safeValueSerializer Safe value serializer
     * @return Serialized value bytes
     * @throws Exception if serialization fails
     */
    public byte[] getSerializedValue(
        byte[] serializedKeyAndNamespace,
        TypeSerializer<K> safeKeySerializer,
        TypeSerializer<N> safeNamespaceSerializer,
        TypeSerializer<V> safeValueSerializer
    ) throws Exception;
}

State Factory System

The changelog state backend uses a factory system to create appropriate state wrappers for each state type.

// Internal factory interface for creating changelog states
interface StateFactory {
    <K, N, SV, S extends State, IS extends InternalKvState<K, N, SV>> IS create(
        IS originalState,
        StateDescriptor<S, ?> stateDescriptor
    );
}

// Factory mapping for different state types
private static final Map<StateDescriptor.Type, StateFactory> STATE_FACTORIES = Map.of(
    StateDescriptor.Type.VALUE, ChangelogValueState::create,
    StateDescriptor.Type.LIST, ChangelogListState::create,
    StateDescriptor.Type.REDUCING, ChangelogReducingState::create,
    StateDescriptor.Type.AGGREGATING, ChangelogAggregatingState::create,
    StateDescriptor.Type.MAP, ChangelogMapState::create
);

Changelog Integration

All state operations are transparently logged to the changelog:

  • Automatic Logging: State mutations are logged without requiring code changes
  • Operation Granularity: Individual operations (put, add, remove, etc.) are logged separately
  • Type Safety: Changelog entries maintain type information for proper deserialization
  • Performance: Minimal overhead for logging operations
  • Consistency: Changelog entries are consistent with state backend state

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-statebackend-changelog

docs

index.md

keyed-state-management.md

state-backend-configuration.md

state-types-operations.md

tile.json