0
# Apache Flink RocksDB State Backend
1
2
The Apache Flink RocksDB State Backend provides persistent, fault-tolerant state storage for stream processing applications using RocksDB as the underlying embedded database. It enables high-throughput stateful computations with support for very large state sizes that exceed available memory, making it essential for production-scale streaming applications.
3
4
## Package Information
5
6
**Maven Dependency:**
7
```xml
8
<dependency>
9
<groupId>org.apache.flink</groupId>
10
<artifactId>flink-statebackend-rocksdb</artifactId>
11
<version>2.1.0</version>
12
</dependency>
13
```
14
15
**Primary Package:** `org.apache.flink.state.rocksdb`
16
**Deprecated Package:** `org.apache.flink.contrib.streaming.state` (use primary package for new development)
17
18
## Core Imports
19
20
```java { .api }
21
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
22
import org.apache.flink.state.rocksdb.RocksDBOptionsFactory;
23
import org.apache.flink.state.rocksdb.ConfigurableRocksDBOptionsFactory;
24
import org.apache.flink.state.rocksdb.PredefinedOptions;
25
import org.apache.flink.state.rocksdb.RocksDBMemoryConfiguration;
26
import org.apache.flink.state.rocksdb.RocksDBNativeMetricOptions;
27
import org.apache.flink.state.rocksdb.RocksDBOptions;
28
import org.apache.flink.state.rocksdb.RocksDBConfigurableOptions;
29
```
30
31
## Basic Usage
32
33
### Simple Setup
34
35
```java
36
// Create RocksDB state backend with default settings
37
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend();
38
39
// Configure local storage path
40
backend.setDbStoragePath("/path/to/rocksdb");
41
42
// Apply to StreamExecutionEnvironment
43
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
44
env.setStateBackend(backend);
45
```
46
47
### Incremental Checkpointing
48
49
```java
50
// Enable incremental checkpointing for better performance
51
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);
52
backend.setDbStoragePath("/path/to/rocksdb");
53
54
// Use predefined optimization for your hardware
55
backend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
56
```
57
58
### Memory Management
59
60
```java
61
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend();
62
63
// Configure memory usage
64
RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();
65
memConfig.setUseManagedMemory(true);
66
memConfig.setWriteBufferRatio(0.4);
67
memConfig.setHighPriorityPoolRatio(0.2);
68
```
69
70
## Architecture Overview
71
72
### Key Components
73
74
The RocksDB State Backend consists of several key components working together:
75
76
1. **EmbeddedRocksDBStateBackend** - Main entry point and configuration hub
77
2. **RocksDB Options System** - Configurable performance tuning via factories and predefined options
78
3. **Memory Management** - Sophisticated memory allocation and caching strategies
79
4. **Metrics Integration** - Native RocksDB metrics forwarded to Flink's monitoring system
80
5. **Checkpointing Integration** - Incremental and full checkpoint support with the Flink runtime
81
82
### State Storage Model
83
84
```
85
┌─────────────────────────────────────┐
86
│ Flink Application │
87
├─────────────────────────────────────┤
88
│ State Backend Interface │
89
├─────────────────────────────────────┤
90
│ EmbeddedRocksDBStateBackend │
91
├─────────────────────────────────────┤
92
│ RocksDB Engine │
93
├─────────────────────────────────────┤
94
│ Local File System │
95
└─────────────────────────────────────┘
96
```
97
98
State is organized as:
99
- **Key Groups**: Distributed across parallel instances
100
- **Column Families**: Separate RocksDB column families per state
101
- **Memory Layers**: Write buffers, block cache, and file system
102
- **Checkpoints**: Incremental snapshots to external storage
103
104
## Capability Summaries
105
106
### [Core State Backend Configuration](./core-state-backend.md)
107
Main `EmbeddedRocksDBStateBackend` class with configuration options for storage paths, checkpointing modes, and integration with Flink runtime.
108
109
**Key APIs:**
110
```java { .api }
111
public class EmbeddedRocksDBStateBackend implements StateBackend {
112
public EmbeddedRocksDBStateBackend();
113
public EmbeddedRocksDBStateBackend(boolean enableIncrementalCheckpointing);
114
public EmbeddedRocksDBStateBackend(TernaryBoolean enableIncrementalCheckpointing);
115
public void setDbStoragePath(String path);
116
public void setDbStoragePaths(String... paths);
117
}
118
```
119
120
### [RocksDB Options and Factories](./options-and-factories.md)
121
Flexible options system using factory patterns for customizing RocksDB behavior, including predefined optimizations and custom configurations.
122
123
**Key APIs:**
124
```java { .api }
125
public interface RocksDBOptionsFactory extends Serializable {
126
DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose);
127
ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose);
128
}
129
130
public enum PredefinedOptions {
131
DEFAULT, SPINNING_DISK_OPTIMIZED, SPINNING_DISK_OPTIMIZED_HIGH_MEM, FLASH_SSD_OPTIMIZED
132
}
133
```
134
135
### [Memory Configuration and Management](./memory-configuration.md)
136
Sophisticated memory management with support for managed memory integration, configurable caching strategies, and memory-aware optimizations.
137
138
**Key APIs:**
139
```java { .api }
140
public final class RocksDBMemoryConfiguration {
141
public void setUseManagedMemory(boolean useManagedMemory);
142
public void setFixedMemoryPerSlot(MemorySize fixedMemoryPerSlot);
143
public void setWriteBufferRatio(double writeBufferRatio);
144
public void setHighPriorityPoolRatio(double highPriorityPoolRatio);
145
public void validate();
146
}
147
```
148
149
### [Metrics and Monitoring](./metrics-monitoring.md)
150
Comprehensive metrics integration that forwards RocksDB native metrics to Flink's metrics system for monitoring performance, memory usage, and operational health.
151
152
**Key APIs:**
153
```java { .api }
154
public class RocksDBNativeMetricOptions {
155
public static final ConfigOption<Boolean> MONITOR_BLOCK_CACHE_HIT;
156
public static final ConfigOption<Boolean> ESTIMATE_NUM_KEYS;
157
public static final ConfigOption<Boolean> MONITOR_COMPACTION_READ_BYTES;
158
// ... many more monitoring options
159
}
160
```
161
162
## Advanced Features
163
164
### Incremental Checkpointing
165
Enables efficient fault recovery by only checkpointing changes since the last checkpoint, reducing I/O overhead for large state.
166
167
### TTL Support
168
Automatic cleanup of expired state entries based on configurable time-to-live policies.
169
170
### Memory Optimization
171
Advanced memory management including:
172
- Managed memory integration with Flink's memory model
173
- Configurable write buffer and block cache sizing
174
- Memory-aware compaction strategies
175
176
### Production Monitoring
177
Native RocksDB metrics integration providing insights into:
178
- Memory usage patterns
179
- I/O performance
180
- Compaction behavior
181
- Cache effectiveness
182
183
## Migration from Deprecated Package
184
185
All classes in `org.apache.flink.contrib.streaming.state` are deprecated. Update imports to use `org.apache.flink.state.rocksdb`:
186
187
```java
188
// Old (deprecated)
189
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
190
191
// New (recommended)
192
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
193
```
194
195
## Related Documentation
196
197
- [Flink State Backend Documentation](https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/ops/state/state_backends/)
198
- [RocksDB Configuration Guide](https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning)