0
# State Backend Configuration
1
2
Core state backend configuration and initialization functionality for integrating changelog capabilities with existing Flink state backends. This capability provides the main entry point for configuring changelog-based incremental checkpointing.
3
4
## Capabilities
5
6
### ChangelogStateBackend Constructor
7
8
Creates a new changelog state backend that wraps an existing Flink state backend to add changelog functionality.
9
10
```java { .api }
11
/**
12
* Creates a changelog state backend wrapping the specified delegated state backend.
13
* The delegated state backend handles the actual state storage while changelog
14
* functionality is added transparently.
15
*
16
* @param stateBackend The underlying state backend to wrap (cannot be null)
17
* @throws IllegalArgumentException if stateBackend is null or is already a DelegatingStateBackend
18
*/
19
public ChangelogStateBackend(StateBackend stateBackend);
20
```
21
22
**Usage Example:**
23
24
```java
25
import org.apache.flink.state.changelog.ChangelogStateBackend;
26
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
27
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
28
29
// Wrap a HashMap state backend
30
StateBackend hashMapBackend = new HashMapStateBackend();
31
ChangelogStateBackend changelogHashMap = new ChangelogStateBackend(hashMapBackend);
32
33
// Wrap a RocksDB state backend
34
StateBackend rocksDBBackend = new EmbeddedRocksDBStateBackend();
35
ChangelogStateBackend changelogRocksDB = new ChangelogStateBackend(rocksDBBackend);
36
```
37
38
### Get Delegated State Backend
39
40
Returns the underlying state backend that is being wrapped by the changelog state backend.
41
42
```java { .api }
43
/**
44
* Returns the underlying state backend that this changelog state backend delegates to.
45
*
46
* @return The wrapped state backend instance
47
*/
48
public StateBackend getDelegatedStateBackend();
49
```
50
51
### Memory Usage Configuration
52
53
Indicates whether the changelog state backend uses managed memory, based on the underlying delegated state backend.
54
55
```java { .api }
56
/**
57
* Indicates whether this state backend uses Flink's managed memory.
58
* The result depends on the wrapped state backend's memory usage.
59
*
60
* @return true if the delegated state backend uses managed memory, false otherwise
61
*/
62
public boolean useManagedMemory();
63
```
64
65
### Configuration
66
67
Configures the changelog state backend with the provided configuration and class loader. If the delegated state backend is configurable, it will be configured as well.
68
69
```java { .api }
70
/**
71
* Configures the changelog state backend with the specified configuration.
72
* If the delegated state backend implements ConfigurableStateBackend,
73
* it will be configured and wrapped in a new ChangelogStateBackend instance.
74
*
75
* @param config Configuration to apply
76
* @param classLoader Class loader for loading configuration-specific classes
77
* @return Configured state backend instance
78
* @throws IllegalConfigurationException if configuration is invalid
79
*/
80
public StateBackend configure(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException;
81
```
82
83
### Keyed State Backend Creation
84
85
Creates a keyed state backend for managing partitioned state with changelog functionality.
86
87
```java { .api }
88
/**
89
* Creates a keyed state backend that wraps the delegated state backend's
90
* keyed state backend with changelog functionality.
91
*
92
* @param env Execution environment
93
* @param jobID Job identifier
94
* @param operatorIdentifier Operator identifier
95
* @param keySerializer Serializer for state keys
96
* @param numberOfKeyGroups Total number of key groups
97
* @param keyGroupRange Range of key groups assigned to this backend
98
* @param kvStateRegistry Registry for queryable state
99
* @param ttlTimeProvider Time provider for TTL functionality
100
* @param metricGroup Metric group for measurements
101
* @param stateHandles State handles for recovery
102
* @param cancelStreamRegistry Registry for cancellable streams
103
* @return ChangelogKeyedStateBackend instance with changelog capabilities
104
* @throws Exception if backend creation fails
105
*/
106
public <K> ChangelogKeyedStateBackend<K> createKeyedStateBackend(
107
Environment env,
108
JobID jobID,
109
String operatorIdentifier,
110
TypeSerializer<K> keySerializer,
111
int numberOfKeyGroups,
112
KeyGroupRange keyGroupRange,
113
TaskKvStateRegistry kvStateRegistry,
114
TtlTimeProvider ttlTimeProvider,
115
MetricGroup metricGroup,
116
@Nonnull Collection<KeyedStateHandle> stateHandles,
117
CloseableRegistry cancelStreamRegistry
118
) throws Exception;
119
```
120
121
### Keyed State Backend Creation with Memory Fraction
122
123
Creates a keyed state backend with explicit managed memory fraction specification.
124
125
```java { .api }
126
/**
127
* Creates a keyed state backend with explicit managed memory fraction.
128
* This version allows fine-grained control over memory allocation.
129
*
130
* @param env Execution environment
131
* @param jobID Job identifier
132
* @param operatorIdentifier Operator identifier
133
* @param keySerializer Serializer for state keys
134
* @param numberOfKeyGroups Total number of key groups
135
* @param keyGroupRange Range of key groups assigned to this backend
136
* @param kvStateRegistry Registry for queryable state
137
* @param ttlTimeProvider Time provider for TTL functionality
138
* @param metricGroup Metric group for measurements
139
* @param stateHandles State handles for recovery
140
* @param cancelStreamRegistry Registry for cancellable streams
141
* @param managedMemoryFraction Fraction of managed memory to use (0.0 to 1.0)
142
* @return CheckpointableKeyedStateBackend instance with changelog capabilities
143
* @throws Exception if backend creation fails
144
*/
145
public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(
146
Environment env,
147
JobID jobID,
148
String operatorIdentifier,
149
TypeSerializer<K> keySerializer,
150
int numberOfKeyGroups,
151
KeyGroupRange keyGroupRange,
152
TaskKvStateRegistry kvStateRegistry,
153
TtlTimeProvider ttlTimeProvider,
154
MetricGroup metricGroup,
155
@Nonnull Collection<KeyedStateHandle> stateHandles,
156
CloseableRegistry cancelStreamRegistry,
157
double managedMemoryFraction
158
) throws Exception;
159
```
160
161
### Operator State Backend Creation
162
163
Creates an operator state backend by delegating to the underlying state backend. Operator state is not affected by changelog functionality.
164
165
```java { .api }
166
/**
167
* Creates an operator state backend by delegating to the wrapped state backend.
168
* Operator state does not use changelog functionality.
169
*
170
* @param env Execution environment
171
* @param operatorIdentifier Operator identifier
172
* @param stateHandles State handles for recovery
173
* @param cancelStreamRegistry Registry for cancellable streams
174
* @return OperatorStateBackend instance from the delegated backend
175
* @throws Exception if backend creation fails
176
*/
177
public OperatorStateBackend createOperatorStateBackend(
178
Environment env,
179
String operatorIdentifier,
180
@Nonnull Collection<OperatorStateHandle> stateHandles,
181
CloseableRegistry cancelStreamRegistry
182
) throws Exception;
183
```
184
185
## Configuration Integration
186
187
The changelog state backend integrates with Flink's configuration system through the standard state backend loading mechanisms:
188
189
```java
190
// Via configuration
191
Configuration config = new Configuration();
192
config.setString(StateBackendOptions.STATE_BACKEND, "changelog");
193
// Additional changelog-specific configuration can be added
194
195
// Via programmatic configuration
196
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
197
StateBackend delegateBackend = new HashMapStateBackend();
198
ChangelogStateBackend changelogBackend = new ChangelogStateBackend(delegateBackend);
199
env.setStateBackend(changelogBackend);
200
```
201
202
## Error Handling
203
204
The changelog state backend validates that:
205
- The delegated state backend is not null
206
- Recursive delegation is not allowed (cannot wrap another DelegatingStateBackend)
207
- All delegated operations are properly forwarded and their exceptions propagated