or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-state-backend.mdindex.mdmemory-configuration.mdmetrics-monitoring.mdoptions-and-factories.md

memory-configuration.mddocs/

0

# Memory Configuration and Management

1

2

The RocksDB State Backend provides sophisticated memory management capabilities that integrate with Flink's memory model. This allows efficient memory utilization across different deployment scenarios and workload patterns.

3

4

## Core Imports

5

6

```java { .api }

7

import org.apache.flink.state.rocksdb.RocksDBMemoryConfiguration;

8

import org.apache.flink.state.rocksdb.RocksDBMemoryFactory;

9

import org.apache.flink.configuration.Configuration;

10

import org.apache.flink.configuration.ReadableConfig;

11

import org.apache.flink.configuration.MemorySize;

12

```

13

14

## RocksDBMemoryConfiguration Class

15

16

### Class Definition

17

18

```java { .api }

19

public final class RocksDBMemoryConfiguration {

20

// Configuration settings for RocksDB memory usage management

21

}

22

```

23

24

## Memory Management Strategies

25

26

### Managed Memory Integration

27

28

```java { .api }

29

public void setUseManagedMemory(boolean useManagedMemory)

30

```

31

Configures whether to use Flink's managed memory for RocksDB.

32

33

**Parameters:**

34

- `useManagedMemory` - Whether to use managed memory budget from Flink

35

36

```java { .api }

37

public boolean isUsingManagedMemory()

38

```

39

Checks if this configuration uses Flink's managed memory.

40

41

**Returns:** `true` if using managed memory, `false` otherwise

42

43

**Example:**

44

```java

45

RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();

46

memConfig.setUseManagedMemory(true);

47

48

// Check current setting

49

if (memConfig.isUsingManagedMemory()) {

50

// Memory will be allocated from Flink's managed memory pool

51

}

52

```

53

54

### Fixed Memory Per Slot

55

56

```java { .api }

57

public void setFixedMemoryPerSlot(MemorySize fixedMemoryPerSlot)

58

```

59

Sets a fixed amount of memory for RocksDB per task slot.

60

61

**Parameters:**

62

- `fixedMemoryPerSlot` - Fixed memory size per slot

63

64

```java { .api }

65

public void setFixedMemoryPerSlot(String totalMemoryPerSlotStr)

66

```

67

Sets a fixed amount of memory for RocksDB per task slot using string format.

68

69

**Parameters:**

70

- `totalMemoryPerSlotStr` - Memory size string (e.g., "256mb", "1gb")

71

72

```java { .api }

73

public boolean isUsingFixedMemoryPerSlot()

74

```

75

Checks if this configuration uses fixed memory per slot.

76

77

**Returns:** `true` if using fixed memory per slot, `false` otherwise

78

79

```java { .api }

80

public MemorySize getFixedMemoryPerSlot()

81

```

82

Gets the configured fixed memory per slot.

83

84

**Returns:** Fixed memory size per slot, or null if not configured

85

86

**Example:**

87

```java

88

RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();

89

90

// Set fixed memory using MemorySize

91

memConfig.setFixedMemoryPerSlot(MemorySize.ofMebiBytes(512));

92

93

// Or set using string format

94

memConfig.setFixedMemoryPerSlot("512mb");

95

96

// Check configuration

97

if (memConfig.isUsingFixedMemoryPerSlot()) {

98

MemorySize fixedSize = memConfig.getFixedMemoryPerSlot();

99

System.out.println("Fixed memory per slot: " + fixedSize);

100

}

101

```

102

103

## Memory Allocation Ratios

104

105

### Write Buffer Ratio

106

107

```java { .api }

108

public void setWriteBufferRatio(double writeBufferRatio)

109

```

110

Sets the fraction of available memory to use for RocksDB write buffers.

111

112

**Parameters:**

113

- `writeBufferRatio` - Fraction of memory for write buffers (0.0 to 1.0)

114

115

```java { .api }

116

public double getWriteBufferRatio()

117

```

118

Gets the configured write buffer ratio.

119

120

