or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-io.mdcheckpointing.mddatastream-transformations.mdexecution-environment.mdindex.mdkeyed-streams-state.mdprocess-functions.mdsources-sinks.mdtime-watermarks.mdwindowing.md

checkpointing.mddocs/

0

# Checkpointing and Fault Tolerance

1

2

Apache Flink provides fault tolerance through checkpointing, which creates consistent snapshots of application state. This enables exactly-once processing guarantees and recovery from failures.

3

4

## Capabilities

5

6

### Checkpoint Configuration

7

8

Configure checkpointing behavior and fault tolerance settings.

9

10

```java { .api }

11

/**

12

* Enable checkpointing with specified interval

13

* @param interval - checkpoint interval in milliseconds

14

*/

15

StreamExecutionEnvironment enableCheckpointing(long interval);

16

17

/**

18

* Enable checkpointing with interval and mode

19

* @param interval - checkpoint interval in milliseconds

20

* @param mode - checkpointing mode (EXACTLY_ONCE or AT_LEAST_ONCE)

21

*/

22

StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode);

23

24

/**

25

* Get checkpoint configuration for advanced settings

26

*/

27

CheckpointConfig getCheckpointConfig();

28

```

29

30

**Usage Examples:**

31

32

```java

33

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

34

35

// Enable checkpointing every 5 seconds

36

env.enableCheckpointing(5000);

37

38

// Enable with specific mode

39

env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);

40

41

// Advanced configuration

42

CheckpointConfig config = env.getCheckpointConfig();

43

config.setMinPauseBetweenCheckpoints(500);

44

config.setCheckpointTimeout(60000);

45

config.setMaxConcurrentCheckpoints(1);

46

config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

47

```

48

49

### Checkpoint Configuration Options

50

51

Fine-tune checkpoint behavior with various configuration options.

52

53

```java { .api }

54

/**

55

* Set checkpointing mode

56

* @param checkpointingMode - EXACTLY_ONCE or AT_LEAST_ONCE

57

*/

58

void setCheckpointingMode(CheckpointingMode checkpointingMode);

59

60

/**

61

* Set minimum pause between checkpoints

62

* @param minPauseBetweenCheckpoints - minimum pause in milliseconds

63

*/

64

void setMinPauseBetweenCheckpoints(long minPauseBetweenCheckpoints);

65

66

/**

67

* Set checkpoint timeout

68

* @param checkpointTimeout - timeout in milliseconds

69

*/

70

void setCheckpointTimeout(long checkpointTimeout);

71

72

/**

73

* Set maximum concurrent checkpoints

74

* @param maxConcurrentCheckpoints - maximum number of concurrent checkpoints

75

*/

76

void setMaxConcurrentCheckpoints(int maxConcurrentCheckpoints);

77

78

/**

79

* Enable externalized checkpoints

80

* @param cleanupMode - cleanup mode for externalized checkpoints

81

*/

82

void enableExternalizedCheckpoints(ExternalizedCheckpointCleanup cleanupMode);

83

84

/**

85

* Set whether to fail on checkpointing errors

86

* @param failOnCheckpointingErrors - true to fail job on checkpoint errors

87

*/

88

void setFailOnCheckpointingErrors(boolean failOnCheckpointingErrors);

89

90

/**

91

* Set tolerable checkpoint failure number

92

* @param tolerableCheckpointFailureNumber - number of tolerable failures

93

*/

94

void setTolerableCheckpointFailureNumber(int tolerableCheckpointFailureNumber);

95

```

96

97

**Usage Examples:**

98

99

```java

100

CheckpointConfig config = env.getCheckpointConfig();

101

102

// Set checkpointing mode

103

config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

104

105

// Minimum pause between checkpoints (prevents too frequent checkpoints)

106

config.setMinPauseBetweenCheckpoints(500);

107

108

// Checkpoint timeout (checkpoint fails if not completed in time)

109

config.setCheckpointTimeout(60000);

110

111

// Maximum concurrent checkpoints (usually 1 for exactly-once)

112

config.setMaxConcurrentCheckpoints(1);

113

114

// Externalized checkpoints (persist checkpoints for recovery)

115

config.enableExternalizedCheckpoints(

116

ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION

117

);

118

119

// Continue job execution even if checkpoint fails

120

config.setFailOnCheckpointingErrors(false);

121

122

// Allow up to 3 consecutive checkpoint failures

123

config.setTolerableCheckpointFailureNumber(3);

124

```

125

126

### Stateful Functions

127

128

Implement stateful functions that participate in checkpointing.

129

130

