or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

execution-scheduling.mdhigh-availability.mdindex.mdjob-graph.mdresource-management.mdstate-checkpointing.mdtask-execution.md

state-checkpointing.mddocs/

0

# State Management and Checkpointing

1

2

Comprehensive state management with pluggable backends and distributed checkpointing for fault tolerance. Flink provides exactly-once processing guarantees through its advanced checkpointing mechanism and flexible state backends.

3

4

## Capabilities

5

6

### CheckpointCoordinator

7

8

Central coordinator for checkpointing in Flink jobs, managing checkpoint triggering, completion, and recovery across all tasks.

9

10

```java { .api }

11

/**

12

* The checkpoint coordinator coordinates the distributed snapshots of operators and state.

13

* It triggers the checkpoint by sending messages to the relevant tasks and collects the

14

* checkpoint acknowledgements. It also maintains and cleans up the checkpoint meta data.

15

*/

16

public class CheckpointCoordinator {

17

/** Trigger a periodic checkpoint for the job */

18

public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(boolean isPeriodic);

19

20

/** Trigger a checkpoint of the specified type */

21

public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointType checkpointType);

22

23

/** Trigger a savepoint */

24

public CompletableFuture<CompletedCheckpoint> triggerSavepoint(

25

@Nullable String targetLocation,

26

SavepointFormatType formatType

27

);

28

29

30

/** Shutdown the checkpoint coordinator */

31

public void shutdown() throws Exception;

32

33

/** Get the checkpoint store */

34

public CompletedCheckpointStore getCheckpointStore();

35

36

/** Start the checkpoint scheduler */

37

public void startCheckpointScheduler();

38

39

/** Check if periodic checkpointing has been started */

40

public boolean isPeriodicCheckpointingStarted();

41

42

/** Restore from the latest checkpoint */

43

public boolean restoreLatestCheckpointedStateToAll(

44

Set<ExecutionJobVertex> tasks,

45

boolean errorIfNoCheckpoint

46

);

47

48

/** Acknowledge checkpoint from task */

49

public void receiveAcknowledgeMessage(

50

AcknowledgeCheckpoint message,

51

String taskManagerLocationInfo

52

) throws CheckpointException;

53

54

/** Handle checkpoint decline from task */

55

public void receiveDeclineMessage(

56

DeclineCheckpoint message,

57

String taskManagerLocationInfo

58

);

59

60

/** Get number of retained checkpoints */

61

public int getNumberOfRetainedSuccessfulCheckpoints();

62

63

/** Get number of pending checkpoints */

64

public int getNumberOfPendingCheckpoints();

65

66

/** Get checkpoint timeout */

67

public long getCheckpointTimeout();

68

69

/** Check if periodic checkpointing is enabled */

70

public boolean isPeriodicCheckpointingConfigured();

71

}

72

```

73

74

**Usage Examples:**

75

76

```java

77

// Create checkpoint coordinator configuration

78

CheckpointCoordinatorConfiguration checkpointConfig =

79

new CheckpointCoordinatorConfiguration.Builder()

80

.setCheckpointInterval(5000L) // 5 seconds

81

.setCheckpointTimeout(60000L) // 1 minute

82

.setMaxConcurrentCheckpoints(1)

83

.setMinPauseBetweenCheckpoints(1000L)

84

.setPreferCheckpointForRecovery(true)

85

.setTolerableCheckpointFailureNumber(3)

86

.build();

87

88

// Enable checkpointing on execution graph

89

executionGraph.enableCheckpointing(

90

checkpointConfig,

91

masterTriggerRestoreHooks,

92

checkpointIdCounter,

93

completedCheckpointStore,

94

stateBackend,

95

checkpointStorage,

96

statsTracker,

97

checkpointsCleaner

98

);

99

100

// Trigger manual checkpoint

101

CheckpointCoordinator coordinator = executionGraph.getCheckpointCoordinator();

102

CompletableFuture<CompletedCheckpoint> checkpointFuture =

103

coordinator.triggerCheckpoint(

104

CheckpointType.CHECKPOINT,

105

CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),

106

null, // external location

107

false, // not periodic

108

System.currentTimeMillis()

109

);

110

111

checkpointFuture.thenAccept(checkpoint -> {

112

System.out.println("Checkpoint " + checkpoint.getCheckpointId() + " completed");

113

});

114

```

