or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-state-backend.mdindex.mdmemory-configuration.mdmetrics-monitoring.mdoptions-and-factories.md

core-state-backend.mddocs/

0

# Core State Backend Configuration

1

2

The `EmbeddedRocksDBStateBackend` is the main entry point for using RocksDB as a state backend in Apache Flink. It provides persistent state storage with support for very large state sizes and efficient checkpointing.

3

4

## Core Imports

5

6

```java { .api }

7

import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;

8

import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackendFactory;

9

import org.apache.flink.configuration.ReadableConfig;

10

import org.apache.flink.core.fs.Path;

11

import org.apache.flink.runtime.state.StateBackend;

12

import org.apache.flink.util.TernaryBoolean;

13

```

14

15

## EmbeddedRocksDBStateBackend Class

16

17

### Class Definition

18

19

```java { .api }

20

@PublicEvolving

21

public class EmbeddedRocksDBStateBackend implements StateBackend, Serializable {

22

// Main implementation class for RocksDB state backend

23

}

24

```

25

26

### Constructors

27

28

```java { .api }

29

public EmbeddedRocksDBStateBackend()

30

```

31

Creates a new `EmbeddedRocksDBStateBackend` with default settings.

32

33

```java { .api }

34

public EmbeddedRocksDBStateBackend(boolean enableIncrementalCheckpointing)

35

```

36

Creates a new `EmbeddedRocksDBStateBackend` with specified incremental checkpointing setting.

37

38

**Parameters:**

39

- `enableIncrementalCheckpointing` - Whether to enable incremental checkpointing

40

41

```java { .api }

42

public EmbeddedRocksDBStateBackend(TernaryBoolean enableIncrementalCheckpointing)

43

```

44

Creates a new `EmbeddedRocksDBStateBackend` with ternary boolean for incremental checkpointing.

45

46

**Parameters:**

47

- `enableIncrementalCheckpointing` - Ternary boolean for incremental checkpointing (TRUE, FALSE, UNDEFINED)

48

49

## Basic Configuration

50

51

### Storage Path Configuration

52

53

```java { .api }

54

public void setDbStoragePath(String dbStoragePath)

55

```

56

Sets the path where RocksDB stores its data files locally on the TaskManager.

57

58

**Parameters:**

59

- `dbStoragePath` - The path to the local RocksDB data directory

60

61

**Example:**

62

```java

63

EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend();

64

backend.setDbStoragePath("/tmp/flink-rocksdb");

65

```

66

67

```java { .api }

68

public void setDbStoragePaths(String... dbStoragePaths)

69

```

70

Sets multiple paths where RocksDB can store its data files, allowing distribution across multiple devices.

71

72

**Parameters:**

73

- `dbStoragePaths` - Multiple paths to local RocksDB data directories

74

75

**Example:**

76

```java

77

backend.setDbStoragePaths("/ssd1/rocksdb", "/ssd2/rocksdb", "/ssd3/rocksdb");

78

```

79

80

```java { .api }

81

public String[] getDbStoragePaths()

82

```

83

Gets the configured storage paths for RocksDB data files.

84

85

**Returns:** Array of configured storage paths

86

87

## Checkpointing Configuration

88

89

### Incremental Checkpointing

90

91

```java { .api }

92

public TernaryBoolean isIncrementalCheckpointsEnabled()

93

```

94

Returns whether incremental checkpointing is enabled for this state backend.

95

96

**Returns:** TernaryBoolean indicating incremental checkpointing status

97

98

```java { .api }

99

public boolean supportsNoClaimRestoreMode()

100

```

101

Returns whether this state backend supports the NO_CLAIM restore mode.

102

103

**Returns:** `true` - RocksDB backend supports all restore modes

104

105

```java { .api }

106

public boolean supportsSavepointFormat(SavepointFormatType formatType)

107

```

108

Returns whether this state backend supports the specified savepoint format.

109

110

**Parameters:**

111

- `formatType` - The savepoint format type to check

112

113

**Returns:** `true` for all supported savepoint formats

114

115

### Transfer Configuration

116

117

```java { .api }

118

public int getNumberOfTransferThreads()

119

```

120

Gets the number of threads used for transferring files during checkpointing.

121

122

**Returns:** Number of transfer threads

123

124

```java { .api }

125

public void setNumberOfTransferThreads(int numberOfTransferThreads)

126

```

127

Sets the number of threads used for transferring files during checkpointing.

128

129

**Parameters:**

130

- `numberOfTransferThreads` - Number of threads to use for file transfers

131

132

```java { .api }

133

public long getWriteBatchSize()

134

```

135

Gets the write batch size for RocksDB operations.

136

137

**Returns:** Write batch size in bytes

138

139

```java { .api }

140

public void setWriteBatchSize(long writeBatchSize)

141

```

142

Sets the write batch size for RocksDB operations.

143

144

**Parameters:**

145

- `writeBatchSize` - Write batch size in bytes

146

147

## Priority Queue Configuration

148

149

### Priority Queue Types

150

151

```java { .api }

152

public enum PriorityQueueStateType {

153

HEAP, // Use heap-based priority queue

154

ROCKSDB // Use RocksDB-based priority queue

155

}

156

```

157

158

```java { .api }

159

public PriorityQueueStateType getPriorityQueueStateType()

160

```

161

Gets the priority queue implementation type.

162

163

**Returns:** Current priority queue state type

164

165

