or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-testing.mdconnector-testing.mdcore-testing.mdindex.mdmigration-testing.mdtable-testing.mdtest-environments.md

index.mddocs/

0

# Apache Flink Test Utils Parent

1

2

Apache Flink Test Utils Parent is a comprehensive collection of testing utilities for Apache Flink stream processing framework applications. This multi-module Maven project provides everything needed to test Flink applications effectively, from basic unit testing with synchronization utilities to complex connector testing, migration testing, and specialized testing for different Flink components.

3

4

## Package Information

5

6

- **Package Name**: flink-test-utils-parent

7

- **Package Type**: Maven (multi-module parent)

8

- **Language**: Java

9

- **Installation**:

10

```xml

11

<!-- Core testing utilities -->

12

<dependency>

13

<groupId>org.apache.flink</groupId>

14

<artifactId>flink-test-utils</artifactId>

15

<version>2.1.0</version>

16

<scope>test</scope>

17

</dependency>

18

19

<!-- JUnit integration -->

20

<dependency>

21

<groupId>org.apache.flink</groupId>

22

<artifactId>flink-test-utils-junit</artifactId>

23

<version>2.1.0</version>

24

<scope>test</scope>

25

</dependency>

26

27

<!-- Connector testing framework -->

28

<dependency>

29

<groupId>org.apache.flink</groupId>

30

<artifactId>flink-connector-test-utils</artifactId>

31

<version>2.1.0</version>

32

<scope>test</scope>

33

</dependency>

34

35

<!-- Migration testing -->

36

<dependency>

37

<groupId>org.apache.flink</groupId>

38

<artifactId>flink-migration-test-utils</artifactId>

39

<version>2.1.0</version>

40

<scope>test</scope>

41

</dependency>

42

43

<!-- Client testing -->

44

<dependency>

45

<groupId>org.apache.flink</groupId>

46

<artifactId>flink-clients-test-utils</artifactId>

47

<version>2.1.0</version>

48

<scope>test</scope>

49

</dependency>

50

51

<!-- Table filesystem testing -->

52

<dependency>

53

<groupId>org.apache.flink</groupId>

54

<artifactId>flink-table-filesystem-test-utils</artifactId>

55

<version>2.1.0</version>

56

<scope>test</scope>

57

</dependency>

58

```

59

60

## Core Imports

61

62

```java

63

// Core test synchronization (flink-test-utils-junit)

64

import org.apache.flink.core.testutils.OneShotLatch;

65

import org.apache.flink.core.testutils.CheckedThread;

66

import org.apache.flink.test.junit5.MiniClusterExtension;

67

68

// Test utilities (flink-test-utils)

69

import org.apache.flink.streaming.util.TestStreamEnvironment;

70

import org.apache.flink.streaming.util.FiniteTestSource;

71

import org.apache.flink.streaming.util.TestListResultSink;

72

import org.apache.flink.util.MetricListener;

73

import org.apache.flink.util.MetricAssertions;

74

75

// JUnit 5 integration (flink-test-utils-junit)

76

import org.apache.flink.testutils.junit.extensions.TestLoggerExtension;

77

import org.apache.flink.testutils.junit.extensions.RetryExtension;

78

import org.apache.flink.core.testutils.FlinkAssertions;

79

import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;

80

81

// Connector testing framework (flink-connector-test-utils)

82

import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension;

83

import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;

84

import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;

85

import org.apache.flink.connector.testframe.environment.TestEnvironment;

86

import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;

87

88

// Migration testing (flink-migration-test-utils)

89

import org.apache.flink.test.migration.MigrationTest;

90

import org.apache.flink.test.migration.PublishedVersionUtils;

91

92

// Client testing (flink-clients-test-utils)

93

import org.apache.flink.client.testjar.TestUserClassLoaderJob;

94

95

// Table filesystem testing (flink-table-filesystem-test-utils)

96

import org.apache.flink.table.file.testutils.TestFileSystemTableFactory;

97

import org.apache.flink.table.file.testutils.catalog.TestFileSystemCatalog;

98

```

99

100

## Basic Usage

101

102

```java

103

import org.apache.flink.core.testutils.OneShotLatch;

104

import org.apache.flink.streaming.util.TestStreamEnvironment;

105

import org.apache.flink.streaming.util.FiniteTestSource;

106

import org.apache.flink.streaming.util.TestListResultSink;

107

import org.apache.flink.test.junit5.MiniClusterExtension;

108

import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;

109

import org.junit.jupiter.api.Test;

110

import org.junit.jupiter.api.extension.RegisterExtension;

111

112

public class FlinkTestExample {

113

@RegisterExtension

114

static final MiniClusterExtension MINI_CLUSTER = new MiniClusterExtension(

115

new MiniClusterResourceConfiguration.Builder()

116

.setNumberTaskManagers(1)

117

.setNumberSlotsPerTaskManager(4)

118

.build());

119

120

@Test

121

public void testWithSynchronization() throws InterruptedException {

122

OneShotLatch latch = new OneShotLatch();

123

124

// Start background task

125

Thread worker = new Thread(() -> {

126

// Do some work

127

latch.trigger(); // Signal completion

128

});

129

worker.start();

130

131

// Wait for completion

132

latch.await();

133

worker.join();

134

}

135

136

@Test

137

public void testStreamingPipeline() throws Exception {

138

// Set up test environment

139

TestStreamEnvironment.setAsContext(MINI_CLUSTER.getMiniCluster(), 1);

140

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

141

142

// Create test source with finite data

143

FiniteTestSource<String> source = new FiniteTestSource<>("hello", "world", "test");

144

DataStream<String> stream = env.addSource(source);

145

146

// Create test sink to collect results

147

TestListResultSink<String> sink = new TestListResultSink<>();

148

stream.addSink(sink);

149

150

// Execute and verify results

151

env.execute("Test Job");

152

List<String> results = sink.getResult();

153

assertEquals(3, results.size());

154

assertTrue(results.contains("hello"));

155

}

156

}

157

```

158

159

## Architecture

160

161

The Flink Test Utils Parent is organized into six specialized modules, each targeting specific testing needs:

162

163

- **Core Testing Foundation**: Basic synchronization primitives, assertion utilities, and JUnit extensions

164

- **Test Environment Management**: Stream environments, test data sources, and execution contexts

165

- **Connector Testing Framework**: Comprehensive testing infrastructure for Flink connectors

166

- **Migration & Compatibility**: Tools for testing state migration and version compatibility

167

- **Specialized Testing**: Table API filesystem testing and client application testing

168

- **Test Data & Utilities**: Pre-built test datasets and utility functions

169

170

## Capabilities

171

172

### Core Testing and Synchronization

173

174

Essential testing utilities including thread synchronization, test assertions, and JUnit integration. Provides the foundation for reliable Flink unit tests.

175

176

```java { .api }

177

// Thread synchronization (flink-test-utils-junit)

178

class OneShotLatch {

179

void trigger();

180

void await() throws InterruptedException;

181

boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException;

182

void awaitQuietly();

183

boolean isTriggered();

184

void reset();

185

}

186

187

abstract class CheckedThread extends Thread {

188

abstract void go() throws Exception;

189

void sync() throws Exception;

190

void sync(long timeoutMillis) throws Exception;

191

}

192

193

// Enhanced assertions (flink-test-utils-junit)

194

class FlinkAssertions {

195

static <T> FlinkCompletableFutureAssert<T> assertThatFuture(CompletableFuture<T> actual);

196

static Stream<Throwable> chainOfCauses(Throwable throwable);

197

}

198

199

class FlinkCompletableFutureAssert<T> extends AbstractCompletableFutureAssert<FlinkCompletableFutureAssert<T>, CompletableFuture<T>, T> {

200

FlinkCompletableFutureAssert<T> eventuallySucceeds();

201

FlinkCompletableFutureAssert<T> eventuallyFails();

202

FlinkCompletableFutureAssert<T> eventuallySucceeds(T expectedValue);

203

}

204

205

// Manual executor (flink-test-utils-junit)

206

class ManuallyTriggeredScheduledExecutorService implements ScheduledExecutorService {

207

void triggerAll();

208

void triggerScheduledTasks();

209

void triggerPeriodicScheduledTasks();

210

Collection<ScheduledTask<?>> getScheduledTasks();

211

int getNumQueuedRunnables();

212

}

213

```

214

215

[Core Testing and Synchronization](./core-testing.md)

216

217

### Test Environments and Data Sources

218

219

Test execution environments, data sources, and utilities for creating controlled testing scenarios in Flink applications.

220

221

