or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdsecurity-testing.mdtest-base-classes.mdtest-data.mdtest-environments.mdtest-utilities.md

test-base-classes.mddocs/

0

# Test Base Classes

1

2

Test base classes provide standardized testing patterns, cluster lifecycle management, and support for parameterized testing across multiple execution modes. They handle the common setup and teardown required for Flink testing.

3

4

## Core Base Classes

5

6

### AbstractTestBase

7

8

Base class for tests that run test programs in a Flink mini cluster with automatic cluster lifecycle management.

9

10

```java { .api }

11

public abstract class AbstractTestBase extends TestBaseUtils {

12

@ClassRule

13

public static final TemporaryFolder temporaryFolder;

14

15

public AbstractTestBase(Configuration config);

16

17

public void startCluster() throws Exception;

18

public void stopCluster() throws Exception;

19

20

public int getTaskManagerNumSlots();

21

public void setTaskManagerNumSlots(int taskManagerNumSlots);

22

public int getNumTaskManagers();

23

public void setNumTaskManagers(int numTaskManagers);

24

25

public String getTempDirPath(String dirName) throws IOException;

26

public String getTempFilePath(String fileName) throws IOException;

27

public String createTempFile(String fileName, String contents) throws IOException;

28

public File createAndRegisterTempFile(String fileName) throws IOException;

29

}

30

```

31

32

**Usage Example:**

33

34

```java

35

public class MyCustomTest extends AbstractTestBase {

36

public MyCustomTest() {

37

super(new Configuration());

38

setNumTaskManagers(2);

39

setTaskManagerNumSlots(4);

40

}

41

42

@Test

43

public void testMyApplication() throws Exception {

44

startCluster();

45

46

// Create temp input file

47

String inputPath = createTempFile("input.txt", "line1\nline2\nline3");

48

49

// Your test logic here

50

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

51

DataSet<String> input = env.readTextFile(inputPath);

52

// ... test logic

53

54

stopCluster();

55

}

56

}

57

```

58

59

## Batch Testing Base Classes

60

61

### JavaProgramTestBase

62

63

Base class for Java API program tests supporting multiple execution modes (cluster, collection, object reuse).

64

65

```java { .api }

66

public abstract class JavaProgramTestBase extends AbstractTestBase {

67

public JavaProgramTestBase();

68

public JavaProgramTestBase(Configuration config);

69

70

public void setParallelism(int parallelism);

71

public void setNumberOfTestRepetitions(int numberOfTestRepetitions);

72

public int getParallelism();

73

public JobExecutionResult getLatestExecutionResult();

74

public boolean isCollectionExecution();

75

76

protected abstract void testProgram() throws Exception;

77

protected void preSubmit() throws Exception;

78

protected void postSubmit() throws Exception;

79

protected boolean skipCollectionExecution();

80

81

@Test

82

public void testJobWithObjectReuse() throws Exception;

83

@Test

84

public void testJobWithoutObjectReuse() throws Exception;

85

@Test

86

public void testJobCollectionExecution() throws Exception;

87

}

88

```

89

90

**Usage Example:**

91

92

```java

93

public class WordCountTest extends JavaProgramTestBase {

94

95

@Override

96

protected void testProgram() throws Exception {

97

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

98

99

DataSet<String> text = env.fromElements(

100

"hello world",

101

"hello flink",

102

"world flink"

103

);

104

105

DataSet<Tuple2<String, Integer>> counts = text

106

.flatMap(new Tokenizer())

107

.groupBy(0)

108

.sum(1);

109

110

List<Tuple2<String, Integer>> result = counts.collect();

111

112

String expected = "flink,2\nhello,2\nworld,2";

113

compareResultAsTuples(result, expected);

114

}

115

116

@Override

117

protected void preSubmit() throws Exception {

118

// Setup before job execution

119

setParallelism(4);

120

}

121

122

@Override

123

protected boolean skipCollectionExecution() {

124

// Skip collection execution for this test

125

return false;

126

}

127

}

128

```

129

130

### MultipleProgramsTestBase

131

132

Base class for parameterized unit tests that run multiple tests and reuse the same Flink cluster across test methods.

133

134

```java { .api }

135

public class MultipleProgramsTestBase extends TestBaseUtils {

136

protected static final int DEFAULT_PARALLELISM = 4;

137

protected static boolean startWebServer = false;

138

protected static LocalFlinkMiniCluster cluster = null;

139

140

public MultipleProgramsTestBase(TestExecutionMode mode);

141

142

@Before

143

public void setupEnvironment();

144

@After

145

public void teardownEnvironment();

146

@BeforeClass

147

public static void setup() throws Exception;

148

@AfterClass

149

public static void teardown() throws Exception;

150

151

@Parameterized.Parameters(name = "Execution mode = {0}")

152

public static Collection<Object[]> executionModes();

153

}

154

155

public enum TestExecutionMode {

156

CLUSTER,

157

CLUSTER_OBJECT_REUSE,

158

COLLECTION

159

}

160

```

