0
# Core State Backend Configuration
1
2
The `EmbeddedRocksDBStateBackend` is the main entry point for using RocksDB as a state backend in Apache Flink. It provides persistent state storage with support for very large state sizes and efficient checkpointing.
3
4
## Core Imports
5
6
```java { .api }
7
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
8
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackendFactory;
9
import org.apache.flink.configuration.ReadableConfig;
10
import org.apache.flink.core.fs.Path;
11
import org.apache.flink.runtime.state.StateBackend;
12
import org.apache.flink.util.TernaryBoolean;
13
```
14
15
## EmbeddedRocksDBStateBackend Class
16
17
### Class Definition
18
19
```java { .api }
20
@PublicEvolving
21
public class EmbeddedRocksDBStateBackend implements StateBackend, Serializable {
22
// Main implementation class for RocksDB state backend
23
}
24
```
25
26
### Constructors
27
28
```java { .api }
29
public EmbeddedRocksDBStateBackend()
30
```
31
Creates a new `EmbeddedRocksDBStateBackend` with default settings.
32
33
```java { .api }
34
public EmbeddedRocksDBStateBackend(boolean enableIncrementalCheckpointing)
35
```
36
Creates a new `EmbeddedRocksDBStateBackend` with specified incremental checkpointing setting.
37
38
**Parameters:**
39
- `enableIncrementalCheckpointing` - Whether to enable incremental checkpointing
40
41
```java { .api }
42
public EmbeddedRocksDBStateBackend(TernaryBoolean enableIncrementalCheckpointing)
43
```
44
Creates a new `EmbeddedRocksDBStateBackend` with ternary boolean for incremental checkpointing.
45
46
**Parameters:**
47
- `enableIncrementalCheckpointing` - Ternary boolean for incremental checkpointing (TRUE, FALSE, UNDEFINED)
48
49
## Basic Configuration
50
51
### Storage Path Configuration
52
53
```java { .api }
54
public void setDbStoragePath(String dbStoragePath)
55
```
56
Sets the path where RocksDB stores its data files locally on the TaskManager.
57
58
**Parameters:**
59
- `dbStoragePath` - The path to the local RocksDB data directory
60
61
**Example:**
62
```java
63
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend();
64
backend.setDbStoragePath("/tmp/flink-rocksdb");
65
```
66
67
```java { .api }
68
public void setDbStoragePaths(String... dbStoragePaths)
69
```
70
Sets multiple paths where RocksDB can store its data files, allowing distribution across multiple devices.
71
72
**Parameters:**
73
- `dbStoragePaths` - Multiple paths to local RocksDB data directories
74
75
**Example:**
76
```java
77
backend.setDbStoragePaths("/ssd1/rocksdb", "/ssd2/rocksdb", "/ssd3/rocksdb");
78
```
79
80
```java { .api }
81
public String[] getDbStoragePaths()
82
```
83
Gets the configured storage paths for RocksDB data files.
84
85
**Returns:** Array of configured storage paths
86
87
## Checkpointing Configuration
88
89
### Incremental Checkpointing
90
91
```java { .api }
92
public TernaryBoolean isIncrementalCheckpointsEnabled()
93
```
94
Returns whether incremental checkpointing is enabled for this state backend.
95
96
**Returns:** TernaryBoolean indicating incremental checkpointing status
97
98
```java { .api }
99
public boolean supportsNoClaimRestoreMode()
100
```
101
Returns whether this state backend supports the NO_CLAIM restore mode.
102
103
**Returns:** `true` - RocksDB backend supports all restore modes
104
105
```java { .api }
106
public boolean supportsSavepointFormat(SavepointFormatType formatType)
107
```
108
Returns whether this state backend supports the specified savepoint format.
109
110
**Parameters:**
111
- `formatType` - The savepoint format type to check
112
113
**Returns:** `true` for all supported savepoint formats
114
115
### Transfer Configuration
116
117
```java { .api }
118
public int getNumberOfTransferThreads()
119
```
120
Gets the number of threads used for transferring files during checkpointing.
121
122
**Returns:** Number of transfer threads
123
124
```java { .api }
125
public void setNumberOfTransferThreads(int numberOfTransferThreads)
126
```
127
Sets the number of threads used for transferring files during checkpointing.
128
129
**Parameters:**
130
- `numberOfTransferThreads` - Number of threads to use for file transfers
131
132
```java { .api }
133
public long getWriteBatchSize()
134
```
135
Gets the write batch size for RocksDB operations.
136
137
**Returns:** Write batch size in bytes
138
139
```java { .api }
140
public void setWriteBatchSize(long writeBatchSize)
141
```
142
Sets the write batch size for RocksDB operations.
143
144
**Parameters:**
145
- `writeBatchSize` - Write batch size in bytes
146
147
## Priority Queue Configuration
148
149
### Priority Queue Types
150
151
```java { .api }
152
public enum PriorityQueueStateType {
153
HEAP, // Use heap-based priority queue
154
ROCKSDB // Use RocksDB-based priority queue
155
}
156
```
157
158
```java { .api }
159
public PriorityQueueStateType getPriorityQueueStateType()
160
```
161
Gets the priority queue implementation type.
162
163
**Returns:** Current priority queue state type
164
165
```java { .api }
166
public void setPriorityQueueStateType(PriorityQueueStateType priorityQueueStateType)
167
```
168
Sets the priority queue implementation type.
169
170
**Parameters:**
171
- `priorityQueueStateType` - The priority queue implementation to use
172
173
**Example:**
174
```java
175
// Use RocksDB for priority queue state (timers, windows)
176
backend.setPriorityQueueStateType(PriorityQueueStateType.ROCKSDB);
177
178
// Use heap for priority queue state (default, faster for small state)
179
backend.setPriorityQueueStateType(PriorityQueueStateType.HEAP);
180
```
181
182
## Runtime Configuration
183
184
### Configuration Method
185
186
```java { .api }
187
public EmbeddedRocksDBStateBackend configure(ReadableConfig config, ClassLoader classLoader)
188
```
189
Creates a copy of this state backend configured with the provided configuration and class loader.
190
191
**Parameters:**
192
- `config` - The configuration to apply
193
- `classLoader` - The class loader to use
194
195
**Returns:** A new configured instance of this state backend
196
197
## Memory Configuration Access
198
199
```java { .api }
200
public RocksDBMemoryConfiguration getMemoryConfiguration()
201
```
202
Gets the memory configuration for this RocksDB state backend.
203
204
**Returns:** The RocksDB memory configuration object
205
206
**Example:**
207
```java
208
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend();
209
RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
210
memConfig.setUseManagedMemory(true);
211
memConfig.setWriteBufferRatio(0.4);
212
```
213
214
## Memory Factory Configuration
215
216
```java { .api }
217
public void setRocksDBMemoryFactory(RocksDBMemoryFactory rocksDBMemoryFactory)
218
```
219
Sets the RocksDB memory factory for custom memory management strategies.
220
221
**Parameters:**
222
- `rocksDBMemoryFactory` - Custom memory factory implementation
223
224
## Complete Usage Example
225
226
```java
227
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
228
import org.apache.flink.state.rocksdb.PredefinedOptions;
229
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
230
231
public class RocksDBStateBackendExample {
232
public static void main(String[] args) {
233
// Create the state backend
234
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true); // Enable incremental checkpointing
235
236
// Configure storage
237
backend.setDbStoragePath("/tmp/flink-rocksdb");
238
239
// Configure performance settings
240
backend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
241
backend.setNumberOfTransferThreads(4);
242
backend.setWriteBatchSize(2 * 1024 * 1024); // 2MB
243
244
// Configure priority queue type
245
backend.setPriorityQueueStateType(
246
EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB
247
);
248
249
// Configure memory
250
RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
251
memConfig.setUseManagedMemory(true);
252
memConfig.setWriteBufferRatio(0.4);
253
memConfig.setHighPriorityPoolRatio(0.2);
254
255
// Apply to execution environment
256
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
257
env.setStateBackend(backend);
258
259
// Now use the environment for your Flink job
260
// env.addSource(...).keyBy(...).process(...);
261
}
262
}
263
```
264
265
## EmbeddedRocksDBStateBackendFactory
266
267
### Factory Class
268
269
```java { .api }
270
@PublicEvolving
271
public class EmbeddedRocksDBStateBackendFactory implements StateBackendFactory<EmbeddedRocksDBStateBackend> {
272
// Factory for creating state backend from configuration
273
}
274
```
275
276
### Factory Method
277
278
```java { .api }
279
public EmbeddedRocksDBStateBackend createFromConfig(ReadableConfig config, ClassLoader classLoader)
280
```
281
Creates an `EmbeddedRocksDBStateBackend` from the given configuration.
282
283
**Parameters:**
284
- `config` - The configuration containing state backend settings
285
- `classLoader` - The class loader for loading classes
286
287
**Returns:** A configured `EmbeddedRocksDBStateBackend` instance
288
289
**Example:**
290
```java
291
Configuration config = new Configuration();
292
config.set(RocksDBOptions.LOCAL_DIRECTORIES, "/tmp/rocksdb");
293
config.set(RocksDBOptions.PREDEFINED_OPTIONS, PredefinedOptions.FLASH_SSD_OPTIMIZED);
294
295
EmbeddedRocksDBStateBackendFactory factory = new EmbeddedRocksDBStateBackendFactory();
296
EmbeddedRocksDBStateBackend backend = factory.createFromConfig(config, getClass().getClassLoader());
297
```
298
299
## Configuration via Flink Configuration
300
301
You can also configure the RocksDB state backend through Flink's configuration system:
302
303
```yaml
304
# flink-conf.yaml
305
state.backend: rocksdb
306
state.backend.rocksdb.localdir: /tmp/flink-rocksdb
307
state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED
308
state.backend.rocksdb.checkpoint.transfer.thread.num: 4
309
state.backend.rocksdb.memory.managed: true
310
state.backend.rocksdb.memory.write-buffer-ratio: 0.4
311
```
312
313
## Thread Safety and Lifecycle
314
315
- **Thread Safety:** The `EmbeddedRocksDBStateBackend` configuration methods are not thread-safe and should be called during setup before the job starts
316
- **Serialization:** The state backend is serializable and will be distributed to TaskManagers
317
- **Lifecycle:** Configuration is immutable once the job starts; use `configure()` method to create configured copies
318
- **Resource Management:** RocksDB resources are managed automatically by Flink's lifecycle management