or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

api-completeness.mdcheckpointing.mddata-generation.mdexecution.mdfault-tolerance.mdindex.mdstreaming.md

fault-tolerance.mddocs/

0

# Fault Tolerance

1

2

Base classes and utilities for testing job cancellation, task failure recovery, and fault tolerance mechanisms. Provides controlled failure injection and recovery validation for comprehensive resilience testing.

3

4

## Core Base Classes

5

6

### CancelingTestBase

7

8

Abstract base class providing infrastructure for testing job cancellation scenarios with cluster management and execution control.

9

10

```java { .api }

11

public abstract class CancelingTestBase {

12

protected LocalFlinkMiniCluster cluster;

13

protected Configuration config;

14

15

@Before

16

public void setup() throws Exception;

17

18

@After

19

public void cleanup() throws Exception;

20

21

// Cluster lifecycle management

22

protected void startCluster() throws Exception;

23

protected void stopCluster() throws Exception;

24

25

// Job execution and cancellation

26

protected void runAndCancelJob(JobGraph jobGraph, int msecsTillCanceling) throws Exception;

27

protected void runAndCancelJob(JobGraph jobGraph, int msecsTillCanceling, boolean waitForCancel) throws Exception;

28

29

// Abstract methods for test implementation

30

protected abstract void testProgram(ExecutionEnvironment env);

31

protected abstract JobGraph getJobGraph() throws Exception;

32

}

33

```

34

35

### SimpleRecoveryITCaseBase

36

37

Abstract base class for testing task failure recovery scenarios with multi-attempt execution and failure injection.

38

39

```java { .api }

40

public abstract class SimpleRecoveryITCaseBase {

41

protected LocalFlinkMiniCluster cluster;

42

protected Configuration config;

43

protected int parallelism;

44

45

@Before

46

public void setup() throws Exception;

47

48

@After

49

public void cleanup() throws Exception;

50

51

// Recovery testing workflow

52

protected void execute() throws Exception;

53

protected void preSubmit() throws Exception;

54

protected void postSubmit() throws Exception;

55

56

// Abstract test program definition

57

protected abstract void testProgram(ExecutionEnvironment env);

58

}

59

```

60

61

## Recovery Execution Environment

62

63

### RecoveryITCaseBase

64

65

Extended base class providing additional recovery testing capabilities with configurable parallelism and failure scenarios.

66

67

```java { .api }

68

public abstract class RecoveryITCaseBase extends SimpleRecoveryITCaseBase {

69

protected int numTaskManagers;

70

protected int slotsPerTaskManager;

71

72

public RecoveryITCaseBase();

73

public RecoveryITCaseBase(Configuration config, int parallelism);

74

75

// Extended setup with custom configuration

76

protected void setupCluster(Configuration config, int numTaskManagers, int slotsPerTaskManager) throws Exception;

77

78

// Failure injection utilities

79

protected void injectTaskFailure(JobID jobId, int taskIndex) throws Exception;

80

protected void waitForRecovery(JobID jobId) throws Exception;

81

}

82

```

83

84

## Cancellation Testing Utilities

85

86

### CancelableInfiniteInputFormat

87

88

Input format that generates infinite data streams for cancellation testing.

89

90

```java { .api }

91

public class CancelableInfiniteInputFormat extends GenericInputFormat<Integer> {

92

private volatile boolean canceled;

93

94

public CancelableInfiniteInputFormat();

95

96

@Override

97

public boolean reachedEnd();

98

99

@Override

100

public Integer nextRecord(Integer reuse);

101

102

@Override

103

public void cancel();

104

}

105

```

106

107

### SlowlyDeserializingInputFormat

108

109

Input format with controllable deserialization delays for timeout and cancellation testing.

110

111

```java { .api }

112

public class SlowlyDeserializingInputFormat extends GenericInputFormat<Integer> {

113

private long deserializationDelay;

114

private int elementsToReturn;

115

116

public SlowlyDeserializingInputFormat(long deserializationDelay, int elementsToReturn);

117

118

@Override

119

public boolean reachedEnd();

120

121

@Override

122

public Integer nextRecord(Integer reuse);

123

}

124

```

125

126

## Recovery Testing Functions

127

128

### FailingMapper

129

130

MapFunction that intentionally fails after processing a specified number of elements.

131

132

```java { .api }

133

public class FailingMapper<T> implements MapFunction<T, T> {

134

private int failAfterElements;

135

private static volatile int processedElements;

136

137

public FailingMapper(int failAfterElements);

138

139

@Override

140

public T map(T value) throws Exception;

141

142

public static void reset();

143

}

144

```

145

146

### RecoveringFunction

147

148

Base class for functions that track failure and recovery across restart attempts.

149

150

```java { .api }

151

public abstract class RecoveringFunction {

152

protected static volatile int attemptNumber;

153

protected static volatile boolean hasFailed;

154

155

protected void trackAttempt();

156

protected boolean shouldFail();

157

protected void simulateFailure() throws Exception;

158

159

public static void reset();

160

public static int getAttemptNumber();

161

}

162

```

163

164

## Usage Examples

165

166

### Job Cancellation Testing

167

168

