or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdrecovery-fault-tolerance.mdscala-testing.mdstate-migration.mdtest-base-classes.mdtest-utilities.md

recovery-fault-tolerance.mddocs/

0

# Recovery and Fault Tolerance

1

2

Testing infrastructure for job recovery scenarios, restart strategies, failure simulation, and fault tolerance validation. This includes support for TaskManager process failures, coordinated failure injection, and comprehensive recovery testing patterns.

3

4

## Capabilities

5

6

### SimpleRecoveryITCaseBase

7

8

Base class for testing job recovery scenarios with failure simulation and restart strategy validation.

9

10

```java { .api }

11

/**

12

* Base class for testing job recovery scenarios

13

* Provides infrastructure for testing failed runs followed by successful runs

14

* Tests restart strategies and multiple failure scenarios

15

*/

16

public abstract class SimpleRecoveryITCaseBase {

17

18

/**

19

* Create execution plan that will fail during execution

20

* Implementation should define a plan that fails at a predictable point

21

* @return Plan that will fail during execution

22

*/

23

protected abstract Plan getFailingPlan();

24

25

/**

26

* Create execution plan that should succeed after recovery

27

* Implementation should define a plan that completes successfully

28

* @return Plan that will succeed

29

*/

30

protected abstract Plan getSuccessfulPlan();

31

32

/**

33

* Execute the complete recovery test cycle

34

* 1. Run failing plan and verify failure

35

* 2. Run successful plan and verify completion

36

*/

37

@Test

38

public void testRecovery() throws Exception;

39

40

/**

41

* Test restart strategy with multiple failures

42

* Verifies that jobs can recover from multiple consecutive failures

43

*/

44

@Test

45

public void testMultipleFailures() throws Exception;

46

}

47

```

48

49

**Usage Example:**

50

51

```java

52

public class MyRecoveryTest extends SimpleRecoveryITCaseBase {

53

54

@Override

55

protected Plan getFailingPlan() {

56

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

57

env.fromElements(1, 2, 3, 4, 5)

58

.map(new FailingMapper()) // Fails on specific element

59

.output(new DiscardingOutputFormat<>());

60

return env.createProgramPlan("Failing Plan");

61

}

62

63

@Override

64

protected Plan getSuccessfulPlan() {

65

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

66

env.fromElements(1, 2, 3, 4, 5)

67

.map(new SuccessfulMapper()) // Completes successfully

68

.output(new DiscardingOutputFormat<>());

69

return env.createProgramPlan("Successful Plan");

70

}

71

72

private static class FailingMapper implements MapFunction<Integer, Integer> {

73

@Override

74

public Integer map(Integer value) throws Exception {

75

if (value == 3) {

76

throw new RuntimeException("Simulated failure");

77

}

78

return value * 2;

79

}

80

}

81

82

private static class SuccessfulMapper implements MapFunction<Integer, Integer> {

83

@Override

84

public Integer map(Integer value) throws Exception {

85

return value * 2;

86

}

87

}

88

}

89

```

90

91

### SimpleRecoveryFixedDelayRestartStrategyITBase

92

93

Specialized base class for testing fixed delay restart strategy scenarios.

94

95

```java { .api }

96

/**

97

* Base class for testing fixed delay restart strategy

98

* Configures cluster with fixed delay restart strategy and tests recovery behavior

99

*/

100

public abstract class SimpleRecoveryFixedDelayRestartStrategyITBase extends SimpleRecoveryITCaseBase {

101

102

/**

103

* Get restart strategy configuration for fixed delay testing

104

* @return RestartStrategy configuration with fixed delay

105

*/

106

protected abstract RestartStrategies.RestartStrategyConfiguration getRestartStrategy();

107

108

/**

109

* Get expected number of restart attempts

110

* @return Number of expected restart attempts

111

*/

112

protected abstract int getExpectedRestartAttempts();

113

}

114

```

115

116

### SimpleRecoveryFailureRateStrategyITBase

117

118

Specialized base class for testing failure rate restart strategy scenarios.

119

120

```java { .api }

121

/**

122

* Base class for testing failure rate restart strategy

123

* Configures cluster with failure rate restart strategy and tests recovery behavior

124

*/

125

public abstract class SimpleRecoveryFailureRateStrategyITBase extends SimpleRecoveryITCaseBase {

126

127

/**

128

* Get restart strategy configuration for failure rate testing

129

* @return RestartStrategy configuration with failure rate limits

130

*/

131

protected abstract RestartStrategies.RestartStrategyConfiguration getRestartStrategy();

132

133

/**

134

* Get failure rate configuration

135

* @return Failure rate configuration (failures per time interval)

136

*/

137

protected abstract FailureRateRestartStrategyConfiguration getFailureRateConfig();

138

}

139

```

140

141

### AbstractTaskManagerProcessFailureRecoveryTest

142

143

Base class for testing TaskManager process failures and recovery with actual JVM process spawning.

144

145