**Returns:** Write buffer ratio as a fraction

121

122

**Example:**

123

```java

124

// Use 40% of available memory for write buffers

125

memConfig.setWriteBufferRatio(0.4);

126

127

// Get current ratio

128

double ratio = memConfig.getWriteBufferRatio();

129

```

130

131

### High Priority Pool Ratio

132

133

```java { .api }

134

public void setHighPriorityPoolRatio(double highPriorityPoolRatio)

135

```

136

Sets the fraction of block cache memory reserved for high priority blocks.

137

138

**Parameters:**

139

- `highPriorityPoolRatio` - Fraction for high priority blocks (0.0 to 1.0)

140

141

```java { .api }

142

public double getHighPriorityPoolRatio()

143

```

144

Gets the configured high priority pool ratio.

145

146

**Returns:** High priority pool ratio as a fraction

147

148

**Example:**

149

```java

150

// Reserve 20% of block cache for high priority blocks (index/filter blocks)

151

memConfig.setHighPriorityPoolRatio(0.2);

152

153

// Get current ratio

154

double highPriorityRatio = memConfig.getHighPriorityPoolRatio();

155

```

156

157

## Advanced Memory Features

158

159

### Partitioned Index Filters

160

161

```java { .api }

162

public boolean isUsingPartitionedIndexFilters()

163

```

164

Checks if partitioned index/filters are enabled for memory efficiency.

165

166

**Returns:** `true` if using partitioned index/filters, `false` otherwise

167

168

**Example:**

169

```java

170

if (memConfig.isUsingPartitionedIndexFilters()) {

171

// Index and filter blocks are partitioned for better memory utilization

172

}

173

```

174

175

## Configuration Validation and Utilities

176

177

### Configuration Validation

178

179

```java { .api }

180

public void validate()

181

```

182

Validates the consistency of the memory configuration.

183

184

**Throws:** IllegalArgumentException if configuration is invalid

185

186

**Example:**

187

```java

188

RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();

189

memConfig.setUseManagedMemory(true);

190

memConfig.setWriteBufferRatio(0.4);

191

memConfig.setHighPriorityPoolRatio(0.2);

192

memConfig.validate(); // Throws exception if configuration is inconsistent

193

```

194

195

### Factory Methods

196

197

```java { .api }

198

public static RocksDBMemoryConfiguration fromOtherAndConfiguration(

199

RocksDBMemoryConfiguration other,

200

ReadableConfig config)

201

```

202

Creates a new memory configuration from an existing one and additional configuration.

203

204

**Parameters:**

205

- `other` - Base memory configuration to copy from

206

- `config` - Additional configuration to apply

207

208

**Returns:** New configured RocksDBMemoryConfiguration instance

209

210

```java { .api }

211

public static RocksDBMemoryConfiguration fromConfiguration(Configuration configuration)

212

```

213

Creates a memory configuration from Flink configuration.

214

215

**Parameters:**

216

- `configuration` - Flink configuration containing memory settings

217

218

**Returns:** RocksDBMemoryConfiguration instance based on configuration

219

220

**Example:**

221

```java

222

// Create from Flink configuration

223

Configuration config = new Configuration();

224

config.set(RocksDBOptions.USE_MANAGED_MEMORY, true);

225

config.set(RocksDBOptions.WRITE_BUFFER_RATIO, 0.4);

226

227

RocksDBMemoryConfiguration memConfig = RocksDBMemoryConfiguration.fromConfiguration(config);

228

229

// Create from existing config with overrides

230

Configuration overrides = new Configuration();

231

overrides.set(RocksDBOptions.HIGH_PRIORITY_POOL_RATIO, 0.3);

232

233

RocksDBMemoryConfiguration newConfig = RocksDBMemoryConfiguration.fromOtherAndConfiguration(

234

memConfig, overrides);

235

```

236

237

## Memory Configuration Patterns

238

239

### Pattern 1: Managed Memory Integration

240

241