```java

169

public class MyCancellationTest extends CancelingTestBase {

170

171

@Test

172

public void testJobCancellation() throws Exception {

173

JobGraph jobGraph = getJobGraph();

174

175

// Run job and cancel after 5 seconds

176

runAndCancelJob(jobGraph, 5000);

177

178

// Verify clean cancellation

179

assertTrue("Job should be cancelled", jobWasCancelled);

180

}

181

182

@Override

183

protected void testProgram(ExecutionEnvironment env) {

184

// Create a long-running job that can be cancelled

185

env.createInput(new CancelableInfiniteInputFormat())

186

.map(x -> x * 2)

187

.map(new SlowProcessingMapper()) // Adds processing delay

188

.writeAsText("/tmp/cancellation-test-output");

189

}

190

191

@Override

192

protected JobGraph getJobGraph() throws Exception {

193

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

194

env.setParallelism(4);

195

testProgram(env);

196

return env.createProgramPlan().getJobGraph();

197

}

198

}

199

```

200

201

### Recovery Testing with Failure Injection

202

203

```java

204

public class MyRecoveryTest extends SimpleRecoveryITCaseBase {

205

206

@Test

207

public void testTaskFailureRecovery() throws Exception {

208

FailingMapper.reset(); // Reset failure counter

209

execute(); // Run test with recovery

210

}

211

212

@Override

213

protected void testProgram(ExecutionEnvironment env) {

214

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000));

215

216

env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

217

.map(new FailingMapper<>(5)) // Fails after 5 elements

218

.map(new RecoveringMapper()) // Handles recovery

219

.writeAsText("/tmp/recovery-test-output");

220

}

221

222

@Override

223

protected void postSubmit() throws Exception {

224

// Verify that recovery occurred

225

assertTrue("Should have failed at least once", FailingMapper.hasFailed());

226

assertTrue("Should have recovered", RecoveringMapper.hasRecovered());

227

228

// Verify output correctness

229

verifyOutputFile("/tmp/recovery-test-output");

230

}

231

}

232

```

233

234

### Custom Recovery Function Implementation

235

236

```java

237

public class StatefulRecoveringMapper extends RecoveringFunction implements MapFunction<Integer, Integer> {

238

private ValueState<Integer> counterState;

239

240

@Override

241

public void open(Configuration parameters) throws Exception {

242

ValueStateDescriptor<Integer> descriptor =

243

new ValueStateDescriptor<>("counter", Integer.class);

244

counterState = getRuntimeContext().getState(descriptor);

245

}

246

247

@Override

248

public Integer map(Integer value) throws Exception {

249

trackAttempt();

250

251

Integer count = counterState.value();

252

if (count == null) count = 0;

253

254

count++;

255

counterState.update(count);

256

257

// Fail on first attempt after processing 3 elements

258

if (getAttemptNumber() == 1 && count == 3) {

259

simulateFailure();

260

}

261

262

return value * count;

263

}

264

}

265

```

266

267

### Advanced Cancellation with Timeout

268

269

```java

270

public class TimeoutCancellationTest extends CancelingTestBase {

271

272

@Test

273

public void testCancellationWithTimeout() throws Exception {

274

JobGraph jobGraph = createSlowJobGraph();

275

276

long startTime = System.currentTimeMillis();

277

278

// Cancel job after 3 seconds, wait for cancellation

279

runAndCancelJob(jobGraph, 3000, true);

280

281

long duration = System.currentTimeMillis() - startTime;

282

283

// Verify cancellation happened within reasonable time

284

assertTrue("Cancellation took too long", duration < 10000);

285

}

286

287

private JobGraph createSlowJobGraph() throws Exception {

288

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

289

290

env.createInput(new SlowlyDeserializingInputFormat(1000, 100)) // 1 sec per element

291

.map(x -> {

292

Thread.sleep(500); // Additional processing delay

293

return x;

294

})

295

.writeAsText("/tmp/slow-job-output");

296

297

return env.createProgramPlan().getJobGraph();

298

}

299

}

300

```

301

302

### Multi-Stage Recovery Testing

303

304

```java

305

public class ComplexRecoveryTest extends RecoveryITCaseBase {

306

307

public ComplexRecoveryTest() {

308

super(new Configuration(), 4); // 4 parallel instances

309

}

310

311

@Test

312

public void testMultiStageRecovery() throws Exception {

313

// Configure multiple restart attempts

314

config.setString(ConfigConstants.RESTART_STRATEGY, "fixed-delay");

315

config.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 5);

316

config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "1s");

317

318

execute();

319

}

320

321

@Override

322

protected void testProgram(ExecutionEnvironment env) {

323

env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))

324

.map(new FailingMapper<>(3)) // Fail after 3 elements

325

.filter(new RecoveringFilter()) // Filter with recovery logic

326

.map(new ValidatingMapper()) // Validate state consistency

327

.collect(); // Force execution

328

}

329

330

@Override

331

protected void postSubmit() throws Exception {

332

// Verify multiple recovery attempts occurred

333

assertTrue("Should have multiple attempts", getAttemptNumber() > 1);

334

335

// Verify final state consistency

336

validateFinalResults();

337

}

338

}