or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdsecurity-testing.mdtest-base-classes.mdtest-data.mdtest-environments.mdtest-utilities.md

test-utilities.mddocs/

0

# Test Utilities

1

2

TestBaseUtils provides comprehensive utilities for Flink testing including cluster management, result comparison, file I/O operations, and test data handling.

3

4

## Cluster Management

5

6

### Starting and Stopping Clusters

7

8

```java { .api }

9

public class TestBaseUtils extends TestLogger {

10

public static FiniteDuration DEFAULT_TIMEOUT;

11

12

// Start a mini cluster with full configuration

13

public static LocalFlinkMiniCluster startCluster(int numTaskManagers, int taskManagerNumSlots,

14

boolean startWebserver, boolean startZooKeeper,

15

boolean singleActorSystem) throws Exception;

16

17

// Start cluster with configuration object

18

public static LocalFlinkMiniCluster startCluster(Configuration config, boolean singleActorSystem) throws Exception;

19

20

// Stop cluster with timeout

21

public static void stopCluster(LocalFlinkMiniCluster executor, FiniteDuration timeout) throws Exception;

22

}

23

```

24

25

**Usage Example:**

26

27

```java

28

// Start a cluster with 2 task managers, 4 slots each

29

LocalFlinkMiniCluster cluster = TestBaseUtils.startCluster(2, 4, false, false, true);

30

31

try {

32

// Use cluster for testing

33

TestEnvironment testEnv = new TestEnvironment(cluster, 4, false);

34

// ... run tests

35

} finally {

36

// Always clean up

37

TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);

38

}

39

```

40

41

## Result Reading and File I/O

42

43

### Reading Test Results

44

45

```java { .api }

46

// Get readers for result files

47

public static BufferedReader[] getResultReader(String resultPath) throws IOException;

48

public static BufferedReader[] getResultReader(String resultPath, String[] excludePrefixes,

49

boolean inOrderOfFiles) throws IOException;

50

51

// Get input streams for result files

52

public static BufferedInputStream[] getResultInputStream(String resultPath) throws IOException;

53

public static BufferedInputStream[] getResultInputStream(String resultPath, String[] excludePrefixes) throws IOException;

54

55

// Read all result lines into a list

56

public static void readAllResultLines(List<String> target, String resultPath) throws IOException;

57

public static void readAllResultLines(List<String> target, String resultPath, String[] excludePrefixes) throws IOException;

58

public static void readAllResultLines(List<String> target, String resultPath, String[] excludePrefixes,

59

boolean inOrderOfFiles) throws IOException;

60

```

61

62

**Usage Example:**

63

64

```java

65

// Read results from output directory

66

List<String> results = new ArrayList<>();

67

TestBaseUtils.readAllResultLines(results, "/path/to/output");

68

69

// Read results excluding log files

70

String[] excludePatterns = {"_logs", ".log"};

71

TestBaseUtils.readAllResultLines(results, "/path/to/output", excludePatterns);

72

73

// Process results

74

for (String line : results) {

75

System.out.println("Result: " + line);

76

}

77

```

78

79

## Result Comparison and Validation

80

81

### Text-based Result Comparison

82

83

```java { .api }

84

// Compare results line by line (order doesn't matter)

85

public static void compareResultsByLinesInMemory(String expectedResultStr, String resultPath) throws Exception;

86

public static void compareResultsByLinesInMemory(String expectedResultStr, String resultPath,

87

String[] excludePrefixes) throws Exception;

88

89

// Compare results with strict ordering

90

public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr, String resultPath) throws Exception;

91

public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr, String resultPath,

92

String[] excludePrefixes) throws Exception;

93

94

// Check results against regular expression

95

public static void checkLinesAgainstRegexp(String resultPath, String regexp);

96

```

97

98

**Usage Example:**

99

100

