or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-exchange.mdexecution-graph.mdhigh-availability.mdindex.mdjob-management.mdmessage-passing.mdmetrics.mdmini-cluster.mdrpc-framework.mdstate-management.mdtask-execution.md

state-management.mddocs/

0

# State Management and Checkpointing

1

2

The state management system provides pluggable state backends and checkpointing mechanisms for fault tolerance and exactly-once processing guarantees. This system enables stateful stream processing applications to maintain state consistently across failures and restarts.

3

4

## State Backend System

5

6

### StateBackend

7

8

The primary interface for state storage and checkpointing backends. State backends determine where and how state is stored during execution and checkpointing.

9

10

```java { .api }

11

public interface StateBackend extends Serializable {

12

<K> AbstractKeyedStateBackend<K> createKeyedStateBackend(

13

Environment env,

14

JobID jobID,

15

String operatorIdentifier,

16

TypeSerializer<K> keySerializer,

17

int numberOfKeyGroups,

18

KeyGroupRange keyGroupRange,

19

TaskKvStateRegistry kvStateRegistry

20

) throws Exception;

21

22

OperatorStateBackend createOperatorStateBackend(

23

Environment env,

24

String operatorIdentifier

25

) throws Exception;

26

27

CompletableFuture<CheckpointStorageLocation> resolveCheckpoint(String checkpointPointer) throws IOException;

28

}

29

```

30

31

### StateBackendFactory

32

33

Factory interface for creating state backend instances from configuration.

34

35

```java { .api }

36

public interface StateBackendFactory<T extends StateBackend> {

37

T createFromConfig(Configuration config) throws IllegalConfigurationException;

38

String getIdentifier();

39

}

40

```

41

42

## Function State Contexts

43

44

### FunctionInitializationContext

45

46

Context provided to user functions during initialization to set up managed and keyed state.

47

48

```java { .api }

49

public interface FunctionInitializationContext {

50

boolean isRestored();

51

52

OperatorStateStore getOperatorStateStore();

53

KeyedStateStore getKeyedStateStore();

54

55

ManagedInitializationContext getManagedInitializationContext();

56

}

57

```

58

59

### FunctionSnapshotContext

60

61

Context provided to user functions during state snapshotting operations.

62

63

```java { .api }

64

public interface FunctionSnapshotContext {

65

long getCheckpointId();

66

long getCheckpointTimestamp();

67

}

68

```

69

70

### StateSnapshotContext

71

72

General context interface for state snapshotting operations.

73

74

```java { .api }

75

public interface StateSnapshotContext {

76

long getCheckpointId();

77

long getCheckpointTimestamp();

78

CheckpointStreamFactory getCheckpointStreamFactory();

79

}

80

```

81

82

## State Stores

83

84

### OperatorStateStore

85

86

Store for operator state that is partitioned per parallel operator instance.

87

88

```java { .api }

89

public interface OperatorStateStore {

90

<T> ListState<T> getListState(ListStateDescriptor<T> stateDescriptor) throws Exception;

91

<T> ListState<T> getUnionListState(ListStateDescriptor<T> stateDescriptor) throws Exception;

92

93

<T> BroadcastState<String, T> getBroadcastState(MapStateDescriptor<String, T> stateDescriptor) throws Exception;

94

95

Set<String> getRegisteredStateNames();

96

Set<String> getRegisteredBroadcastStateNames();

97

}

98

```

99

100

### KeyedStateStore

101

102

Store for keyed state that is partitioned and scoped by key.

103

104

```java { .api }

105

public interface KeyedStateStore {

106

<T> ValueState<T> getState(ValueStateDescriptor<T> stateDescriptor) throws Exception;

107

<T> ListState<T> getListState(ListStateDescriptor<T> stateDescriptor) throws Exception;

108

<T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateDescriptor) throws Exception;

109

<IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateDescriptor) throws Exception;

110

<T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateDescriptor) throws Exception;

111

<UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateDescriptor) throws Exception;

112

}

113

```

114

115

## Stream Providers

116

117

### StatePartitionStreamProvider

118

119

Provides access to state partition streams during restore operations.

120

121

```java { .api }

122

public interface StatePartitionStreamProvider {

123

FSDataInputStream getStream() throws IOException;

124

}

125

```

126

127

## Checkpoint Coordination

128

129

### CheckpointCoordinator

130

131

Coordinates the distributed checkpointing process across all operators in a job.

132

133