```java { .api }

222

// Test environments (flink-test-utils)

223

class TestStreamEnvironment extends StreamExecutionEnvironment {

224

TestStreamEnvironment(MiniCluster miniCluster, int parallelism);

225

TestStreamEnvironment(MiniCluster miniCluster, Configuration config, int parallelism,

226

Collection<Path> jarFiles, Collection<URL> classPaths);

227

static void setAsContext(MiniCluster miniCluster, int parallelism);

228

static void setAsContext(MiniCluster miniCluster, int parallelism,

229

Collection<Path> jarFiles, Collection<URL> classpaths);

230

static void unsetAsContext();

231

JobExecutionResult getLastJobExecutionResult();

232

}

233

234

// MiniCluster extension (flink-test-utils)

235

class MiniClusterExtension implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback,

236

AfterEachCallback, ParameterResolver {

237

MiniClusterExtension(MiniClusterResourceConfiguration configuration);

238

ClusterClient<?> getClusterClient();

239

URI getRestAddress();

240

MiniCluster getMiniCluster();

241

int getNumberSlots();

242

}

243

244

// Test data sources (flink-test-utils)

245

class FiniteTestSource<T> implements SourceFunction<T>, CheckpointListener {

246

FiniteTestSource(T... elements);

247

FiniteTestSource(Iterable<T> elements);

248

FiniteTestSource(BooleanSupplier couldExit, Iterable<T> elements);

249

void run(SourceContext<T> ctx) throws Exception;

250

void cancel();

251

void notifyCheckpointComplete(long checkpointId) throws Exception;

252

}

253

254

// Test sinks (flink-test-utils)

255

class TestListResultSink<T> extends RichSinkFunction<T> {

256

TestListResultSink();

257

void invoke(T value) throws Exception;

258

List<T> getResult();

259

List<T> getSortedResult();

260

}

261

```

262

263

[Test Environments and Data Sources](./test-environments.md)

264

265

### Connector Testing Framework

266

267

Comprehensive testing framework for Flink connectors with support for external systems, multiple test environments, and automated test suites.

268

269

```java { .api }

270

// Test framework extension (flink-connector-test-utils)

271

@ExtendWith(ConnectorTestingExtension.class)

272

class ConnectorTestingExtension implements TestTemplateInvocationContextProvider {

273

boolean supportsTestTemplate(ExtensionContext context);

274

Stream<TestTemplateInvocationContext> provideTestTemplateInvocationContexts(ExtensionContext context);

275

}

276

277

// Test environment interfaces (flink-connector-test-utils)

278

interface TestEnvironment extends TestResource {

279

JobExecutionResult executeJob(JobGraph job) throws Exception;

280

ClusterClient<?> getClusterClient();

281

String getRestAddress();

282

String getWebUIUrl();

283

}

284

285

interface TestResource extends AutoCloseable {

286

void startUp() throws Exception;

287

void tearDown() throws Exception;

288

}

289

290

// Test environment implementations (flink-connector-test-utils)

291

class MiniClusterTestEnvironment implements TestEnvironment, ClusterControllable {

292

MiniClusterTestEnvironment();

293

MiniClusterTestEnvironment(MiniClusterConfiguration config);

294

static Builder builder();

295

296

static class Builder {

297

Builder setParallelism(int parallelism);

298

Builder setCheckpointingEnabled(boolean enabled);

299

Builder setCheckpointInterval(Duration interval);

300

MiniClusterTestEnvironment build();

301

}

302

}

303

304

// Test suite base classes (flink-connector-test-utils)

305

abstract class SourceTestSuiteBase<T, SplitT extends SourceSplit> {

306

@TestTemplate void testSourceReading(TestEnvironment testEnv, ExternalContext<DataStreamSourceExternalContext<T>> externalContext);

307

@TestTemplate void testTaskManagerFailover(TestEnvironment testEnv, ExternalContext<DataStreamSourceExternalContext<T>> externalContext);

308

@TestTemplate void testJobManagerFailover(TestEnvironment testEnv, ExternalContext<DataStreamSourceExternalContext<T>> externalContext);

309

protected abstract ExternalContext<DataStreamSourceExternalContext<T>> sourceExternalContext();

310

protected abstract Source<T, SplitT, ?> source();

311

}

312

313

abstract class SinkTestSuiteBase<T> {

314

@TestTemplate void testSinkWriteWithSingleSubtask(TestEnvironment testEnv, ExternalContext<DataStreamSinkV2<T>> externalContext);

315

@TestTemplate void testSinkWriteWithMultipleSubtasks(TestEnvironment testEnv, ExternalContext<DataStreamSinkV2<T>> externalContext);

316

@TestTemplate void testScaleUpSinkWriter(TestEnvironment testEnv, ExternalContext<DataStreamSinkV2<T>> externalContext);

317

@TestTemplate void testScaleDownSinkWriter(TestEnvironment testEnv, ExternalContext<DataStreamSinkV2<T>> externalContext);

318

protected abstract ExternalContext<DataStreamSinkV2<T>> sinkExternalContext();

319

protected abstract List<T> generateTestData(TestingSinkSettings sinkSettings, ExternalContext<DataStreamSinkV2<T>> externalContext);

320

}

321

```

