CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-statebackend-changelog

Apache Flink changelog state backend implementation that provides durable state management with changelog capabilities for stream processing applications

Pending
Overview
Eval results
Files

keyed-state-management.mddocs/

Keyed State Management

Keyed state backend implementation that provides transparent changelog logging for all state operations while maintaining full compatibility with Flink's state management system. This capability handles the core functionality of managing partitioned state with changelog-based incremental checkpointing.

Capabilities

Key Management

Core key management operations for the current processing key in the keyed state backend.

/**
 * Sets the current key for state operations. All subsequent state operations
 * will be scoped to this key until a new key is set.
 * 
 * @param newKey The key to set as current
 */
public void setCurrentKey(K newKey);

/**
 * Gets the currently set key for state operations.
 * 
 * @return The current key, or null if no key is set
 */
public K getCurrentKey();

/**
 * Gets the serializer used for keys in this state backend.
 * 
 * @return TypeSerializer for key type K
 */
public TypeSerializer<K> getKeySerializer();

Key Group Management

Operations for managing key groups, which are used for scaling and checkpoint distribution.

/**
 * Gets the range of key groups assigned to this state backend instance.
 * Key groups are used for partitioning state across parallel operators.
 * 
 * @return The KeyGroupRange assigned to this backend
 */
public KeyGroupRange getKeyGroupRange();

State Access and Creation

Core methods for accessing and creating partitioned state with changelog functionality.

/**
 * Gets or creates a partitioned state for the specified namespace and descriptor.
 * This is the primary method for accessing state in Flink operators.
 * State operations will be transparently logged to the changelog.
 * 
 * @param namespace The namespace for the state (typically VoidNamespace or custom namespace)
 * @param namespaceSerializer Serializer for the namespace type
 * @param stateDescriptor Descriptor defining the state type and configuration
 * @return State instance of the requested type
 * @throws Exception if state creation or access fails
 */
public <N, S extends State> S getPartitionedState(
    N namespace,
    TypeSerializer<N> namespaceSerializer,
    StateDescriptor<S, ?> stateDescriptor
) throws Exception;

/**
 * Gets or creates a keyed state for the specified descriptor.
 * This method handles state creation with proper type safety and caching.
 * 
 * @param namespaceSerializer Serializer for the namespace type
 * @param stateDescriptor Descriptor defining the state type and configuration
 * @return State instance of the requested type
 * @throws Exception if state creation or access fails
 */
public <N, S extends State, T> S getOrCreateKeyedState(
    TypeSerializer<N> namespaceSerializer,
    StateDescriptor<S, T> stateDescriptor
) throws Exception;

/**
 * Creates internal state with snapshot transformation support.
 * This is used internally by the state backend for advanced state creation scenarios.
 * 
 * @param namespaceSerializer Serializer for the namespace type
 * @param stateDesc Descriptor defining the state type and configuration
 * @param snapshotTransformFactory Factory for state snapshot transformations
 * @return Internal state instance of the requested type
 * @throws Exception if state creation fails
 */
public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
    @Nonnull TypeSerializer<N> namespaceSerializer,
    @Nonnull StateDescriptor<S, SV> stateDesc,
    @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory
) throws Exception;

Usage Example:

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.StringSerializer;

// In a Flink operator with ChangelogKeyedStateBackend
public class MyKeyedFunction extends KeyedProcessFunction<String, String, String> {
    
    private transient ValueState<Long> countState;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<Long> descriptor = 
            new ValueStateDescriptor<>("count", Long.class);
        
        // This will create changelog-enabled state
        countState = getRuntimeContext().getState(descriptor);
    }
    
    @Override
    public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
        Long currentCount = countState.value();
        if (currentCount == null) currentCount = 0L;
        
        // State updates are automatically logged to changelog
        countState.update(currentCount + 1);
        
        out.collect("Processed: " + value + ", count: " + (currentCount + 1));
    }
}

State Querying

Methods for querying existing state keys and namespaces.

/**
 * Gets all keys for a specific state and namespace.
 * Useful for state inspection and debugging.
 * 
 * @param state Name of the state
 * @param namespace Namespace to query
 * @return Stream of keys that have state for the given state name and namespace
 */
public <N> Stream<K> getKeys(String state, N namespace);

/**
 * Gets all key-namespace pairs for a specific state.
 * Provides comprehensive view of state distribution.
 * 
 * @param state Name of the state to query
 * @return Stream of key-namespace tuples
 */
public <N> Stream<Tuple2<K, N>> getKeysAndNamespaces(String state);

State Operations

Bulk operations for applying functions across all keys in a state.

/**
 * Applies a function to all keys in a specific state and namespace.
 * This is useful for state migration, cleanup, or bulk operations.
 * 
 * @param namespace The namespace to operate on
 * @param namespaceSerializer Serializer for the namespace
 * @param stateDescriptor Descriptor for the state
 * @param function Function to apply to each key's state
 * @throws Exception if the operation fails
 */
