or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdrecovery-fault-tolerance.mdscala-testing.mdstate-migration.mdtest-base-classes.mdtest-utilities.md

test-base-classes.mddocs/

0

# Test Base Classes

1

2

Foundation classes that provide standardized setup, execution patterns, and infrastructure for different types of Flink testing scenarios. These abstract base classes handle cluster configuration, test execution coordination, and provide template methods for specific testing patterns.

3

4

## Capabilities

5

6

### StreamFaultToleranceTestBase

7

8

Base class for fault tolerant streaming program tests with automatic checkpointing, failure recovery, and cluster management.

9

10

```java { .api }

11

/**

12

* Base class for fault tolerant streaming program tests

13

* Automatically sets up MiniCluster with 3 task managers, 4 slots each (PARALLELISM = 12)

14

* Enables checkpointing every 500ms with infinite restart strategy

15

*/

16

public abstract class StreamFaultToleranceTestBase extends TestLogger {

17

18

protected static final int NUM_TASK_MANAGERS = 3;

19

protected static final int NUM_TASK_SLOTS = 4;

20

protected static final int PARALLELISM = 12;

21

22

/**

23

* Implementations define the test topology using the provided environment

24

* @param env StreamExecutionEnvironment with checkpointing and restarts configured

25

*/

26

public abstract void testProgram(StreamExecutionEnvironment env);

27

28

/**

29

* Implementations provide test verification logic executed after job completion

30

* Use this to verify results, check state, or validate behavior

31

*/

32

public abstract void postSubmit() throws Exception;

33

34

/**

35

* Runs the complete fault tolerance test cycle

36

* Sets up environment, executes testProgram(), handles failures, runs postSubmit()

37

*/

38

@Test

39

public void runCheckpointedProgram() throws Exception;

40

}

41

42

/**

43

* POJO for storing prefix, value, and count in fault tolerance tests

44

*/

45

public static class PrefixCount implements Serializable {

46

public String prefix;

47

public String value;

48

public long count;

49

50

public PrefixCount() {}

51

public PrefixCount(String prefix, String value, long count);

52

}

53

```

54

55

**Usage Example:**

56

57

```java

58

public class MyFaultToleranceTest extends StreamFaultToleranceTestBase {

59

60

@Override

61

public void testProgram(StreamExecutionEnvironment env) {

62

env.fromElements("a", "b", "c", "a", "b")

63

.keyBy(x -> x)

64

.map(new StatefulMapper())

65

.addSink(new TestListResultSink<>());

66

}

67

68

@Override

69

public void postSubmit() throws Exception {

70

List<String> results = TestListResultSink.getResults();

71

assertTrue("Results should contain processed elements", results.size() > 0);

72

}

73

}

74

```

75

76

### SavepointMigrationTestBase

77

78

Base class for savepoint migration testing with support for creating savepoints from one job and restoring them in another job to test state compatibility.

79

80

```java { .api }

81

/**

82

* Base class for savepoint migration testing

83

* Provides infrastructure for savepoint creation, restoration, and verification

84

*/

85

public abstract class SavepointMigrationTestBase extends TestBaseUtils {

86

87

protected static final int DEFAULT_PARALLELISM = 4;

88

89

/**

90

* Get resource file path for test savepoints

91

* @param filename Resource filename to locate

92

* @return Absolute path to resource file

93

*/

94

protected static String getResourceFilename(String filename);

95

96

/**

97

* Execute job and create savepoint when specified accumulators reach expected values

98

* @param env StreamExecutionEnvironment configured for the job

99

* @param savepointPath Path where savepoint should be saved

100

* @param expectedAccumulators Array of accumulator name/value pairs to wait for

101

*/

102

@SafeVarargs

103

protected final void executeAndSavepoint(

104

StreamExecutionEnvironment env,

105

String savepointPath,

106

Tuple2<String, Integer>... expectedAccumulators) throws Exception;

107

108

/**

109

* Restore job from savepoint and execute until specified accumulators reach expected values

110

* @param env StreamExecutionEnvironment configured for the restored job

111

* @param savepointPath Path to savepoint for restoration

112

* @param expectedAccumulators Array of accumulator name/value pairs to wait for

113

*/

114

@SafeVarargs

115

protected final void restoreAndExecute(

116

StreamExecutionEnvironment env,

117

String savepointPath,

118

Tuple2<String, Integer>... expectedAccumulators) throws Exception;

119

}

120

```

121

122

**Usage Example:**

123

124

