or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-testing.mdconnector-testing.mdcore-testing.mdindex.mdmigration-testing.mdtable-testing.mdtest-environments.md

core-testing.mddocs/

0

# Core Testing and Synchronization

1

2

Essential testing utilities including thread synchronization, test assertions, and JUnit integration. These utilities provide the foundation for reliable Flink unit tests with deterministic behavior and enhanced assertion capabilities.

3

4

## Capabilities

5

6

### Thread Synchronization

7

8

Core synchronization primitives for coordinating test execution across multiple threads.

9

10

#### OneShotLatch

11

12

A synchronization latch that can be triggered once and allows multiple threads to wait for that trigger event.

13

14

```java { .api }

15

/**

16

* One-time synchronization latch for test coordination

17

* Fires once only, then remains triggered

18

*/

19

class OneShotLatch {

20

/** Fire the latch, releasing all waiting threads */

21

void trigger();

22

23

/** Wait until the latch is triggered */

24

void await() throws InterruptedException;

25

26

/** Wait until triggered with timeout */

27

void await(long timeout, TimeUnit timeUnit) throws InterruptedException;

28

29

/** Wait until triggered, converting InterruptedException to RuntimeException */

30

void awaitQuietly();

31

32

/** Check if the latch has been triggered */

33

boolean isTriggered();

34

35

/** Reset the latch to untriggered state */

36

void reset();

37

}

38

```

39

40

**Usage Examples:**

41

42

```java

43

import org.apache.flink.core.testutils.OneShotLatch;

44

45

// Basic synchronization

46

OneShotLatch latch = new OneShotLatch();

47

48

// In test thread: wait for background task

49

Thread worker = new Thread(() -> {

50

// Do some work

51

latch.trigger(); // Signal completion

52

});

53

worker.start();

54

latch.await(); // Wait for worker to complete

55

56

// With timeout

57

if (!latch.await(5, TimeUnit.SECONDS)) {

58

fail("Worker did not complete within timeout");

59

}

60

```

61

62

#### MultiShotLatch

63

64

A reusable synchronization latch that automatically resets after each await() call.

65

66

```java { .api }

67

/**

68

* Reusable synchronization latch that resets after each await

69

* Useful for repeated coordination patterns

70

*/

71

class MultiShotLatch {

72

/** Fire the latch for the next waiting thread */

73

void trigger();

74

75

/** Wait until triggered, then automatically reset */

76

void await() throws InterruptedException;

77

78

/** Check if the latch is currently triggered */

79

boolean isTriggered();

80

}

81

```

82

83

**Usage Examples:**

84

85

```java

86

import org.apache.flink.core.testutils.MultiShotLatch;

87

88

MultiShotLatch latch = new MultiShotLatch();

89

90

// Producer-consumer pattern

91

Thread producer = new Thread(() -> {

92

for (int i = 0; i < 10; i++) {

93

// Produce item

94

latch.trigger(); // Signal item ready

95

}

96

});

97

98

Thread consumer = new Thread(() -> {

99

for (int i = 0; i < 10; i++) {

100

latch.await(); // Wait for next item

101

// Consume item

102

}

103

});

104

```

105

106

#### CheckedThread

107

108

Thread wrapper that captures exceptions from the thread execution and re-throws them when joining.

109

110

```java { .api }

111

/**

112

* Thread that catches exceptions and re-throws them on join

113

* Useful for testing error conditions in background threads

114

*/

115

abstract class CheckedThread extends Thread {

116

/** Override this method instead of run() - exceptions will be captured */

117

abstract void go() throws Exception;

118

119

/** Join the thread and re-throw any captured exceptions */

120

void sync() throws Exception;

121

122

/** Join with timeout and re-throw any captured exceptions */

123

void sync(long timeoutMillis) throws Exception;

124

}

125

```

126

127

**Usage Examples:**

128

129

```java

130

import org.apache.flink.core.testutils.CheckedThread;

131

132

CheckedThread testThread = new CheckedThread() {

133

@Override

134

void go() throws Exception {

135

// Test logic that might throw exceptions

136

if (someCondition) {

137

throw new RuntimeException("Test failure");

138

}

139

}

140

};

141

142

testThread.start();

143

testThread.sync(); // Will re-throw any exceptions from go()

144

```

145

146

### Test Utilities

147

148

General utility methods for common testing scenarios and operations.

149

150

#### CommonTestUtils

151

152

Collection of static utility methods for common test operations.

153

154

