or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cancellation-testing.mdcheckpointing-migration.mdfault-tolerance-recovery.mdindex.mdoperator-lifecycle.mdplugin-testing.mdruntime-utilities.mdsession-window-testing.mdstate-backend-restore.mdtest-data-utilities.md

state-backend-restore.mddocs/

0

# State Backend and Operator Restore Testing

1

2

Framework for testing state backend switching, operator restore scenarios, and state migration validation. This framework ensures state compatibility across different backends and validates operator restore behavior.

3

4

## Capabilities

5

6

### Abstract Operator Restore Test Base

7

8

Base class providing common functionality for testing operator restore scenarios across different state management configurations.

9

10

```java { .api }

11

/**

12

* Abstract base class for operator restore testing scenarios

13

*/

14

public abstract class AbstractOperatorRestoreTestBase {

15

16

/**

17

* Test operator restore from savepoint or checkpoint

18

* @throws Exception if restore testing fails

19

*/

20

protected abstract void testRestore() throws Exception;

21

22

/**

23

* Create savepoint from running job for restore testing

24

* @param jobGraph job to create savepoint from

25

* @return String path to created savepoint

26

* @throws Exception if savepoint creation fails

27

*/

28

protected String createSavepoint(JobGraph jobGraph) throws Exception;

29

30

/**

31

* Restore job from savepoint and validate state

32

* @param jobGraph job to restore

33

* @param savepointPath path to savepoint

34

* @throws Exception if restore or validation fails

35

*/

36

protected void restoreFromSavepoint(JobGraph jobGraph, String savepointPath) throws Exception;

37

38

/**

39

* Configure state backend for testing

40

* @param backend state backend configuration

41

*/

42

protected void configureStateBackend(StateBackend backend);

43

}

44

```

45

46

### Keyed Operator Restore Testing

47

48

Specialized base class for testing restore scenarios with keyed state operators.

49

50

```java { .api }

51

/**

52

* Base class for testing keyed operator restore scenarios

53

*/

54

public abstract class AbstractKeyedOperatorRestoreTestBase

55

extends AbstractOperatorRestoreTestBase {

56

57

/**

58

* Test keyed state restore with different key serializers

59

* @param keySerializer serializer for key type

60

* @param valueSerializer serializer for value type

61

* @throws Exception if keyed restore test fails

62

*/

63

protected <K, V> void testKeyedStateRestore(

64

TypeSerializer<K> keySerializer,

65

TypeSerializer<V> valueSerializer) throws Exception;

66

67

/**

68

* Validate keyed state after restore

69

* @param expectedState expected state values after restore

70

* @param actualState actual restored state

71

* @return boolean indicating state validity

72

*/

73

protected <K, V> boolean validateKeyedState(

74

Map<K, V> expectedState,

75

Map<K, V> actualState);

76

77

/**

78

* Create keyed state test job with configurable state

79

* @param initialState initial state values

80

* @return JobGraph configured for keyed state testing

81

*/

82

protected <K, V> JobGraph createKeyedStateJob(Map<K, V> initialState);

83

}

84

```

85

86

### Non-Keyed Operator Restore Testing

87

88

Base class for testing restore scenarios with non-keyed (operator) state.

89

90

```java { .api }

91

/**

92

* Base class for testing non-keyed operator restore scenarios

93

*/

94

public abstract class AbstractNonKeyedOperatorRestoreTestBase

95

extends AbstractOperatorRestoreTestBase {

96

97

/**

98

* Test operator state restore with list state

99

* @param initialListState initial list state values

100

* @throws Exception if non-keyed restore test fails

101

*/

102

protected <T> void testListStateRestore(List<T> initialListState) throws Exception;

103

104

/**

105

* Test operator state restore with union list state

106

* @param initialUnionState initial union state values

107

* @throws Exception if union state restore test fails

108

*/

109

protected <T> void testUnionListStateRestore(List<T> initialUnionState) throws Exception;

110

111

/**

112

* Test operator state restore with broadcast state

113

* @param initialBroadcastState initial broadcast state

114

* @throws Exception if broadcast state restore test fails

115

*/

116

protected <K, V> void testBroadcastStateRestore(

117

Map<K, V> initialBroadcastState) throws Exception;

118

119

/**

120

* Validate operator state after restore

121

* @param expectedState expected operator state

122

* @param actualState actual restored operator state

123

* @return boolean indicating state validity

124

*/

125

protected boolean validateOperatorState(

126

OperatorState expectedState,

127

OperatorState actualState);

128

}

129

```

130

131

### State Backend Switch Testing

132

133

Framework for testing state backend switching scenarios and compatibility validation.

134

135

```java { .api }

136

/**

137

* Base class for testing state backend switching scenarios

138

*/

139

public abstract class SavepointStateBackendSwitchTestBase {

140

141

/**

142

* Test switching state backend while preserving state correctness

143

* @throws Exception if state backend switching test fails

144

*/

145

protected abstract void testSwitchingStateBackend() throws Exception;

146

147

/**

148

* Test switch from memory state backend to filesystem

149

* @param filesystemPath path for filesystem state backend

150

* @throws Exception if backend switch fails

151

*/

152

protected void testMemoryToFilesystemSwitch(String filesystemPath) throws Exception;

153

154

/**

155

* Test switch from filesystem state backend to RocksDB

156

* @param rocksDbPath path for RocksDB state backend

157

* @throws Exception if backend switch fails

158

*/

159

protected void testFilesystemToRocksDbSwitch(String rocksDbPath) throws Exception;

160

161

/**

162

* Test switch from RocksDB state backend back to memory

163

* @throws Exception if backend switch fails

164

*/

165

protected void testRocksDbToMemorySwitch() throws Exception;

166

167

/**

168

* Validate state consistency after backend switch

169

* @param originalState state before switch

170

* @param restoredState state after switch

171

* @return boolean indicating state consistency

172

*/

173

protected boolean validateStateConsistency(

174

StateSnapshot originalState,

175

StateSnapshot restoredState);

176

}

177

178

/**

179

* Specifications for state backend switching test scenarios

180

*/

181

public class BackendSwitchSpecs {

182

183

/**

184

* Specification for memory to filesystem switch

185

*/

186

public static class MemoryToFilesystemSpec {

187

/**

188

* Constructor for memory to filesystem switch spec

189

* @param targetPath filesystem path for state storage

190

* @param asyncSnapshot enable asynchronous snapshots

191

*/

192

public MemoryToFilesystemSpec(String targetPath, boolean asyncSnapshot);

193

194

public String getTargetPath();

195

public boolean isAsyncSnapshot();

196

}

197

198

/**

199

* Specification for filesystem to RocksDB switch

200

*/

201

public static class FilesystemToRocksDbSpec {

202

/**

203

* Constructor for filesystem to RocksDB switch spec

204

* @param rocksDbPath RocksDB storage path

205

* @param incrementalCheckpoints enable incremental checkpoints

206

*/

207

public FilesystemToRocksDbSpec(String rocksDbPath, boolean incrementalCheckpoints);

208

209

public String getRocksDbPath();

210

public boolean isIncrementalCheckpoints();

211

}

212

213

/**

214

* Create specification for complete backend switch test

215

* @param memoryToFs memory to filesystem spec

216

* @param fsToRocksDb filesystem to RocksDB spec

217

* @return CompleteSwitchSpec for full backend switching test

218

*/

219

public static CompleteSwitchSpec createCompleteSwitchSpec(

220

MemoryToFilesystemSpec memoryToFs,

221

FilesystemToRocksDbSpec fsToRocksDb);

222

}

223

```

224

225

### State Restore Utilities

226

227

Utility classes for common state restore testing operations and validation.

228

229

```java { .api }

230

/**

231

* Utility class for state restore testing operations

232

*/

233

public class StateRestoreTestUtils {

234

235

/**

236

* Create test job with configurable state for restore testing

237

* @param stateConfig state configuration parameters

238

* @return JobGraph configured for state restore testing

239

*/

240

public static JobGraph createStatefulTestJob(StateConfiguration stateConfig);

241

242

/**

243

* Execute job and create savepoint at specified interval

244

* @param jobGraph job to execute

245

* @param savepointIntervalMs interval between savepoints

246

* @return List of savepoint paths created

247

* @throws Exception if execution or savepoint creation fails

248

*/

249

public static List<String> executeAndCreateSavepoints(

250

JobGraph jobGraph,

251

long savepointIntervalMs) throws Exception;

252

253

/**

254

* Compare state snapshots for consistency validation

255

* @param snapshot1 first state snapshot

256

* @param snapshot2 second state snapshot

257

* @return StateComparisonResult containing comparison details

258

*/

259

public static StateComparisonResult compareStateSnapshots(

260

StateSnapshot snapshot1,

261

StateSnapshot snapshot2);

262

263

/**

264

* Extract state from running job for validation

265

* @param jobId identifier of running job

266

* @return StateSnapshot containing current job state

267

* @throws Exception if state extraction fails

268

*/

269

public static StateSnapshot extractJobState(JobID jobId) throws Exception;

270

}

271

272

/**

273

* Configuration for stateful test jobs

274

*/

275

public class StateConfiguration {

276

277

/**

278

* Constructor for state configuration

279

* @param keyedStateSize number of keyed state entries

280

* @param operatorStateSize size of operator state

281

* @param checkpointInterval checkpoint interval in milliseconds

282

*/

283

public StateConfiguration(

284

int keyedStateSize,

285

int operatorStateSize,

286

long checkpointInterval);

287

288

public int getKeyedStateSize();

289

public int getOperatorStateSize();

290

public long getCheckpointInterval();

291

}

292

293

/**

294

* Result of state snapshot comparison

295

*/

296

public class StateComparisonResult {

297

298

/**

299

* Check if state snapshots are identical

300

* @return boolean indicating state identity

301

*/

302

public boolean isIdentical();

303

304

/**

305

* Get differences between state snapshots

306

* @return List of StateDifference objects

307

*/

308

public List<StateDifference> getDifferences();

309

310

/**

311

* Get summary of comparison results

312

* @return String summary of state comparison

313

*/

314

public String getComparisonSummary();

315

}

316

```

