or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

api-completeness.mdcheckpointing.mddata-generation.mdexecution.mdfault-tolerance.mdindex.mdstreaming.md

checkpointing.mddocs/

0

# Checkpointing and State Management

1

2

Infrastructure for testing savepoint migration, state compatibility, and checkpoint recovery across Apache Flink versions. These utilities are essential for validating upgrade paths and ensuring state serialization compatibility.

3

4

## Core Base Classes

5

6

### SavepointMigrationTestBase

7

8

Abstract base class providing infrastructure for testing savepoint migration between Flink versions.

9

10

```java { .api }

11

public abstract class SavepointMigrationTestBase extends TestBaseUtils {

12

@Rule

13

protected TemporaryFolder tempFolder;

14

protected LocalFlinkMiniCluster cluster;

15

16

@Before

17

public void setup() throws Exception;

18

19

@After

20

public void cleanup() throws Exception;

21

22

// Core migration testing methods

23

protected void executeAndSavepoint(StreamExecutionEnvironment env, String savepointPath, Tuple2<String, Integer>... expectedAccumulators) throws Exception;

24

protected void restoreAndExecute(StreamExecutionEnvironment env, String savepointPath, Tuple2<String, Integer>... expectedAccumulators) throws Exception;

25

26

// Resource management

27

protected static String getResourceFilename(String filename);

28

29

// Cluster management

30

protected void startCluster() throws Exception;

31

protected void stopCluster() throws Exception;

32

}

33

```

34

35

### AbstractOperatorRestoreTestBase

36

37

Base class for testing operator state restoration across versions with two-step migration testing.

38

39

```java { .api }

40

public abstract class AbstractOperatorRestoreTestBase {

41

protected Configuration config;

42

protected LocalFlinkMiniCluster cluster;

43

44

@Before

45

public void setup() throws Exception;

46

47

@After

48

public void cleanup() throws Exception;

49

50

// Migration workflow methods

51

protected abstract JobGraph createMigrationJob() throws Exception;

52

protected abstract JobGraph createRestoredJob() throws Exception;

53

protected abstract String getMigrationSavepointName();

54

55

// Test execution

56

@Test

57

public void testRestore() throws Exception;

58

}

59

```

60

61

### AbstractKeyedOperatorRestoreTestBase

62

63

Specialized base for testing keyed operator state restoration with parameterized testing.

64

65

```java { .api }

66

public abstract class AbstractKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {

67

@Parameterized.Parameters(name = "Savepoint: {0}")

68

public static Collection<Tuple2<String, String>> getParameters();

69

70

protected String savepointPath;

71

protected String savepointVersion;

72

73

public AbstractKeyedOperatorRestoreTestBase(String savepointPath, String savepointVersion);

74

}

75

```

76

77

### AbstractNonKeyedOperatorRestoreTestBase

78

79

Specialized base for testing non-keyed operator state restoration.

80

81

```java { .api }

82

public abstract class AbstractNonKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {

83

@Parameterized.Parameters(name = "Savepoint: {0}")

84

public static Collection<Tuple2<String, String>> getParameters();

85

86

protected String savepointPath;

87

protected String savepointVersion;

88

89

public AbstractNonKeyedOperatorRestoreTestBase(String savepointPath, String savepointVersion);

90

}

91

```

92

93

## Savepoint Migration Test Cases

94

95

### StatefulJobSavepointFrom11MigrationITCase

96

97

Concrete test for validating savepoint migration from Flink 1.1.

98

99

```java { .api }

100

public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigrationTestBase {

101

private static final int NUM_SOURCE_ELEMENTS = 4;

102

103

@Test

104

public void testSavepoint() throws Exception;

105

106

// Job creation methods

107

private JobGraph createJobGraphV2() throws IOException;

108

private JobGraph createJobGraphV3() throws IOException;

109

110

@Override

111

protected String getResourceFilename(String filename);

112

}

113

```

114

115

### StatefulJobSavepointFrom12MigrationITCase

116

117

Concrete test for validating savepoint migration from Flink 1.2.

118

119

```java { .api }

120

public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigrationTestBase {

121

private static final int NUM_SOURCE_ELEMENTS = 4;

122

123

@Test

124

public void testSavepoint() throws Exception;

125

126

// Job graph creation

127

private JobGraph createJobGraph() throws IOException;

128

129

@Override

130

protected String getResourceFilename(String filename);

131

}

132

```

133

134

### StatefulJobSavepointFrom13MigrationITCase

135

136

Test for validating savepoint migration within Flink 1.3.

137

138

```java { .api }

139

public class StatefulJobSavepointFrom13MigrationITCase extends SavepointMigrationTestBase {

140

@Test

141

public void testSavepoint() throws Exception;

142

143

@Override

144

protected String getResourceFilename(String filename);

145

}

146

```

147

148

## Stream Fault Tolerance Base

149

150

### StreamFaultToleranceTestBase

151

152

Infrastructure for testing fault tolerance in streaming applications with multi-TaskManager clusters.

153

154

```java { .api }

155

public abstract class StreamFaultToleranceTestBase {

156

protected static final int NUM_TASK_MANAGERS = 2;

157

protected LocalFlinkMiniCluster cluster;

158

protected Configuration config;

159

160

@Before

161

public void setup() throws Exception;

162

163

@After

164

public void cleanup() throws Exception;

165

166

// Abstract methods for test implementation

167

protected abstract void testProgram(StreamExecutionEnvironment env);

168

protected void postSubmit() throws Exception;

169

170

// Execution control

171

@Test

172

public void runCheckpointedProgram() throws Exception;

173

}

174

```

175

176

## Usage Examples

177

178

### Basic Savepoint Migration Test

179

180

```java

181

public class MySavepointMigrationTest extends SavepointMigrationTestBase {

182

183

@Test

184

public void testSavepointMigration() throws Exception {

185

// Create initial streaming environment and execute

186

StreamExecutionEnvironment env1 = createV1Environment();

187

188

// Execute and create savepoint

189

executeAndSavepoint(env1, "my-migration-savepoint");

190

191

// Create updated streaming environment

192

StreamExecutionEnvironment env2 = createV2Environment();

193

194

// Restore from savepoint and execute

195

restoreAndExecute(env2, "my-migration-savepoint");

196

}

197

198

private StreamExecutionEnvironment createV1Environment() {

199

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

200

env.enableCheckpointing(500);

201

202

env.fromElements(1, 2, 3, 4, 5)

203

.keyBy(x -> x % 2)

204

.map(new StatefulMapper())

205

.addSink(new DiscardingSink<>());

206

207

return env;

208

}

209

210

private StreamExecutionEnvironment createV2Environment() {

211

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

212

env.enableCheckpointing(500);

213

214

// Modified job with additional transformation

215

env.fromElements(1, 2, 3, 4, 5)

216

.keyBy(x -> x % 2)

217

.map(new StatefulMapper())

218

.map(x -> x * 2) // Additional transformation

219

.addSink(new DiscardingSink<>());

220

221

return env;

222

}

223

224

@Override

225

protected String getResourceFilename(String filename) {

226

return "savepoints/" + filename;

227

}

228

}

229

```

230

231

### Operator State Restoration Test

232

233

```java

234

public class MyOperatorRestoreTest extends AbstractKeyedOperatorRestoreTestBase {

235

236

public MyOperatorRestoreTest(String savepointPath, String savepointVersion) {

237

super(savepointPath, savepointVersion);

238

}

239

240

@Parameterized.Parameters(name = "Savepoint: {0}")

241

public static Collection<Tuple2<String, String>> getParameters() {

242

return Arrays.asList(

243

new Tuple2<>("keyed-flink1.2", "1.2"),

244

new Tuple2<>("keyed-flink1.3", "1.3")

245

);

246

}

247

248

@Override

249

protected JobGraph createMigrationJob() throws Exception {

250

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

251

env.enableCheckpointing(500);

252

253

// Create job that generates state

254

env.fromElements(1, 2, 3, 4, 5)

255

.keyBy(x -> x)

256

.map(new StatefulKeyedFunction())

257

.addSink(new DiscardingSink<>());

258

259

return env.getStreamGraph().getJobGraph();

260

}

261

262

@Override

263

protected JobGraph createRestoredJob() throws Exception {

264

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

265

266

// Create job that validates restored state

267

env.fromElements(6, 7, 8, 9, 10)

268

.keyBy(x -> x)

269

.map(new ValidatingKeyedFunction())

270

.addSink(new DiscardingSink<>());

271

272

return env.getStreamGraph().getJobGraph();

273

}

274

275

@Override

276

protected String getMigrationSavepointName() {

277

return savepointPath;

278

}

279

}

280

```

281

282

### Stream Fault Tolerance Test

283

284

```java

285

public class MyFaultToleranceTest extends StreamFaultToleranceTestBase {

286

287

@Override

288

protected void testProgram(StreamExecutionEnvironment env) {

289

env.enableCheckpointing(500);

290

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000));

291

292

env.fromElements(1, 2, 3, 4, 5)

293

.map(new FailingMapper()) // Intentionally fails

294

.keyBy(x -> x % 2)

295

.map(new RecoveringMapper()) // Recovers from failure

296

.addSink(new ValidatingSink());

297

}

298

299

@Override

300

protected void postSubmit() throws Exception {

301

// Validation logic after job completion

302

Thread.sleep(2000); // Wait for completion

303

// Verify results through external validation

304

}

305

}

306

```