```java

101

// Compare unordered results

102

String expected = "apple\nbanana\ncherry";

103

TestBaseUtils.compareResultsByLinesInMemory(expected, "/path/to/results");

104

105

// Compare with strict ordering

106

TestBaseUtils.compareResultsByLinesInMemoryWithStrictOrder(expected, "/path/to/results");

107

108

// Validate format with regex

109

TestBaseUtils.checkLinesAgainstRegexp("/path/to/results", "\\d+,\\w+");

110

```

111

112

### Collection-based Result Comparison

113

114

```java { .api }

115

// Compare collected results as tuples

116

public static <T> void compareResultAsTuples(List<T> result, String expected);

117

118

// Compare collected results as text

119

public static <T> void compareResultAsText(List<T> result, String expected);

120

121

// Compare with strict ordering

122

public static <T> void compareOrderedResultAsText(List<T> result, String expected);

123

public static <T> void compareOrderedResultAsText(List<T> result, String expected, boolean asTuples);

124

125

// Check if results contain expected values

126

public static <T> void containsResultAsText(List<T> result, String expected);

127

128

// Generic comparison with custom comparator

129

public static <X> void compareResultCollections(List<X> expected, List<X> actual, Comparator<X> comparator);

130

```

131

132

**Usage Example:**

133

134

```java

135

// Test word count results

136

DataSet<Tuple2<String, Integer>> wordCounts = // ... your computation

137

List<Tuple2<String, Integer>> results = wordCounts.collect();

138

139

String expected = "apple,3\nbanana,1\ncherry,2";

140

TestBaseUtils.compareResultAsTuples(results, expected);

141

142

// Test simple string results

143

DataSet<String> words = // ... your computation

144

List<String> wordList = words.collect();

145

146

String expectedWords = "apple\nbanana\ncherry";

147

TestBaseUtils.compareResultAsText(wordList, expectedWords);

148

```

149

150

### Numerical Result Comparison

151

152

```java { .api }

153

// Compare key-value pairs with delta tolerance

154

public static void compareKeyValuePairsWithDelta(String expectedLines, String resultPath,

155

String delimiter, double maxDelta) throws Exception;

156

public static void compareKeyValuePairsWithDelta(String expectedLines, String resultPath, String[] excludePrefixes,

157

String delimiter, double maxDelta) throws Exception;

158

```

159

160

**Usage Example:**

161

162

```java

163

// Compare floating-point results with tolerance

164

String expected = "pi,3.14159\ne,2.71828";

165

TestBaseUtils.compareKeyValuePairsWithDelta(expected, "/path/to/results", ",", 0.001);

166

```

167

168

## Utility Methods

169

170

### Test Configuration

171

172

```java { .api }

173

// Convert configurations to parameterized test parameters

174

protected static Collection<Object[]> toParameterList(Configuration... testConfigs);

175

protected static Collection<Object[]> toParameterList(List<Configuration> testConfigs);

176

177

// Set environment variables for testing

178

public static void setEnv(Map<String, String> newenv);

179

```

180

181

**Usage Example:**

182

183

```java

184

// Create parameterized test configurations

185

Configuration config1 = new Configuration();

186

config1.setString("key1", "value1");

187

188

Configuration config2 = new Configuration();

189

config2.setString("key2", "value2");

190

191

@Parameterized.Parameters

192

public static Collection<Object[]> getConfigurations() {

193

return TestBaseUtils.toParameterList(config1, config2);

194

}

195

196

// Set test environment variables

197

Map<String, String> testEnv = new HashMap<>();

198

testEnv.put("FLINK_HOME", "/test/flink");

199

TestBaseUtils.setEnv(testEnv);

200

```

201

202

### Path and URL Utilities

203

204

```java { .api }

205

// Construct test resource paths

206

public static String constructTestPath(Class<?> forClass, String folder);

207

public static String constructTestURI(Class<?> forClass, String folder);

208

209

// Fetch content from HTTP endpoint

210

public static String getFromHTTP(String url) throws Exception;

211

```

212

213

**Usage Example:**

214

215

