or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cancellation-testing.mdcheckpointing-migration.mdfault-tolerance-recovery.mdindex.mdoperator-lifecycle.mdplugin-testing.mdruntime-utilities.mdsession-window-testing.mdstate-backend-restore.mdtest-data-utilities.md

fault-tolerance-recovery.mddocs/

0

# Fault Tolerance and Recovery Testing

1

2

Multiple failure injection mechanisms and recovery testing utilities for validating Flink's fault tolerance capabilities. This framework enables testing of job recovery scenarios, restart strategies, and failure handling behavior.

3

4

## Capabilities

5

6

### Simple Recovery Test Base

7

8

Abstract base class for testing job recovery scenarios with configurable failure injection and restart validation.

9

10

```java { .api }

11

/**

12

* Base class for testing job recovery scenarios

13

*/

14

public abstract class SimpleRecoveryITCaseBase {

15

16

/**

17

* Run job with controlled cancellation for recovery testing

18

* @param jobGraph JobGraph to execute and cancel

19

* @throws Exception if job execution or cancellation fails

20

*/

21

protected void runAndCancelJob(JobGraph jobGraph) throws Exception;

22

23

/**

24

* Failing mapper function that fails after processing a specified number of elements

25

*/

26

public static class FailingMapper1 implements MapFunction<Integer, Integer> {

27

/**

28

* Constructor for failing mapper

29

* @param failAfterElements number of elements to process before failing

30

*/

31

public FailingMapper1(int failAfterElements);

32

33

@Override

34

public Integer map(Integer value) throws Exception;

35

}

36

37

/**

38

* Alternative failing mapper with different failure patterns

39

*/

40

public static class FailingMapper2 implements MapFunction<Integer, Integer> {

41

/**

42

* Constructor for alternative failing mapper

43

* @param failAfterElements number of elements before failure

44

*/

45

public FailingMapper2(int failAfterElements);

46

47

@Override

48

public Integer map(Integer value) throws Exception;

49

}

50

51

/**

52

* Third variant of failing mapper for complex failure scenarios

53

*/

54

public static class FailingMapper3 implements MapFunction<Integer, Integer> {

55

/**

56

* Constructor for third failing mapper variant

57

* @param failAfterElements number of elements before failure

58

*/

59

public FailingMapper3(int failAfterElements);

60

61

@Override

62

public Integer map(Integer value) throws Exception;

63

}

64

}

65

```

66

67

### Stream Fault Tolerance Test Base

68

69

Comprehensive parameterized base class for testing stream processing fault tolerance with checkpointing, failure injection, and recovery validation.

70

71

```java { .api }

72

/**

73

* Parameterized base class for comprehensive fault tolerance testing in streaming scenarios

74

*/

75

@RunWith(Parameterized.class)

76

public abstract class StreamFaultToleranceTestBase {

77

78

/** Default parallelism for fault tolerance tests */

79

public static final int PARALLELISM = 12;

80

/** Number of task managers for test cluster */

81

public static final int NUM_TASK_MANAGERS = 3;

82

/** Number of task slots per task manager */

83

public static final int NUM_TASK_SLOTS = 4;

84

85

/**

86

* Enumeration of available failover strategies for testing

87

*/

88

public enum FailoverStrategy {

89

RestartAllFailoverStrategy,

90

RestartPipelinedRegionFailoverStrategy

91

}

92

93

/**

94

* POJO for counting prefixed values in fault tolerance tests

95

*/

96

public static class PrefixCount {

97

/** Prefix string */

98

public String prefix;

99

/** Integer value */

100

public Integer value;

101

/** Count of occurrences */

102

public Long count;

103

104

/**

105

* Default constructor for PrefixCount

106

*/

107

public PrefixCount();

108

109

/**

110

* Constructor with field initialization

111

* @param prefix prefix string

112

* @param value integer value

113

* @param count occurrence count

114

*/

115

public PrefixCount(String prefix, Integer value, Long count);

116

}

117

118

/**

119

* Current failover strategy being tested

120

*/

121

protected final FailoverStrategy failoverStrategy;

122

123

/**

124

* Constructor for parameterized test

125

* @param failoverStrategy failover strategy to test

126

*/

127

public StreamFaultToleranceTestBase(FailoverStrategy failoverStrategy);

128

129

/**

130

* Abstract method to define the test program topology

131

* @param env StreamExecutionEnvironment for building the job

132

* @return DataStream representing the final result

133

* @throws Exception if program construction fails

134

*/

135

public abstract DataStream<PrefixCount> testProgram(StreamExecutionEnvironment env) throws Exception;

136

137

/**

138

* Abstract method for post-submission actions and validation

139

* @throws Exception if post-submission actions fail

140

*/

141

public abstract void postSubmit() throws Exception;

142

143

/**

144

* Run the complete checkpointed program with fault injection

145

* @throws Exception if test execution fails

146

*/

147

public void runCheckpointedProgram() throws Exception;

148

149

/**

150

* Get parameters for parameterized testing of different failover strategies

151

* @return Collection of failover strategy parameters

152

*/

153

@Parameterized.Parameters(name = "Failover strategy: {0}")

154

public static Collection<FailoverStrategy[]> parameters();

155

156

/**

157

* Create test environment configuration with fault tolerance settings

158

* @return Configuration for test environment

159

*/

160

protected Configuration createTestConfiguration();

161

162

/**

163

* Trigger failure in the running job for fault tolerance testing

164

* @param jobId JobID of the running job

165

* @throws Exception if failure triggering fails

166

*/

167

protected void triggerFailure(JobID jobId) throws Exception;

168

169

/**

170

* Wait for job to reach running state

171

* @param jobId JobID to monitor

172

* @param timeout timeout in milliseconds

173

* @throws Exception if job doesn't reach running state within timeout

174

*/

175

protected void waitForJobRunning(JobID jobId, long timeout) throws Exception;

176

177

/**

178

* Validate checkpointing behavior

179

* @param jobId JobID to validate

180

* @return boolean indicating if checkpointing is working correctly

181

* @throws Exception if validation fails

182

*/

183

protected boolean validateCheckpointing(JobID jobId) throws Exception;

184

}

185

```