```java { .api }

134

public class CheckpointCoordinator {

135

public CheckpointCoordinator(

136

JobID job,

137

CheckpointConfig chkConfig,

138

ExecutionVertex[] tasksToTrigger,

139

ExecutionVertex[] tasksToWaitFor,

140

ExecutionVertex[] tasksToCommitTo,

141

ClassLoader userClassLoader,

142

CheckpointIDCounter checkpointIdCounter,

143

CompletedCheckpointStore completedCheckpointStore,

144

StateBackend checkpointStateBackend,

145

Executor executor,

146

CheckpointFailureManager failureManager

147

);

148

149

public void startCheckpointScheduler();

150

public void stopCheckpointScheduler();

151

152

public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(

153

CheckpointProperties props,

154

String externalSavepointLocation,

155

boolean isPeriodic

156

);

157

158

public void receiveAcknowledgeMessage(AcknowledgeCheckpoint message);

159

public void receiveDeclineMessage(DeclineCheckpoint message);

160

161

public void restoreLatestCheckpointedState(

162

Map<JobVertexID, ExecutionJobVertex> tasks,

163

boolean errorIfNoCheckpoint,

164

boolean allOrNothingState

165

) throws Exception;

166

}

167

```

168

169

### CheckpointMetaData

170

171

Metadata associated with checkpoints.

172

173

```java { .api }

174

public class CheckpointMetaData implements Serializable {

175

public CheckpointMetaData(long checkpointId, long timestamp);

176

177

public long getCheckpointId();

178

public long getTimestamp();

179

}

180

```

181

182

### CheckpointOptions

183

184

Configuration options for checkpoint behavior.

185

186

```java { .api }

187

public class CheckpointOptions implements Serializable {

188

public CheckpointOptions(CheckpointType checkpointType, CheckpointStorageLocationReference targetLocation);

189

190

public CheckpointType getCheckpointType();

191

public CheckpointStorageLocationReference getTargetLocation();

192

193

public static CheckpointOptions forCheckpointWithDefaultLocation();

194

public static CheckpointOptions forSavepoint(CheckpointStorageLocationReference location);

195

}

196

```

197

198

### CheckpointType

199

200

Enumeration of checkpoint types.

201

202

```java { .api }

203

public enum CheckpointType {

204

CHECKPOINT(false),

205

SAVEPOINT(true);

206

207

private final boolean isSavepoint;

208

209

public boolean isSavepoint();

210

}

211

```

212

213

## Exception Handling

214

215

### CheckpointException

216

217

Exception for checkpoint-related failures.

218

219

```java { .api }

220

public class CheckpointException extends FlinkException {

221

public CheckpointException(String message);

222

public CheckpointException(String message, Throwable cause);

223

public CheckpointException(CheckpointFailureReason reason);

224

public CheckpointException(CheckpointFailureReason reason, Throwable cause);

225

226

public CheckpointFailureReason getCheckpointFailureReason();

227

}

228

```

229

230

### CheckpointFailureReason

231

232

Enumeration of checkpoint failure reasons.

233

234

```java { .api }

235

public enum CheckpointFailureReason {

236

CHECKPOINT_COORDINATOR_SHUTDOWN,

237

CHECKPOINT_COORDINATOR_SUSPEND,

238

CHECKPOINT_DECLINED_TASK_NOT_READY,

239

CHECKPOINT_DECLINED_SUBSUMED,

240

CHECKPOINT_DECLINED_TIME_OUT,

241

CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED,

242

CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER,

243

CHECKPOINT_DECLINED_INPUT_END_OF_STREAM,

244

CHECKPOINT_ASYNC_EXCEPTION,

245

CHECKPOINT_EXPIRED,

246

TASK_CHECKPOINT_FAILURE,

247

TASK_FAILURE_DURING_CHECKPOINT,

248

UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE,

249

TRIGGER_CHECKPOINT_FAILURE,

250

FINALIZE_CHECKPOINT_FAILURE,

251

IO_EXCEPTION;

252

}

253

```

254

255

## Usage Examples

256

257

### Implementing Stateful Functions

258

259

```java

260

import org.apache.flink.runtime.state.*;

261

import org.apache.flink.api.common.state.*;

262

import org.apache.flink.api.common.typeinfo.TypeHint;

263

264

public class StatefulMapFunction extends RichMapFunction<String, String>

265

implements CheckpointedFunction {

266

267

private ValueState<Integer> countState;

268

private ListState<String> bufferState;

269

270

@Override

271

public void initializeState(FunctionInitializationContext context) throws Exception {

272

// Initialize keyed state

273

ValueStateDescriptor<Integer> countDescriptor =

274

new ValueStateDescriptor<>("count", Integer.class);

275

countState = context.getKeyedStateStore().getState(countDescriptor);

276

277

// Initialize operator state

278

ListStateDescriptor<String> bufferDescriptor =

279

new ListStateDescriptor<>("buffer", String.class);

280

bufferState = context.getOperatorStateStore().getListState(bufferDescriptor);

281

282

// Restore state if recovering from checkpoint

283

if (context.isRestored()) {

284

System.out.println("Restoring state from checkpoint");

285

}

286

}

287

288

@Override

289

public void snapshotState(FunctionSnapshotContext context) throws Exception {

290

// State is automatically managed for managed state

291

System.out.println("Taking snapshot for checkpoint: " + context.getCheckpointId());

292

}

293

294

@Override

295

public String map(String value) throws Exception {

296

// Use keyed state

297

Integer currentCount = countState.value();

298

if (currentCount == null) {

299

currentCount = 0;

300

}

301

countState.update(currentCount + 1);

302

303

// Use operator state

304

bufferState.add(value);

305

306

return value + " (processed " + (currentCount + 1) + " times)";

307

}

308

}

309

```