public <N, S extends State, T> void applyToAllKeys(
    N namespace,
    TypeSerializer<N> namespaceSerializer,
    StateDescriptor<S, T> stateDescriptor,
    KeyedStateFunction<K, S> function
) throws Exception;

Priority Queue Creation

Creates priority queues for timer and event-time processing with changelog support.

/**
 * Creates a key-grouped priority queue for timers and ordered event processing.
 * The priority queue operations will be logged to the changelog.
 * 
 * @param stateName Name identifier for the priority queue
 * @param byteOrderedElementSerializer Serializer for queue elements
 * @return KeyGroupedInternalPriorityQueue with changelog capabilities
 */
public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
KeyGroupedInternalPriorityQueue<T> create(
    String stateName,
    TypeSerializer<T> byteOrderedElementSerializer
);

Checkpointing Operations

Snapshot creation and checkpoint management with changelog integration.

/**
 * Creates a snapshot of the current state for checkpointing.
 * The changelog state backend delegates snapshot creation to the underlying
 * state backend while ensuring changelog consistency.
 * 
 * @param checkpointId Unique identifier for this checkpoint
 * @param timestamp Timestamp when checkpoint was triggered
 * @param streamFactory Factory for creating checkpoint streams
 * @param checkpointOptions Configuration options for the checkpoint
 * @return RunnableFuture that will complete with the snapshot result
 * @throws Exception if snapshot creation fails
 */
public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
    long checkpointId,
    long timestamp,
    @Nonnull CheckpointStreamFactory streamFactory,
    @Nonnull CheckpointOptions checkpointOptions
) throws Exception;

/**
 * Notification that a checkpoint has been completed successfully.
 * Used for cleanup and state management optimization.
 * 
 * @param checkpointId The completed checkpoint identifier
 * @throws Exception if cleanup operations fail
 */
public void notifyCheckpointComplete(long checkpointId) throws Exception;

/**
 * Notification that a checkpoint has been aborted.
 * Used for cleanup and resource management.
 * 
 * @param checkpointId The aborted checkpoint identifier
 * @throws Exception if cleanup operations fail
 */
public void notifyCheckpointAborted(long checkpointId) throws Exception;

Lifecycle Management

Resource management and cleanup operations for the state backend.

/**
 * Closes the state backend and releases resources.
 * This should be called when the operator is shutting down.
 * 
 * @throws IOException if resource cleanup fails
 */
public void close() throws IOException;

/**
 * Disposes of the state backend and cleans up all resources.
 * This is a more thorough cleanup than close().
 */
public void dispose();

Key Selection Listeners

Event listeners for key selection changes, useful for debugging and monitoring.

/**
 * Registers a listener to be notified when the current key changes.
 * 
 * @param listener The listener to register
 */
public void registerKeySelectionListener(KeySelectionListener<K> listener);

/**
 * Deregisters a previously registered key selection listener.
 * 
 * @param listener The listener to deregister
 * @return true if the listener was found and removed, false otherwise
 */
public boolean deregisterKeySelectionListener(KeySelectionListener<K> listener);

State Immutability Check

Determines whether state is immutable for specific checkpoint types, which affects checkpoint optimization strategies.

/**
 * Checks if state is immutable for the given checkpoint type.
 * This affects how checkpoints are optimized and stored.
 * 
 * @param checkpointType The type of checkpoint being performed
 * @return true if state is immutable for this checkpoint type
 */
public boolean isStateImmutableInStateBackend(CheckpointType checkpointType);

Savepoint Operations

Creates savepoints for manual backup and migration scenarios.

/**
 * Creates a savepoint of the current state.
 * Savepoints are used for manual backups and job migration.
 * 
 * @return SavepointResources containing the savepoint data
 * @throws Exception if savepoint creation fails
 */
public SavepointResources<K> savepoint() throws Exception;

Testing Support

Methods specifically designed for testing and verification.

/**
 * Gets the number of key-value state entries currently stored.
 * This method is primarily intended for testing and debugging.
 * 
 * @return Number of key-value state entries
 */
public int numKeyValueStateEntries();

State Caching

The keyed state backend implements efficient state caching:

  • Last State Caching: Recently accessed state is cached to avoid lookup overhead
  • State Name Indexing: States are indexed by name for fast retrieval
  • Namespace Awareness: Each state access properly handles namespace scoping

Integration with Flink Runtime

The changelog keyed state backend integrates seamlessly with Flink's runtime:

  • Transparent Operation: Existing Flink operators work without modification
  • Full Compatibility: Supports all Flink state types and operations
  • Performance: Minimal overhead for changelog logging
  • Fault Tolerance: Maintains Flink's exactly-once processing guarantees

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-statebackend-changelog

docs

index.md

keyed-state-management.md

state-backend-configuration.md

state-types-operations.md

tile.json