186

187

### Restart Strategy Test Bases

188

189

Abstract base classes for testing different restart strategies with Flink's fault tolerance mechanisms.

190

191

```java { .api }

192

/**

193

* Base class for testing fixed delay restart strategy

194

*/

195

public abstract class SimpleRecoveryFixedDelayRestartStrategyITBase

196

extends SimpleRecoveryITCaseBase {

197

198

/**

199

* Test recovery with fixed delay between restart attempts

200

* @param delayMs delay in milliseconds between restarts

201

* @param maxAttempts maximum number of restart attempts

202

* @throws Exception if test execution fails

203

*/

204

protected void testFixedDelayRestart(long delayMs, int maxAttempts) throws Exception;

205

}

206

207

/**

208

* Base class for testing exponential delay restart strategy

209

*/

210

public abstract class SimpleRecoveryExponentialDelayRestartStrategyITBase

211

extends SimpleRecoveryITCaseBase {

212

213

/**

214

* Test recovery with exponential backoff delay

215

* @param initialDelayMs initial delay in milliseconds

216

* @param maxDelayMs maximum delay in milliseconds

217

* @param backoffMultiplier multiplier for exponential backoff

218

* @throws Exception if test execution fails

219

*/

220

protected void testExponentialDelayRestart(

221

long initialDelayMs,

222

long maxDelayMs,

223

double backoffMultiplier) throws Exception;

224

}

225

226

/**

227

* Base class for testing failure rate restart strategy

228

*/

229

public abstract class SimpleRecoveryFailureRateStrategyITBase

230

extends SimpleRecoveryITCaseBase {

231

232

/**

233

* Test recovery based on failure rate thresholds

234

* @param maxFailuresPerInterval maximum failures allowed per time interval

235

* @param failureRateIntervalMs time interval for failure rate calculation

236

* @param delayMs delay between restart attempts

237

* @throws Exception if test execution fails

238

*/

239

protected void testFailureRateRestart(

240

int maxFailuresPerInterval,

241

long failureRateIntervalMs,

242

long delayMs) throws Exception;

243

}

244

```

245

246

### Recovery Test Utilities

247

248

Utility classes providing common recovery testing functionality and helper methods.

249

250