322

323

[Connector Testing Framework](./connector-testing.md)

324

325

### Migration and Compatibility Testing

326

327

Utilities for testing state migration between Flink versions and ensuring compatibility across version upgrades.

328

329

```java { .api }

330

// Migration testing (flink-migration-test-utils)

331

interface MigrationTest {

332

static FlinkVersion getMostRecentlyPublishedVersion();

333

334

@SnapshotsGenerator

335

@interface SnapshotsGenerator {}

336

337

@ParameterizedSnapshotsGenerator

338

@interface ParameterizedSnapshotsGenerator {

339

String value();

340

}

341

}

342

343

// Version utilities (flink-migration-test-utils)

344

class PublishedVersionUtils {

345

static FlinkVersion getMostRecentlyPublishedVersion();

346

static List<FlinkVersion> getPublishedVersions();

347

}

348

349

class SnapshotGeneratorUtils {

350

static void generateSnapshots(Class<?> testClass, FlinkVersion flinkVersion, String targetDir) throws Exception;

351

}

352

```

353

354

[Migration and Compatibility Testing](./migration-testing.md)

355

356

### Table API and Filesystem Testing

357

358

Specialized testing utilities for Flink Table API applications and filesystem-based connectors.

359

360

```java { .api }

361

// Filesystem table testing (flink-table-filesystem-test-utils)

362

class TestFileSystemTableFactory extends FileSystemTableFactory {

363

static final String IDENTIFIER = "test-filesystem";

364

String factoryIdentifier();

365

DynamicTableSource createDynamicTableSource(Context context);

366

DynamicTableSink createDynamicTableSink(Context context);

367

}

368

369

class TestFileSystemCatalog extends GenericInMemoryCatalog {

370

TestFileSystemCatalog(String catalogName, String defaultDatabaseName, String basePath);

371

static boolean isFileSystemTable(Map<String, String> options);

372

void createDatabase(String databaseName, CatalogDatabase database, boolean ignoreIfExists) throws Exception;

373

void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws Exception;

374

}

375

376

class TestFileSystemCatalogFactory implements CatalogFactory {

377

static final String IDENTIFIER = "test-filesystem-catalog";

378

String factoryIdentifier();

379

Catalog createCatalog(Context context);

380

}

381

```

382

383

[Table API and Filesystem Testing](./table-testing.md)

384

385

### Client Application Testing

386

387

Testing utilities specifically designed for Flink client applications, including classloader testing and job submission scenarios.

388

389

```java { .api }

390

// Client testing utilities (flink-clients-test-utils)

391

class TestUserClassLoaderJob {

392

static void main(String[] args) throws Exception;

393

static void executeWithCustomClassLoader(ClassLoader classLoader) throws Exception;

394

static boolean verifyClassLoading(String className, ClassLoader expectedLoader);

395

static JobExecutionResult getLastExecutionResult();

396

}

397

398

class TestUserClassLoaderAdditionalArtifact {

399

static void loadArtifact(String artifactPath, ClassLoader classLoader) throws Exception;

400

static boolean isArtifactAvailable(String artifactName, ClassLoader classLoader);

401

static ArtifactMetadata getArtifactMetadata(String artifactPath);

402

static List<String> resolveDependencies(String artifactPath);

403

}

404

405

class TestUserClassLoaderJobLib {

406

static void someLibMethod();

407

}

408

```

409

410

[Client Application Testing](./client-testing.md)

411

412

## Types