115

116

### KeyedStateBackend

117

118

Backend for managing keyed state (state associated with keys) with support for different storage engines.

119

120

```java { .api }

121

/**

122

* A keyed state backend provides methods for managing keyed state.

123

*/

124

public interface KeyedStateBackend<K> extends KeyedState, Disposable {

125

/** Create or retrieve a keyed state */

126

<T> InternalKvState<K, ?, T> createState(

127

StateDescriptor<?, T> stateDescriptor,

128

TypeSerializer<T> namespaceSerializer

129

);

130

131

/** Get partitioned state for a specific namespace */

132

<N, S extends State, T> S getPartitionedState(

133

N namespace,

134

TypeSerializer<N> namespaceSerializer,

135

StateDescriptor<S, T> stateDescriptor

136

);

137

138

/** Take a snapshot of the keyed state */

139

RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(

140

long checkpointId,

141

long timestamp,

142

CheckpointStreamFactory streamFactory,

143

CheckpointOptions checkpointOptions

144

);

145

146

/** Restore from a state handle */

147

void restore(StateHandles<KeyedStateHandle> restoredState);

148

149

/** Get current key */

150

K getCurrentKey();

151

152

/** Set current key context */

153

void setCurrentKey(K newKey);

154

155

/** Get key serializer */

156

TypeSerializer<K> getKeySerializer();

157

158

/** Get key groups for this backend */

159

KeyGroupRange getKeyGroupRange();

160

161

/** Get number of key groups */

162

int getNumberOfKeyGroups();

163

164

/** Close the backend and release resources */

165

void close();

166

167

/** Dispose the backend */

168

void dispose();

169

170

/** Apply state to a key group */

171

void applyToAllKeys(

172

N namespace,

173

TypeSerializer<N> namespaceSerializer,

174

StateDescriptor<?, ?> stateDescriptor,

175

KeyedStateFunction<K, ?> function

176

);

177

178

/** Get approximate memory usage */

179

long getApproximateMemoryUsage();

180

}

181

```

182

183

### OperatorStateBackend

184

185

Backend for managing operator state (state not associated with keys) including list state and broadcast state.

186

187

```java { .api }

188

/**

189

* Interface for operator state backends. Operator state is state that is associated with

190

* parallel instances of an operator (tasks), as opposed to keyed state.

191

*/

192

public interface OperatorStateBackend extends Disposable {

193

/** Get list state for operator state */

194

<S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor);

195

196

/** Get union list state (combines state from all parallel instances) */

197

<S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor);

198

199

/** Get broadcast state for coordinating across parallel instances */

200

<K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor);

201

202

/** Take a snapshot of the operator state */

203

RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(

204

long checkpointId,

205

long timestamp,

206

CheckpointStreamFactory factory,

207

CheckpointOptions checkpointOptions

208

);

209

210

/** Restore from state handles */

211

void restore(StateHandles<OperatorStateHandle> stateHandles);

212

213

/** Close the backend */

214

void close();

215

216

/** Dispose the backend */

217

void dispose();

218

}

219

```

220

221

**Usage Examples:**

222

223

```java

224

// Using KeyedStateBackend in a task

225

public class MyKeyedTask extends AbstractInvokable {

226

private KeyedStateBackend<String> keyedStateBackend;

227

private ValueState<Integer> countState;

228

229

@Override

230

public void invoke() throws Exception {

231

// Get keyed state backend from runtime context

232

keyedStateBackend = getRuntimeContext().getKeyedStateBackend();

233

234

// Create state descriptor

235

ValueStateDescriptor<Integer> descriptor =

236

new ValueStateDescriptor<>("count", Integer.class);

237

238

// Get state

239

countState = keyedStateBackend.getPartitionedState(

240

VoidNamespace.INSTANCE,

241

VoidNamespaceSerializer.INSTANCE,

242

descriptor

243

);

244

245

// Use state

246

keyedStateBackend.setCurrentKey("user123");

247

Integer currentCount = countState.value();

248

countState.update(currentCount == null ? 1 : currentCount + 1);

249

}

250

}

251

252

// Using OperatorStateBackend

253

public class MyOperatorTask extends AbstractInvokable {

254

private OperatorStateBackend operatorStateBackend;

255

private ListState<String> bufferState;

256

257

@Override

258

public void invoke() throws Exception {

259

operatorStateBackend = getRuntimeContext().getOperatorStateBackend();

260

261

// Create list state for buffering

262

ListStateDescriptor<String> descriptor =

263

new ListStateDescriptor<>("buffer", String.class);

264

bufferState = operatorStateBackend.getListState(descriptor);

265

266

// Use state

267

bufferState.add("new_item");

268

Iterable<String> bufferedItems = bufferState.get();

269

}

270

}

271

```