317

318

**Usage Examples:**

319

320

```java

321

import org.apache.flink.test.state.operator.restore.*;

322

323

// Basic keyed state restore test

324

public class KeyedStateRestoreTest extends AbstractKeyedOperatorRestoreTestBase {

325

326

@Test

327

public void testSimpleKeyedStateRestore() throws Exception {

328

// Create initial state

329

Map<String, Integer> initialState = new HashMap<>();

330

initialState.put("key1", 100);

331

initialState.put("key2", 200);

332

initialState.put("key3", 300);

333

334

// Create job with keyed state

335

JobGraph job = createKeyedStateJob(initialState);

336

337

// Test restore scenario

338

testKeyedStateRestore(

339

StringSerializer.INSTANCE,

340

IntSerializer.INSTANCE);

341

}

342

343

@Override

344

protected void testRestore() throws Exception {

345

// Create savepoint

346

String savepointPath = createSavepoint(createKeyedStateJob(getTestState()));

347

348

// Modify job configuration

349

JobGraph modifiedJob = createModifiedKeyedStateJob();

350

351

// Restore and validate

352

restoreFromSavepoint(modifiedJob, savepointPath);

353

}

354

}

355

356

// Non-keyed state restore test

357

public class OperatorStateRestoreTest extends AbstractNonKeyedOperatorRestoreTestBase {

358

359

@Test

360

public void testListStateRestore() throws Exception {

361

List<String> initialListState = Arrays.asList("item1", "item2", "item3");

362

testListStateRestore(initialListState);

363

}

364

365

@Test

366

public void testUnionListStateRestore() throws Exception {

367

List<Integer> initialUnionState = Arrays.asList(1, 2, 3, 4, 5);

368

testUnionListStateRestore(initialUnionState);

369

}

370

371

@Test

372

public void testBroadcastStateRestore() throws Exception {

373

Map<String, String> initialBroadcastState = new HashMap<>();

374

initialBroadcastState.put("config1", "value1");

375

initialBroadcastState.put("config2", "value2");

376

377

testBroadcastStateRestore(initialBroadcastState);

378

}

379

}

380

381

// State backend switching test

382

public class StateBackendSwitchTest extends SavepointStateBackendSwitchTestBase {

383

384

@Test

385

public void testCompleteBackendSwitching() throws Exception {

386

testSwitchingStateBackend();

387

}

388

389

@Override

390

protected void testSwitchingStateBackend() throws Exception {

391

// Start with memory backend

392

configureStateBackend(new MemoryStateBackend());

393

394

// Create job and run briefly

395

JobGraph job = StateRestoreTestUtils.createStatefulTestJob(

396

new StateConfiguration(1000, 500, 5000L));

397

String memoryState = createSavepoint(job);

398

399

// Switch to filesystem backend

400

testMemoryToFilesystemSwitch("/tmp/flink-state");

401

402

// Switch to RocksDB backend

403

testFilesystemToRocksDbSwitch("/tmp/rocksdb-state");

404

405

// Switch back to memory backend

406

testRocksDbToMemorySwitch();

407

}

408

409

@Test

410

public void testBackendSwitchWithComplexState() throws Exception {

411

// Create specifications for backend switching

412

MemoryToFilesystemSpec memToFs = new BackendSwitchSpecs.MemoryToFilesystemSpec(

413

"/tmp/fs-state", true);

414

FilesystemToRocksDbSpec fsToRocks = new BackendSwitchSpecs.FilesystemToRocksDbSpec(

415

"/tmp/rocks-state", true);

416

417

BackendSwitchSpecs.CompleteSwitchSpec switchSpec =

418

BackendSwitchSpecs.createCompleteSwitchSpec(memToFs, fsToRocks);

419

420

// Execute complete switching test

421

executeBackendSwitchTest(switchSpec);

422

}

423

}

424

425

// Comprehensive state restore validation

426

public class StateRestoreValidationTest {

427

428

@Test

429

public void testStateConsistencyAcrossRestores() throws Exception {

430

StateConfiguration config = new StateConfiguration(5000, 1000, 2000L);

431

JobGraph job = StateRestoreTestUtils.createStatefulTestJob(config);

432

433

// Create multiple savepoints

434

List<String> savepoints = StateRestoreTestUtils.executeAndCreateSavepoints(

435

job, 10000L);

436

437

// Validate state consistency across savepoints

438

for (int i = 0; i < savepoints.size() - 1; i++) {

439

StateSnapshot snapshot1 = loadStateSnapshot(savepoints.get(i));

440

StateSnapshot snapshot2 = loadStateSnapshot(savepoints.get(i + 1));

441

442

StateComparisonResult comparison =

443

StateRestoreTestUtils.compareStateSnapshots(snapshot1, snapshot2);

444

445

// Validate progressive state changes

446

assertTrue(comparison.getDifferences().size() > 0);

447

assertFalse(comparison.isIdentical());

448

}

449

}

450

}

451

```