or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-testing.mdconnector-testing.mdcore-testing.mdindex.mdmigration-testing.mdtable-testing.mdtest-environments.md

connector-testing.mddocs/

0

# Connector Testing Framework

1

2

Comprehensive testing framework for Flink connectors with support for external systems, multiple test environments, and automated test suites. This framework enables thorough testing of both source and sink connectors with real external systems and controlled test environments.

3

4

## Capabilities

5

6

### Test Framework Core

7

8

#### ConnectorTestingExtension

9

10

JUnit 5 extension that provides comprehensive testing infrastructure for Flink connectors.

11

12

```java { .api }

13

/**

14

* JUnit 5 extension for connector testing

15

* Manages test lifecycle, external systems, and test environments

16

*/

17

@ExtendWith(ConnectorTestingExtension.class)

18

class ConnectorTestingExtension implements BeforeAllCallback, AfterAllCallback {

19

void beforeAll(ExtensionContext context) throws Exception;

20

void afterAll(ExtensionContext context) throws Exception;

21

}

22

```

23

24

**Usage Examples:**

25

26

```java

27

import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension;

28

import org.junit.jupiter.api.extension.ExtendWith;

29

30

@ExtendWith(ConnectorTestingExtension.class)

31

public class MyConnectorTest {

32

// Test methods will have access to managed test infrastructure

33

34

@Test

35

public void testSourceConnector() {

36

// Test implementation

37

}

38

}

39

```

40

41

### Test Environment Management

42

43

Core interfaces and implementations for managing test execution environments.

44

45

#### TestEnvironment

46

47

Base interface for test execution environments that can run Flink jobs.

48

49

```java { .api }

50

/**

51

* Test execution environment interface

52

* Provides abstraction over different Flink execution environments

53

*/

54

interface TestEnvironment extends TestResource {

55

/** Submit and execute a Flink job */

56

JobExecutionResult executeJob(JobGraph job) throws Exception;

57

58

/** Get cluster information */

59

ClusterClient<?> getClusterClient();

60

61

/** Get job manager REST address */

62

String getRestAddress();

63

64

/** Get web UI URL */

65

String getWebUIUrl();

66

}

67

68

/**

69

* Base interface for test resources with lifecycle management

70

*/

71

interface TestResource extends AutoCloseable {

72

/** Start/initialize the resource */

73

void startUp() throws Exception;

74

75

/** Stop/cleanup the resource */

76

void tearDown() throws Exception;

77

78

void close() throws Exception;

79

}

80

```

81

82

#### MiniClusterTestEnvironment

83

84

Test environment based on Flink's embedded MiniCluster for fast, lightweight testing.

85

86

```java { .api }

87

/**

88

* MiniCluster-based test environment

89

* Provides fast in-memory Flink execution for testing

90

*/

91

class MiniClusterTestEnvironment implements TestEnvironment, ClusterControllable {

92

/** Create with default configuration */

93

MiniClusterTestEnvironment();

94

95

/** Create with custom configuration */

96

MiniClusterTestEnvironment(MiniClusterConfiguration config);

97

98

/** Create builder for configuration */

99

static Builder builder();

100

101

JobExecutionResult executeJob(JobGraph job) throws Exception;

102

void triggerCheckpoint(long checkpointId) throws Exception;

103

void cancelJob(JobID jobId) throws Exception;

104

105

static class Builder {

106

Builder setParallelism(int parallelism);

107

Builder setCheckpointingEnabled(boolean enabled);

108

Builder setCheckpointInterval(Duration interval);

109

MiniClusterTestEnvironment build();

110

}

111

}

112

```

113

114

#### FlinkContainerTestEnvironment

115

116

Test environment using Docker containers for testing with a real Flink cluster.

117

118

```java { .api }

119

/**

120

* Docker container-based test environment

121

* Provides realistic Flink cluster environment for integration testing

122

*/

123

class FlinkContainerTestEnvironment implements TestEnvironment, ClusterControllable {

124

/** Create with default Flink image */

125

FlinkContainerTestEnvironment();

126

127

/** Create with custom Flink image */

128

FlinkContainerTestEnvironment(String flinkImageName);

129

130

/** Create builder for configuration */

131

static Builder builder();

132

133

JobExecutionResult executeJob(JobGraph job) throws Exception;

134

void restartTaskManager(int taskManagerIndex) throws Exception;

135

void stopTaskManager(int taskManagerIndex) throws Exception;

136

137

static class Builder {

138

Builder withFlinkImage(String imageName);

139

Builder withTaskManagers(int count);

140

Builder withTaskSlots(int slots);

141

Builder withJobManagerMemory(String memory);

142

Builder withTaskManagerMemory(String memory);

143

FlinkContainerTestEnvironment build();

144

}

145

}

146

```