272

273

### CompletedCheckpoint

274

275

Represents a successfully completed checkpoint with metadata and state handles.

276

277

```java { .api }

278

/**

279

* A completed checkpoint represents a snapshot of the state of all operators

280

* that has been acknowledged by all tasks.

281

*/

282

public class CompletedCheckpoint implements Serializable {

283

/** Get the checkpoint ID */

284

public long getCheckpointId();

285

286

/** Get checkpoint timestamp */

287

public long getTimestamp();

288

289

/** Get checkpoint duration in milliseconds */

290

public long getDuration();

291

292

/** Get total checkpoint size in bytes */

293

public long getSize();

294

295

/** Get external pointer (path) for this checkpoint */

296

public String getExternalPointer();

297

298

/** Get operator states */

299

public Map<OperatorID, OperatorState> getOperatorStates();

300

301

/** Get master hook states */

302

public Collection<MasterState> getMasterHookStates();

303

304

/** Get checkpoint properties */

305

public CheckpointProperties getProperties();

306

307

/** Check if checkpoint is discarded */

308

public boolean isDiscarded();

309

310

/** Discard the checkpoint and clean up resources */

311

public CompletableFuture<Void> discardOnSubsume();

312

313

/** Discard the checkpoint on cancellation */

314

public CompletableFuture<Void> discardOnCancellation();

315

316

/** Discard the checkpoint on shutdown */

317

public CompletableFuture<Void> discardOnShutdown(JobStatus jobStatus);

318

319

/** Get state size statistics */

320

public CheckpointStatsSummarySnapshot getStatsSummary();

321

}

322

```

323

324

### StateBackend Configuration

325

326

Configuration and factory classes for different state backends.

327

328

```java { .api }

329

/**

330

* Base class for configurable state backends

331

*/

332

public abstract class ConfigurableStateBackend implements StateBackend, Configurable {

333

/** Configure the state backend from configuration */

334

public abstract StateBackend configure(ReadableConfig config, ClassLoader classLoader);

335

336

/** Get default savepoint directory */

337

public abstract String getDefaultSavepointDirectory();

338

339

/** Check if state backend supports async snapshots */

340

public abstract boolean supportsAsynchronousSnapshots();

341

}

342

343

/**

344

* State backend loader utility

345

*/

346

public class StateBackendLoader {

347

/** Load state backend from configuration */

348

public static StateBackend loadStateBackendFromConfig(

349

ReadableConfig config,

350

ClassLoader classLoader,

351

String defaultStateBackend

352

);

353

354

/** Create state backend from factory class name */

355

public static StateBackend fromApplicationOrConfigOrDefault(

356

StateBackend fromApplication,

357

Configuration config,

358

ClassLoader classLoader,

359

String defaultStateBackend

360

);

361

}

362

```

363

364

### Checkpoint Storage

365

366

Configuration for where checkpoints are stored (filesystem, S3, etc.).

367

368

