or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

class-loading-programs.mdindex.mdmigration-testing.mdperformance-testing.mdstate-management-testing.mdstreaming-utilities.mdtest-base-classes.mdtest-data-generation.md

test-base-classes.mddocs/

0

# Test Base Classes

1

2

Abstract base classes providing common patterns and infrastructure for different testing scenarios including cancellation testing, fault tolerance testing, and recovery testing. These base classes standardize test setup, execution patterns, and verification procedures across different types of Flink tests.

3

4

## Capabilities

5

6

### Stream Fault Tolerance Test Base

7

8

Abstract base class for testing streaming applications under fault tolerance conditions.

9

10

```java { .api }

11

/**

12

* Abstract base class for fault tolerance testing of streaming applications

13

*/

14

public abstract class StreamFaultToleranceTestBase extends TestLogger {

15

16

// Test cluster configuration constants

17

public static final int NUM_TASK_MANAGERS = 2;

18

public static final int NUM_TASK_SLOTS = 8;

19

public static final int PARALLELISM = 4;

20

21

/**

22

* Define the streaming topology to be tested under fault conditions

23

* @param env Pre-configured StreamExecutionEnvironment

24

*/

25

public abstract void testProgram(StreamExecutionEnvironment env);

26

27

/**

28

* Verify test results after job completion

29

* Called after successful job execution to validate results

30

* @throws Exception if verification fails

31

*/

32

public abstract void postSubmit() throws Exception;

33

}

34

```

35

36

**Usage Example:**

37

38

```java

39

public class MyFaultToleranceTest extends StreamFaultToleranceTestBase {

40

41

private List<String> collectedResults = new ArrayList<>();

42

43

@Override

44

public void testProgram(StreamExecutionEnvironment env) {

45

// Configure environment for fault tolerance

46

env.setParallelism(PARALLELISM);

47

env.enableCheckpointing(100);

48

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000));

49

50

// Build fault-tolerant topology

51

env.addSource(new FaultTolerantSource())

52

.keyBy(value -> value.getKey())

53

.process(new StatefulProcessFunction())

54

.addSink(new CollectingSink(collectedResults));

55

}

56

57

@Override

58

public void postSubmit() throws Exception {

59

// Verify results after fault tolerance test

60

assertEquals(expectedResultCount, collectedResults.size());

61

assertTrue("Missing expected results", collectedResults.containsAll(expectedResults));

62

63

// Verify no duplicates after recovery

64

Set<String> uniqueResults = new HashSet<>(collectedResults);

65

assertEquals("Duplicate results detected", collectedResults.size(), uniqueResults.size());

66

}

67

68

@Test

69

public void testFaultTolerance() throws Exception {

70

// Test execution handled by base class infrastructure

71

runTest();

72

}

73

}

74

```

75

76

### Canceling Test Base

77

78

Abstract base class for testing job cancellation scenarios and cleanup behavior.

79

80

```java { .api }

81

/**

82

* Abstract base class for testing job cancellation and cleanup

83

*/

84

public abstract class CancelingTestBase extends TestLogger {

85

86

// Test execution constants

87

protected static final int PARALLELISM = 4;

88

protected static final Duration GET_FUTURE_TIMEOUT = Duration.ofSeconds(30);

89

90

/**

91

* Run a job and cancel it after specified time

92

* @param plan Job execution plan to run and cancel

93

* @param msecsTillCanceling Milliseconds to wait before canceling

94

* @param maxTimeTillCanceled Maximum time to wait for cancellation completion

95

* @throws Exception if job execution or cancellation fails

96

*/

97

protected void runAndCancelJob(

98

Plan plan,

99

int msecsTillCanceling,

100

int maxTimeTillCanceled) throws Exception;

101

}

102

```

103

104

**Usage Example:**

105

106

```java

107

public class MyCancellationTest extends CancelingTestBase {

108

109

@Test

110

public void testJobCancellation() throws Exception {

111

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

112

env.setParallelism(PARALLELISM);

113

114

// Create long-running job that can be cancelled

115

DataSet<String> input = env.fromElements("data");

116

input.map(new SlowMapper()) // Mapper that takes time to process

117

.output(new DiscardingOutputFormat<>());

118

119

Plan plan = env.createProgramPlan();

120

121

// Cancel job after 2 seconds, allow up to 10 seconds for cancellation

122

runAndCancelJob(plan, 2000, 10000);

123

124

// Test passes if job is successfully cancelled within time limit

125

}

126

}

127

```

128

129

### Recovery Test Base Classes

130

131

Base classes for testing different recovery strategies and failure scenarios.

132

133

```java { .api }

134

/**

135

* Base class for simple recovery testing scenarios

136

*/

137

public abstract class SimpleRecoveryITCaseBase extends TestLogger {

138

139

/**

140

* Execute recovery test with default configuration

141

* @throws Exception if recovery test fails

142

*/

143

protected abstract void executeRecoveryTest() throws Exception;

144

}

145

146

/**

147

* Base class for testing fixed delay restart strategy

148

*/

149

public abstract class SimpleRecoveryFixedDelayRestartStrategyITBase extends SimpleRecoveryITCaseBase {

150

151

/**

152

* Get restart strategy configuration for fixed delay

153

* @return RestartStrategy configured for fixed delay

154

*/

155

protected RestartStrategies.RestartStrategyConfiguration getRestartStrategy();

156

}

157

158

/**

159

* Base class for testing failure rate restart strategy

160

*/

161

public abstract class SimpleRecoveryFailureRateStrategyITBase extends SimpleRecoveryITCaseBase {

162

163

/**

164

* Get restart strategy configuration for failure rate limiting

165

* @return RestartStrategy configured for failure rate limiting

166

*/

167

protected RestartStrategies.RestartStrategyConfiguration getRestartStrategy();

168

}

169

```

