or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

class-loading-programs.mdindex.mdmigration-testing.mdperformance-testing.mdstate-management-testing.mdstreaming-utilities.mdtest-base-classes.mdtest-data-generation.md

state-management-testing.mddocs/

0

# State Management Testing

1

2

Framework for testing operator state restoration and migration, including utilities for both keyed and non-keyed state scenarios. This framework validates that streaming operators can correctly restore their state after failures, checkpoints, and version migrations.

3

4

## Capabilities

5

6

### Execution Mode Control

7

8

Enumeration controlling function behavior for different test stages.

9

10

```java { .api }

11

/**

12

* Execution mode enumeration for controlling test behavior

13

*/

14

public enum ExecutionMode {

15

/** Generate initial state and data */

16

GENERATE,

17

18

/** Migrate state to new format */

19

MIGRATE,

20

21

/** Restore from migrated state */

22

RESTORE

23

}

24

```

25

26

**Usage Example:**

27

28

```java

29

// Control test behavior based on execution mode

30

ExecutionMode mode = ExecutionMode.valueOf(args[0]);

31

32

switch (mode) {

33

case GENERATE:

34

// Generate initial state and create savepoint

35

runStateGenerationJob();

36

break;

37

case MIGRATE:

38

// Migrate state format if needed

39

runStateMigrationJob();

40

break;

41

case RESTORE:

42

// Restore from savepoint and verify

43

runStateRestorationJob();

44

break;

45

}

46

```

47

48

### Operator Restore Test Base Classes

49

50

Abstract base classes for testing operator state restoration scenarios.

51

52

```java { .api }

53

/**

54

* Abstract base class for testing operator state restoration

55

*/

56

public abstract class AbstractOperatorRestoreTestBase {

57

58

/**

59

* Create the streaming topology for state testing

60

* @param env StreamExecutionEnvironment to configure

61

* @param mode ExecutionMode determining test behavior

62

*/

63

protected abstract void createRestorationTopology(

64

StreamExecutionEnvironment env,

65

ExecutionMode mode);

66

67

/**

68

* Verify state restoration was successful

69

* @param mode ExecutionMode that was executed

70

* @throws Exception if verification fails

71

*/

72

protected abstract void verifyRestorationResult(ExecutionMode mode) throws Exception;

73

}

74

```

75

76

### Keyed Operator Restore Testing

77

78

Framework for testing keyed operator state restoration.

79

80

```java { .api }

81

/**

82

* Abstract base class for testing keyed operator state restoration

83

*/

84

public abstract class AbstractKeyedOperatorRestoreTestBase

85

extends AbstractOperatorRestoreTestBase {

86

87

/**

88

* Create stateful keyed operators for testing state restoration

89

* @param env StreamExecutionEnvironment to configure

90

* @param mode ExecutionMode determining behavior

91

*/

92

protected abstract void createKeyedStateTopology(

93

StreamExecutionEnvironment env,

94

ExecutionMode mode);

95

96

/**

97

* Verify keyed state was correctly restored

98

* @param expectedStateValues Expected state values after restoration

99

* @throws Exception if verification fails

100

*/

101

protected void verifyKeyedState(Map<String, Object> expectedStateValues) throws Exception;

102

}

103

104

/**

105

* Standalone job for testing keyed state migration

106

*/

107

public class KeyedJob {

108

109

/**

110

* Main entry point for keyed state migration testing

111

* @param args Command line arguments: [mode] [savepointPath] [checkpointDir]

112

* @throws Exception if job execution fails

113

*/

114

public static void main(String[] args) throws Exception;

115

}

116

```

117

118

### Non-Keyed Operator Restore Testing

119

120

Framework for testing non-keyed operator state restoration.

121

122

