0
# State Backend Configuration
1
2
Core functionality for creating and configuring the RocksDB state backend, including storage paths, incremental checkpointing, and performance tuning options.
3
4
## Capabilities
5
6
### EmbeddedRocksDBStateBackend
7
8
The primary state backend class that manages local RocksDB instances for keyed state storage.
9
10
```java { .api }
11
/**
12
* RocksDB state backend that stores state in embedded RocksDB instances.
13
* This is the recommended state backend for production workloads requiring durable state.
14
*/
15
class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBackend implements ConfigurableStateBackend {
16
17
/**
18
* Creates a new EmbeddedRocksDBStateBackend with undefined incremental checkpointing.
19
* The incremental checkpointing setting will be determined by configuration.
20
*/
21
EmbeddedRocksDBStateBackend();
22
23
/**
24
* Creates a new EmbeddedRocksDBStateBackend with specified incremental checkpointing.
25
* @param enableIncrementalCheckpointing whether to enable incremental checkpointing
26
*/
27
EmbeddedRocksDBStateBackend(boolean enableIncrementalCheckpointing);
28
29
/**
30
* Creates a new EmbeddedRocksDBStateBackend with ternary incremental checkpointing setting.
31
* @param enableIncrementalCheckpointing incremental checkpointing setting (true/false/undefined)
32
*/
33
EmbeddedRocksDBStateBackend(TernaryBoolean enableIncrementalCheckpointing);
34
}
35
```
36
37
**Usage Example:**
38
39
```java
40
// Basic setup with incremental checkpointing enabled
41
EmbeddedRocksDBStateBackend stateBackend = new EmbeddedRocksDBStateBackend(true);
42
43
// Setup with configuration-determined incremental checkpointing
44
EmbeddedRocksDBStateBackend stateBackend = new EmbeddedRocksDBStateBackend();
45
```
46
47
### Storage Path Configuration
48
49
Configure where RocksDB stores its files on the local filesystem.
50
51
```java { .api }
52
/**
53
* Sets a single storage path for RocksDB files.
54
* @param path directory path for RocksDB storage
55
*/
56
void setDbStoragePath(String path);
57
58
/**
59
* Sets multiple storage paths for RocksDB files to distribute I/O load.
60
* @param paths array of directory paths for RocksDB storage
61
*/
62
void setDbStoragePaths(String... paths);
63
64
/**
65
* Gets the configured storage paths.
66
* @return array of configured storage paths
67
*/
68
String[] getDbStoragePaths();
69
```
70
71
**Usage Examples:**
72
73
```java
74
// Single storage path
75
stateBackend.setDbStoragePath("/data/flink/rocksdb");
76
77
// Multiple paths for I/O distribution
78
stateBackend.setDbStoragePaths(
79
"/data1/flink/rocksdb",
80
"/data2/flink/rocksdb",
81
"/data3/flink/rocksdb"
82
);
83
```
84
85
### Options Configuration
86
87
Configure RocksDB behavior through predefined options or custom factories.
88
89
```java { .api }
90
/**
91
* Sets predefined RocksDB options optimized for specific hardware profiles.
92
* @param options predefined configuration set
93
*/
94
void setPredefinedOptions(PredefinedOptions options);
95
96
/**
97
* Gets the current predefined options configuration.
98
* @return current predefined options
99
*/
100
PredefinedOptions getPredefinedOptions();
101
102
/**
103
* Sets a custom RocksDB options factory for fine-grained configuration.
104
* @param optionsFactory factory for creating RocksDB options
105
*/
106
void setRocksDBOptions(RocksDBOptionsFactory optionsFactory);
107
108
/**
109
* Gets the current RocksDB options factory.
110
* @return current options factory
111
*/
112
RocksDBOptionsFactory getRocksDBOptions();
113
```
114
115
**Usage Examples:**
116
117
```java
118
// Use predefined options
119
stateBackend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED);
120
121
// Use custom options factory
122
DefaultConfigurableOptionsFactory factory = new DefaultConfigurableOptionsFactory()
123
.setMaxBackgroundThreads(4)
124
.setWriteBufferSize("128mb");
125
stateBackend.setRocksDBOptions(factory);
126
```
127
128
### Performance Tuning
129
130
Configure performance-related settings for checkpointing and write operations.
131
132
```java { .api }
133
/**
134
* Sets the number of threads for file transfer during checkpointing.
135
* @param numberOfTransferThreads thread count for parallel file transfer
136
*/
137
void setNumberOfTransferThreads(int numberOfTransferThreads);
138
139
/**
140
* Gets the configured number of transfer threads.
141
* @return number of transfer threads
142
*/
143
int getNumberOfTransferThreads();
144
145
/**
146
* Sets the maximum size of write batches for RocksDB operations.
147
* @param writeBatchSize maximum write batch size in bytes
148
*/
149
void setWriteBatchSize(long writeBatchSize);
150
151
/**
152
* Gets the configured write batch size.
153
* @return write batch size in bytes, or -1 if not set
154
*/
155
long getWriteBatchSize();
156
```
157
158
**Usage Examples:**
159
160
```java
161
// Configure checkpoint transfer threads
162
stateBackend.setNumberOfTransferThreads(8);
163
164
// Configure write batch size (64MB)
165
stateBackend.setWriteBatchSize(64 * 1024 * 1024);
166
```
167
168
### Priority Queue Configuration
169
170
Configure the implementation type for priority queue state (used by timer service).
171
172
```java { .api }
173
/**
174
* Sets the priority queue state type for timer service.
175
* @param priorityQueueStateType implementation type for priority queues
176
*/
177
void setPriorityQueueStateType(PriorityQueueStateType priorityQueueStateType);
178
179
/**
180
* Gets the configured priority queue state type.
181
* @return current priority queue state type
182
*/
183
PriorityQueueStateType getPriorityQueueStateType();
184
```
185
186
**Usage Examples:**
187
188
```java
189
// Use heap-based priority queues (faster, limited by memory)
190
stateBackend.setPriorityQueueStateType(PriorityQueueStateType.HEAP);
191
192
// Use RocksDB-based priority queues (scalable beyond memory limits)
193
stateBackend.setPriorityQueueStateType(PriorityQueueStateType.ROCKSDB);
194
```
195
196
### Status and Configuration Methods
197
198
Query state backend configuration and create configured copies.
199
200
```java { .api }
201
/**
202
* Checks if incremental checkpointing is enabled.
203
* @return true if incremental checkpointing is enabled
204
*/
205
boolean isIncrementalCheckpointsEnabled();
206
207
/**
208
* Creates a configured copy of the state backend from ReadableConfig.
209
* @param config configuration to apply
210
* @param classLoader class loader for loading factory classes
211
* @return new configured state backend instance
212
*/
213
EmbeddedRocksDBStateBackend configure(ReadableConfig config, ClassLoader classLoader);
214
215
/**
216
* Gets the memory configuration for this state backend.
217
* @return memory configuration object
218
*/
219
RocksDBMemoryConfiguration getMemoryConfiguration();
220
```
221
222
**Usage Examples:**
223
224
```java
225
// Check incremental checkpointing status
226
if (stateBackend.isIncrementalCheckpointsEnabled()) {
227
System.out.println("Incremental checkpointing is enabled");
228
}
229
230
// Access memory configuration
231
RocksDBMemoryConfiguration memConfig = stateBackend.getMemoryConfiguration();
232
memConfig.setWriteBufferRatio(0.4);
233
```
234
235
## Types
236
237
```java { .api }
238
enum PriorityQueueStateType {
239
/** Heap-based priority queue (faster access, limited by memory) */
240
HEAP,
241
242
/** RocksDB-based priority queue (scalable beyond memory limits) */
243
ROCKSDB
244
}
245
246
enum TernaryBoolean {
247
TRUE,
248
FALSE,
249
UNDEFINED
250
}
251
```