```java { .api }

251

/**

252

* Utility class for recovery testing scenarios

253

*/

254

public class RecoveryTestUtils {

255

256

/**

257

* Create job graph with configurable failure injection

258

* @param sourceParallelism parallelism for source operator

259

* @param mapParallelism parallelism for map operator

260

* @param failAfterElements elements to process before failure

261

* @return JobGraph configured for recovery testing

262

*/

263

public static JobGraph createJobWithFailure(

264

int sourceParallelism,

265

int mapParallelism,

266

int failAfterElements);

267

268

/**

269

* Validate job recovery metrics and behavior

270

* @param jobExecutionResult result from job execution

271

* @param expectedRestarts expected number of restarts

272

* @return boolean indicating if recovery behavior is correct

273

*/

274

public static boolean validateRecoveryBehavior(

275

JobExecutionResult jobExecutionResult,

276

int expectedRestarts);

277

}

278

```

279

280

**Usage Examples:**

281

282

```java

283

import org.apache.flink.test.recovery.SimpleRecoveryITCaseBase;

284

import org.apache.flink.test.recovery.SimpleRecoveryFixedDelayRestartStrategyITBase;

285

import org.apache.flink.test.recovery.utils.RecoveryTestUtils;

286

287

// Basic recovery test

288

public class JobRecoveryTest extends SimpleRecoveryITCaseBase {

289

290

@Test

291

public void testSimpleJobRecovery() throws Exception {

292

// Create job with failing mapper

293

JobGraph job = new JobGraph();

294

295

// Add source

296

JobVertex source = new JobVertex("source");

297

source.setInvokableClass(NumberSequenceSource.class);

298

source.setParallelism(1);

299

job.addVertex(source);

300

301

// Add failing mapper

302

JobVertex mapper = new JobVertex("mapper");

303

mapper.setInvokableClass(FailingMapper1.class);

304

mapper.getConfiguration().setInteger("fail-after", 50);

305

mapper.setParallelism(2);

306

job.addVertex(mapper);

307

308

// Connect vertices

309

mapper.connectNewDataSetAsInput(source, DistributionPattern.REBALANCE);

310

311

// Test recovery

312

runAndCancelJob(job);

313

}

314

315

@Test

316

public void testMultipleFailureRecovery() throws Exception {

317

JobGraph job = RecoveryTestUtils.createJobWithFailure(1, 2, 30);

318

319

// Configure restart strategy

320

job.getJobConfiguration().setString(

321

"restart-strategy", "fixed-delay");

322

job.getJobConfiguration().setString(

323

"restart-strategy.fixed-delay.attempts", "3");

324

job.getJobConfiguration().setString(

325

"restart-strategy.fixed-delay.delay", "1s");

326

327

JobExecutionResult result = runJobWithExpectedFailures(job);

328

329

// Validate recovery behavior

330

assertTrue(RecoveryTestUtils.validateRecoveryBehavior(result, 2));

331

}

332

}

333

334

// Fixed delay restart strategy test

335

public class FixedDelayRecoveryTest extends SimpleRecoveryFixedDelayRestartStrategyITBase {

336

337

@Test

338

public void testFixedDelayStrategy() throws Exception {

339

// Test with 2 second delay, maximum 3 attempts

340

testFixedDelayRestart(2000L, 3);

341

}

342

343

@Test

344

public void testFixedDelayWithQuickRecovery() throws Exception {

345

// Test with 500ms delay for quick recovery scenarios

346

testFixedDelayRestart(500L, 5);

347

}

348

}

349

350

// Comprehensive recovery testing

351

public class ComprehensiveRecoveryTest extends SimpleRecoveryITCaseBase {

352

353

@Test

354

public void testCascadingFailures() throws Exception {

355

JobGraph job = new JobGraph();

356

357

// Chain multiple failing mappers

358

JobVertex source = createSourceVertex();

359

JobVertex mapper1 = createMapperVertex(new FailingMapper1(20));

360

JobVertex mapper2 = createMapperVertex(new FailingMapper2(40));

361

JobVertex mapper3 = createMapperVertex(new FailingMapper3(60));

362

JobVertex sink = createSinkVertex();

363

364

// Connect in chain

365

mapper1.connectNewDataSetAsInput(source, DistributionPattern.FORWARD);

366

mapper2.connectNewDataSetAsInput(mapper1, DistributionPattern.FORWARD);

367

mapper3.connectNewDataSetAsInput(mapper2, DistributionPattern.FORWARD);

368

sink.connectNewDataSetAsInput(mapper3, DistributionPattern.FORWARD);

369

370

job.addVertex(source);

371

job.addVertex(mapper1);

372

job.addVertex(mapper2);

373

job.addVertex(mapper3);

374

job.addVertex(sink);

375

376

// Test complex recovery scenario

377

runAndCancelJob(job);

378

}

379

}

380

```