```java { .api }

123

/**

124

* Abstract base class for testing non-keyed operator state restoration

125

*/

126

public abstract class AbstractNonKeyedOperatorRestoreTestBase

127

extends AbstractOperatorRestoreTestBase {

128

129

/**

130

* Create stateful non-keyed operators for testing state restoration

131

* @param env StreamExecutionEnvironment to configure

132

* @param mode ExecutionMode determining behavior

133

*/

134

protected abstract void createNonKeyedStateTopology(

135

StreamExecutionEnvironment env,

136

ExecutionMode mode);

137

138

/**

139

* Verify non-keyed state was correctly restored

140

* @param expectedGlobalState Expected global state after restoration

141

* @throws Exception if verification fails

142

*/

143

protected void verifyNonKeyedState(Object expectedGlobalState) throws Exception;

144

}

145

146

/**

147

* Standalone job for testing non-keyed state migration

148

*/

149

public class NonKeyedJob {

150

151

/**

152

* Main entry point for non-keyed state migration testing

153

* @param args Command line arguments: [mode] [savepointPath] [checkpointDir]

154

* @throws Exception if job execution fails

155

*/

156

public static void main(String[] args) throws Exception;

157

}

158

```

159

160

### State Management Test Patterns

161

162

Common patterns for implementing state management tests:

163

164

**Basic Keyed State Test:**

165

166

```java

167

public class KeyedStateRestorationTest extends AbstractKeyedOperatorRestoreTestBase {

168

169

private TestListResultSink<Tuple2<String, Integer>> resultSink;

170

171

@Override

172

protected void createKeyedStateTopology(StreamExecutionEnvironment env, ExecutionMode mode) {

173

env.setParallelism(2);

174

env.enableCheckpointing(100);

175

176

resultSink = new TestListResultSink<>();

177

178

DataStream<String> input;

179

180

if (mode == ExecutionMode.GENERATE) {

181

// Generate test data and state

182

input = env.fromElements("key1", "key1", "key2", "key2", "key1");

183

} else {

184

// Use minimal input for restoration testing

185

input = env.fromElements("key1", "key2");

186

}

187

188

input.keyBy(value -> value)

189

.process(new StatefulKeyedProcessFunction(mode))

190

.addSink(resultSink);

191

}

192

193

@Override

194

protected void verifyRestorationResult(ExecutionMode mode) throws Exception {

195

List<Tuple2<String, Integer>> results = resultSink.getResult();

196

197

if (mode == ExecutionMode.RESTORE) {

198

// Verify state was correctly restored

199

Map<String, Integer> stateMap = results.stream()

200

.collect(Collectors.toMap(t -> t.f0, t -> t.f1));

201

202

assertEquals(3, stateMap.get("key1").intValue()); // Previous count + 1

203

assertEquals(2, stateMap.get("key2").intValue()); // Previous count + 1

204

}

205

}

206

207

@Test

208

public void testKeyedStateRestoration() throws Exception {

209

// Phase 1: Generate state and create savepoint

210

String savepointPath = runTestPhase(ExecutionMode.GENERATE);

211

212

// Phase 2: Restore from savepoint and verify

213

runTestPhase(ExecutionMode.RESTORE, savepointPath);

214

}

215

}

216

```

217

218

**Non-Keyed State Test:**

219

220

```java

221

public class NonKeyedStateRestorationTest extends AbstractNonKeyedOperatorRestoreTestBase {

222

223

private TestListResultSink<Long> resultSink;

224

225

@Override

226

protected void createNonKeyedStateTopology(StreamExecutionEnvironment env, ExecutionMode mode) {

227

env.setParallelism(1); // Non-keyed operators typically use parallelism 1

228

env.enableCheckpointing(50);

229

230

resultSink = new TestListResultSink<>();

231

232

DataStream<Integer> input;

233

234

if (mode == ExecutionMode.GENERATE) {

235

input = env.fromElements(1, 2, 3, 4, 5);

236

} else {

237

input = env.fromElements(6, 7); // Additional elements for restore test

238

}

239

240

input.process(new StatefulNonKeyedProcessFunction(mode))

241

.addSink(resultSink);

242

}

243

244

@Override

245

protected void verifyRestorationResult(ExecutionMode mode) throws Exception {

246

List<Long> results = resultSink.getResult();

247

248

if (mode == ExecutionMode.RESTORE) {

249

// Verify global state was restored (running sum should continue)

250

long finalSum = results.get(results.size() - 1);

251

assertEquals(28, finalSum); // 15 (previous) + 6 + 7 = 28

252

}

253

}

254

}

255

```

256

257

**State Migration Test:**

258

259

