0
# Memory Configuration
1
2
Memory management configuration for RocksDB integration with Flink's managed memory system, including write buffer ratios and memory allocation strategies.
3
4
## Capabilities
5
6
### RocksDBMemoryConfiguration
7
8
Configuration class for managing RocksDB memory usage within Flink's memory management system.
9
10
```java { .api }
11
/**
12
* Configuration for RocksDB memory management.
13
* Controls how RocksDB integrates with Flink's managed memory system.
14
*/
15
class RocksDBMemoryConfiguration {
16
17
/**
18
* Creates a new memory configuration with default settings.
19
*/
20
RocksDBMemoryConfiguration();
21
}
22
```
23
24
### Managed Memory Configuration
25
26
Configure integration with Flink's managed memory system.
27
28
```java { .api }
29
/**
30
* Enables or disables the use of Flink's managed memory for RocksDB.
31
* When enabled, RocksDB memory usage is bounded by Flink's memory management.
32
* @param useManagedMemory whether to use Flink's managed memory
33
*/
34
void setUseManagedMemory(boolean useManagedMemory);
35
36
/**
37
* Checks if RocksDB is using Flink's managed memory.
38
* @return true if managed memory is enabled
39
*/
40
boolean isUsingManagedMemory();
41
```
42
43
**Usage Examples:**
44
45
```java
46
EmbeddedRocksDBStateBackend stateBackend = new EmbeddedRocksDBStateBackend();
47
RocksDBMemoryConfiguration memConfig = stateBackend.getMemoryConfiguration();
48
49
// Enable managed memory (recommended for production)
50
memConfig.setUseManagedMemory(true);
51
52
// Check if managed memory is enabled
53
if (memConfig.isUsingManagedMemory()) {
54
System.out.println("Using Flink managed memory");
55
}
56
```
57
58
### Fixed Memory Per Slot
59
60
Configure fixed memory allocation per task slot.
61
62
```java { .api }
63
/**
64
* Sets a fixed amount of memory per task slot for RocksDB.
65
* This overrides managed memory settings and allocates a specific amount.
66
* @param fixedMemoryPerSlot fixed memory size per slot
67
*/
68
void setFixedMemoryPerSlot(MemorySize fixedMemoryPerSlot);
69
70
/**
71
* Sets a fixed amount of memory per task slot from string representation.
72
* @param totalMemoryPerSlotStr memory size string (e.g., "128mb", "1gb")
73
*/
74
void setFixedMemoryPerSlot(String totalMemoryPerSlotStr);
75
76
/**
77
* Checks if fixed memory per slot is configured.
78
* @return true if fixed memory per slot is set
79
*/
80
boolean isUsingFixedMemoryPerSlot();
81
82
/**
83
* Gets the configured fixed memory per slot.
84
* @return fixed memory size per slot, or null if not configured
85
*/
86
MemorySize getFixedMemoryPerSlot();
87
```
88
89
**Usage Examples:**
90
91
```java
92
// Set fixed memory per slot (512MB)
93
memConfig.setFixedMemoryPerSlot(MemorySize.ofMebiBytes(512));
94
95
// Set fixed memory per slot from string
96
memConfig.setFixedMemoryPerSlot("512mb");
97
98
// Check configuration
99
if (memConfig.isUsingFixedMemoryPerSlot()) {
100
MemorySize fixedSize = memConfig.getFixedMemoryPerSlot();
101
System.out.println("Fixed memory per slot: " + fixedSize);
102
}
103
```
104
105
### Memory Ratio Configuration
106
107
Configure memory allocation ratios for different RocksDB components.
108
109
```java { .api }
110
/**
111
* Sets the ratio of total memory allocated to write buffers (memtables).
112
* Value must be between 0.0 and 1.0.
113
* @param writeBufferRatio fraction of memory for write buffers (0 < ratio < 1)
114
*/
115
void setWriteBufferRatio(double writeBufferRatio);
116
117
/**
118
* Gets the configured write buffer memory ratio.
119
* @return write buffer ratio
120
*/
121
double getWriteBufferRatio();
122
123
/**
124
* Sets the ratio of block cache memory allocated to high-priority pool.
125
* High-priority pool is used for index/filter blocks and other metadata.
126
* Value must be between 0.0 and 1.0.
127
* @param highPriorityPoolRatio fraction of cache for high-priority pool (0 < ratio < 1)
128
*/
129
void setHighPriorityPoolRatio(double highPriorityPoolRatio);
130
131
/**
132
* Gets the configured high-priority pool ratio.
133
* @return high-priority pool ratio
134
*/
135
double getHighPriorityPoolRatio();
136
```
137
138
**Usage Examples:**
139
140
```java
141
// Allocate 40% of memory to write buffers, 60% to block cache
142
memConfig.setWriteBufferRatio(0.4);
143
144
// Allocate 20% of block cache to high-priority pool (index/filter blocks)
145
memConfig.setHighPriorityPoolRatio(0.2);
146
147
// Check current ratios
148
double writeRatio = memConfig.getWriteBufferRatio();
149
double highPrioRatio = memConfig.getHighPriorityPoolRatio();
150
System.out.println("Write buffer ratio: " + writeRatio);
151
System.out.println("High priority pool ratio: " + highPrioRatio);
152
```
153
154
### Partitioned Index Filters
155
156
Configure partitioned index and filter blocks for better memory management.
157
158
```java { .api }
159
/**
160
* Checks if partitioned index filters are enabled.
161
* Partitioned filters can help with memory management for large datasets.
162
* @return true if partitioned index filters are enabled, null if not configured
163
*/
164
Boolean isUsingPartitionedIndexFilters();
165
```
166
167
**Usage Example:**
168
169
```java
170
// Check if partitioned index filters are enabled
171
Boolean partitioned = memConfig.isUsingPartitionedIndexFilters();
172
if (partitioned != null && partitioned) {
173
System.out.println("Using partitioned index filters");
174
}
175
```
176
177
### Configuration Validation and Factory Methods
178
179
Validate configuration consistency and create configurations from existing ones.
180
181
```java { .api }
182
/**
183
* Validates the memory configuration for consistency.
184
* Throws exception if configuration is invalid (e.g., ratios out of range).
185
* @throws IllegalArgumentException if configuration is invalid
186
*/
187
void validate();
188
189
/**
190
* Creates a memory configuration from an existing one and additional config.
191
* @param other existing memory configuration to copy from
192
* @param config additional configuration to apply
193
* @return new memory configuration instance
194
*/
195
static RocksDBMemoryConfiguration fromOtherAndConfiguration(
196
RocksDBMemoryConfiguration other,
197
ReadableConfig config
198
);
199
```
200
201
**Usage Examples:**
202
203
```java
204
// Validate configuration before use
205
try {
206
memConfig.validate();
207
System.out.println("Memory configuration is valid");
208
} catch (IllegalArgumentException e) {
209
System.err.println("Invalid memory configuration: " + e.getMessage());
210
}
211
212
// Create configuration from existing one
213
RocksDBMemoryConfiguration baseConfig = stateBackend.getMemoryConfiguration();
214
ReadableConfig flinkConfig = getFlinkConfiguration();
215
RocksDBMemoryConfiguration newConfig = RocksDBMemoryConfiguration
216
.fromOtherAndConfiguration(baseConfig, flinkConfig);
217
```
218
219
## Complete Configuration Example
220
221
```java
222
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
223
import org.apache.flink.contrib.streaming.state.RocksDBMemoryConfiguration;
224
import org.apache.flink.configuration.MemorySize;
225
226
// Create state backend
227
EmbeddedRocksDBStateBackend stateBackend = new EmbeddedRocksDBStateBackend(true);
228
229
// Configure memory settings
230
RocksDBMemoryConfiguration memConfig = stateBackend.getMemoryConfiguration();
231
232
// Option 1: Use managed memory with custom ratios
233
memConfig.setUseManagedMemory(true);
234
memConfig.setWriteBufferRatio(0.4); // 40% for write buffers
235
memConfig.setHighPriorityPoolRatio(0.1); // 10% of cache for high-priority
236
237
// Option 2: Use fixed memory per slot
238
// memConfig.setFixedMemoryPerSlot("1gb");
239
240
// Validate configuration
241
memConfig.validate();
242
243
// Set state backend on environment
244
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
245
env.setStateBackend(stateBackend);
246
```
247
248
## Memory Management Best Practices
249
250
### Managed Memory vs Fixed Memory
251
252
**Use Managed Memory when:**
253
- Running in containerized environments (Kubernetes, Docker)
254
- Memory allocation needs to be coordinated with other Flink components
255
- You want automatic memory management and resource isolation
256
257
**Use Fixed Memory when:**
258
- You need predictable memory usage for capacity planning
259
- Running on dedicated hardware with known memory constraints
260
- You want direct control over RocksDB memory allocation
261
262
### Memory Ratio Guidelines
263
264
**Write Buffer Ratio:**
265
- Higher ratios (0.4-0.6): Better for write-heavy workloads
266
- Lower ratios (0.2-0.3): Better for read-heavy workloads with large state
267
- Default (0.5): Balanced for most workloads
268
269
**High Priority Pool Ratio:**
270
- Higher ratios (0.1-0.2): Better for workloads with many small keys
271
- Lower ratios (0.05-0.1): Better for workloads with fewer, larger values
272
- Default (0.1): Suitable for most scenarios
273
274
## Types
275
276
```java { .api }
277
class MemorySize {
278
static MemorySize ofMebiBytes(long mebiBytes);
279
static MemorySize ofBytes(long bytes);
280
static MemorySize parse(String text);
281
long getBytes();
282
long getMebiBytes();
283
}
284
```