```java

216

// Get test data path relative to test class

217

String testDataPath = TestBaseUtils.constructTestPath(MyTest.class, "testdata");

218

String inputFile = testDataPath + "/input.txt";

219

220

// Construct test resource URI

221

String testDataURI = TestBaseUtils.constructTestURI(MyTest.class, "resources");

222

223

// Fetch test data from web endpoint (for integration tests)

224

String jsonData = TestBaseUtils.getFromHTTP("http://localhost:8080/test-data");

225

```

226

227

## Helper Classes

228

229

### TupleComparator

230

231

```java { .api }

232

public static class TupleComparator<T extends Tuple> implements Comparator<T> {

233

// Compares tuples field by field for consistent ordering

234

}

235

```

236

237

**Usage Example:**

238

239

```java

240

// Sort tuple results for consistent comparison

241

List<Tuple2<String, Integer>> results = wordCounts.collect();

242

results.sort(new TestBaseUtils.TupleComparator<>());

243

244

// Now compare with expected results

245

String expected = "apple,1\nbanana,2\ncherry,3";

246

TestBaseUtils.compareResultAsTuples(results, expected);

247

```

248

249

## Testing Utilities

250

251

### CheckedThread

252

253

Thread utility that propagates exceptions from background threads to the main test thread.

254

255

```java { .api }

256

/**

257

* Thread that captures exceptions during execution and makes them available to the calling thread

258

*/

259

public abstract class CheckedThread extends Thread {

260

public CheckedThread();

261

public CheckedThread(String name);

262

263

// Main work method - implement this instead of run()

264

public abstract void go() throws Exception;

265

266

// Join thread and re-throw any exceptions that occurred

267

public void sync() throws Exception;

268

}

269

```

270

271

**Usage Example:**

272

273

```java

274

@Test

275

public void testConcurrentOperations() throws Exception {

276

CheckedThread worker = new CheckedThread("test-worker") {

277

@Override

278

public void go() throws Exception {

279

// This code runs in background thread

280

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

281

DataSet<String> result = env.fromElements("test").map(String::toUpperCase);

282

List<String> output = result.collect();

283

284

// Any exceptions thrown here will be propagated to main thread

285

assertEquals("Unexpected result", "TEST", output.get(0));

286

}

287

};

288

289

worker.start();

290

291

// This will throw any exception that occurred in the background thread

292

worker.sync();

293

}

294

```

295

296

### Retry Mechanisms

297

298

JUnit rules and annotations for automatically retrying failed tests.

299

300

#### RetryRule

301

302

```java { .api }

303

/**

304

* JUnit rule that enables automatic test retries based on annotations

305

*/

306

public class RetryRule implements TestRule {

307

public RetryRule();

308

public Statement apply(Statement base, Description description);

309

}

310

```

311

312

#### RetryOnFailure

313

314

```java { .api }

315

/**

316

* Annotation to retry tests that fail for any reason

317

*/

318

public @interface RetryOnFailure {

319

int times(); // Number of retry attempts

320

}

321

```

322

323

#### RetryOnException

324

325

```java { .api }

326

/**

327

* Annotation to retry tests that fail with specific exception types

328

*/

329

public @interface RetryOnException {

330

int times(); // Number of retry attempts

331

Class<? extends Throwable> exception(); // Exception type to retry on

332

}

333

```

334

335

**Usage Example:**

336

337

```java

338

public class FlakeyTest {

339

@Rule

340

public RetryRule retryRule = new RetryRule();

341

342

@Test

343

@RetryOnFailure(times = 3)

344

public void testUnstableOperation() throws Exception {

345

// Test that might fail due to timing issues

346

// Will be retried up to 3 times if it fails

347

performUnstableOperation();

348

}

349

350

@Test

351

@RetryOnException(times = 2, exception = ConnectException.class)

352

public void testNetworkOperation() throws Exception {

353

// Test that might fail due to network connectivity

354

// Will be retried up to 2 times if ConnectException occurs

355

connectToExternalService();

356

}

357

}

358

```