```java

125

public class MySavepointMigrationTest extends SavepointMigrationTestBase {

126

127

@Test

128

public void testMigration() throws Exception {

129

// Create and execute job that generates savepoint

130

StreamExecutionEnvironment env1 = StreamExecutionEnvironment.getExecutionEnvironment();

131

env1.setParallelism(DEFAULT_PARALLELISM);

132

env1.addSource(new TestSource())

133

.keyBy(x -> x.f0)

134

.map(new StatefulFunction())

135

.addSink(new AccumulatorCountingSink<>());

136

137

executeAndSavepoint(env1, "test-savepoint",

138

Tuple2.of(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, 100));

139

140

// Restore from savepoint and verify state

141

StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecutionEnvironment();

142

env2.setParallelism(DEFAULT_PARALLELISM);

143

env2.addSource(new VerifyingTestSource())

144

.keyBy(x -> x.f0)

145

.map(new StatefulFunction())

146

.addSink(new AccumulatorCountingSink<>());

147

148

restoreAndExecute(env2, "test-savepoint",

149

Tuple2.of(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, 50));

150

}

151

}

152

```

153

154

### CancelingTestBase

155

156

Base class for testing job cancellation scenarios with controlled timing and proper cluster setup.

157

158

```java { .api }

159

/**

160

* Base class for testing job cancellation

161

* Sets up MiniCluster with 2 task managers, 4 slots each (PARALLELISM = 4)

162

*/

163

public abstract class CancelingTestBase extends TestLogger {

164

165

protected static final int PARALLELISM = 4;

166

167

/**

168

* Submit job, wait specified time, then cancel and verify proper cancellation

169

* @param plan Flink execution plan to run and cancel

170

* @param msecsTillCanceling Milliseconds to wait before canceling

171

* @param maxTimeTillCanceled Maximum time to wait for cancellation completion

172

*/

173

protected void runAndCancelJob(Plan plan, int msecsTillCanceling, int maxTimeTillCanceled) throws Exception;

174

}

175

```

176

177

### AbstractOperatorRestoreTestBase

178

179

Base class for testing operator state migration and restoration across Flink versions with standardized savepoint handling.

180

181

```java { .api }

182

/**

183

* Base class for testing operator state migration and restoration

184

* Provides infrastructure for savepoint-based migration testing

185

*/

186

public abstract class AbstractOperatorRestoreTestBase {

187

188

/**

189

* Create job that generates migration savepoint

190

* @param env StreamExecutionEnvironment for job creation

191

* @return JobGraph for the migration job

192

*/

193

public abstract JobGraph createMigrationJob(StreamExecutionEnvironment env) throws Exception;

194

195

/**

196

* Create job that restores from migration savepoint and verifies state

197

* @param env StreamExecutionEnvironment for job creation

198

* @return JobGraph for the restoration job

199

*/

200

public abstract JobGraph createRestoredJob(StreamExecutionEnvironment env) throws Exception;

201

202

/**

203

* Get name of savepoint resource for this test

204

* @return Resource name for the savepoint

205

*/

206

public abstract String getMigrationSavepointName();

207

}

208

```

209

210

### AbstractKeyedOperatorRestoreTestBase

211

212

Specialized base class for testing keyed operator state restoration.

213

214

```java { .api }

215

/**

216

* Specialized base for testing keyed operator state restoration

217

* Extends AbstractOperatorRestoreTestBase with keyed state specific functionality

218

*/

219

public abstract class AbstractKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {

220

// Keyed state specific restoration testing

221

}

222

```

223

224

### AbstractNonKeyedOperatorRestoreTestBase

225

226

Specialized base class for testing non-keyed operator state restoration.

227

228

```java { .api }

229

/**

230

* Specialized base for testing non-keyed operator state restoration

231

* Extends AbstractOperatorRestoreTestBase with non-keyed state specific functionality

232

*/

233

public abstract class AbstractNonKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {

234

// Non-keyed state specific restoration testing

235

}

236

```

237

238

### SimpleRecoveryITCaseBase

239

240

Base class for testing job recovery scenarios with failure simulation and restart strategies.

241

242

```java { .api }

243

/**

244

* Base class for testing job recovery scenarios

245

* Provides infrastructure for testing failed runs followed by successful runs

246

*/

247

public abstract class SimpleRecoveryITCaseBase {

248

// Abstract methods for defining failing and successful execution plans

249

// Built-in recovery and restart strategy testing

250

}

251

```

252

253

## Common Patterns

254

255

### Test Setup Pattern

256

All base classes follow a consistent pattern:

257

1. **Cluster Setup**: Standardized MiniClusterResource configuration

258

2. **Environment Configuration**: Proper timeouts, parallelism, and checkpointing

259

3. **Execution Coordination**: Template methods for test orchestration

260

4. **Result Verification**: Abstract methods for test-specific validation

261

262

### Error Handling

263

Base classes handle common error scenarios:

264

- **Job Submission Failures**: Automatic retry and proper error reporting

265

- **Timeout Handling**: Configurable deadlines with clear failure messages

266

- **Resource Cleanup**: Automatic cleanup of temporary files and cluster resources

267

- **Exception Propagation**: Proper exception handling and test failure reporting