or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

checkpointing-state.mddatastream-operations.mdexecution-environment.mdindex.mdsources-and-sinks.mdstream-operators.mdwindowing.md

checkpointing-state.mddocs/

0

# Checkpointing and State

1

2

Flink's checkpointing mechanism provides fault tolerance by creating consistent snapshots of streaming application state. This enables exactly-once processing semantics and recovery from failures.

3

4

## Core Checkpointing Interfaces

5

6

### Checkpointed<T>

7

8

Interface for functions that need to participate in checkpointing.

9

10

```java { .api }

11

public interface Checkpointed<T> extends Serializable {

12

T snapshotState(long checkpointId, long checkpointTimestamp) throws Exception;

13

void restoreState(T state) throws Exception;

14

}

15

```

16

17

**Usage Example**:

18

19

```java

20

import org.apache.flink.streaming.api.checkpoint.Checkpointed;

21

22

public class StatefulMapFunction implements MapFunction<String, String>, Checkpointed<Integer> {

23

private int counter = 0;

24

25

@Override

26

public String map(String value) throws Exception {

27

counter++;

28

return value + "_" + counter;

29

}

30

31

@Override

32

public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {

33

return counter;

34

}

35

36

@Override

37

public void restoreState(Integer state) throws Exception {

38

counter = state;

39

}

40

}

41

```

42

43

### CheckpointedAsynchronously<T>

44

45

Interface for functions that can perform asynchronous checkpointing to avoid blocking stream processing.

46

47

```java { .api }

48

public interface CheckpointedAsynchronously<T> extends Checkpointed<T> {

49

// Inherits snapshotState and restoreState methods

50

// Indicates that snapshotState can be called asynchronously

51

}

52

```

53

54

**Usage Example**:

55

56

```java

57

import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;

58

59

public class AsyncCheckpointedSink implements SinkFunction<String>, CheckpointedAsynchronously<Map<String, Integer>> {

60

private Map<String, Integer> state = new HashMap<>();

61

62

@Override

63

public void invoke(String value) throws Exception {

64

state.put(value, state.getOrDefault(value, 0) + 1);

65

}

66

67

@Override

68

public Map<String, Integer> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {

69

// Can be called asynchronously - return a copy of the state

70

return new HashMap<>(state);

71

}

72

73

@Override

74

public void restoreState(Map<String, Integer> restoredState) throws Exception {

75

state = restoredState;

76

}

77

}

78

```

79

80

## Checkpoint Committing

81

82

### CheckpointCommitter

83

84

Interface for committing checkpoints to external systems for additional durability guarantees.

85

86

```java { .api }

87

public interface CheckpointCommitter extends Serializable {

88

void commitCheckpoint(long checkpointId) throws Exception;

89

boolean isCheckpointCommitted(long checkpointId) throws Exception;

90

}

91

```

92

93

**Usage Example**:

94

95

```java

96

import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter;

97

98

public class DatabaseCheckpointCommitter implements CheckpointCommitter {

99

private final String connectionUrl;

100

101

public DatabaseCheckpointCommitter(String connectionUrl) {

102

this.connectionUrl = connectionUrl;

103

}

104

105

@Override

106

public void commitCheckpoint(long checkpointId) throws Exception {

107

// Commit checkpoint information to external database

108

// This ensures the checkpoint is durably stored

109

executeUpdate("INSERT INTO checkpoints (id, timestamp) VALUES (?, ?)",

110

checkpointId, System.currentTimeMillis());

111

}

112

113

@Override

114

public boolean isCheckpointCommitted(long checkpointId) throws Exception {

115

// Check if checkpoint was successfully committed

116

return checkExists("SELECT 1 FROM checkpoints WHERE id = ?", checkpointId);

117

}

118

119

private void executeUpdate(String sql, Object... params) throws Exception {

120

// Database update implementation

121

}

122

123

private boolean checkExists(String sql, Object... params) throws Exception {

124

// Database query implementation

125

return false;

126

}

127

}

128

```

129

130

## State Handle Providers

131

132

### StateHandleProvider<T>

133

134

Interface for providing state handle storage and retrieval mechanisms.

135

136

```java { .api }

137

public interface StateHandleProvider<T extends StateHandle> extends Serializable {

138

T createStateHandle(String state) throws Exception;

139

String getStateFromHandle(T stateHandle) throws Exception;

140

}

141

```

142

143

