or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdrecovery-fault-tolerance.mdscala-testing.mdstate-migration.mdtest-base-classes.mdtest-utilities.md

state-migration.mddocs/

0

# State Migration Testing

1

2

Comprehensive infrastructure for testing operator state migration and compatibility across Flink versions. This includes utilities for savepoint creation, restoration, verification, and specialized source/sink implementations for migration testing scenarios.

3

4

## Capabilities

5

6

### MigrationTestUtils

7

8

Utility class containing specialized sources, sinks, and helper components for migration testing.

9

10

```java { .api }

11

/**

12

* Utility class containing common functions and classes for migration tests

13

* Provides standardized components for testing state migration scenarios

14

*/

15

public class MigrationTestUtils {

16

17

/**

18

* Non-parallel source with list state for migration testing

19

* Creates checkpointed state with predefined string values

20

*/

21

public static class CheckpointingNonParallelSourceWithListState

22

implements SourceFunction<Tuple2<Long, Long>>, CheckpointedFunction {

23

24

static final ListStateDescriptor<String> STATE_DESCRIPTOR =

25

new ListStateDescriptor<>("source-state", StringSerializer.INSTANCE);

26

27

static final String CHECKPOINTED_STRING = "Here be dragons!";

28

static final String CHECKPOINTED_STRING_1 = "Here be more dragons!";

29

static final String CHECKPOINTED_STRING_2 = "Here be yet more dragons!";

30

static final String CHECKPOINTED_STRING_3 = "Here be the mostest dragons!";

31

32

/**

33

* Create source with specified number of elements

34

* @param numElements Number of elements to emit

35

*/

36

public CheckpointingNonParallelSourceWithListState(int numElements);

37

38

@Override

39

public void snapshotState(FunctionSnapshotContext context) throws Exception;

40

41

@Override

42

public void initializeState(FunctionInitializationContext context) throws Exception;

43

44

@Override

45

public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception;

46

47

@Override

48

public void cancel();

49

}

50

51

/**

52

* Source for verifying restored state from CheckpointingNonParallelSourceWithListState

53

* Validates that list state was properly restored with expected values

54

*/

55

public static class CheckingNonParallelSourceWithListState

56

extends RichSourceFunction<Tuple2<Long, Long>> implements CheckpointedFunction {

57

58

static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR =

59

CheckingNonParallelSourceWithListState.class + "_RESTORE_CHECK";

60

61

/**

62

* Create checking source with specified number of elements

63

* @param numElements Number of elements to emit after verification

64

*/

65

public CheckingNonParallelSourceWithListState(int numElements);

66

67

@Override

68

public void initializeState(FunctionInitializationContext context) throws Exception;

69

70

@Override

71

public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception;

72

}

73

74

/**

75

* Parallel source with union list state for migration testing

76

* Distributes state across parallel instances using hash-based distribution

77

*/

78

public static class CheckpointingParallelSourceWithUnionListState

79

extends RichSourceFunction<Tuple2<Long, Long>> implements CheckpointedFunction {

80

81

static final ListStateDescriptor<String> STATE_DESCRIPTOR =

82

new ListStateDescriptor<>("source-state", StringSerializer.INSTANCE);

83

84

static final String[] CHECKPOINTED_STRINGS = {

85

"Here be dragons!",

86

"Here be more dragons!",

87

"Here be yet more dragons!",

88

"Here be the mostest dragons!" };

89

90

/**

91

* Create parallel source with specified number of elements

92

* @param numElements Number of elements to emit per parallel instance

93

*/

94

public CheckpointingParallelSourceWithUnionListState(int numElements);

95

96

@Override

97

public void snapshotState(FunctionSnapshotContext context) throws Exception;

98

99

@Override

100

public void initializeState(FunctionInitializationContext context) throws Exception;

101

}

102

103

/**

104

* Source for verifying restored union list state

105

* Validates that union list state contains all expected values from all parallel instances

106

*/

107

public static class CheckingParallelSourceWithUnionListState

108

extends RichParallelSourceFunction<Tuple2<Long, Long>> implements CheckpointedFunction {

109

110

static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR =

111

CheckingParallelSourceWithUnionListState.class + "_RESTORE_CHECK";

112

113

/**

114

* Create checking parallel source

115

* @param numElements Number of elements to emit after verification

116

*/

117

public CheckingParallelSourceWithUnionListState(int numElements);

118

119

@Override

120

public void initializeState(FunctionInitializationContext context) throws Exception;

121

}

122

123

/**

124

* Sink that counts elements using accumulators for coordination

125

* Used to signal completion of migration test phases

126

*/

127

public static class AccumulatorCountingSink<T> extends RichSinkFunction<T> {

128

129

static final String NUM_ELEMENTS_ACCUMULATOR =

130

AccumulatorCountingSink.class + "_NUM_ELEMENTS";

131

132

/**

133

* Open sink and initialize accumulator

134

*/

135

@Override

136

public void open(Configuration parameters) throws Exception;

137

138

/**

139

* Count element and update accumulator

140

* @param value Element to count

141

* @param context Sink context

142

*/

143

@Override

144

public void invoke(T value, Context context) throws Exception;

145

}

146

}

147

```