```java { .api }

155

/**

156

* General utility methods for unit tests

157

*/

158

class CommonTestUtils {

159

/** Create a serialized copy of an object for testing serialization */

160

static <T> T createCopySerializable(T original) throws IOException, ClassNotFoundException;

161

162

/** Create a temporary file with the specified contents */

163

static String createTempFile(String contents) throws IOException;

164

165

/** Block the current thread permanently (for testing interruption) */

166

static void blockForeverNonInterruptibly();

167

168

/** Set environment variables for testing */

169

static void setEnv(Map<String, String> newenv);

170

171

/** Check if exception chain contains a specific cause type */

172

static boolean containsCause(Throwable throwable, Class<? extends Throwable> cause);

173

174

/** Wait until a condition becomes true or timeout expires */

175

static void waitUtil(Supplier<Boolean> condition, Duration timeout, String errorMsg)

176

throws Exception;

177

}

178

```

179

180

**Usage Examples:**

181

182

```java

183

import org.apache.flink.core.testutils.CommonTestUtils;

184

185

// Test serialization

186

MyObject original = new MyObject();

187

MyObject copy = CommonTestUtils.createCopySerializable(original);

188

assertEquals(original, copy);

189

190

// Wait for condition

191

CommonTestUtils.waitUtil(

192

() -> service.isReady(),

193

Duration.ofSeconds(10),

194

"Service did not become ready"

195

);

196

197

// Check exception chain

198

try {

199

riskyOperation();

200

} catch (Exception e) {

201

assertTrue(CommonTestUtils.containsCause(e, IOException.class));

202

}

203

```

204

205

### Enhanced Assertions

206

207

Flink-specific assertion utilities that extend AssertJ with specialized testing capabilities.

208

209

#### FlinkAssertions

210

211

Static factory methods for creating Flink-specific assertions.

212

213

```java { .api }

214

/**

215

* Enhanced AssertJ assertions for Flink testing

216

*/

217

class FlinkAssertions extends Assertions {

218

/** Create enhanced assertions for CompletableFuture */

219

static <T> FlinkCompletableFutureAssert<T> assertThatFuture(CompletableFuture<T> actual);

220

221

/** Create assertions for exception cause chains */

222

static ListAssert<Throwable> assertThatChainOfCauses(Throwable root);

223

224

/** Extract chain of causes from exception */

225

static Stream<Throwable> chainOfCauses(Throwable throwable);

226

}

227

```

228

229

#### FlinkCompletableFutureAssert

230

231

Enhanced CompletableFuture assertions that don't rely on timeouts.

232

233

```java { .api }

234

/**

235

* Enhanced CompletableFuture assertions without timeout dependencies

236

*/

237

class FlinkCompletableFutureAssert<T>

238

extends AbstractCompletableFutureAssert<FlinkCompletableFutureAssert<T>, CompletableFuture<T>, T> {

239

240

/** Assert that the future eventually succeeds */

241

FlinkCompletableFutureAssert<T> eventuallySucceeds();

242

243

/** Assert that the future eventually fails */

244

FlinkCompletableFutureAssert<T> eventuallyFails();

245

246

/** Assert that the future completes with a specific value */

247

FlinkCompletableFutureAssert<T> eventuallySucceeds(T expectedValue);

248

}

249

```

250

251

**Usage Examples:**

252

253

```java

254

import org.apache.flink.core.testutils.FlinkAssertions;

255

256

// Future assertions

257

CompletableFuture<String> result = asyncOperation();

258

FlinkAssertions.assertThatFuture(result)

259

.eventuallySucceeds()

260

.isEqualTo("expected result");

261

262

// Exception chain assertions

263

try {

264

complexOperation();

265

} catch (Exception e) {

266

FlinkAssertions.assertThatChainOfCauses(e)

267

.hasSize(3)

268

.extracting(Throwable::getClass)

269

.contains(IOException.class, RuntimeException.class);

270

}

271

```

272

273

### JUnit Integration

274

275

Specialized JUnit rules and extensions for Flink testing scenarios.

276

277

#### RetryRule and Annotations

278

279

JUnit rule for automatically retrying failed tests with configurable conditions.

280

281

```java { .api }

282

/**

283

* JUnit rule to retry failed tests

284

* Use with @RetryOnFailure or @RetryOnException annotations

285

*/

286

class RetryRule implements TestRule {

287

TestRule apply(Statement base, Description description);

288

}

289

290

/**

291

* Retry test on any failure

292

*/

293

@interface RetryOnFailure {

294

/** Number of retry attempts */

295

int times() default 3;

296

}

297

298

/**

299

* Retry test on specific exception types

300

*/

301

@interface RetryOnException {

302

/** Number of retry attempts */

303

int times() default 3;

304

305

/** Exception type to retry on */

306

Class<? extends Throwable> exception();

307

}

308

```