```java { .api }

166

public void setPriorityQueueStateType(PriorityQueueStateType priorityQueueStateType)

167

```

168

Sets the priority queue implementation type.

169

170

**Parameters:**

171

- `priorityQueueStateType` - The priority queue implementation to use

172

173

**Example:**

174

```java

175

// Use RocksDB for priority queue state (timers, windows)

176

backend.setPriorityQueueStateType(PriorityQueueStateType.ROCKSDB);

177

178

// Use heap for priority queue state (default, faster for small state)

179

backend.setPriorityQueueStateType(PriorityQueueStateType.HEAP);

180

```

181

182

## Runtime Configuration

183

184

### Configuration Method

185

186

```java { .api }

187

public EmbeddedRocksDBStateBackend configure(ReadableConfig config, ClassLoader classLoader)

188

```

189

Creates a copy of this state backend configured with the provided configuration and class loader.

190

191

**Parameters:**

192

- `config` - The configuration to apply

193

- `classLoader` - The class loader to use

194

195

**Returns:** A new configured instance of this state backend

196

197

## Memory Configuration Access

198

199

```java { .api }

200

public RocksDBMemoryConfiguration getMemoryConfiguration()

201

```

202

Gets the memory configuration for this RocksDB state backend.

203

204

**Returns:** The RocksDB memory configuration object

205

206

**Example:**

207

```java

208

EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend();

209

RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();

210

memConfig.setUseManagedMemory(true);

211

memConfig.setWriteBufferRatio(0.4);

212

```

213

214

## Memory Factory Configuration

215

216

```java { .api }

217

public void setRocksDBMemoryFactory(RocksDBMemoryFactory rocksDBMemoryFactory)

218

```

219

Sets the RocksDB memory factory for custom memory management strategies.

220

221

**Parameters:**

222

- `rocksDBMemoryFactory` - Custom memory factory implementation

223

224

## Complete Usage Example

225

226

```java

227

import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;

228

import org.apache.flink.state.rocksdb.PredefinedOptions;

229

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

230

231

public class RocksDBStateBackendExample {

232

public static void main(String[] args) {

233

// Create the state backend

234

EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true); // Enable incremental checkpointing

235

236

// Configure storage

237

backend.setDbStoragePath("/tmp/flink-rocksdb");

238

239

// Configure performance settings

240

backend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);

241

backend.setNumberOfTransferThreads(4);

242

backend.setWriteBatchSize(2 * 1024 * 1024); // 2MB

243

244

// Configure priority queue type

245

backend.setPriorityQueueStateType(

246

EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB

247

);

248

249

// Configure memory

250

RocksDBMemoryConfiguration memConfig = backend.getMemoryConfiguration();

251

memConfig.setUseManagedMemory(true);

252

memConfig.setWriteBufferRatio(0.4);

253

memConfig.setHighPriorityPoolRatio(0.2);

254

255

// Apply to execution environment

256

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

257

env.setStateBackend(backend);

258

259

// Now use the environment for your Flink job

260

// env.addSource(...).keyBy(...).process(...);

261

}

262

}

263

```

264

265

## EmbeddedRocksDBStateBackendFactory

266

267

### Factory Class

268

269

```java { .api }

270

@PublicEvolving

271

public class EmbeddedRocksDBStateBackendFactory implements StateBackendFactory<EmbeddedRocksDBStateBackend> {

272

// Factory for creating state backend from configuration

273

}

274

```

275

276

### Factory Method

277

278

```java { .api }

279

public EmbeddedRocksDBStateBackend createFromConfig(ReadableConfig config, ClassLoader classLoader)

280

```

281

Creates an `EmbeddedRocksDBStateBackend` from the given configuration.

282

283

**Parameters:**

284

- `config` - The configuration containing state backend settings

285

- `classLoader` - The class loader for loading classes

286

287

**Returns:** A configured `EmbeddedRocksDBStateBackend` instance

288

289

**Example:**

290

```java

291

Configuration config = new Configuration();

292

config.set(RocksDBOptions.LOCAL_DIRECTORIES, "/tmp/rocksdb");

293

config.set(RocksDBOptions.PREDEFINED_OPTIONS, PredefinedOptions.FLASH_SSD_OPTIMIZED);

294

295

EmbeddedRocksDBStateBackendFactory factory = new EmbeddedRocksDBStateBackendFactory();

296

EmbeddedRocksDBStateBackend backend = factory.createFromConfig(config, getClass().getClassLoader());

297

```

298

299

## Configuration via Flink Configuration

300

301

You can also configure the RocksDB state backend through Flink's configuration system:

302

303

```yaml

304

# flink-conf.yaml

305

state.backend: rocksdb

306

state.backend.rocksdb.localdir: /tmp/flink-rocksdb

307

state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED

308

state.backend.rocksdb.checkpoint.transfer.thread.num: 4

309

state.backend.rocksdb.memory.managed: true

310

state.backend.rocksdb.memory.write-buffer-ratio: 0.4

311

```

312

313

## Thread Safety and Lifecycle

314

315

- **Thread Safety:** The `EmbeddedRocksDBStateBackend` configuration methods are not thread-safe and should be called during setup before the job starts

316

- **Serialization:** The state backend is serializable and will be distributed to TaskManagers

317

- **Lifecycle:** Configuration is immutable once the job starts; use `configure()` method to create configured copies

318

- **Resource Management:** RocksDB resources are managed automatically by Flink's lifecycle management