161

162

**Usage Example:**

163

164

```java

165

@RunWith(Parameterized.class)

166

public class ParameterizedTest extends MultipleProgramsTestBase {

167

168

public ParameterizedTest(TestExecutionMode mode) {

169

super(mode);

170

}

171

172

@Test

173

public void testWordCount() throws Exception {

174

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

175

176

// Test runs in the mode specified by constructor parameter

177

DataSet<String> input = env.fromElements("hello", "world", "hello");

178

List<String> result = input.distinct().collect();

179

180

compareResultAsText(result, "hello\nworld");

181

}

182

183

@Test

184

public void testFilter() throws Exception {

185

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

186

187

DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);

188

List<Integer> evenNumbers = numbers.filter(x -> x % 2 == 0).collect();

189

190

compareResultAsText(evenNumbers, "2\n4");

191

}

192

}

193

```

194

195

## Streaming Testing Base Classes

196

197

### StreamingProgramTestBase

198

199

Abstract base class for streaming program tests.

200

201

```java { .api }

202

public abstract class StreamingProgramTestBase extends AbstractTestBase {

203

protected static final int DEFAULT_PARALLELISM = 4;

204

205

public StreamingProgramTestBase();

206

207

public void setParallelism(int parallelism);

208

public int getParallelism();

209

210

protected abstract void testProgram() throws Exception;

211

protected void preSubmit() throws Exception;

212

protected void postSubmit() throws Exception;

213

214

@Test

215

public void testJob() throws Exception;

216

}

217

```

218

219

**Usage Example:**

220

221

```java

222

public class StreamingWordCountTest extends StreamingProgramTestBase {

223

224

@Override

225

protected void testProgram() throws Exception {

226

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

227

env.setParallelism(getParallelism());

228

229

DataStream<String> text = env.fromElements(

230

"hello world",

231

"hello flink"

232

);

233

234

DataStream<Tuple2<String, Integer>> counts = text

235

.flatMap(new Tokenizer())

236

.keyBy(0)

237

.sum(1);

238

239

counts.print();

240

env.execute("Streaming WordCount");

241

}

242

243

@Override

244

protected void preSubmit() throws Exception {

245

setParallelism(2);

246

}

247

}

248

```

249

250

### StreamingMultipleProgramsTestBase

251

252

Base class for streaming unit tests that run multiple tests and reuse the same Flink cluster.

253

254

```java { .api }

255

public class StreamingMultipleProgramsTestBase extends AbstractTestBase {

256

protected static final int DEFAULT_PARALLELISM = 4;

257

protected static LocalFlinkMiniCluster cluster;

258

protected static final Logger LOG;

259

260

public StreamingMultipleProgramsTestBase();

261

262

@BeforeClass

263

public static void setup() throws Exception;

264

@AfterClass

265

public static void teardown() throws Exception;

266

}

267

```

268

269

**Usage Example:**

270

271

```java

272

public class StreamingIntegrationTest extends StreamingMultipleProgramsTestBase {

273

274

@Test

275

public void testStreamingMap() throws Exception {

276

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

277

278

DataStream<String> input = env.fromElements("a", "b", "c");

279

input.map(String::toUpperCase).print();

280

281

env.execute("Map Test");

282

}

283

284

@Test

285

public void testStreamingFilter() throws Exception {

286

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

287

288

DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);

289

numbers.filter(x -> x > 3).print();

290

291

env.execute("Filter Test");

292

}

293

}

294

```

295

296

## Common Patterns

297

298

### Cluster Configuration

299

300

All base classes support configuration of cluster parameters:

301

302

```java

303

public MyTest() {

304

super(new Configuration());

305

setNumTaskManagers(2);

306

setTaskManagerNumSlots(8);

307

}

308

```

309

310

### Temporary File Management

311

312

Use the provided temporary file utilities for test data:

313

314

```java

315

String inputFile = createTempFile("test-input.txt", "test data content");

316

String outputDir = getTempDirPath("test-output");

317

```

318

319

### Test Lifecycle Hooks

320

321

Override lifecycle methods for custom setup/teardown:

322

323

```java

324

@Override

325

protected void preSubmit() throws Exception {

326

// Setup before job execution

327

}

328

329

@Override

330

protected void postSubmit() throws Exception {

331

// Validation after job execution

332

}

333

```