309

310

**Usage Examples:**

311

312

```java

313

import org.apache.flink.testutils.junit.RetryRule;

314

import org.apache.flink.testutils.junit.RetryOnFailure;

315

import org.apache.flink.testutils.junit.RetryOnException;

316

317

public class FlinkRetryTest {

318

@Rule

319

public RetryRule retryRule = new RetryRule();

320

321

@Test

322

@RetryOnFailure(times = 5)

323

public void testWithRetryOnAnyFailure() {

324

// Test that might fail intermittently

325

if (Math.random() < 0.5) {

326

fail("Random failure");

327

}

328

}

329

330

@Test

331

@RetryOnException(times = 3, exception = IOException.class)

332

public void testWithRetryOnSpecificException() throws IOException {

333

// Test that might throw IOException

334

if (networkUnavailable()) {

335

throw new IOException("Network error");

336

}

337

}

338

}

339

```

340

341

#### SharedObjects

342

343

JUnit rule for sharing objects across test methods within a test class.

344

345

```java { .api }

346

/**

347

* Share objects across test methods in the same test class

348

*/

349

class SharedObjects extends ExternalResource {

350

/** Add an object to be shared across test methods */

351

<T> SharedReference<T> add(T object);

352

}

353

354

/**

355

* Reference to a shared object

356

*/

357

interface SharedReference<T> {

358

T get();

359

}

360

```

361

362

**Usage Examples:**

363

364

```java

365

import org.apache.flink.testutils.junit.SharedObjects;

366

367

public class SharedObjectTest {

368

@Rule

369

public SharedObjects sharedObjects = new SharedObjects();

370

371

private SharedReference<ExpensiveResource> resource;

372

373

@Before

374

public void setup() {

375

if (resource == null) {

376

resource = sharedObjects.add(new ExpensiveResource());

377

}

378

}

379

380

@Test

381

public void test1() {

382

ExpensiveResource r = resource.get();

383

// Use shared resource

384

}

385

386

@Test

387

public void test2() {

388

ExpensiveResource r = resource.get(); // Same instance as test1

389

// Use shared resource

390

}

391

}

392

```

393

394

### Executor Testing

395

396

Utilities for testing with controlled thread execution and scheduling.

397

398

#### ManuallyTriggeredScheduledExecutorService

399

400

Executor service that requires manual triggering for deterministic testing.

401

402

```java { .api }

403

/**

404

* Manually controlled executor for deterministic testing

405

* Tasks are queued but not executed until manually triggered

406

*/

407

class ManuallyTriggeredScheduledExecutorService implements ScheduledExecutorService {

408

/** Trigger execution of all queued tasks */

409

void triggerAll();

410

411

/** Trigger execution of the next queued task */

412

void triggerNext();

413

414

/** Get number of queued tasks */

415

int getQueueSize();

416

417

/** Check if any tasks are queued */

418

boolean hasQueuedTasks();

419

}

420

```

421

422

**Usage Examples:**

423

424

```java

425

import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;

426

427

ManuallyTriggeredScheduledExecutorService executor =

428

new ManuallyTriggeredScheduledExecutorService();

429

430

// Schedule tasks

431

executor.schedule(() -> System.out.println("Task 1"), 1, TimeUnit.SECONDS);

432

executor.schedule(() -> System.out.println("Task 2"), 2, TimeUnit.SECONDS);

433

434

// Tasks are queued but not executed yet

435

assertEquals(2, executor.getQueueSize());

436

437

// Manually trigger execution

438

executor.triggerNext(); // Executes "Task 1"

439

executor.triggerAll(); // Executes remaining tasks

440

```

441

442

### Test Marker Annotations

443

444

Annotations for marking tests that have known compatibility issues with specific environments.

445

446

```java { .api }

447

/** Mark tests that fail on Java 11 */

448

@interface FailsOnJava11 {}

449

450

/** Mark tests that fail on Java 17 */

451

@interface FailsOnJava17 {}

452

453

/** Mark tests that fail with adaptive scheduler */

454

@interface FailsWithAdaptiveScheduler {}

455

```

456

457

**Usage Examples:**

458

459

```java

460

import org.apache.flink.testutils.junit.FailsOnJava11;

461

import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;

462

463

public class CompatibilityTest {

464

@Test

465

@FailsOnJava11

466

public void testThatFailsOnJava11() {

467

// Test implementation

468

}

469

470

@Test

471

@FailsWithAdaptiveScheduler

472

public void testThatFailsWithAdaptiveScheduler() {

473

// Test implementation

474

}

475

}