310

311

### Configuring State Backends

312

313

```java

314

import org.apache.flink.runtime.state.StateBackend;

315

import org.apache.flink.runtime.state.filesystem.FsStateBackend;

316

import org.apache.flink.runtime.state.memory.MemoryStateBackend;

317

318

// Configure memory state backend (for testing/development)

319

StateBackend memoryBackend = new MemoryStateBackend();

320

321

// Configure filesystem state backend (for production)

322

StateBackend fsBackend = new FsStateBackend("file:///checkpoints");

323

324

// Configure state backend in job configuration

325

Configuration jobConfig = new Configuration();

326

jobConfig.setString("state.backend", "filesystem");

327

jobConfig.setString("state.checkpoints.dir", "file:///checkpoints");

328

jobConfig.setString("state.savepoints.dir", "file:///savepoints");

329

330

// Set advanced checkpointing options

331

jobConfig.setLong("execution.checkpointing.interval", 30000L);

332

jobConfig.setString("execution.checkpointing.mode", "EXACTLY_ONCE");

333

jobConfig.setLong("execution.checkpointing.timeout", 600000L);

334

jobConfig.setInteger("execution.checkpointing.max-concurrent-checkpoints", 1);

335

jobConfig.setLong("execution.checkpointing.min-pause", 5000L);

336

```

337

338

### Checkpoint Coordination Setup

339

340

```java

341

import org.apache.flink.runtime.checkpoint.*;

342

343

// Set up checkpoint coordinator

344

CheckpointConfig checkpointConfig = new CheckpointConfig();

345

checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

346

checkpointConfig.setCheckpointInterval(30000); // 30 seconds

347

checkpointConfig.setCheckpointTimeout(600000); // 10 minutes

348

checkpointConfig.setMaxConcurrentCheckpoints(1);

349

checkpointConfig.setMinPauseBetweenCheckpoints(5000); // 5 seconds

350

351

// Configure checkpoint retention

352

checkpointConfig.enableExternalizedCheckpoints(

353

CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

354

355

// Set up checkpoint storage

356

checkpointConfig.setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoints"));

357

358

CheckpointCoordinator coordinator = new CheckpointCoordinator(

359

jobGraph.getJobID(),

360

checkpointConfig,

361

tasksToTrigger,

362

tasksToWaitFor,

363

tasksToCommitTo,

364

userClassLoader,

365

checkpointIdCounter,

366

completedCheckpointStore,

367

stateBackend,

368

executor,

369

failureManager

370

);

371

372

// Start periodic checkpointing

373

coordinator.startCheckpointScheduler();

374

```

375

376

### Manual Checkpoint Triggering

377

378

```java

379

// Trigger a checkpoint manually

380

CheckpointProperties properties = CheckpointProperties.forCheckpoint(

381

CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION);

382

383

CompletableFuture<CompletedCheckpoint> checkpointFuture = coordinator.triggerCheckpoint(

384

properties,

385

null, // no external savepoint location

386

false // not periodic

387

);

388

389

checkpointFuture.whenComplete((checkpoint, throwable) -> {

390

if (throwable != null) {

391

System.err.println("Checkpoint failed: " + throwable.getMessage());

392

} else {

393

System.out.println("Checkpoint completed: " + checkpoint.getCheckpointID());

394

}

395

});

396

```

397

398

## Common Patterns

399

400

### State Migration

401

402

```java

403

// Handle state schema evolution

404

@Override

405

public void initializeState(FunctionInitializationContext context) throws Exception {

406

ValueStateDescriptor<MyState> descriptor = new ValueStateDescriptor<>(

407

"myState",

408

new MyStateTypeSerializer()

409

);

410

411

// Configure state migration

412

descriptor.initializeSerializerUnlessSet(new MyStateTypeSerializer());

413

414

state = context.getKeyedStateStore().getState(descriptor);

415

}

416

```

417

418

### Broadcast State

419

420

```java

421

// Set up broadcast state for configuration

422

MapStateDescriptor<String, Configuration> configDescriptor =

423

new MapStateDescriptor<>("config", String.class, Configuration.class);

424

425

BroadcastState<String, Configuration> broadcastState =

426

context.getOperatorStateStore().getBroadcastState(configDescriptor);

427

428

// Update broadcast state

429

broadcastState.put("global-config", newConfiguration);

430

431

// Read from broadcast state in processing function

432

Configuration config = broadcastState.get("global-config");

433

```