```java

242

/**

243

* Use Flink's managed memory with automatic memory distribution

244

* Best for: Multi-tenant clusters, resource management

245

*/

246

public void configureForManagedMemory(EmbeddedRocksDBStateBackend backend) {

247

RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();

248

249

memConfig.setUseManagedMemory(true); // Use Flink's managed memory

250

memConfig.setWriteBufferRatio(0.4); // 40% for write buffers

251

memConfig.setHighPriorityPoolRatio(0.2); // 20% high priority in cache

252

memConfig.validate();

253

254

// Memory will be allocated from Flink's managed memory pool

255

// Automatic scaling based on available memory

256

}

257

```

258

259

### Pattern 2: Fixed Memory Per Slot

260

261

```java

262

/**

263

* Fixed memory allocation per task slot

264

* Best for: Predictable memory usage, dedicated clusters

265

*/

266

public void configureFixedMemory(EmbeddedRocksDBStateBackend backend) {

267

RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();

268

269

memConfig.setFixedMemoryPerSlot("1gb"); // 1GB per slot

270

memConfig.setWriteBufferRatio(0.3); // 300MB for write buffers

271

memConfig.setHighPriorityPoolRatio(0.15); // 15% high priority

272

memConfig.validate();

273

274

// Each task slot will use exactly 1GB for RocksDB

275

}

276

```

277

278

### Pattern 3: High-Throughput Write Workload

279

280

```java

281

/**

282

* Optimized for high write throughput

283

* Best for: Heavy ingestion, frequent updates

284

*/

285

public void configureForHighWrites(EmbeddedRocksDBStateBackend backend) {

286

RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();

287

288

memConfig.setUseManagedMemory(true);

289

memConfig.setWriteBufferRatio(0.6); // 60% for write buffers (higher)

290

memConfig.setHighPriorityPoolRatio(0.1); // 10% high priority (lower)

291

memConfig.validate();

292

293

// More memory for write buffers to handle high write load

294

}

295

```

296

297

### Pattern 4: Read-Heavy Workload

298

299

```java

300

/**

301

* Optimized for read-heavy workloads with complex state access

302

* Best for: Analytics, lookups, windowing operations

303

*/

304

public void configureForReads(EmbeddedRocksDBStateBackend backend) {

305

RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();

306

307

memConfig.setUseManagedMemory(true);

308

memConfig.setWriteBufferRatio(0.2); // 20% for write buffers (lower)

309

memConfig.setHighPriorityPoolRatio(0.3); // 30% high priority (higher)

310

memConfig.validate();

311

312

// More memory for block cache to improve read performance

313

}

314

```

315

316

## Memory Factory Integration

317

318

### Custom Memory Factory

319

320

```java { .api }

321

public EmbeddedRocksDBStateBackend setRocksDBMemoryFactory(RocksDBMemoryFactory rocksDBMemoryFactory)

322

```

323

Sets a custom RocksDB memory factory for advanced memory management.

324

325

**Parameters:**

326

- `rocksDBMemoryFactory` - Custom memory factory implementation

327

328

**Returns:** The state backend instance for method chaining

329

330

**Example:**

331

```java

332

// Custom memory factory for specialized allocation strategies

333

public class CustomRocksDBMemoryFactory implements RocksDBMemoryFactory {

334

// Implementation for custom memory allocation

335

}

336

337

EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend();

338

backend.setRocksDBMemoryFactory(new CustomRocksDBMemoryFactory());

339

```

340

341

## Configuration Integration

342

343

### Flink Configuration Keys

344

345

```java

346

// Set memory configuration through Flink configuration

347

Configuration config = new Configuration();

348

349

// Managed memory integration

350

config.set(RocksDBOptions.USE_MANAGED_MEMORY, true);

351

352

// Fixed memory allocation

353

config.set(RocksDBOptions.FIX_PER_SLOT_MEMORY_SIZE, MemorySize.parse("512mb"));

354

config.set(RocksDBOptions.FIX_PER_TM_MEMORY_SIZE, MemorySize.parse("2gb"));

355

356

// Memory ratios

357

config.set(RocksDBOptions.WRITE_BUFFER_RATIO, 0.4);

358

config.set(RocksDBOptions.HIGH_PRIORITY_POOL_RATIO, 0.2);

359

360

// Index/filter optimization

361

config.set(RocksDBOptions.USE_PARTITIONED_INDEX_FILTERS, true);

362

```

