or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdkeyed-state-management.mdstate-backend-configuration.mdstate-types-operations.md
tile.json

tessl/maven-org-apache-flink--flink-statebackend-changelog

Apache Flink changelog state backend implementation that provides durable state management with changelog capabilities for stream processing applications

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-statebackend-changelog_2.11@1.13.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-statebackend-changelog@1.13.0

index.mddocs/

Flink Changelog State Backend

Apache Flink changelog state backend implementation that provides durable state management with changelog capabilities for stream processing applications. This state backend enables incremental checkpointing by logging state changes to a changelog, allowing for efficient recovery and state consistency in distributed streaming applications.

Package Information

  • Package Name: flink-statebackend-changelog
  • Package Type: maven
  • Language: Java
  • Group ID: org.apache.flink
  • Artifact ID: flink-statebackend-changelog_2.11
  • Installation: Add to Maven dependencies:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-changelog_2.11</artifactId>
    <version>1.13.6</version>
</dependency>

Core Imports

import org.apache.flink.state.changelog.ChangelogStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;  
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.delegate.DelegatingStateBackend;
import org.apache.flink.runtime.state.ConfigurableStateBackend;

Basic Usage

import org.apache.flink.state.changelog.ChangelogStateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

// Create a changelog state backend wrapping a HashMap state backend
StateBackend delegateBackend = new HashMapStateBackend();
ChangelogStateBackend changelogBackend = new ChangelogStateBackend(delegateBackend);

// Configure Flink to use the changelog state backend
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(changelogBackend);

// State operations will now be logged to changelog for incremental checkpointing
// Use standard Flink state operations - they will be transparently logged

Architecture

The changelog state backend is built around a delegation pattern with several key components:

  • ChangelogStateBackend: Main entry point that wraps any existing Flink state backend
  • ChangelogKeyedStateBackend: Internal keyed state backend that logs state changes to changelog
  • State Wrappers: Transparent wrappers for all Flink state types (Value, List, Map, Reducing, Aggregating)
  • Delegation Pattern: All operations delegate to underlying state backend while logging changes
  • Incremental Checkpointing: State changes are logged to enable efficient incremental checkpoints

This design allows existing Flink applications to benefit from changelog-based incremental checkpointing without code changes, simply by configuring the changelog state backend.

Capabilities

State Backend Configuration

Core state backend configuration and initialization functionality for integrating changelog capabilities with existing Flink state backends.

public class ChangelogStateBackend implements DelegatingStateBackend, ConfigurableStateBackend {
    public ChangelogStateBackend(StateBackend stateBackend);
    public StateBackend getDelegatedStateBackend();
    public StateBackend configure(ReadableConfig config, ClassLoader classLoader);
}

State Backend Configuration

Keyed State Management

Keyed state backend implementation that provides transparent changelog logging for all state operations while maintaining full compatibility with Flink's state management system.

public <K> ChangelogKeyedStateBackend<K> createKeyedStateBackend(
    Environment env,
    JobID jobID,
    String operatorIdentifier,
    TypeSerializer<K> keySerializer,
    int numberOfKeyGroups,
    KeyGroupRange keyGroupRange,
    TaskKvStateRegistry kvStateRegistry,
    TtlTimeProvider ttlTimeProvider,
    MetricGroup metricGroup,
    @Nonnull Collection<KeyedStateHandle> stateHandles,
    CloseableRegistry cancelStreamRegistry
) throws Exception;

Keyed State Management

State Types and Operations

Comprehensive support for all Flink state types including Value, List, Map, Reducing, and Aggregating states, with transparent changelog logging for all state mutations.

// State creation and access patterns
public <N, S extends State> S getPartitionedState(
    N namespace,
    TypeSerializer<N> namespaceSerializer,
    StateDescriptor<S, ?> stateDescriptor
) throws Exception;

public <N, S extends State, T> S getOrCreateKeyedState(
    TypeSerializer<N> namespaceSerializer,
    StateDescriptor<S, T> stateDescriptor
) throws Exception;

State Types and Operations

Types

Core State Backend Types

public class ChangelogStateBackend implements DelegatingStateBackend, ConfigurableStateBackend {
    // Constructor and configuration
    public ChangelogStateBackend(StateBackend stateBackend);
    public StateBackend getDelegatedStateBackend();
    public boolean useManagedMemory();
    public StateBackend configure(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException;
    
    // State backend creation methods
    public <K> ChangelogKeyedStateBackend<K> createKeyedStateBackend(
        Environment env, JobID jobID, String operatorIdentifier,
        TypeSerializer<K> keySerializer, int numberOfKeyGroups,
        KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry,
        TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup,
        Collection<KeyedStateHandle> stateHandles,
        CloseableRegistry cancelStreamRegistry
    ) throws Exception;
    
    public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(
        Environment env, JobID jobID, String operatorIdentifier,
        TypeSerializer<K> keySerializer, int numberOfKeyGroups,
        KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry,
        TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup,
        Collection<KeyedStateHandle> stateHandles,
        CloseableRegistry cancelStreamRegistry, double managedMemoryFraction
    ) throws Exception;
    
    public OperatorStateBackend createOperatorStateBackend(
        Environment env, String operatorIdentifier,
        Collection<OperatorStateHandle> stateHandles,
        CloseableRegistry cancelStreamRegistry
    ) throws Exception;
}

interface DelegatingStateBackend extends StateBackend {
    StateBackend getDelegatedStateBackend();
}

interface ConfigurableStateBackend extends StateBackend {
    StateBackend configure(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException;
}

Keyed State Backend Types

class ChangelogKeyedStateBackend<K> implements CheckpointableKeyedStateBackend<K>, CheckpointListener, TestableKeyedStateBackend {
    // Core state backend operations
    public KeyGroupRange getKeyGroupRange();
    public void setCurrentKey(K newKey);
    public K getCurrentKey();
    public TypeSerializer<K> getKeySerializer();
    
    // State access and management
    public <N, S extends State> S getPartitionedState(
        N namespace, TypeSerializer<N> namespaceSerializer,
        StateDescriptor<S, ?> stateDescriptor
    ) throws Exception;
    
    public <N, S extends State, T> S getOrCreateKeyedState(
        TypeSerializer<N> namespaceSerializer,
        StateDescriptor<S, T> stateDescriptor
    ) throws Exception;
    
    // Lifecycle management
    public void close() throws IOException;
    public void dispose();
    
    // Checkpointing
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
        long checkpointId, long timestamp,
        CheckpointStreamFactory streamFactory,
        CheckpointOptions checkpointOptions
    ) throws Exception;
    
    // Priority queue support
    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>> 
    KeyGroupedInternalPriorityQueue<T> create(
        String stateName,
        TypeSerializer<T> byteOrderedElementSerializer
    );
}