359

360

### TestLogger

361

362

Base class that provides automatic logging for test execution.

363

364

```java { .api }

365

/**

366

* Base test class with automatic test lifecycle logging

367

*/

368

public class TestLogger {

369

protected final Logger log; // Logger instance for test class

370

371

@Rule

372

public TestRule watchman; // Automatic test logging rule

373

}

374

```

375

376

**Usage Example:**

377

378

```java

379

public class MyTest extends TestLogger {

380

@Test

381

public void testWithLogging() {

382

// Logging is automatically available

383

log.info("Starting test execution");

384

385

// Test logic here

386

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

387

// ...

388

389

log.info("Test completed successfully");

390

// Test start/end automatically logged by watchman rule

391

}

392

}

393

```

394

395

### Common Test Utilities

396

397

```java { .api }

398

/**

399

* Common testing utilities for environment setup and validation

400

*/

401

public class CommonTestUtils {

402

// Environment utilities

403

public static String getTempDir();

404

public static void setEnv(Map<String, String> newenv);

405

public static void setEnv(Map<String, String> newenv, boolean clearPrevious);

406

407

// Serialization testing

408

public static <T extends Serializable> T createCopySerializable(T original);

409

410

// File utilities

411

public static String createTempFile(String contents);

412

413

// Threading utilities

414

public static void blockForeverNonInterruptibly();

415

416

// Environment checks

417

public static void assumeJava8();

418

419

// Exception utilities

420

public static boolean containsCause(Throwable throwable, Class<? extends Throwable> causeType);

421

}

422

```

423

424

**Usage Example:**

425

426

```java

427

@Test

428

public void testSerializationRoundtrip() throws Exception {

429

MySerializableClass original = new MySerializableClass("test");

430

431

// Test that object survives serialization/deserialization

432

MySerializableClass copy = CommonTestUtils.createCopySerializable(original);

433

assertEquals("Data should survive serialization", original.getData(), copy.getData());

434

}

435

436

@Test

437

public void testWithTempEnvironment() throws Exception {

438

// Set up temporary environment variables

439

Map<String, String> testEnv = new HashMap<>();

440

testEnv.put("TEST_MODE", "true");

441

testEnv.put("LOG_LEVEL", "DEBUG");

442

443

CommonTestUtils.setEnv(testEnv);

444

445

// Run test with modified environment

446

// Environment will be restored after test

447

}

448

```

449

450

## Common Testing Patterns

451

452

### Complete Test Setup

453

454

```java

455

public class IntegrationTest {

456

private LocalFlinkMiniCluster cluster;

457

458

@Before

459

public void setup() throws Exception {

460

cluster = TestBaseUtils.startCluster(1, 4, false, false, true);

461

}

462

463

@After

464

public void teardown() throws Exception {

465

if (cluster != null) {

466

TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);

467

}

468

}

469

470

@Test

471

public void testJobExecution() throws Exception {

472

TestEnvironment testEnv = new TestEnvironment(cluster, 4, false);

473

testEnv.setAsContext();

474

475

try {

476

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

477

DataSet<String> result = env.fromElements("test").map(String::toUpperCase);

478

List<String> output = result.collect();

479

480

TestBaseUtils.compareResultAsText(output, "TEST");

481

} finally {

482

TestEnvironment.unsetAsContext();

483

}

484

}

485

}

486

```

487

488

### File-based Testing

489

490

```java

491

@Test

492

public void testFileProcessing() throws Exception {

493

// Write test input

494

String inputPath = createTempFile("input.txt", "line1\nline2\nline3");

495

String outputPath = getTempDirPath("output");

496

497

// Run job

498

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

499

env.readTextFile(inputPath)

500

.map(String::toUpperCase)

501

.writeAsText(outputPath);

502

env.execute();

503

504

// Validate results

505

String expected = "LINE1\nLINE2\nLINE3";

506

TestBaseUtils.compareResultsByLinesInMemory(expected, outputPath);

507

}

508

```