Apache Flink changelog state backend implementation that provides durable state management with changelog capabilities for stream processing applications
npx @tessl/cli install tessl/maven-org-apache-flink--flink-statebackend-changelog@1.13.00
# Flink Changelog State Backend
1
2
Apache Flink changelog state backend implementation that provides durable state management with changelog capabilities for stream processing applications. This state backend enables incremental checkpointing by logging state changes to a changelog, allowing for efficient recovery and state consistency in distributed streaming applications.
3
4
## Package Information
5
6
- **Package Name**: flink-statebackend-changelog
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Group ID**: org.apache.flink
10
- **Artifact ID**: flink-statebackend-changelog_2.11
11
- **Installation**: Add to Maven dependencies:
12
13
```xml
14
<dependency>
15
<groupId>org.apache.flink</groupId>
16
<artifactId>flink-statebackend-changelog_2.11</artifactId>
17
<version>1.13.6</version>
18
</dependency>
19
```
20
21
## Core Imports
22
23
```java
24
import org.apache.flink.state.changelog.ChangelogStateBackend;
25
import org.apache.flink.runtime.state.StateBackend;
26
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
27
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
28
import org.apache.flink.runtime.state.delegate.DelegatingStateBackend;
29
import org.apache.flink.runtime.state.ConfigurableStateBackend;
30
```
31
32
## Basic Usage
33
34
```java
35
import org.apache.flink.state.changelog.ChangelogStateBackend;
36
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
37
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
38
39
// Create a changelog state backend wrapping a HashMap state backend
40
StateBackend delegateBackend = new HashMapStateBackend();
41
ChangelogStateBackend changelogBackend = new ChangelogStateBackend(delegateBackend);
42
43
// Configure Flink to use the changelog state backend
44
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
45
env.setStateBackend(changelogBackend);
46
47
// State operations will now be logged to changelog for incremental checkpointing
48
// Use standard Flink state operations - they will be transparently logged
49
```
50
51
## Architecture
52
53
The changelog state backend is built around a delegation pattern with several key components:
54
55
- **ChangelogStateBackend**: Main entry point that wraps any existing Flink state backend
56
- **ChangelogKeyedStateBackend**: Internal keyed state backend that logs state changes to changelog
57
- **State Wrappers**: Transparent wrappers for all Flink state types (Value, List, Map, Reducing, Aggregating)
58
- **Delegation Pattern**: All operations delegate to underlying state backend while logging changes
59
- **Incremental Checkpointing**: State changes are logged to enable efficient incremental checkpoints
60
61
This design allows existing Flink applications to benefit from changelog-based incremental checkpointing without code changes, simply by configuring the changelog state backend.
62
63
## Capabilities
64
65
### State Backend Configuration
66
67
Core state backend configuration and initialization functionality for integrating changelog capabilities with existing Flink state backends.
68
69
```java { .api }
70
public class ChangelogStateBackend implements DelegatingStateBackend, ConfigurableStateBackend {
71
public ChangelogStateBackend(StateBackend stateBackend);
72
public StateBackend getDelegatedStateBackend();
73
public StateBackend configure(ReadableConfig config, ClassLoader classLoader);
74
}
75
```
76
77
[State Backend Configuration](./state-backend-configuration.md)
78
79
### Keyed State Management
80
81
Keyed state backend implementation that provides transparent changelog logging for all state operations while maintaining full compatibility with Flink's state management system.
82
83
```java { .api }
84
public <K> ChangelogKeyedStateBackend<K> createKeyedStateBackend(
85
Environment env,
86
JobID jobID,
87
String operatorIdentifier,
88
TypeSerializer<K> keySerializer,
89
int numberOfKeyGroups,
90
KeyGroupRange keyGroupRange,
91
TaskKvStateRegistry kvStateRegistry,
92
TtlTimeProvider ttlTimeProvider,
93
MetricGroup metricGroup,
94
@Nonnull Collection<KeyedStateHandle> stateHandles,
95
CloseableRegistry cancelStreamRegistry
96
) throws Exception;
97
```
98
99
[Keyed State Management](./keyed-state-management.md)
100
101
### State Types and Operations
102
103
Comprehensive support for all Flink state types including Value, List, Map, Reducing, and Aggregating states, with transparent changelog logging for all state mutations.
104
105
```java { .api }
106
// State creation and access patterns
107
public <N, S extends State> S getPartitionedState(
108
N namespace,
109
TypeSerializer<N> namespaceSerializer,
110
StateDescriptor<S, ?> stateDescriptor
111
) throws Exception;
112
113
public <N, S extends State, T> S getOrCreateKeyedState(
114
TypeSerializer<N> namespaceSerializer,
115
StateDescriptor<S, T> stateDescriptor
116
) throws Exception;
117
```
118
119
[State Types and Operations](./state-types-operations.md)
120
121
## Types
122
123
### Core State Backend Types
124
125
```java { .api }
126
public class ChangelogStateBackend implements DelegatingStateBackend, ConfigurableStateBackend {
127
// Constructor and configuration
128
public ChangelogStateBackend(StateBackend stateBackend);
129
public StateBackend getDelegatedStateBackend();
130
public boolean useManagedMemory();
131
public StateBackend configure(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException;
132
133
// State backend creation methods
134
public <K> ChangelogKeyedStateBackend<K> createKeyedStateBackend(
135
Environment env, JobID jobID, String operatorIdentifier,
136
TypeSerializer<K> keySerializer, int numberOfKeyGroups,
137
KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry,
138
TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup,
139
Collection<KeyedStateHandle> stateHandles,
140
CloseableRegistry cancelStreamRegistry
141
) throws Exception;
142
143
public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(
144
Environment env, JobID jobID, String operatorIdentifier,
145
TypeSerializer<K> keySerializer, int numberOfKeyGroups,
146
KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry,
147
TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup,
148
Collection<KeyedStateHandle> stateHandles,
149
CloseableRegistry cancelStreamRegistry, double managedMemoryFraction
150
) throws Exception;
151
152
public OperatorStateBackend createOperatorStateBackend(
153
Environment env, String operatorIdentifier,
154
Collection<OperatorStateHandle> stateHandles,
155
CloseableRegistry cancelStreamRegistry
156
) throws Exception;
157
}
158
159
interface DelegatingStateBackend extends StateBackend {
160
StateBackend getDelegatedStateBackend();
161
}
162
163
interface ConfigurableStateBackend extends StateBackend {
164
StateBackend configure(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException;
165
}
166
```
167
168
### Keyed State Backend Types
169
170
```java { .api }
171
class ChangelogKeyedStateBackend<K> implements CheckpointableKeyedStateBackend<K>, CheckpointListener, TestableKeyedStateBackend {
172
// Core state backend operations
173
public KeyGroupRange getKeyGroupRange();
174
public void setCurrentKey(K newKey);
175
public K getCurrentKey();
176
public TypeSerializer<K> getKeySerializer();
177
178
// State access and management
179
public <N, S extends State> S getPartitionedState(
180
N namespace, TypeSerializer<N> namespaceSerializer,
181
StateDescriptor<S, ?> stateDescriptor
182
) throws Exception;
183
184
public <N, S extends State, T> S getOrCreateKeyedState(
185
TypeSerializer<N> namespaceSerializer,
186
StateDescriptor<S, T> stateDescriptor
187
) throws Exception;
188
189
// Lifecycle management
190
public void close() throws IOException;
191
public void dispose();
192
193
// Checkpointing
194
public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
195
long checkpointId, long timestamp,
196
CheckpointStreamFactory streamFactory,
197
CheckpointOptions checkpointOptions
198
) throws Exception;
199
200
// Priority queue support
201
public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
202
KeyGroupedInternalPriorityQueue<T> create(
203
String stateName,
204
TypeSerializer<T> byteOrderedElementSerializer
205
);
206
}
207
```