```java

260

public class StateMigrationTest extends AbstractKeyedOperatorRestoreTestBase {

261

262

@Override

263

protected void createKeyedStateTopology(StreamExecutionEnvironment env, ExecutionMode mode) {

264

env.setParallelism(4);

265

env.enableCheckpointing(100);

266

267

DataStream<String> source = env.addSource(new TestDataSource(mode));

268

269

if (mode == ExecutionMode.MIGRATE) {

270

// Use evolved state schema

271

source.keyBy(value -> value)

272

.process(new EvolvedStateProcessFunction())

273

.addSink(new DiscardingSink<>());

274

} else {

275

// Use original state schema

276

source.keyBy(value -> value)

277

.process(new OriginalStateProcessFunction())

278

.addSink(new DiscardingSink<>());

279

}

280

}

281

282

@Test

283

public void testStateSchemaMigration() throws Exception {

284

// Generate with original schema

285

String originalSavepoint = runTestPhase(ExecutionMode.GENERATE);

286

287

// Migrate to new schema

288

String migratedSavepoint = runTestPhase(ExecutionMode.MIGRATE, originalSavepoint);

289

290

// Restore with new schema

291

runTestPhase(ExecutionMode.RESTORE, migratedSavepoint);

292

}

293

}

294

```

295

296

**Standalone Job Pattern:**

297

298

```java

299

// Using KeyedJob for state migration testing

300

public class KeyedStateMigrationITCase {

301

302

@Test

303

public void testKeyedStateMigration() throws Exception {

304

String checkpointDir = tempFolder.newFolder("checkpoints").getAbsolutePath();

305

String savepointPath = null;

306

307

// Generate phase

308

String[] generateArgs = {

309

ExecutionMode.GENERATE.toString(),

310

"null", // no input savepoint

311

checkpointDir

312

};

313

314

KeyedJob.main(generateArgs);

315

316

// Find generated savepoint

317

savepointPath = findLatestSavepoint(checkpointDir);

318

assertNotNull("Savepoint should be generated", savepointPath);

319

320

// Restore phase

321

String[] restoreArgs = {

322

ExecutionMode.RESTORE.toString(),

323

savepointPath,

324

checkpointDir

325

};

326

327

KeyedJob.main(restoreArgs);

328

// Test passes if restoration completes without exception

329

}

330

}

331

```

332

333

**Complex State Evolution Test:**

334

335

```java

336

public class ComplexStateEvolutionTest extends AbstractKeyedOperatorRestoreTestBase {

337

338

@Override

339

protected void createKeyedStateTopology(StreamExecutionEnvironment env, ExecutionMode mode) {

340

env.setParallelism(2);

341

env.enableCheckpointing(100);

342

343

DataStream<Tuple2<String, Integer>> source = createTestSource(mode);

344

345

source.keyBy(value -> value.f0)

346

.process(new MultiStateProcessFunction(mode))

347

.addSink(new TestResultSink());

348

}

349

350

/**

351

* Process function with multiple state types for evolution testing

352

*/

353

private static class MultiStateProcessFunction

354

extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {

355

356

private ValueState<Integer> countState;

357

private ListState<String> historyState;

358

private MapState<String, Long> timestampState;

359

360

@Override

361

public void open(Configuration parameters) throws Exception {

362

// Initialize state descriptors

363

countState = getRuntimeContext().getState(

364

new ValueStateDescriptor<>("count", Integer.class));

365

historyState = getRuntimeContext().getListState(

366

new ListStateDescriptor<>("history", String.class));

367

timestampState = getRuntimeContext().getMapState(

368

new MapStateDescriptor<>("timestamps", String.class, Long.class));

369

}

370

371

@Override

372

public void processElement(

373

Tuple2<String, Integer> value,

374

Context ctx,

375

Collector<String> out) throws Exception {

376

377

// Update multiple state types

378

Integer currentCount = countState.value();

379

countState.update(currentCount == null ? 1 : currentCount + 1);

380

381

historyState.add(value.toString());

382

timestampState.put(value.f0, ctx.timestamp());

383

384

// Emit result

385

out.collect(String.format("Key: %s, Count: %d",

386

value.f0, countState.value()));

387

}

388

}

389

}

390

```

391

392

This state management testing framework ensures that Flink operators correctly maintain and restore their state across various failure and migration scenarios, providing confidence in the reliability of stateful streaming applications.