## Checkpoint Configuration

144

145

Checkpointing is configured at the execution environment level:

146

147

```java

148

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

149

150

// Enable checkpointing with 5-second interval

151

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

152

env.enableCheckpointing(5000); // Checkpoint every 5 seconds

153

154

// Configure checkpointing behavior

155

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

156

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);

157

env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

158

env.getCheckpointConfig().setCheckpointTimeout(60000);

159

```

160

161

## Integration with User Functions

162

163

### Rich Functions with State

164

165

Rich functions can access operator state through the RuntimeContext:

166

167

```java

168

import org.apache.flink.api.common.functions.RichMapFunction;

169

import org.apache.flink.api.common.state.ValueState;

170

import org.apache.flink.api.common.state.ValueStateDescriptor;

171

import org.apache.flink.configuration.Configuration;

172

173

public class StatefulRichMapFunction extends RichMapFunction<String, String> {

174

private ValueState<Integer> counterState;

175

176

@Override

177

public void open(Configuration parameters) throws Exception {

178

ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>(

179

"counter", // state name

180

Integer.class, // type class

181

0 // default value

182

);

183

counterState = getRuntimeContext().getState(descriptor);

184

}

185

186

@Override

187

public String map(String value) throws Exception {

188

Integer currentCount = counterState.value();

189

currentCount++;

190

counterState.update(currentCount);

191

return value + "_" + currentCount;

192

}

193

}

194

```

195

196

### Keyed State

197

198

For keyed streams, state is automatically partitioned by key:

199

200

```java

201

import org.apache.flink.streaming.api.datastream.DataStream;

202

203

DataStream<String> input = env.socketTextStream("localhost", 9999);

204

205

DataStream<String> result = input

206

.groupBy(value -> value.split(" ")[0]) // Group by first word

207

.map(new StatefulRichMapFunction()); // State is keyed automatically

208

```

209

210

## Fault Tolerance Guarantees

211

212

### Exactly-Once Processing

213

214

Flink provides exactly-once processing guarantees through:

215

216

1. **Consistent Checkpointing**: All operators checkpoint their state consistently

217

2. **Checkpoint Barriers**: Special watermark-like records that align checkpoints across parallel streams

218

3. **State Recovery**: On failure, all operators restore state from the last successful checkpoint

219

220

```java

221

// Example of exactly-once sink with checkpointing

222

public class ExactlyOnceSink implements SinkFunction<String>, CheckpointedAsynchronously<List<String>> {

223

private List<String> pendingRecords = new ArrayList<>();

224

225

@Override

226

public void invoke(String value) throws Exception {

227

// Buffer records until checkpoint

228

pendingRecords.add(value);

229

}

230

231

@Override

232

public List<String> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {

233

// Flush all pending records to external system

234

flushToExternalSystem(pendingRecords);

235

236

// Return state for recovery

237

List<String> snapshot = new ArrayList<>(pendingRecords);

238

pendingRecords.clear();

239

return snapshot;

240

}

241

242

@Override

243

public void restoreState(List<String> state) throws Exception {

244

pendingRecords = state;

245

}

246

247

private void flushToExternalSystem(List<String> records) throws Exception {

248

// Implementation to write records to external system

249

}

250

}

251

```

252

253

## Types

254

255

```java { .api }

256

// State handle interface

257

public interface StateHandle extends Serializable {

258

void discardState() throws Exception;

259

long getStateSize() throws Exception;

260

}

261

262

// Checkpoint configuration

263

public class CheckpointConfig {

264

public void setCheckpointingMode(CheckpointingMode mode);

265

public void setMinPauseBetweenCheckpoints(long minPause);

266

public void setMaxConcurrentCheckpoints(int maxConcurrentCheckpoints);

267

public void setCheckpointTimeout(long checkpointTimeout);

268

public void enableExternalizedCheckpoints(ExternalizedCheckpointCleanup cleanup);

269

}

270

271

// Checkpointing modes

272

public enum CheckpointingMode {

273

EXACTLY_ONCE, // Exactly-once processing semantics

274

AT_LEAST_ONCE // At-least-once processing semantics

275

}

276

277

// External checkpoint cleanup modes

278

public enum ExternalizedCheckpointCleanup {

279

RETAIN_ON_CANCELLATION, // Keep checkpoints when job is cancelled

280

DELETE_ON_CANCELLATION // Delete checkpoints when job is cancelled

281

}

282

```