```java { .api }

131

/**

132

* Interface for functions that need to checkpoint state

133

*/

134

interface CheckpointedFunction {

135

/**

136

* Take a snapshot of the function's state

137

* @param context - snapshot context

138

*/

139

void snapshotState(FunctionSnapshotContext context) throws Exception;

140

141

/**

142

* Initialize or restore function state

143

* @param context - initialization context

144

*/

145

void initializeState(FunctionInitializationContext context) throws Exception;

146

}

147

148

/**

149

* Listener for checkpoint events

150

*/

151

interface CheckpointListener {

152

/**

153

* Notified when checkpoint is completed

154

* @param checkpointId - ID of completed checkpoint

155

*/

156

void notifyCheckpointComplete(long checkpointId) throws Exception;

157

158

/**

159

* Notified when checkpoint is aborted

160

* @param checkpointId - ID of aborted checkpoint

161

*/

162

default void notifyCheckpointAborted(long checkpointId) throws Exception {}

163

}

164

```

165

166

**Usage Examples:**

167

168

```java

169

public class StatefulMapFunction extends RichMapFunction<String, String>

170

implements CheckpointedFunction {

171

172

private ValueState<Integer> countState;

173

private ListState<String> bufferedElements;

174

175

// Transient state not included in checkpoints

176

private transient List<String> localBuffer;

177

178

@Override

179

public void open(Configuration parameters) throws Exception {

180

super.open(parameters);

181

182

// Initialize state descriptors

183

ValueStateDescriptor<Integer> countDescriptor =

184

new ValueStateDescriptor<>("count", Integer.class);

185

countState = getRuntimeContext().getState(countDescriptor);

186

187

localBuffer = new ArrayList<>();

188

}

189

190

@Override

191

public String map(String value) throws Exception {

192

// Use state in processing

193

Integer count = countState.value();

194

if (count == null) count = 0;

195

196

countState.update(count + 1);

197

localBuffer.add(value);

198

199

return value + "_" + count;

200

}

201

202

@Override

203

public void snapshotState(FunctionSnapshotContext context) throws Exception {

204

// Clear previous checkpoint data

205

bufferedElements.clear();

206

207

// Add current local buffer to checkpointed state

208

for (String element : localBuffer) {

209

bufferedElements.add(element);

210

}

211

}

212

213

@Override

214

public void initializeState(FunctionInitializationContext context) throws Exception {

215

// Initialize checkpointed state

216

ListStateDescriptor<String> bufferDescriptor =

217

new ListStateDescriptor<>("bufferedElements", String.class);

218

bufferedElements = context.getOperatorState().getListState(bufferDescriptor);

219

220

// Restore local buffer from checkpointed state

221

localBuffer = new ArrayList<>();

222

if (context.isRestored()) {

223

for (String element : bufferedElements.get()) {

224

localBuffer.add(element);

225

}

226

}

227

}

228

}

229

230

// Usage with checkpoint listener

231

public class CheckpointAwareFunction extends RichMapFunction<String, String>

232

implements CheckpointedFunction, CheckpointListener {

233

234

private ValueState<Long> lastCheckpointId;

235

236

@Override

237

public void initializeState(FunctionInitializationContext context) throws Exception {

238

ValueStateDescriptor<Long> descriptor =

239

new ValueStateDescriptor<>("lastCheckpointId", Long.class);

240

lastCheckpointId = context.getKeyedState().getState(descriptor);

241

}

242

243

@Override

244

public void snapshotState(FunctionSnapshotContext context) throws Exception {

245

lastCheckpointId.update(context.getCheckpointId());

246

}

247

248

@Override

249

public void notifyCheckpointComplete(long checkpointId) throws Exception {

250

// Perform cleanup or external system commits

251

System.out.println("Checkpoint " + checkpointId + " completed successfully");

252

}

253

254

@Override

255

public String map(String value) throws Exception {

256

return value.toUpperCase();

257

}

258

}

259

```

260

261

### State Backends

262

263

Configure different state backends for storing checkpointed state.

264

265

```java { .api }

266

// Configure state backend (typically done via configuration)

267

// MemoryStateBackend - for development/testing

268

// FsStateBackend - for production with file system storage

269

// RocksDBStateBackend - for large state

270

271

// Configuration in flink-conf.yaml:

272

// state.backend: filesystem

273

// state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

274

```

275

276

**Usage Examples:**

277

278

```java

279

// Configure state backend programmatically (not recommended for production)

280

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

281

282

// Memory state backend (for testing only)

283

env.setStateBackend(new MemoryStateBackend(10 * 1024 * 1024)); // 10MB

284

285

// File system state backend

286

env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));

287

288

// RocksDB state backend (for large state)

289

env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));

290

```