170

171

### Prefix Count POJO

172

173

Common data type used in fault tolerance and recovery testing.

174

175

```java { .api }

176

/**

177

* POJO for prefix counting tests in fault tolerance scenarios

178

*/

179

public static class PrefixCount {

180

public String str;

181

public long count;

182

183

/**

184

* Default constructor

185

*/

186

public PrefixCount();

187

188

/**

189

* Constructor with string and count

190

* @param str String prefix

191

* @param count Count value

192

*/

193

public PrefixCount(String str, long count);

194

195

/**

196

* Check equality based on str and count fields

197

* @param obj Object to compare

198

* @return true if equal

199

*/

200

public boolean equals(Object obj);

201

202

/**

203

* Generate hash code based on str and count

204

* @return hash code

205

*/

206

public int hashCode();

207

208

/**

209

* String representation

210

* @return formatted string

211

*/

212

public String toString();

213

}

214

```

215

216

### Test Base Usage Patterns

217

218

Common patterns for implementing tests using base classes:

219

220

**Fault Tolerance Test Pattern:**

221

222

```java

223

public class StreamingJobFaultToleranceTest extends StreamFaultToleranceTestBase {

224

225

private TestListResultSink<PrefixCount> resultSink;

226

227

@Override

228

public void testProgram(StreamExecutionEnvironment env) {

229

// Configure for fault tolerance

230

env.setParallelism(PARALLELISM);

231

env.enableCheckpointing(50);

232

env.setStateBackend(new RocksDBStateBackend("file:///tmp/test"));

233

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000));

234

235

resultSink = new TestListResultSink<>();

236

237

// Create fault-tolerant streaming topology

238

env.addSource(new FailingSource(1000, 500, 3)) // Fail after 500 elements

239

.keyBy(value -> value.f0 % 4)

240

.process(new CountingProcessFunction())

241

.addSink(resultSink);

242

}

243

244

@Override

245

public void postSubmit() throws Exception {

246

List<PrefixCount> results = resultSink.getResult();

247

248

// Verify all expected results are present after recovery

249

assertEquals(4, results.size()); // One per key

250

251

long totalCount = results.stream().mapToLong(pc -> pc.count).sum();

252

assertEquals(1000, totalCount); // All elements processed exactly once

253

}

254

}

255

```

256

257

**Cancellation Test Pattern:**

258

259

```java

260

public class LongRunningJobCancellationTest extends CancelingTestBase {

261

262

@Test

263

public void testIterativeJobCancellation() throws Exception {

264

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

265

env.setParallelism(PARALLELISM);

266

267

// Create iterative job that runs indefinitely

268

DataSet<Long> initial = env.fromElements(1L);

269

IterativeDataSet<Long> iteration = initial.iterate(Integer.MAX_VALUE);

270

271

DataSet<Long> next = iteration.map(value -> value + 1);

272

DataSet<Long> result = iteration.closeWith(next);

273

274

result.output(new DiscardingOutputFormat<>());

275

276

Plan plan = env.createProgramPlan();

277

278

// Cancel after 3 seconds, allow up to 15 seconds for cancellation

279

runAndCancelJob(plan, 3000, 15000);

280

}

281

}

282

```

283

284

**Recovery Strategy Test Pattern:**

285

286

```java

287

public class FixedDelayRecoveryTest extends SimpleRecoveryFixedDelayRestartStrategyITBase {

288

289

@Override

290

protected void executeRecoveryTest() throws Exception {

291

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

292

env.setParallelism(2);

293

env.setRestartStrategy(getRestartStrategy());

294

env.enableCheckpointing(100);

295

296

TestListResultSink<String> sink = new TestListResultSink<>();

297

298

// Source that fails twice then succeeds

299

env.addSource(new RecoveringSource(3, 100))

300

.map(new StatelessMapper())

301

.addSink(sink);

302

303

env.execute("Recovery Test");

304

305

// Verify successful recovery

306

List<String> results = sink.getResult();

307

assertEquals(100, results.size());

308

assertTrue("Recovery failed", results.stream().allMatch(Objects::nonNull));

309

}

310

311

@Override

312

protected RestartStrategies.RestartStrategyConfiguration getRestartStrategy() {

313

// Fixed delay of 1 second, maximum 3 restart attempts

314

return RestartStrategies.fixedDelayRestart(3, 1000);

315

}

316

}

317

```

318

319

**Multi-Phase Test Pattern:**

320

321

```java

322

public class MultiPhaseRecoveryTest extends SimpleRecoveryITCaseBase {

323

324

@Override

325

protected void executeRecoveryTest() throws Exception {

326

// Phase 1: Generate savepoint

327

String savepointPath = runJobAndCreateSavepoint();

328

329

// Phase 2: Restore and verify

330

restoreJobAndVerifyResults(savepointPath);

331

332

// Phase 3: Test failure recovery

333

testFailureRecoveryFromSavepoint(savepointPath);

334

}

335

336

private String runJobAndCreateSavepoint() throws Exception {

337

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

338

// Implementation for savepoint generation

339

return savepointPath;

340

}

341

342

private void restoreJobAndVerifyResults(String savepointPath) throws Exception {

343

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

344

// Implementation for restoration and verification

345

}

346

347

private void testFailureRecoveryFromSavepoint(String savepointPath) throws Exception {

348

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

349

// Implementation for failure recovery testing

350

}

351

}

352

```

353

354

These test base classes provide standardized infrastructure for comprehensive testing of Flink applications under various failure and recovery scenarios, ensuring robust and reliable stream processing applications.