```java { .api }

146

/**

147

* Base class for testing TaskManager process failures and recovery

148

* Spawns actual JVM processes for TaskManagers and coordinates failure scenarios

149

*/

150

public abstract class AbstractTaskManagerProcessFailureRecoveryTest {

151

152

// File-based coordination constants

153

protected static final String READY_MARKER_FILE_PREFIX = "ready-";

154

protected static final String PROCEED_MARKER_FILE = "proceed";

155

protected static final String FINISH_MARKER_FILE_PREFIX = "finish-";

156

157

/**

158

* Create and submit job that will experience TaskManager failure

159

* @return JobGraph for the job that will experience failure

160

*/

161

protected abstract JobGraph createJobGraph();

162

163

/**

164

* Trigger TaskManager process failure at appropriate time

165

* Implementation should coordinate with job execution to trigger failure

166

*/

167

protected abstract void triggerTaskManagerFailure() throws Exception;

168

169

/**

170

* Verify job recovery after TaskManager failure

171

* Implementation should validate that job recovered successfully

172

*/

173

protected abstract void verifyRecovery() throws Exception;

174

175

/**

176

* Execute complete TaskManager failure and recovery test

177

* 1. Start job execution

178

* 2. Trigger TaskManager failure

179

* 3. Verify job recovery

180

*/

181

@Test

182

public void testTaskManagerFailureRecovery() throws Exception;

183

184

/**

185

* Create ready marker file for coordination

186

* @param taskManagerId ID of TaskManager that is ready

187

*/

188

protected void createReadyMarker(String taskManagerId) throws IOException;

189

190

/**

191

* Wait for proceed marker file

192

* TaskManager processes wait for this signal to continue

193

*/

194

protected void waitForProceedMarker() throws InterruptedException;

195

196

/**

197

* Create finish marker file

198

* @param taskManagerId ID of TaskManager that finished

199

*/

200

protected void createFinishMarker(String taskManagerId) throws IOException;

201

}

202

```

203

204

**Usage Example:**

205

206

```java

207

public class TaskManagerFailureTest extends AbstractTaskManagerProcessFailureRecoveryTest {

208

209

@Override

210

protected JobGraph createJobGraph() {

211

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

212

env.setParallelism(4);

213

env.enableCheckpointing(1000);

214

215

env.addSource(new ContinuousSource())

216

.keyBy(x -> x % 4)

217

.map(new StatefulProcessingFunction())

218

.addSink(new CheckpointedSink<>());

219

220

return env.getStreamGraph().getJobGraph();

221

}

222

223

@Override

224

protected void triggerTaskManagerFailure() throws Exception {

225

// Wait for job to be running

226

Thread.sleep(5000);

227

228

// Kill one TaskManager process

229

ProcessHandle.allProcesses()

230

.filter(p -> p.info().command().orElse("").contains("TaskManager"))

231

.findFirst()

232

.ifPresent(ProcessHandle::destroyForcibly);

233

}

234

235

@Override

236

protected void verifyRecovery() throws Exception {

237

// Wait for job to recover

238

Thread.sleep(10000);

239

240

// Verify job is running with all TaskManagers

241

ClusterClient<?> client = miniCluster.getClusterClient();

242

CompletableFuture<Collection<TaskManagerInfo>> taskManagersFuture =

243

client.listTaskManagers();

244

245

Collection<TaskManagerInfo> taskManagers = taskManagersFuture.get();

246

assertEquals("All TaskManagers should be running", 2, taskManagers.size());

247

}

248

}

249

```

250

251

## Failure Simulation Patterns

252

253

### Coordinated Failure Pattern

254

255

Using file-based coordination for precise failure timing:

256

257

```java

258

public class CoordinatedFailureTest extends AbstractTaskManagerProcessFailureRecoveryTest {

259

260

@Override

261

protected JobGraph createJobGraph() {

262

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

263

264

env.addSource(new SourceFunction<Integer>() {

265

@Override

266

public void run(SourceContext<Integer> ctx) throws Exception {

267

// Signal readiness

268

createReadyMarker("source-tm");

269

270

// Wait for proceed signal

271

waitForProceedMarker();

272

273

// Generate data

274

for (int i = 0; i < 1000; i++) {

275

ctx.collect(i);

276

Thread.sleep(10);

277

}

278

279

// Signal completion

280

createFinishMarker("source-tm");

281

}

282

283

@Override

284

public void cancel() {}

285

}).addSink(new DiscardingSink<>());

286

287

return env.getStreamGraph().getJobGraph();

288

}

289

}

290

```

291

292

### Exception-based Failure Pattern

293

294

Using exceptions for controlled failure simulation:

295

296

```java

297

public class ExceptionFailureTest extends SimpleRecoveryITCaseBase {

298

299

private static class ControlledFailureMapper implements MapFunction<Integer, Integer> {

300

private static volatile boolean shouldFail = true;

301

private int failureCount = 0;

302

303

@Override

304

public Integer map(Integer value) throws Exception {

305

if (shouldFail && failureCount < 3) {

306

failureCount++;

307

throw new RuntimeException("Controlled failure #" + failureCount);

308

}

309

shouldFail = false; // Stop failing after 3 attempts

310

return value * 2;

311

}

312

}

313

314

@Override

315

protected Plan getFailingPlan() {

316

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

317

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 1000));

318

319

env.fromElements(1, 2, 3, 4, 5)

320

.map(new ControlledFailureMapper())

321

.output(new DiscardingOutputFormat<>());

322

323

return env.createProgramPlan();

324

}

325

}

326

```