291

292

### Restart Strategies

293

294

Configure how jobs should restart after failures.

295

296

```java { .api }

297

// Configure restart strategy

298

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(

299

3, // number of restart attempts

300

Time.of(10, TimeUnit.SECONDS) // delay between attempts

301

));

302

303

env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(

304

Time.milliseconds(1), // initial delay

305

Time.milliseconds(1000), // max delay

306

1.5, // backoff multiplier

307

Time.minutes(5), // reset time

308

0.1 // jitter

309

));

310

311

env.setRestartStrategy(RestartStrategies.failureRateRestart(

312

3, // max failures per interval

313

Time.of(5, TimeUnit.MINUTES), // failure rate interval

314

Time.of(10, TimeUnit.SECONDS) // delay between attempts

315

));

316

```

317

318

## Types

319

320

### Checkpoint Configuration Types

321

322

```java { .api }

323

// Checkpointing mode

324

enum CheckpointingMode {

325

EXACTLY_ONCE, // Exactly-once processing guarantees

326

AT_LEAST_ONCE // At-least-once processing guarantees

327

}

328

329

// Externalized checkpoint cleanup

330

enum ExternalizedCheckpointCleanup {

331

RETAIN_ON_CANCELLATION, // Keep checkpoints when job is cancelled

332

DELETE_ON_CANCELLATION // Delete checkpoints when job is cancelled

333

}

334

335

// Checkpoint configuration

336

class CheckpointConfig {

337

void setCheckpointingMode(CheckpointingMode mode);

338

void setMinPauseBetweenCheckpoints(long minPause);

339

void setCheckpointTimeout(long timeout);

340

void setMaxConcurrentCheckpoints(int maxConcurrent);

341

void enableExternalizedCheckpoints(ExternalizedCheckpointCleanup cleanup);

342

void setFailOnCheckpointingErrors(boolean failOnErrors);

343

void setTolerableCheckpointFailureNumber(int tolerableFailures);

344

}

345

```

346

347

### Stateful Function Interfaces

348

349

```java { .api }

350

// Checkpointed function interface

351

interface CheckpointedFunction {

352

void snapshotState(FunctionSnapshotContext context) throws Exception;

353

void initializeState(FunctionInitializationContext context) throws Exception;

354

}

355

356

// Checkpoint listener interface

357

interface CheckpointListener {

358

void notifyCheckpointComplete(long checkpointId) throws Exception;

359

default void notifyCheckpointAborted(long checkpointId) throws Exception {}

360

}

361

362

// Function snapshot context

363

interface FunctionSnapshotContext {

364

long getCheckpointId();

365

long getCheckpointTimestamp();

366

}

367

368

// Function initialization context

369

interface FunctionInitializationContext {

370

boolean isRestored();

371

OperatorStateStore getOperatorState();

372

KeyedStateStore getKeyedState();

373

}

374

```

375

376

### State Store Interfaces

377

378

```java { .api }

379

// Operator state store for non-keyed state

380

interface OperatorStateStore {

381

<S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception;

382

<S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception;

383

<S> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) throws Exception;

384

}

385

386

// Keyed state store for keyed state

387

interface KeyedStateStore {

388

<T> ValueState<T> getState(ValueStateDescriptor<T> stateDescriptor) throws Exception;

389

<T> ListState<T> getListState(ListStateDescriptor<T> stateDescriptor) throws Exception;

390

<UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateDescriptor) throws Exception;

391

<T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateDescriptor) throws Exception;

392

<IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateDescriptor) throws Exception;

393

}

394

```

395

396

### Restart Strategy Types

397

398

```java { .api }

399

// Restart strategies

400

class RestartStrategies {

401

static RestartStrategyConfiguration noRestart();

402

static RestartStrategyConfiguration fallBackRestart();

403

static RestartStrategyConfiguration fixedDelayRestart(int restartAttempts, Time delayBetweenAttempts);

404

static RestartStrategyConfiguration exponentialDelayRestart(Time initialBackoff, Time maxBackoff, double backoffMultiplier, Time resetBackoffThreshold, double jitter);

405

static RestartStrategyConfiguration failureRateRestart(int failureRate, Time failureInterval, Time delayInterval);

406

}

407

408

// Time utility for restart strategies

409

class Time {

410

static Time of(long size, TimeUnit unit);

411

static Time milliseconds(long milliseconds);

412

static Time seconds(long seconds);

413

static Time minutes(long minutes);

414

static Time hours(long hours);

415

}

416

```