0
# Memory Configuration and Management
1
2
The RocksDB State Backend provides sophisticated memory management capabilities that integrate with Flink's memory model. This allows efficient memory utilization across different deployment scenarios and workload patterns.
3
4
## Core Imports
5
6
```java { .api }
7
import org.apache.flink.state.rocksdb.RocksDBMemoryConfiguration;
8
import org.apache.flink.state.rocksdb.RocksDBMemoryFactory;
9
import org.apache.flink.configuration.Configuration;
10
import org.apache.flink.configuration.ReadableConfig;
11
import org.apache.flink.configuration.MemorySize;
12
```
13
14
## RocksDBMemoryConfiguration Class
15
16
### Class Definition
17
18
```java { .api }
19
public final class RocksDBMemoryConfiguration {
20
// Configuration settings for RocksDB memory usage management
21
}
22
```
23
24
## Memory Management Strategies
25
26
### Managed Memory Integration
27
28
```java { .api }
29
public void setUseManagedMemory(boolean useManagedMemory)
30
```
31
Configures whether to use Flink's managed memory for RocksDB.
32
33
**Parameters:**
34
- `useManagedMemory` - Whether to use managed memory budget from Flink
35
36
```java { .api }
37
public boolean isUsingManagedMemory()
38
```
39
Checks if this configuration uses Flink's managed memory.
40
41
**Returns:** `true` if using managed memory, `false` otherwise
42
43
**Example:**
44
```java
45
RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
46
memConfig.setUseManagedMemory(true);
47
48
// Check current setting
49
if (memConfig.isUsingManagedMemory()) {
50
// Memory will be allocated from Flink's managed memory pool
51
}
52
```
53
54
### Fixed Memory Per Slot
55
56
```java { .api }
57
public void setFixedMemoryPerSlot(MemorySize fixedMemoryPerSlot)
58
```
59
Sets a fixed amount of memory for RocksDB per task slot.
60
61
**Parameters:**
62
- `fixedMemoryPerSlot` - Fixed memory size per slot
63
64
```java { .api }
65
public void setFixedMemoryPerSlot(String totalMemoryPerSlotStr)
66
```
67
Sets a fixed amount of memory for RocksDB per task slot using string format.
68
69
**Parameters:**
70
- `totalMemoryPerSlotStr` - Memory size string (e.g., "256mb", "1gb")
71
72
```java { .api }
73
public boolean isUsingFixedMemoryPerSlot()
74
```
75
Checks if this configuration uses fixed memory per slot.
76
77
**Returns:** `true` if using fixed memory per slot, `false` otherwise
78
79
```java { .api }
80
public MemorySize getFixedMemoryPerSlot()
81
```
82
Gets the configured fixed memory per slot.
83
84
**Returns:** Fixed memory size per slot, or null if not configured
85
86
**Example:**
87
```java
88
RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
89
90
// Set fixed memory using MemorySize
91
memConfig.setFixedMemoryPerSlot(MemorySize.ofMebiBytes(512));
92
93
// Or set using string format
94
memConfig.setFixedMemoryPerSlot("512mb");
95
96
// Check configuration
97
if (memConfig.isUsingFixedMemoryPerSlot()) {
98
MemorySize fixedSize = memConfig.getFixedMemoryPerSlot();
99
System.out.println("Fixed memory per slot: " + fixedSize);
100
}
101
```
102
103
## Memory Allocation Ratios
104
105
### Write Buffer Ratio
106
107
```java { .api }
108
public void setWriteBufferRatio(double writeBufferRatio)
109
```
110
Sets the fraction of available memory to use for RocksDB write buffers.
111
112
**Parameters:**
113
- `writeBufferRatio` - Fraction of memory for write buffers (0.0 to 1.0)
114
115
```java { .api }
116
public double getWriteBufferRatio()
117
```
118
Gets the configured write buffer ratio.
119
120
**Returns:** Write buffer ratio as a fraction
121
122
**Example:**
123
```java
124
// Use 40% of available memory for write buffers
125
memConfig.setWriteBufferRatio(0.4);
126
127
// Get current ratio
128
double ratio = memConfig.getWriteBufferRatio();
129
```
130
131
### High Priority Pool Ratio
132
133
```java { .api }
134
public void setHighPriorityPoolRatio(double highPriorityPoolRatio)
135
```
136
Sets the fraction of block cache memory reserved for high priority blocks.
137
138
**Parameters:**
139
- `highPriorityPoolRatio` - Fraction for high priority blocks (0.0 to 1.0)
140
141
```java { .api }
142
public double getHighPriorityPoolRatio()
143
```
144
Gets the configured high priority pool ratio.
145
146
**Returns:** High priority pool ratio as a fraction
147
148
**Example:**
149
```java
150
// Reserve 20% of block cache for high priority blocks (index/filter blocks)
151
memConfig.setHighPriorityPoolRatio(0.2);
152
153
// Get current ratio
154
double highPriorityRatio = memConfig.getHighPriorityPoolRatio();
155
```
156
157
## Advanced Memory Features
158
159
### Partitioned Index Filters
160
161
```java { .api }
162
public boolean isUsingPartitionedIndexFilters()
163
```
164
Checks if partitioned index/filters are enabled for memory efficiency.
165
166
**Returns:** `true` if using partitioned index/filters, `false` otherwise
167
168
**Example:**
169
```java
170
if (memConfig.isUsingPartitionedIndexFilters()) {
171
// Index and filter blocks are partitioned for better memory utilization
172
}
173
```
174
175
## Configuration Validation and Utilities
176
177
### Configuration Validation
178
179
```java { .api }
180
public void validate()
181
```
182
Validates the consistency of the memory configuration.
183
184
**Throws:** IllegalArgumentException if configuration is invalid
185
186
**Example:**
187
```java
188
RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
189
memConfig.setUseManagedMemory(true);
190
memConfig.setWriteBufferRatio(0.4);
191
memConfig.setHighPriorityPoolRatio(0.2);
192
memConfig.validate(); // Throws exception if configuration is inconsistent
193
```
194
195
### Factory Methods
196
197
```java { .api }
198
public static RocksDBMemoryConfiguration fromOtherAndConfiguration(
199
RocksDBMemoryConfiguration other,
200
ReadableConfig config)
201
```
202
Creates a new memory configuration from an existing one and additional configuration.
203
204
**Parameters:**
205
- `other` - Base memory configuration to copy from
206
- `config` - Additional configuration to apply
207
208
**Returns:** New configured RocksDBMemoryConfiguration instance
209
210
```java { .api }
211
public static RocksDBMemoryConfiguration fromConfiguration(Configuration configuration)
212
```
213
Creates a memory configuration from Flink configuration.
214
215
**Parameters:**
216
- `configuration` - Flink configuration containing memory settings
217
218
**Returns:** RocksDBMemoryConfiguration instance based on configuration
219
220
**Example:**
221
```java
222
// Create from Flink configuration
223
Configuration config = new Configuration();
224
config.set(RocksDBOptions.USE_MANAGED_MEMORY, true);
225
config.set(RocksDBOptions.WRITE_BUFFER_RATIO, 0.4);
226
227
RocksDBMemoryConfiguration memConfig = RocksDBMemoryConfiguration.fromConfiguration(config);
228
229
// Create from existing config with overrides
230
Configuration overrides = new Configuration();
231
overrides.set(RocksDBOptions.HIGH_PRIORITY_POOL_RATIO, 0.3);
232
233
RocksDBMemoryConfiguration newConfig = RocksDBMemoryConfiguration.fromOtherAndConfiguration(
234
memConfig, overrides);
235
```
236
237
## Memory Configuration Patterns
238
239
### Pattern 1: Managed Memory Integration
240
241
```java
242
/**
243
* Use Flink's managed memory with automatic memory distribution
244
* Best for: Multi-tenant clusters, resource management
245
*/
246
public void configureForManagedMemory(EmbeddedRocksDBStateBackend backend) {
247
RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
248
249
memConfig.setUseManagedMemory(true); // Use Flink's managed memory
250
memConfig.setWriteBufferRatio(0.4); // 40% for write buffers
251
memConfig.setHighPriorityPoolRatio(0.2); // 20% high priority in cache
252
memConfig.validate();
253
254
// Memory will be allocated from Flink's managed memory pool
255
// Automatic scaling based on available memory
256
}
257
```
258
259
### Pattern 2: Fixed Memory Per Slot
260
261
```java
262
/**
263
* Fixed memory allocation per task slot
264
* Best for: Predictable memory usage, dedicated clusters
265
*/
266
public void configureFixedMemory(EmbeddedRocksDBStateBackend backend) {
267
RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
268
269
memConfig.setFixedMemoryPerSlot("1gb"); // 1GB per slot
270
memConfig.setWriteBufferRatio(0.3); // 300MB for write buffers
271
memConfig.setHighPriorityPoolRatio(0.15); // 15% high priority
272
memConfig.validate();
273
274
// Each task slot will use exactly 1GB for RocksDB
275
}
276
```
277
278
### Pattern 3: High-Throughput Write Workload
279
280
```java
281
/**
282
* Optimized for high write throughput
283
* Best for: Heavy ingestion, frequent updates
284
*/
285
public void configureForHighWrites(EmbeddedRocksDBStateBackend backend) {
286
RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
287
288
memConfig.setUseManagedMemory(true);
289
memConfig.setWriteBufferRatio(0.6); // 60% for write buffers (higher)
290
memConfig.setHighPriorityPoolRatio(0.1); // 10% high priority (lower)
291
memConfig.validate();
292
293
// More memory for write buffers to handle high write load
294
}
295
```
296
297
### Pattern 4: Read-Heavy Workload
298
299
```java
300
/**
301
* Optimized for read-heavy workloads with complex state access
302
* Best for: Analytics, lookups, windowing operations
303
*/
304
public void configureForReads(EmbeddedRocksDBStateBackend backend) {
305
RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
306
307
memConfig.setUseManagedMemory(true);
308
memConfig.setWriteBufferRatio(0.2); // 20% for write buffers (lower)
309
memConfig.setHighPriorityPoolRatio(0.3); // 30% high priority (higher)
310
memConfig.validate();
311
312
// More memory for block cache to improve read performance
313
}
314
```
315
316
## Memory Factory Integration
317
318
### Custom Memory Factory
319
320
```java { .api }
321
public EmbeddedRocksDBStateBackend setRocksDBMemoryFactory(RocksDBMemoryFactory rocksDBMemoryFactory)
322
```
323
Sets a custom RocksDB memory factory for advanced memory management.
324
325
**Parameters:**
326
- `rocksDBMemoryFactory` - Custom memory factory implementation
327
328
**Returns:** The state backend instance for method chaining
329
330
**Example:**
331
```java
332
// Custom memory factory for specialized allocation strategies
333
public class CustomRocksDBMemoryFactory implements RocksDBMemoryFactory {
334
// Implementation for custom memory allocation
335
}
336
337
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend();
338
backend.setRocksDBMemoryFactory(new CustomRocksDBMemoryFactory());
339
```
340
341
## Configuration Integration
342
343
### Flink Configuration Keys
344
345
```java
346
// Set memory configuration through Flink configuration
347
Configuration config = new Configuration();
348
349
// Managed memory integration
350
config.set(RocksDBOptions.USE_MANAGED_MEMORY, true);
351
352
// Fixed memory allocation
353
config.set(RocksDBOptions.FIX_PER_SLOT_MEMORY_SIZE, MemorySize.parse("512mb"));
354
config.set(RocksDBOptions.FIX_PER_TM_MEMORY_SIZE, MemorySize.parse("2gb"));
355
356
// Memory ratios
357
config.set(RocksDBOptions.WRITE_BUFFER_RATIO, 0.4);
358
config.set(RocksDBOptions.HIGH_PRIORITY_POOL_RATIO, 0.2);
359
360
// Index/filter optimization
361
config.set(RocksDBOptions.USE_PARTITIONED_INDEX_FILTERS, true);
362
```
363
364
### YAML Configuration
365
366
```yaml
367
# flink-conf.yaml
368
state.backend.rocksdb.memory.managed: true
369
state.backend.rocksdb.memory.fixed-per-slot: 512mb
370
state.backend.rocksdb.memory.write-buffer-ratio: 0.4
371
state.backend.rocksdb.memory.high-priority-pool-ratio: 0.2
372
state.backend.rocksdb.memory.partitioned-index-filters: true
373
```
374
375
## Memory Monitoring and Tuning
376
377
### Memory Usage Patterns
378
379
1. **Write Buffer Memory**: Used for incoming writes before flushing to disk
380
- Higher ratio for write-heavy workloads
381
- Lower ratio for read-heavy workloads
382
383
2. **Block Cache Memory**: Used for caching frequently accessed data blocks
384
- Remaining memory after write buffers
385
- High priority pool for metadata (index/filter blocks)
386
387
3. **Index/Filter Memory**: Cached separately for fast lookups
388
- Configure high priority pool ratio appropriately
389
- Use partitioned index/filters for large state
390
391
### Tuning Guidelines
392
393
**Memory Pressure Indicators:**
394
- Frequent compactions (monitor via metrics)
395
- High read amplification
396
- Slow checkpoint performance
397
398
**Tuning Recommendations:**
399
- Start with managed memory integration
400
- Use 0.3-0.5 write buffer ratio for balanced workloads
401
- Set 0.1-0.3 high priority pool ratio based on read patterns
402
- Enable partitioned index/filters for large state (>1GB per operator)
403
404
**Memory Sizing:**
405
```java
406
// Example memory calculation for 4GB TaskManager heap
407
// With 50% managed memory = 2GB available for RocksDB
408
409
RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
410
memConfig.setUseManagedMemory(true)
411
.setWriteBufferRatio(0.4) // 800MB write buffers
412
.setHighPriorityPoolRatio(0.2); // 240MB high priority cache
413
// 960MB regular block cache
414
// Total: 2GB RocksDB memory usage
415
```
416
417
## Complete Configuration Example
418
419
```java
420
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
421
import org.apache.flink.state.rocksdb.RocksDBMemoryConfiguration;
422
import org.apache.flink.configuration.MemorySize;
423
424
public class RocksDBMemoryConfigurationExample {
425
426
public static EmbeddedRocksDBStateBackend createOptimizedBackend() {
427
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);
428
429
// Configure storage
430
backend.setDbStoragePaths("/ssd1/rocksdb", "/ssd2/rocksdb");
431
432
// Configure memory management
433
RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
434
memConfig.setUseManagedMemory(true); // Use Flink managed memory
435
memConfig.setWriteBufferRatio(0.4); // 40% for write buffers
436
memConfig.setHighPriorityPoolRatio(0.2); // 20% high priority cache
437
memConfig.validate(); // Validate configuration
438
439
return backend;
440
}
441
442
public static EmbeddedRocksDBStateBackend createFixedMemoryBackend() {
443
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);
444
445
// Configure with fixed memory per slot
446
RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
447
memConfig.setFixedMemoryPerSlot(MemorySize.ofMebiBytes(1024)); // 1GB per slot
448
memConfig.setWriteBufferRatio(0.3); // 300MB write buffers
449
memConfig.setHighPriorityPoolRatio(0.15); // 15% high priority
450
memConfig.validate();
451
452
return backend;
453
}
454
}
455
```
456
457
## Thread Safety and Lifecycle
458
459
- **Configuration Phase**: Memory configuration should be set before the job starts
460
- **Runtime**: Memory allocation is managed automatically by RocksDB and Flink
461
- **Validation**: Use `validate()` to check configuration consistency during setup
462
- **Monitoring**: Use RocksDB native metrics to monitor actual memory usage patterns