or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

class-loading-programs.mdindex.mdmigration-testing.mdperformance-testing.mdstate-management-testing.mdstreaming-utilities.mdtest-base-classes.mdtest-data-generation.md

migration-testing.mddocs/

0

# Migration Testing Framework

1

2

Complete framework for testing savepoint and checkpoint migration across different Flink versions. This framework provides infrastructure for validating that streaming applications can successfully restore from savepoints created with previous Flink versions, ensuring backward compatibility and state migration correctness.

3

4

## Capabilities

5

6

### Savepoint Migration Test Base

7

8

Abstract base class providing core functionality for savepoint migration testing.

9

10

```java { .api }

11

/**

12

* Base class for savepoint migration tests providing utilities for creating,

13

* restoring, and validating savepoints across different Flink versions

14

*/

15

public abstract class SavepointMigrationTestBase extends TestBaseUtils {

16

17

/**

18

* Get the full path to a test resource file

19

* @param filename Resource filename relative to test resources

20

* @return Full path to the resource file

21

*/

22

protected String getResourceFilename(String filename);

23

24

/**

25

* Execute a streaming job and create a savepoint at the specified path

26

* @param env StreamExecutionEnvironment configured for the test

27

* @param savepointPath Path where savepoint should be created

28

* @param expectedAccumulators Expected accumulator values for verification

29

* @throws Exception if job execution or savepoint creation fails

30

*/

31

protected void executeAndSavepoint(

32

StreamExecutionEnvironment env,

33

String savepointPath,

34

Tuple2<String, Integer>... expectedAccumulators) throws Exception;

35

36

/**

37

* Restore a streaming job from savepoint and execute to completion

38

* @param env StreamExecutionEnvironment configured for the test

39

* @param savepointPath Path to existing savepoint

40

* @param expectedAccumulators Expected accumulator values for verification

41

* @throws Exception if restoration or execution fails

42

*/

43

protected void restoreAndExecute(

44

StreamExecutionEnvironment env,

45

String savepointPath,

46

Tuple2<String, Integer>... expectedAccumulators) throws Exception;

47

}

48

```

49

50

**Usage Example:**

51

52

```java

53

public class MyMigrationTest extends SavepointMigrationTestBase {

54

55

@Test

56

public void testMigrationFromFlink17() throws Exception {

57

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

58

env.setParallelism(4);

59

60

// Configure your streaming topology

61

env.addSource(new TestSource())

62

.keyBy(value -> value.getKey())

63

.map(new StatefulMapper())

64

.addSink(new TestSink());

65

66

// Restore from Flink 1.7 savepoint and verify

67

String savepointPath = getResourceFilename("migration-test-flink1.7-savepoint");

68

restoreAndExecute(env, savepointPath,

69

Tuple2.of("elements-count", 1000),

70

Tuple2.of("checkpoints-count", 10));

71

}

72

}

73

```

74

75

### Migration Test Utilities

76

77

Utility classes and sources/sinks specifically designed for migration testing.

78

79

```java { .api }

80

/**

81

* Utility class containing specialized sources and sinks for migration testing

82

*/

83

public class MigrationTestUtils {

84

85

/**

86

* Source with list state for checkpointing tests (non-parallel)

87

*/

88

public static class CheckpointingNonParallelSourceWithListState

89

extends RichSourceFunction<Tuple2<Long, Long>>

90

implements ListCheckpointed<Long> {

91

92

public CheckpointingNonParallelSourceWithListState(int numElements);

93

94

public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception;

95

public void cancel();

96

public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception;

97

public void restoreState(List<Long> state) throws Exception;

98

}

99

100

/**

101

* Source for verifying restored state (non-parallel)

102

*/

103

public static class CheckingNonParallelSourceWithListState

104

extends RichSourceFunction<Tuple2<Long, Long>>

105

implements ListCheckpointed<Long> {

106

107

public CheckingNonParallelSourceWithListState(int numElements);

108

109

public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception;

110

public void cancel();

111

public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception;

112

public void restoreState(List<Long> state) throws Exception;

113

}

114

115

/**

116

* Parallel source with union list state for checkpointing tests

117

*/

118

public static class CheckpointingParallelSourceWithUnionListState

119

extends RichParallelSourceFunction<Tuple2<Long, Long>>

120

implements ListCheckpointed<Long> {

121

122

public CheckpointingParallelSourceWithUnionListState(int numElements);

123

124

public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception;

125

public void cancel();

126

public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception;

127

public void restoreState(List<Long> state) throws Exception;

128

}

129

130

/**

131

* Parallel source for verifying union list state restoration

132

*/

133

public static class CheckingParallelSourceWithUnionListState

134

extends RichParallelSourceFunction<Tuple2<Long, Long>>

135

implements ListCheckpointed<Long> {

136

137

public CheckingParallelSourceWithUnionListState(int numElements);

138

139

public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception;

140

public void cancel();

141

public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception;

142

public void restoreState(List<Long> state) throws Exception;

143

}

144

145

/**

146

* Sink that counts elements using accumulator

147

*/

148

public static class AccumulatorCountingSink<T> extends RichSinkFunction<T> {

149

150

public AccumulatorCountingSink(String accumulatorName);

151

152

public void open(Configuration parameters) throws Exception;

153

public void invoke(T value, Context context) throws Exception;

154

}

155

}

156

```