148

149

**Usage Example:**

150

151

```java

152

public class MyMigrationTest extends SavepointMigrationTestBase {

153

154

@Test

155

public void testListStateMigration() throws Exception {

156

final int NUM_ELEMENTS = 100;

157

158

// Phase 1: Create savepoint with list state

159

StreamExecutionEnvironment env1 = StreamExecutionEnvironment.getExecutionEnvironment();

160

env1.setParallelism(DEFAULT_PARALLELISM);

161

162

env1.addSource(new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(NUM_ELEMENTS))

163

.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());

164

165

executeAndSavepoint(env1, "migration-savepoint",

166

Tuple2.of(MigrationTestUtils.AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_ELEMENTS));

167

168

// Phase 2: Restore and verify state

169

StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecutionEnvironment();

170

env2.setParallelism(DEFAULT_PARALLELISM);

171

172

env2.addSource(new MigrationTestUtils.CheckingNonParallelSourceWithListState(NUM_ELEMENTS))

173

.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());

174

175

restoreAndExecute(env2, "migration-savepoint",

176

Tuple2.of(MigrationTestUtils.CheckingNonParallelSourceWithListState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1),

177

Tuple2.of(MigrationTestUtils.AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_ELEMENTS));

178

}

179

}

180

```

181

182

### AbstractOperatorRestoreTestBase

183

184

Foundation class for testing operator state restoration with standardized savepoint handling and job creation patterns.

185

186

```java { .api }

187

/**

188

* Base class for testing operator state migration and restoration across Flink versions

189

* Provides standardized infrastructure for savepoint-based testing

190

*/

191

public abstract class AbstractOperatorRestoreTestBase {

192

193

/**

194

* Create job that generates migration savepoint

195

* Implementation should create a job that produces state to be migrated

196

* @param env StreamExecutionEnvironment for job creation

197

* @return JobGraph for the migration job

198

* @throws Exception if job creation fails

199

*/

200

public abstract JobGraph createMigrationJob(StreamExecutionEnvironment env) throws Exception;

201

202

/**

203

* Create job that restores from migration savepoint and verifies state

204

* Implementation should create a job that validates migrated state

205

* @param env StreamExecutionEnvironment for job creation

206

* @return JobGraph for the restoration job

207

* @throws Exception if job creation fails

208

*/

209

public abstract JobGraph createRestoredJob(StreamExecutionEnvironment env) throws Exception;

210

211

/**

212

* Get name of savepoint resource for this test

213

* Should return the filename of the savepoint resource in test resources

214

* @return Resource name for the savepoint

215

*/

216

public abstract String getMigrationSavepointName();

217

218

/**

219

* Execute the complete migration test cycle

220

* 1. Create migration job and generate savepoint

221

* 2. Create restored job and validate state

222

*/

223

@Test

224

public void testRestore() throws Exception;

225

}

226

```

227

228

**Usage Example:**

229

230

```java

231

public class MyOperatorRestoreTest extends AbstractOperatorRestoreTestBase {

232

233

@Override

234

public JobGraph createMigrationJob(StreamExecutionEnvironment env) throws Exception {

235

env.addSource(new SourceWithManagedState())

236

.keyBy(x -> x.key)

237

.map(new StatefulMapFunction())

238

.addSink(new DiscardingSink<>());

239

240

return env.getStreamGraph().getJobGraph();

241

}

242

243

@Override

244

public JobGraph createRestoredJob(StreamExecutionEnvironment env) throws Exception {

245

env.addSource(new VerifyingSource())

246

.keyBy(x -> x.key)

247

.map(new StatefulMapFunction()) // Same function, should restore state

248

.addSink(new ValidatingSink<>());

249

250

return env.getStreamGraph().getJobGraph();

251

}

252

253

@Override

254

public String getMigrationSavepointName() {

255

return "my-operator-flink1.4-savepoint";

256

}

257

}

258

```

259

260

### AbstractKeyedOperatorRestoreTestBase

261

262

Specialized base class for testing keyed operator state restoration with keyed state management.