147

148

**Usage Examples:**

149

150

```java

151

import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;

152

import org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment;

153

154

// MiniCluster environment for fast tests

155

MiniClusterTestEnvironment miniEnv = MiniClusterTestEnvironment.builder()

156

.setParallelism(4)

157

.setCheckpointingEnabled(true)

158

.setCheckpointInterval(Duration.ofSeconds(1))

159

.build();

160

161

// Container environment for integration tests

162

FlinkContainerTestEnvironment containerEnv = FlinkContainerTestEnvironment.builder()

163

.withFlinkImage("flink:1.17")

164

.withTaskManagers(2)

165

.withTaskSlots(4)

166

.withJobManagerMemory("1g")

167

.withTaskManagerMemory("2g")

168

.build();

169

```

170

171

### External System Testing

172

173

Framework for testing connectors with real external systems like databases, message queues, etc.

174

175

#### ExternalContext

176

177

Base interface for managing external system lifecycle and interaction during tests.

178

179

```java { .api }

180

/**

181

* Context for external system interaction

182

* Manages lifecycle and provides access to external systems

183

*/

184

interface ExternalContext extends AutoCloseable {

185

/** Initialize external system for testing */

186

void setUp() throws Exception;

187

188

/** Clean up external system after testing */

189

void tearDown() throws Exception;

190

191

/** Get connection information for Flink connectors */

192

Properties getConnectionProperties();

193

194

/** Generate unique identifier for this test run */

195

String generateTestId();

196

}

197

198

/**

199

* Factory for creating external contexts

200

*/

201

interface ExternalContextFactory<C extends ExternalContext> {

202

/** Create external context for testing */

203

C createExternalContext(String testName);

204

205

/** Get display name for this external system */

206

String getDisplayName();

207

}

208

```

209

210

#### Data Reading and Writing

211

212

Interfaces for reading and writing test data to external systems.

213

214

```java { .api }

215

/**

216

* Read data from external systems for verification

217

*/

218

interface ExternalSystemDataReader<T> extends AutoCloseable {

219

/** Read all data from external system */

220

List<T> readData() throws Exception;

221

222

/** Read data with timeout */

223

List<T> readData(Duration timeout) throws Exception;

224

225

/** Read data matching criteria */

226

List<T> readData(Predicate<T> filter) throws Exception;

227

228

void close() throws Exception;

229

}

230

231

/**

232

* Write data to external systems in splits for parallel testing

233

*/

234

interface ExternalSystemSplitDataWriter<T> extends AutoCloseable {

235

/** Write split of data to external system */

236

void writeSplit(List<T> data, int splitIndex) throws Exception;

237

238

/** Write all splits and finalize */

239

void writeAndFinalize(List<List<T>> splits) throws Exception;

240

241

/** Get number of supported splits */

242

int getMaxParallelism();

243

244

void close() throws Exception;

245

}

246

```

247

248

**Usage Examples:**

249

250

```java

251

import org.apache.flink.connector.testframe.external.ExternalContext;

252

import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;

253

254

// External context implementation

255

public class DatabaseExternalContext implements ExternalContext {

256

private Connection connection;

257

258

@Override

259

public void setUp() throws Exception {

260

connection = DriverManager.getConnection(getConnectionUrl());

261

// Initialize test database

262

}

263

264

@Override

265

public Properties getConnectionProperties() {

266

Properties props = new Properties();

267

props.setProperty("url", getConnectionUrl());

268

return props;

269

}

270

}

271

272

// Reading test data

273

ExternalSystemDataReader<MyRecord> reader = createDataReader();

274

List<MyRecord> results = reader.readData(Duration.ofSeconds(30));

275

assertEquals(expectedRecords, results);

276

```

277

278

### Source Testing Framework

279

280

Specialized testing framework for Flink source connectors.

281

282

#### Source Test Contexts

283

284

Contexts specifically designed for testing source connectors with different APIs.