413

414

```java { .api }

415

// Core synchronization types (flink-test-utils-junit)

416

class OneShotLatch {

417

void trigger();

418

void await() throws InterruptedException;

419

boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException;

420

void awaitQuietly();

421

boolean isTriggered();

422

void reset();

423

}

424

425

abstract class CheckedThread extends Thread {

426

CheckedThread(String name);

427

abstract void go() throws Exception;

428

void sync() throws Exception;

429

Throwable getError();

430

}

431

432

// Test assertion types (flink-test-utils-junit)

433

class FlinkCompletableFutureAssert<T> extends AbstractCompletableFutureAssert<FlinkCompletableFutureAssert<T>, CompletableFuture<T>, T> {

434

FlinkCompletableFutureAssert<T> eventuallySucceeds();

435

FlinkCompletableFutureAssert<T> eventuallyFails();

436

FlinkCompletableFutureAssert<T> eventuallySucceeds(T expectedValue);

437

}

438

439

// Test environment interfaces (flink-connector-test-utils)

440

interface TestEnvironment extends TestResource {

441

JobExecutionResult executeJob(JobGraph job) throws Exception;

442

ClusterClient<?> getClusterClient();

443

String getRestAddress();

444

String getWebUIUrl();

445

}

446

447

interface TestResource extends AutoCloseable {

448

void startUp() throws Exception;

449

void tearDown() throws Exception;

450

void close() throws Exception;

451

}

452

453

interface ExternalContext<T> extends AutoCloseable {

454

void setUp() throws Exception;

455

void tearDown() throws Exception;

456

Properties getConnectionProperties();

457

String generateTestId();

458

}

459

460

// Test data interfaces (flink-connector-test-utils)

461

interface ExternalSystemDataReader<T> extends AutoCloseable {

462

List<T> readData() throws Exception;

463

List<T> readData(Duration timeout) throws Exception;

464

List<T> readData(Predicate<T> filter) throws Exception;

465

void close() throws Exception;

466

}

467

468

interface ExternalSystemSplitDataWriter<T> extends AutoCloseable {

469

void writeSplit(List<T> data, int splitIndex) throws Exception;

470

void writeAndFinalize(List<List<T>> splits) throws Exception;

471

int getMaxParallelism();

472

void close() throws Exception;

473

}

474

475

// Test source/sink types (flink-test-utils)

476

class FiniteTestSource<T> implements SourceFunction<T>, CheckpointListener {

477

FiniteTestSource(T... elements);

478

FiniteTestSource(Iterable<T> elements);

479

FiniteTestSource(BooleanSupplier couldExit, Iterable<T> elements);

480

FiniteTestSource(BooleanSupplier couldExit, long waitTimeOut, Iterable<T> elements);

481

void run(SourceContext<T> ctx) throws Exception;

482

void cancel();

483

void notifyCheckpointComplete(long checkpointId) throws Exception;

484

void notifyCheckpointAborted(long checkpointId);

485

}

486

487

class TestListResultSink<T> extends RichSinkFunction<T> {

488

TestListResultSink();

489

void invoke(T value) throws Exception;

490

List<T> getResult();

491

List<T> getSortedResult();

492

}

493

494

// Metric testing types (flink-test-utils)

495

class MetricListener {

496

MetricListener();

497

MetricGroup getMetricGroup();

498

<T extends Metric> Optional<T> getMetric(Class<T> metricType, String... identifier);

499

Optional<Meter> getMeter(String... identifier);

500

Optional<Counter> getCounter(String... identifier);

501

Optional<Histogram> getHistogram(String... identifier);

502

<T> Optional<Gauge<T>> getGauge(String... identifier);

503

}

504

505

// Migration testing types (flink-migration-test-utils)

506

interface MigrationTest {

507

static FlinkVersion getMostRecentlyPublishedVersion();

508

}

509

510

class FlinkVersion implements Comparable<FlinkVersion> {

511

int getMajor();

512

int getMinor();

513

int getPatch();

514

String toString();

515

boolean isNewerThan(FlinkVersion other);

516

int compareTo(FlinkVersion other);

517

}

518

519

// Client testing types (flink-clients-test-utils)

520

class ArtifactMetadata {

521

String getName();

522

String getVersion();

523

List<String> getDependencies();

524

Map<String, String> getManifestAttributes();

525

}

526

```