263

264

```java { .api }

265

/**

266

* Specialized base class for testing keyed operator state restoration

267

* Provides additional infrastructure for keyed state scenarios

268

*/

269

public abstract class AbstractKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {

270

271

/**

272

* Default key selector for keyed state tests

273

* @return Function to extract keys from test data

274

*/

275

protected abstract KeySelector<?, ?> getKeySelector();

276

277

/**

278

* Create keyed stream for migration testing

279

* @param env StreamExecutionEnvironment

280

* @return Configured DataStream with keying applied

281

*/

282

protected abstract DataStream<?> createKeyedStream(StreamExecutionEnvironment env);

283

}

284

```

285

286

### AbstractNonKeyedOperatorRestoreTestBase

287

288

Specialized base class for testing non-keyed (operator) state restoration.

289

290

```java { .api }

291

/**

292

* Specialized base class for testing non-keyed operator state restoration

293

* Handles operator state scenarios without keying requirements

294

*/

295

public abstract class AbstractNonKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {

296

297

/**

298

* Create non-keyed stream for migration testing

299

* @param env StreamExecutionEnvironment

300

* @return Configured DataStream without keying

301

*/

302

protected abstract DataStream<?> createNonKeyedStream(StreamExecutionEnvironment env);

303

}

304

```

305

306

## Migration Testing Patterns

307

308

### List State Migration Pattern

309

310

Testing migration of list state across Flink versions:

311

312

```java

313

// Step 1: Create job with list state (older Flink version)

314

public class ListStateMigrationJob {

315

public static void main(String[] args) throws Exception {

316

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

317

env.addSource(new MigrationTestUtils.CheckpointingNonParallelSourceWithListState(1000))

318

.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());

319

env.execute();

320

}

321

}

322

323

// Step 2: Restore and verify (newer Flink version)

324

public class ListStateRestorationJob {

325

public static void main(String[] args) throws Exception {

326

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

327

env.addSource(new MigrationTestUtils.CheckingNonParallelSourceWithListState(1000))

328

.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());

329

env.execute();

330

}

331

}

332

```

333

334

### Union State Migration Pattern

335

336

Testing migration of union list state with parallel instances:

337

338

```java

339

// Create job with union state

340

env.addSource(new MigrationTestUtils.CheckpointingParallelSourceWithUnionListState(1000))

341

.setParallelism(4)

342

.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());

343

344

// Restore and verify union state

345

env.addSource(new MigrationTestUtils.CheckingParallelSourceWithUnionListState(1000))

346

.setParallelism(4)

347

.addSink(new MigrationTestUtils.AccumulatorCountingSink<>());

348

```

349

350

### Accumulator Coordination Pattern

351

352

Using accumulators to coordinate test phases:

353

354

```java

355

// Wait for specific accumulator values before proceeding

356

Tuple2<String, Integer> completionSignal =

357

Tuple2.of(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, expectedCount);

358

359

Tuple2<String, Integer> verificationSignal =

360

Tuple2.of(CheckingNonParallelSourceWithListState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1);

361

362

executeAndSavepoint(env, savepointPath, completionSignal);

363

restoreAndExecute(env, savepointPath, verificationSignal, completionSignal);

364

```

365

366

## State Validation

367

368

### List State Validation

369

370

The migration test utilities automatically validate that list state contains expected values:

371

372

```java

373

// CheckingNonParallelSourceWithListState validates:

374

assertThat(unionListState.get(), containsInAnyOrder(

375

CheckpointingNonParallelSourceWithListState.CHECKPOINTED_STRING,

376

CheckpointingNonParallelSourceWithListState.CHECKPOINTED_STRING_1,

377

CheckpointingNonParallelSourceWithListState.CHECKPOINTED_STRING_2,

378

CheckpointingNonParallelSourceWithListState.CHECKPOINTED_STRING_3

379

));

380

```

381

382

### Union State Validation

383

384

Union state validation ensures all parallel instances contribute their state:

385

386

```java

387

// CheckingParallelSourceWithUnionListState validates:

388

assertThat(unionListState.get(), containsInAnyOrder(

389

CheckpointingParallelSourceWithUnionListState.CHECKPOINTED_STRINGS

390

));

391

```

392

393

### Accumulator-based Verification

394

395

Accumulators provide coordination and verification signals:

396

397

```java

398

// Success accumulator indicates proper state restoration

399

getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter());

400

getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);

401

402

// Count accumulator tracks processing progress

403

getRuntimeContext().getAccumulator(NUM_ELEMENTS_ACCUMULATOR).add(1);

404

```