157

158

### Failing Source for Fault Tolerance Testing

159

160

Specialized source that can introduce controlled failures for fault tolerance and migration testing.

161

162

```java { .api }

163

/**

164

* Source that can introduce artificial failures for fault tolerance testing

165

*/

166

public class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>> {

167

168

/**

169

* Functional interface for event emission strategies

170

*/

171

@FunctionalInterface

172

public interface EventEmittingGenerator {

173

void emitEvent(SourceContext<Tuple2<Long, IntType>> ctx, int eventSequenceNo);

174

}

175

176

/**

177

* Constructor for basic failing source

178

* @param eventGenerator Generator function for creating events

179

* @param numEvents Total number of events to generate

180

* @param numElementsUntilFailure Number of elements before inducing failure

181

* @param numSuccessfulCheckpoints Number of successful checkpoints before failure

182

*/

183

public FailingSource(

184

EventEmittingGenerator eventGenerator,

185

int numEvents,

186

int numElementsUntilFailure,

187

int numSuccessfulCheckpoints);

188

189

/**

190

* Constructor with failure position control

191

* @param eventGenerator Generator function for creating events

192

* @param numEvents Total number of events to generate

193

* @param failurePos Position at which to induce failure

194

* @param numSuccessfulCheckpoints Number of successful checkpoints before failure

195

* @param continueAfterFailure Whether to continue after failure

196

*/

197

public FailingSource(

198

EventEmittingGenerator eventGenerator,

199

int numEvents,

200

int failurePos,

201

int numSuccessfulCheckpoints,

202

boolean continueAfterFailure);

203

204

public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws Exception;

205

public void cancel();

206

}

207

```

208

209

### Supporting Types

210

211

Supporting data types and utilities for migration testing.

212

213

```java { .api }

214

/**

215

* Simple integer wrapper for testing

216

*/

217

public class IntType {

218

public int value;

219

220

public IntType();

221

public IntType(int value);

222

223

public boolean equals(Object obj);

224

public int hashCode();

225

public String toString();

226

}

227

228

/**

229

* Sink for result validation in migration tests

230

*/

231

public class ValidatingSink<T> extends RichSinkFunction<T> {

232

233

public ValidatingSink(List<T> expectedValues);

234

235

public void open(Configuration parameters) throws Exception;

236

public void invoke(T value, Context context) throws Exception;

237

public void close() throws Exception;

238

}

239

```

240

241

### Migration Testing Patterns

242

243

Common patterns for implementing migration tests:

244

245

**Basic Migration Test Pattern:**

246

247

```java

248

public class StatefulJobMigrationTest extends SavepointMigrationTestBase {

249

250

@Test

251

public void testMigrationFromVersion14() throws Exception {

252

// 1. Setup environment

253

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

254

env.setStateBackend(new RocksDBStateBackend("file:///tmp/test-backend"));

255

env.enableCheckpointing(100);

256

257

// 2. Create topology with stateful operators

258

DataStream<Tuple2<Long, IntType>> source = env.addSource(

259

new MigrationTestUtils.CheckingNonParallelSourceWithListState(100));

260

261

source.keyBy(value -> value.f0)

262

.map(new StatefulMapFunction())

263

.addSink(new MigrationTestUtils.AccumulatorCountingSink<>("count"));

264

265

// 3. Restore from savepoint and execute

266

String savepointPath = getResourceFilename("stateful-job-flink1.4-savepoint");

267

restoreAndExecute(env, savepointPath, Tuple2.of("count", 100));

268

}

269

}

270

```

271

272

**Fault Tolerance Migration Test:**

273

274

```java

275

@Test

276

public void testFaultToleranceMigration() throws Exception {

277

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

278

env.setParallelism(4);

279

env.enableCheckpointing(50);

280

281

// Source that will fail and recover

282

FailingSource source = new FailingSource(

283

(ctx, eventSeq) -> ctx.collect(Tuple2.of((long) eventSeq, new IntType(eventSeq))),

284

1000, // total events

285

500, // fail after 500 events

286

5 // after 5 successful checkpoints

287

);

288

289

env.addSource(source)

290

.keyBy(value -> value.f0 % 4)

291

.map(new RecoveringMapFunction())

292

.addSink(new ValidatingSink<>(expectedResults));

293

294

String savepointPath = getResourceFilename("fault-tolerance-test-savepoint");

295

restoreAndExecute(env, savepointPath, Tuple2.of("processed", 1000));

296

}

297

```

298

299

**State Evolution Migration Test:**

300

301

```java

302

@Test

303

public void testStateEvolutionMigration() throws Exception {

304

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

305

env.setParallelism(1);

306

307

// Test state schema evolution

308

env.addSource(new MigrationTestUtils.CheckingNonParallelSourceWithListState(50))

309

.keyBy(value -> value.f0)

310

.process(new EvolvingProcessFunction()) // Function with evolved state schema

311

.addSink(new MigrationTestUtils.AccumulatorCountingSink<>("evolved-count"));

312

313

String savepointPath = getResourceFilename("state-evolution-flink1.6-savepoint");

314

restoreAndExecute(env, savepointPath, Tuple2.of("evolved-count", 50));

315

}

316

```

317

318

This migration testing framework ensures that Flink applications maintain backward compatibility across version upgrades and that stateful streaming applications can successfully restore from savepoints created with previous versions.