```java { .api }

369

/**

370

* Checkpoint storage defines how checkpoint data and metadata are persisted

371

*/

372

public interface CheckpointStorage {

373

/** Check if storage supports highly available storage */

374

boolean supportsHighlyAvailableStorage();

375

376

/** Check if storage has default savepoint location */

377

boolean hasDefaultSavepointLocation();

378

379

/** Get default savepoint directory */

380

String getDefaultSavepointDirectory();

381

382

/** Resolve checkpoint storage from configuration */

383

CheckpointStorageAccess resolveCheckpointStorageAccess(

384

JobID jobId,

385

CheckpointStorageAccessCoordinatorView coordinatorView

386

);

387

388

/** Create checkpoint storage access for workers */

389

CheckpointStorageWorkerView createCheckpointStorageWorkerView(

390

Configuration configuration,

391

ResourceID resourceId

392

);

393

}

394

395

/**

396

* Configurable checkpoint storage

397

*/

398

public abstract class ConfigurableCheckpointStorage

399

implements CheckpointStorage, Configurable {

400

401

/** Configure checkpoint storage from configuration */

402

public abstract CheckpointStorage configure(

403

ReadableConfig config,

404

ClassLoader classLoader

405

);

406

}

407

```

408

409

**Usage Examples:**

410

411

```java

412

// Configure different state backends

413

Configuration config = new Configuration();

414

415

// Memory state backend

416

config.setString(StateBackendOptions.STATE_BACKEND, "hashmap");

417

418

// Filesystem state backend

419

config.setString(StateBackendOptions.STATE_BACKEND, "filesystem");

420

config.setString(StateBackendOptions.CHECKPOINTS_DIRECTORY, "hdfs://cluster/checkpoints");

421

422

// RocksDB state backend

423

config.setString(StateBackendOptions.STATE_BACKEND, "rocksdb");

424

config.setString(StateBackendOptions.CHECKPOINTS_DIRECTORY, "s3://bucket/checkpoints");

425

config.setBoolean(RocksDBOptions.USE_MANAGED_MEMORY, true);

426

427

// Load state backend

428

StateBackend stateBackend = StateBackendLoader.loadStateBackendFromConfig(

429

config,

430

classLoader,

431

null

432

);

433

```

434

435

## Types

436

437

```java { .api }

438

// Checkpoint types and properties

439

public enum CheckpointType implements SnapshotType {

440

CHECKPOINT("Checkpoint"),

441

SAVEPOINT("Savepoint");

442

443

public String getName();

444

public boolean isSavepoint();

445

}

446

447

public class CheckpointProperties implements Serializable {

448

public static CheckpointProperties forCheckpoint(CheckpointRetentionPolicy retentionPolicy);

449

public static CheckpointProperties forSavepoint(boolean forced);

450

451

public CheckpointType getCheckpointType();

452

public CheckpointRetentionPolicy getRetentionPolicy();

453

public boolean isForced();

454

}

455

456

public enum CheckpointRetentionPolicy {

457

NEVER_RETAIN_AFTER_TERMINATION,

458

RETAIN_ON_CANCELLATION,

459

RETAIN_ON_FAILURE

460

}

461

462

// State handles

463

public interface StateHandle extends Serializable {

464

void discardState();

465

long getStateSize();

466

}

467

468

public interface KeyedStateHandle extends StateHandle {

469

KeyGroupRange getKeyGroupRange();

470

KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange);

471

}

472

473

public interface OperatorStateHandle extends StateHandle {

474

Map<String, OperatorStateHandle.StateMetaInfo> getStateNameToPartitionOffsets();

475

FSDataInputStream openInputStream();

476

}

477

478

// Checkpoint options and metadata

479

public class CheckpointOptions implements Serializable {

480

public CheckpointOptions(

481

CheckpointType checkpointType,

482

CheckpointStorageLocationReference targetLocation

483

);

484

485

public CheckpointType getCheckpointType();

486

public CheckpointStorageLocationReference getTargetLocation();

487

public boolean isExactlyOnceMode();

488

}

489

490

public class CheckpointMetaData implements Serializable {

491

public CheckpointMetaData(long checkpointId, long timestamp);

492

493

public long getCheckpointId();

494

public long getTimestamp();

495

}

496

497

// Snapshot results

498

public class SnapshotResult<T extends StateObject> implements Serializable {

499

public static <T extends StateObject> SnapshotResult<T> empty();

500

public static <T extends StateObject> SnapshotResult<T> of(T jobManagerState);

501

public static <T extends StateObject> SnapshotResult<T> withLocalState(

502

T jobManagerState,

503

T taskLocalState

504

);

505

506

public T getJobManagerOwnedSnapshot();

507

public T getTaskLocalSnapshot();

508

public long getStateSize();

509

}

510

```