285

286

```java { .api }

287

/**

288

* Context for DataStream source testing

289

* Provides source creation and data verification capabilities

290

*/

291

interface DataStreamSourceExternalContext<T> extends ExternalContext {

292

/** Create source for DataStream API */

293

SourceFunction<T> createSource(SourceSplitSerializer<?> splitSerializer);

294

295

/** Get data reader for verification */

296

ExternalSystemDataReader<T> createDataReader();

297

298

/** Generate test data splits */

299

List<List<T>> generateTestDataSplits(int numSplits);

300

}

301

302

/**

303

* Context for Table API source testing

304

*/

305

interface TableSourceExternalContext extends ExternalContext {

306

/** Create table source for Table API */

307

DynamicTableSource createTableSource(TableDescriptor descriptor);

308

309

/** Get table descriptor for source */

310

TableDescriptor getTableDescriptor();

311

312

/** Create catalog for table registration */

313

Catalog createCatalog();

314

}

315

```

316

317

#### SourceTestSuiteBase

318

319

Base class providing comprehensive test suite for source connectors.

320

321

```java { .api }

322

/**

323

* Base class for source test suites

324

* Provides standard test methods for source connector validation

325

*/

326

abstract class SourceTestSuiteBase<T> {

327

/** Test basic source functionality */

328

@TestTemplate

329

void testSourceReading() throws Exception;

330

331

/** Test source with checkpointing */

332

@TestTemplate

333

void testSourceWithCheckpointing() throws Exception;

334

335

/** Test source restart from checkpoint */

336

@TestTemplate

337

void testSourceRestartFromCheckpoint() throws Exception;

338

339

/** Test source parallelism handling */

340

@TestTemplate

341

void testSourceParallelism() throws Exception;

342

343

/** Test source idempotency */

344

@TestTemplate

345

void testSourceIdempotency() throws Exception;

346

347

/** Get external context for testing */

348

protected abstract DataStreamSourceExternalContext<T> getExternalContext();

349

350

/** Get test environment */

351

protected abstract TestEnvironment getTestEnvironment();

352

}

353

```

354

355

**Usage Examples:**

356

357

```java

358

import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;

359

import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;

360

361

@ExtendWith(ConnectorTestingExtension.class)

362

public class MySourceConnectorTest extends SourceTestSuiteBase<MyRecord> {

363

364

@Override

365

protected DataStreamSourceExternalContext<MyRecord> getExternalContext() {

366

return new MySourceExternalContext();

367

}

368

369

@Override

370

protected TestEnvironment getTestEnvironment() {

371

return MiniClusterTestEnvironment.builder()

372

.setParallelism(2)

373

.build();

374

}

375

376

// All standard source tests are inherited and will run automatically

377

// testSourceReading(), testSourceWithCheckpointing(), etc.

378

}

379

```

380

381

### Sink Testing Framework

382

383

Specialized testing framework for Flink sink connectors.

384

385

#### Sink Test Contexts

386

387

Contexts for testing sink connectors with different APIs and semantics.

388

389

```java { .api }

390

/**

391

* Context for DataStream sink testing

392

* Supports both legacy SinkFunction and new Sink API

393

*/

394

interface DataStreamSinkExternalContext<T> extends ExternalContext, ResultTypeQueryable<T> {

395

/** Create sink function for legacy API */

396

SinkFunction<T> createSinkFunction();

397

398

/** Create data reader for result verification */

399

ExternalSystemDataReader<T> createDataReader();

400

401

/** Generate test data for sink testing */

402

List<T> generateTestData(int numRecords);

403

404

/** Get type information for records */

405

TypeInformation<T> getProducedType();

406

}

407

408

/**

409

* Context for Sink V2 API testing

410

*/

411

interface DataStreamSinkV2ExternalContext<T> extends DataStreamSinkExternalContext<T> {

412

/** Create sink for new Sink API */

413

Sink<T> createSink();

414

415

/** Test exactly-once semantics support */

416

boolean supportsExactlyOnce();

417

418

/** Test at-least-once semantics support */

419

boolean supportsAtLeastOnce();

420

}

421

422

/**

423

* Context for Table API sink testing

424

*/

425

interface TableSinkExternalContext extends ExternalContext {

426

/** Create table sink for Table API */

427

DynamicTableSink createTableSink(TableDescriptor descriptor);

428

429

/** Get table descriptor for sink */

430

TableDescriptor getTableDescriptor();

431

432

/** Create catalog for table registration */

433

Catalog createCatalog();

434

}

435

```