363

364

### YAML Configuration

365

366

```yaml

367

# flink-conf.yaml

368

state.backend.rocksdb.memory.managed: true

369

state.backend.rocksdb.memory.fixed-per-slot: 512mb

370

state.backend.rocksdb.memory.write-buffer-ratio: 0.4

371

state.backend.rocksdb.memory.high-priority-pool-ratio: 0.2

372

state.backend.rocksdb.memory.partitioned-index-filters: true

373

```

374

375

## Memory Monitoring and Tuning

376

377

### Memory Usage Patterns

378

379

1. **Write Buffer Memory**: Used for incoming writes before flushing to disk

380

- Higher ratio for write-heavy workloads

381

- Lower ratio for read-heavy workloads

382

383

2. **Block Cache Memory**: Used for caching frequently accessed data blocks

384

- Remaining memory after write buffers

385

- High priority pool for metadata (index/filter blocks)

386

387

3. **Index/Filter Memory**: Cached separately for fast lookups

388

- Configure high priority pool ratio appropriately

389

- Use partitioned index/filters for large state

390

391

### Tuning Guidelines

392

393

**Memory Pressure Indicators:**

394

- Frequent compactions (monitor via metrics)

395

- High read amplification

396

- Slow checkpoint performance

397

398

**Tuning Recommendations:**

399

- Start with managed memory integration

400

- Use 0.3-0.5 write buffer ratio for balanced workloads

401

- Set 0.1-0.3 high priority pool ratio based on read patterns

402

- Enable partitioned index/filters for large state (>1GB per operator)

403

404

**Memory Sizing:**

405

```java

406

// Example memory calculation for 4GB TaskManager heap

407

// With 50% managed memory = 2GB available for RocksDB

408

409

RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();

410

memConfig.setUseManagedMemory(true)

411

.setWriteBufferRatio(0.4) // 800MB write buffers

412

.setHighPriorityPoolRatio(0.2); // 240MB high priority cache

413

// 960MB regular block cache

414

// Total: 2GB RocksDB memory usage

415

```

416

417

## Complete Configuration Example

418

419

```java

420

import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;

421

import org.apache.flink.state.rocksdb.RocksDBMemoryConfiguration;

422

import org.apache.flink.configuration.MemorySize;

423

424

public class RocksDBMemoryConfigurationExample {

425

426

public static EmbeddedRocksDBStateBackend createOptimizedBackend() {

427

EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);

428

429

// Configure storage

430

backend.setDbStoragePaths("/ssd1/rocksdb", "/ssd2/rocksdb");

431

432

// Configure memory management

433

RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();

434

memConfig.setUseManagedMemory(true); // Use Flink managed memory

435

memConfig.setWriteBufferRatio(0.4); // 40% for write buffers

436

memConfig.setHighPriorityPoolRatio(0.2); // 20% high priority cache

437

memConfig.validate(); // Validate configuration

438

439

return backend;

440

}

441

442

public static EmbeddedRocksDBStateBackend createFixedMemoryBackend() {

443

EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);

444

445

// Configure with fixed memory per slot

446

RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();

447

memConfig.setFixedMemoryPerSlot(MemorySize.ofMebiBytes(1024)); // 1GB per slot

448

memConfig.setWriteBufferRatio(0.3); // 300MB write buffers

449

memConfig.setHighPriorityPoolRatio(0.15); // 15% high priority

450

memConfig.validate();

451

452

return backend;

453

}

454

}

455

```

456

457

## Thread Safety and Lifecycle

458

459

- **Configuration Phase**: Memory configuration should be set before the job starts

460

- **Runtime**: Memory allocation is managed automatically by RocksDB and Flink

461

- **Validation**: Use `validate()` to check configuration consistency during setup

462

- **Monitoring**: Use RocksDB native metrics to monitor actual memory usage patterns