Apache Flink State Processor API for reading and writing savepoint state data offline
npx @tessl/cli install tessl/maven-org-apache-flink--flink-state-processor-api-2-12@1.14.00
# Apache Flink State Processor API
1
2
The Apache Flink State Processor API provides programmatic access to reading and writing Flink savepoint state data outside of a running Flink application. This API enables batch processing of streaming application state, allowing developers to bootstrap new savepoints with initial state data, query and analyze existing state, modify operator state, and perform state transformations using standard Flink DataSet operations.
3
4
## Package Information
5
6
- **Package Name**: org.apache.flink:flink-state-processor-api_2.12
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Installation**: Add dependency in `pom.xml`:
10
11
```xml
12
<dependency>
13
<groupId>org.apache.flink</groupId>
14
<artifactId>flink-state-processor-api_2.12</artifactId>
15
<version>1.14.6</version>
16
</dependency>
17
```
18
19
## Core Imports
20
21
```java
22
import org.apache.flink.state.api.Savepoint;
23
import org.apache.flink.state.api.ExistingSavepoint;
24
import org.apache.flink.state.api.NewSavepoint;
25
import org.apache.flink.state.api.OperatorTransformation;
26
import org.apache.flink.state.api.BootstrapTransformation;
27
import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
28
import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
29
import org.apache.flink.api.java.ExecutionEnvironment;
30
import org.apache.flink.runtime.state.StateBackend;
31
```
32
33
## Basic Usage
34
35
### Loading and Reading an Existing Savepoint
36
37
```java
38
import org.apache.flink.api.java.ExecutionEnvironment;
39
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
40
import org.apache.flink.state.api.Savepoint;
41
import org.apache.flink.state.api.ExistingSavepoint;
42
43
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
44
StateBackend stateBackend = new HashMapStateBackend();
45
46
// Load existing savepoint
47
ExistingSavepoint savepoint = Savepoint.load(env, "/path/to/savepoint", stateBackend);
48
49
// Read operator state
50
DataSource<MyState> states = savepoint.readListState(
51
"my-operator-uid",
52
"my-state-name",
53
TypeInformation.of(MyState.class)
54
);
55
56
states.print();
57
env.execute();
58
```
59
60
### Creating a New Savepoint with Bootstrap Data
61
62
```java
63
import org.apache.flink.state.api.NewSavepoint;
64
import org.apache.flink.state.api.OperatorTransformation;
65
import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
66
67
// Create new savepoint
68
NewSavepoint savepoint = Savepoint.create(stateBackend, 128);
69
70
// Bootstrap data from DataSet
71
DataSet<MyInput> inputData = env.fromCollection(myDataList);
72
73
BootstrapTransformation<MyInput> transformation = OperatorTransformation
74
.bootstrapWith(inputData)
75
.keyBy(input -> input.getKey())
76
.transform(new MyKeyedStateBootstrapFunction());
77
78
// Add operator and write savepoint
79
savepoint.withOperator("my-operator-uid", transformation)
80
.write("/path/to/new-savepoint");
81
```
82
83
## Architecture
84
85
The State Processor API consists of several key components:
86
87
- **Savepoint Management**: Entry points for loading existing and creating new savepoints
88
- **State Reading**: APIs for reading different types of state (keyed, list, union, broadcast, window)
89
- **State Writing**: Bootstrap transformations for writing new state data
90
- **Function Interfaces**: User-defined functions for processing state data
91
- **Type System**: Integration with Flink's type system for serialization
92
93
## Capabilities
94
95
### Savepoint Management
96
97
Core functionality for loading, creating, and managing savepoints.
98
99
```java { .api }
100
// Entry point class
101
public final class Savepoint {
102
public static ExistingSavepoint load(
103
ExecutionEnvironment env,
104
String path,
105
StateBackend stateBackend
106
) throws IOException;
107
108
public static NewSavepoint create(
109
StateBackend stateBackend,
110
int maxParallelism
111
);
112
}
113
```
114
115
[Savepoint Management](./savepoint-management.md)
116
117
### State Reading
118
119
Read various types of state from existing savepoints including keyed state, operator state, and window state.
120
121
```java { .api }
122
// Reading different state types
123
public <T> DataSource<T> readListState(
124
String uid,
125
String name,
126
TypeInformation<T> typeInfo
127
) throws IOException;
128
129
public <K, OUT> DataSource<OUT> readKeyedState(
130
String uid,
131
KeyedStateReaderFunction<K, OUT> function
132
) throws IOException;
133
134
public <W extends Window> WindowReader<W> window(
135
WindowAssigner<?, W> assigner
136
);
137
```
138
139
[State Reading](./state-reading.md)
140
141
### State Writing
142
143
Bootstrap new state data into savepoints using DataSet transformations.
144
145
```java { .api }
146
// Bootstrap transformation creation
147
public static <T> OneInputOperatorTransformation<T> bootstrapWith(
148
DataSet<T> dataSet
149
);
150
151
// Keyed state bootstrap
152
public BootstrapTransformation<T> transform(
153
KeyedStateBootstrapFunction<K, T> processFunction
154
);
155
```
156
157
[State Writing](./state-writing.md)
158
159
### Function Interfaces
160
161
User-defined functions for reading and writing state data.
162
163
```java { .api }
164
// Base bootstrap function
165
public abstract class KeyedStateBootstrapFunction<K, IN> extends AbstractRichFunction {
166
public abstract void processElement(IN value, Context ctx) throws Exception;
167
}
168
169
// Base reader function
170
public abstract class KeyedStateReaderFunction<K, OUT> extends AbstractRichFunction {
171
public abstract void readKey(K key, Context ctx, Collector<OUT> out) throws Exception;
172
}
173
```
174
175
[Function Interfaces](./function-interfaces.md)
176
177
### Window State Operations
178
179
Specialized operations for reading and writing window state data.
180
181
```java { .api }
182
// Window reader for different aggregation types
183
public <T, K> DataSource<T> reduce(
184
String uid,
185
ReduceFunction<T> function,
186
TypeInformation<K> keyType,
187
TypeInformation<T> reduceType
188
) throws IOException;
189
190
public <K, T, ACC, R> DataSource<R> aggregate(
191
String uid,
192
AggregateFunction<T, ACC, R> aggregateFunction,
193
TypeInformation<K> keyType,
194
TypeInformation<ACC> accType,
195
TypeInformation<R> outputType
196
) throws IOException;
197
```
198
199
[Window Operations](./window-operations.md)
200
201
## Common Types
202
203
```java { .api }
204
// Core context interfaces
205
public abstract class Context {
206
public abstract TimerService timerService();
207
public abstract K getCurrentKey();
208
}
209
210
// Bootstrap transformation
211
public class BootstrapTransformation<T> {
212
public DataSet<OperatorState> writeOperatorState(
213
OperatorID operatorID,
214
StateBackend stateBackend,
215
Configuration config,
216
int globalMaxParallelism,
217
Path savepointPath
218
);
219
}
220
221
// Writable savepoint base
222
public abstract class WritableSavepoint<F extends WritableSavepoint> {
223
public F removeOperator(String uid);
224
public <T> F withOperator(String uid, BootstrapTransformation<T> transformation);
225
public <T> F withConfiguration(ConfigOption<T> option, T value);
226
public void write(String path);
227
}
228
```
229
230
## Error Handling
231
232
The API throws standard Java exceptions:
233
- `IOException` - For savepoint path and file system operations
234
- `InvalidProgramException` - For type inference failures
235
- `RuntimeException` - For validation and state access errors
236
237
Common error scenarios:
238
- Savepoint path not found or inaccessible
239
- Operator UID not found in savepoint
240
- Type serialization/deserialization issues
241
- Invalid max parallelism values
242
- State descriptor registration errors