436

437

#### SinkTestSuiteBase

438

439

Base class providing comprehensive test suite for sink connectors.

440

441

```java { .api }

442

/**

443

* Base class for sink test suites

444

* Provides standard test methods for sink connector validation

445

*/

446

abstract class SinkTestSuiteBase<T> {

447

/** Test basic sink writing */

448

@TestTemplate

449

void testSinkWriting() throws Exception;

450

451

/** Test sink with checkpointing */

452

@TestTemplate

453

void testSinkWithCheckpointing() throws Exception;

454

455

/** Test sink exactly-once semantics */

456

@TestTemplate

457

void testSinkExactlyOnce() throws Exception;

458

459

/** Test sink at-least-once semantics */

460

@TestTemplate

461

void testSinkAtLeastOnce() throws Exception;

462

463

/** Test sink with multiple writers */

464

@TestTemplate

465

void testSinkParallelism() throws Exception;

466

467

/** Test sink failure recovery */

468

@TestTemplate

469

void testSinkFailureRecovery() throws Exception;

470

471

/** Get external context for testing */

472

protected abstract DataStreamSinkExternalContext<T> getExternalContext();

473

474

/** Get test environment */

475

protected abstract TestEnvironment getTestEnvironment();

476

}

477

```

478

479

**Usage Examples:**

480

481

```java

482

import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;

483

import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;

484

485

@ExtendWith(ConnectorTestingExtension.class)

486

public class MySinkConnectorTest extends SinkTestSuiteBase<MyRecord> {

487

488

@Override

489

protected DataStreamSinkV2ExternalContext<MyRecord> getExternalContext() {

490

return new MySinkExternalContext();

491

}

492

493

@Override

494

protected TestEnvironment getTestEnvironment() {

495

return FlinkContainerTestEnvironment.builder()

496

.withTaskManagers(2)

497

.build();

498

}

499

500

// All standard sink tests are inherited

501

// testSinkWriting(), testSinkExactlyOnce(), etc.

502

}

503

```

504

505

### Test Configuration Annotations

506

507

Annotations for configuring connector test behavior and requirements.

508

509

```java { .api }

510

/**

511

* Configure test context

512

*/

513

@interface TestContext {

514

/** External context factory class */

515

Class<? extends ExternalContextFactory<?>> value();

516

}

517

518

/**

519

* Configure test environment

520

*/

521

@interface TestEnv {

522

/** Test environment class */

523

Class<? extends TestEnvironment> value();

524

}

525

526

/**

527

* Configure external system for testing

528

*/

529

@interface TestExternalSystem {

530

/** External system identifier */

531

String value();

532

}

533

534

/**

535

* Configure test semantics requirements

536

*/

537

@interface TestSemantics {

538

/** Required delivery guarantees */

539

DeliveryGuarantee[] value();

540

}

541

```

542

543

**Usage Examples:**

544

545

```java

546

import org.apache.flink.connector.testframe.junit.annotations.*;

547

548

@ExtendWith(ConnectorTestingExtension.class)

549

@TestContext(MyConnectorExternalContextFactory.class)

550

@TestEnv(MiniClusterTestEnvironment.class)

551

@TestExternalSystem("kafka")

552

@TestSemantics({DeliveryGuarantee.EXACTLY_ONCE, DeliveryGuarantee.AT_LEAST_ONCE})

553

public class ConfiguredConnectorTest extends SourceTestSuiteBase<String> {

554

// Test configuration provided by annotations

555

}

556

```

557

558

### Container Testing Support

559

560

Docker container management for realistic Flink cluster testing.

561

562

#### FlinkContainers

563

564

Utility for managing Flink Docker containers in tests.

565

566