327

328

## Recovery Validation Patterns

329

330

### State Consistency Validation

331

332

Ensuring state remains consistent across failures:

333

334

```java

335

public class StateConsistencyTest extends StreamFaultToleranceTestBase {

336

337

@Override

338

public void testProgram(StreamExecutionEnvironment env) {

339

env.addSource(new CheckpointedCountingSource())

340

.keyBy(x -> x % 4)

341

.map(new StatefulCounter()) // Maintains count state

342

.addSink(new ValidatingCountSink());

343

}

344

345

@Override

346

public void postSubmit() throws Exception {

347

// Verify final counts are consistent

348

Map<Integer, Long> finalCounts = ValidatingCountSink.getFinalCounts();

349

350

for (Map.Entry<Integer, Long> entry : finalCounts.entrySet()) {

351

assertTrue("Count should be positive", entry.getValue() > 0);

352

assertTrue("Count should be reasonable", entry.getValue() < 10000);

353

}

354

}

355

}

356

```

357

358

### Checkpoint Validation

359

360

Verifying checkpoint behavior during failures:

361

362

```java

363

public class CheckpointValidationTest extends StreamFaultToleranceTestBase {

364

365

@Override

366

public void testProgram(StreamExecutionEnvironment env) {

367

env.enableCheckpointing(500); // Checkpoint every 500ms

368

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

369

370

env.addSource(new FailureInjectingSource())

371

.map(new CheckpointValidatingFunction())

372

.addSink(new CheckpointTrackingSink<>());

373

}

374

375

@Override

376

public void postSubmit() throws Exception {

377

List<Long> checkpointIds = CheckpointTrackingSink.getObservedCheckpoints();

378

379

// Verify checkpoints were created

380

assertFalse("Should have observed checkpoints", checkpointIds.isEmpty());

381

382

// Verify checkpoint IDs are increasing

383

for (int i = 1; i < checkpointIds.size(); i++) {

384

assertTrue("Checkpoint IDs should increase",

385

checkpointIds.get(i) > checkpointIds.get(i-1));

386

}

387

}

388

}

389

```

390

391

## Restart Strategy Testing

392

393

### Fixed Delay Restart Testing

394

395

```java

396

public class FixedDelayRestartTest extends SimpleRecoveryFixedDelayRestartStrategyITBase {

397

398

@Override

399

protected RestartStrategies.RestartStrategyConfiguration getRestartStrategy() {

400

return RestartStrategies.fixedDelayRestart(3, 1000); // 3 attempts, 1 second delay

401

}

402

403

@Override

404

protected int getExpectedRestartAttempts() {

405

return 3;

406

}

407

408

@Override

409

protected Plan getFailingPlan() {

410

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

411

env.setRestartStrategy(getRestartStrategy());

412

413

env.fromElements(1, 2, 3)

414

.map(new AlwaysFailingMapper()) // Always fails

415

.output(new DiscardingOutputFormat<>());

416

417

return env.createProgramPlan();

418

}

419

}

420

```

421

422

### Failure Rate Restart Testing

423

424

```java

425

public class FailureRateRestartTest extends SimpleRecoveryFailureRateStrategyITBase {

426

427

@Override

428

protected RestartStrategies.RestartStrategyConfiguration getRestartStrategy() {

429

return RestartStrategies.failureRateRestart(

430

3, // max failures

431

Time.minutes(1), // within 1 minute

432

Time.seconds(5) // delay between restarts

433

);

434

}

435

436

@Override

437

protected Plan getFailingPlan() {

438

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

439

env.setRestartStrategy(getRestartStrategy());

440

441

env.fromElements(1, 2, 3, 4, 5)

442

.map(new RateLimitedFailingMapper()) // Fails at controlled rate

443

.output(new DiscardingOutputFormat<>());

444

445

return env.createProgramPlan();

446

}

447

}

448

```

449

450

## Error Handling Patterns

451

452

### Graceful Degradation

453

454

Testing graceful handling of partial failures:

455

456

```java

457

env.addSource(new RobustSource())

458

.map(new FaultTolerantMapper()) // Handles individual record failures

459

.filter(Objects::nonNull) // Filter out failed records

460

.addSink(new ResilientSink<>());

461

```

462

463

### Failure Isolation

464

465

Ensuring failures in one operator don't affect others unnecessarily:

466

467

```java

468

DataStream<Integer> mainStream = env.addSource(new ReliableSource());

469

470

// Separate processing branches with different failure characteristics

471

DataStream<Integer> criticalPath = mainStream

472

.filter(x -> x % 2 == 0)

473

.map(new CriticalProcessor()); // Must not fail

474

475

DataStream<Integer> bestEffortPath = mainStream

476

.filter(x -> x % 2 == 1)

477

.map(new BestEffortProcessor()); // Can tolerate failures

478

479

criticalPath.addSink(new GuaranteedSink<>());

480

bestEffortPath.addSink(new BestEffortSink<>());

481

```