```java { .api }

567

/**

568

* Manage Flink containers for testing

569

* Handles JobManager and TaskManager containers

570

*/

571

class FlinkContainers implements BeforeAllCallback, AfterAllCallback {

572

/** Create with default Flink image */

573

FlinkContainers();

574

575

/** Create with custom image */

576

FlinkContainers(String flinkImage);

577

578

/** Get JobManager container */

579

GenericContainer<?> getJobManagerContainer();

580

581

/** Get TaskManager containers */

582

List<GenericContainer<?>> getTaskManagerContainers();

583

584

/** Get Flink REST client */

585

RestClusterClient<String> getRestClient();

586

587

void beforeAll(ExtensionContext context) throws Exception;

588

void afterAll(ExtensionContext context) throws Exception;

589

}

590

591

/**

592

* Build custom Flink Docker images for testing

593

*/

594

class FlinkImageBuilder {

595

/** Create builder with base Flink image */

596

static FlinkImageBuilder fromBaseImage(String baseImage);

597

598

/** Add JAR file to image */

599

FlinkImageBuilder addJar(Path jarPath);

600

601

/** Add connector dependencies */

602

FlinkImageBuilder addConnectorDependencies(String... coordinates);

603

604

/** Set custom configuration */

605

FlinkImageBuilder withConfiguration(String key, String value);

606

607

/** Build the custom image */

608

String build();

609

}

610

```

611

612

**Usage Examples:**

613

614

```java

615

import org.apache.flink.connector.testframe.container.FlinkContainers;

616

import org.apache.flink.connector.testframe.container.FlinkImageBuilder;

617

618

// Custom Flink image with connector

619

String customImage = FlinkImageBuilder

620

.fromBaseImage("flink:1.17")

621

.addConnectorDependencies("org.apache.flink:flink-connector-kafka:1.17.0")

622

.withConfiguration("state.backend", "filesystem")

623

.build();

624

625

// Use containers in test

626

@RegisterExtension

627

static FlinkContainers flinkContainers = new FlinkContainers(customImage);

628

629

@Test

630

void testWithContainers() {

631

RestClusterClient<?> client = flinkContainers.getRestClient();

632

// Submit job to container cluster

633

}

634

```

635

636

### Utility Classes

637

638

#### CollectIteratorAssert

639

640

Specialized assertions for collected test results.

641

642

```java { .api }

643

/**

644

* Assertions for collected test results

645

* Provides convenient verification of streaming results

646

*/

647

class CollectIteratorAssert<T> extends AbstractIterableAssert<CollectIteratorAssert<T>, Iterable<T>, T, ObjectAssert<T>> {

648

/** Assert results match expected values in order */

649

CollectIteratorAssert<T> containsExactly(T... expected);

650

651

/** Assert results contain expected values in any order */

652

CollectIteratorAssert<T> containsExactlyInAnyOrder(T... expected);

653

654

/** Assert result count matches expected */

655

CollectIteratorAssert<T> hasSize(int expectedSize);

656

657

/** Assert all results match predicate */

658

CollectIteratorAssert<T> allMatch(Predicate<T> predicate);

659

}

660

```

661

662

#### MetricQuerier

663

664

Utility for querying and asserting on Flink metrics during tests.

665

666

```java { .api }

667

/**

668

* Query and assert on Flink metrics

669

* Enables verification of connector behavior through metrics

670

*/

671

class MetricQuerier {

672

/** Create querier for test environment */

673

static MetricQuerier forEnvironment(TestEnvironment environment);

674

675

/** Query counter metric value */

676

long getCounterValue(String metricName);

677

678

/** Query gauge metric value */

679

double getGaugeValue(String metricName);

680

681

/** Query histogram metric */

682

HistogramStatistics getHistogramValue(String metricName);

683

684

/** Wait for metric to reach expected value */

685

void waitForMetric(String metricName, long expectedValue, Duration timeout);

686

687

/** Assert metric has expected value */

688

void assertMetricEquals(String metricName, long expectedValue);

689

}

690

```

691

692

**Usage Examples:**

693

694

```java

695

import org.apache.flink.connector.testframe.utils.CollectIteratorAssert;

696

import org.apache.flink.connector.testframe.utils.MetricQuerier;

697

698

// Collect and assert results

699

CloseableIterator<String> results = // ... collect from job

700

CollectIteratorAssert.assertThat(results)

701

.hasSize(1000)

702

.containsExactlyInAnyOrder(expectedResults.toArray(new String[0]));

703

704

// Query metrics

705

MetricQuerier metrics = MetricQuerier.forEnvironment(testEnvironment);

706

metrics.waitForMetric("numRecordsIn", 1000, Duration.ofSeconds(30));

707

metrics.assertMetricEquals("numRecordsOut", 1000);

708

```