or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

assertions.mdcontainers.mdexternal-systems.mdindex.mdjunit-integration.mdmetrics.mdtest-environments.mdtest-suites.md

external-systems.mddocs/

0

# External System Integration

1

2

The external system integration framework provides abstractions for connectors to interact with their respective external systems during testing. This includes data generation, reading, writing, and connector instantiation.

3

4

## Capabilities

5

6

### Base External Context

7

8

Foundation interface for all external system integrations.

9

10

```java { .api }

11

/**

12

* Base interface for external system integration in testing framework

13

*/

14

public interface ExternalContext extends AutoCloseable {

15

/**

16

* Get connector JAR URLs for job submission

17

* @return List of connector JAR URLs to attach to Flink jobs

18

*/

19

List<URL> getConnectorJarPaths();

20

}

21

22

/**

23

* Factory for creating external context instances

24

* @param <C> Type of external context to create

25

*/

26

public interface ExternalContextFactory<C extends ExternalContext> {

27

/**

28

* Create external context instance for test

29

* @param testName Name of the current test for resource isolation

30

* @return External context instance

31

*/

32

C createExternalContext(String testName);

33

}

34

```

35

36

**Usage Examples:**

37

38

```java

39

// External context factory registration

40

@TestContext

41

ExternalContextFactory<MyExternalContext> contextFactory =

42

testName -> new MyExternalContext(testName);

43

44

// Custom external context implementation

45

public class MyExternalContext implements ExternalContext {

46

private final String testName;

47

48

public MyExternalContext(String testName) {

49

this.testName = testName;

50

}

51

52

@Override

53

public List<URL> getConnectorJarPaths() {

54

return Arrays.asList(

55

new File("target/my-connector.jar").toURI().toURL()

56

);

57

}

58

59

@Override

60

public void close() throws Exception {

61

// Cleanup external resources

62

}

63

}

64

```

65

66

### Sink External Context

67

68

Abstract base class for sink connector external system integration.

69

70

```java { .api }

71

/**

72

* External context for DataStream sink testing

73

* @param <T> Type of data elements handled by the sink

74

*/

75

public abstract class DataStreamSinkExternalContext<T> extends ExternalContext {

76

77

/**

78

* Create sink instance for testing

79

* @param sinkSettings Configuration settings for the sink

80

* @return Configured sink instance

81

* @throws UnsupportedOperationException if settings combination not supported

82

*/

83

public abstract Sink<T> createSink(TestingSinkSettings sinkSettings);

84

85

/**

86

* Generate test data for sink validation

87

* @param sinkSettings Sink configuration settings

88

* @param seed Random seed for reproducible data generation

89

* @return List of test data elements

90

*/

91

public abstract List<T> generateTestData(TestingSinkSettings sinkSettings, long seed);

92

93

/**

94

* Create reader for validating data written to external system

95

* @param sinkSettings Sink configuration settings

96

* @return Data reader for external system validation

97

*/

98

public abstract ExternalSystemDataReader<T> createSinkDataReader(TestingSinkSettings sinkSettings);

99

100

/**

101

* Get type information for data elements

102

* @return TypeInformation for proper serialization

103

*/

104

public abstract TypeInformation<T> getProducedType();

105

}

106

107

/**

108

* External context specifically for Sink V2 API

109

* @param <T> Type of data elements

110

*/

111

public abstract class DataStreamSinkV2ExternalContext<T> extends DataStreamSinkExternalContext<T> {

112

// Inherits all methods from DataStreamSinkExternalContext

113

// Specifically for org.apache.flink.api.connector.sink2.Sink implementations

114

}

115

116

/**

117

* External context for Table API sink testing

118

* @param <T> Type of data elements

119

*/

120

public abstract class TableSinkExternalContext<T> extends ExternalContext {

121

// Table-specific sink testing methods

122

}

123

```

124

125

**Usage Examples:**

126

127

```java

128

public class KafkaSinkExternalContext extends DataStreamSinkV2ExternalContext<String> {

129

130

private final String topicName;

131

private final KafkaContainer kafkaContainer;

132

133

public KafkaSinkExternalContext(String testName) {

134

this.topicName = "test-topic-" + testName;

135

this.kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"));

136

this.kafkaContainer.start();

137

}

138

139

@Override

140

public Sink<String> createSink(TestingSinkSettings sinkSettings) {

141

Properties props = new Properties();

142

props.setProperty("bootstrap.servers", kafkaContainer.getBootstrapServers());

143

144

return KafkaSink.<String>builder()

145

.setBootstrapServers(kafkaContainer.getBootstrapServers())

146

.setRecordSerializer(KafkaRecordSerializationSchema.builder()

147

.setTopic(topicName)

148

.setValueSerializationSchema(new SimpleStringSchema())

149

.build())

150

.build();

151

}

152

153

@Override

154

public List<String> generateTestData(TestingSinkSettings sinkSettings, long seed) {

155

Random random = new Random(seed);

156

return IntStream.range(0, 100)

157

.mapToObj(i -> "test-record-" + i + "-" + random.nextInt(1000))

158

.collect(Collectors.toList());

159

}

160

161

@Override

162

public ExternalSystemDataReader<String> createSinkDataReader(TestingSinkSettings sinkSettings) {

163

return new KafkaDataReader(kafkaContainer.getBootstrapServers(), topicName);

164

}

165

166

@Override

167

public TypeInformation<String> getProducedType() {

168

return Types.STRING;

169

}

170

171

@Override

172

public void close() throws Exception {

173

kafkaContainer.stop();

174

}

175

}

176

```

177

178

### Source External Context

179

180

Abstract base class for source connector external system integration.

181

182

```java { .api }

183

/**

184

* External context for DataStream source testing

185

* @param <T> Type of data elements produced by the source

186

*/

187

public abstract class DataStreamSourceExternalContext<T> extends ExternalContext {

188

189

/**

190

* Create source instance for testing

191

* @param sourceSettings Configuration settings for the source

192

* @return Configured source instance

193

* @throws UnsupportedOperationException if settings combination not supported

194

*/

195

public abstract Source<T, ?, ?> createSource(TestingSourceSettings sourceSettings);

196

197

/**

198

* Generate test data for specific split

199

* @param sourceSettings Source configuration settings

200

* @param splitIndex Index of the split (for multiple split scenarios)

201

* @param seed Random seed for reproducible data generation

202

* @return List of test data elements for the split

203

*/

204

public abstract List<T> generateTestData(TestingSourceSettings sourceSettings, int splitIndex, long seed);

205

206

/**

207

* Create writer for sending test data to external system split

208

* @param sourceSettings Source configuration settings

209

* @return Split data writer for external system

210

*/

211

public abstract ExternalSystemSplitDataWriter<T> createSourceSplitDataWriter(TestingSourceSettings sourceSettings);

212

213

/**

214

* Get type information for data elements

215

* @return TypeInformation for proper deserialization

216

*/

217

public abstract TypeInformation<T> getProducedType();

218

}

219

220

/**

221

* External context for Table API source testing

222

* @param <T> Type of data elements

223

*/

224

public abstract class TableSourceExternalContext<T> extends ExternalContext {

225

// Table-specific source testing methods

226

}

227

```

228

229

**Usage Examples:**

230

231

```java

232

public class KafkaSourceExternalContext extends DataStreamSourceExternalContext<String> {

233

234

private final KafkaContainer kafkaContainer;

235

private final Map<Integer, String> splitTopics = new HashMap<>();

236

237

public KafkaSourceExternalContext(String testName) {

238

this.kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"));

239

this.kafkaContainer.start();

240

}

241

242

@Override

243

public Source<String, ?, ?> createSource(TestingSourceSettings sourceSettings) {

244

return KafkaSource.<String>builder()

245

.setBootstrapServers(kafkaContainer.getBootstrapServers())

246

.setTopics(splitTopics.values())

247

.setValueOnlyDeserializer(new SimpleStringSchema())

248

.setStartingOffsets(OffsetsInitializer.earliest())

249

.setBounded(sourceSettings.getBoundedness() == Boundedness.BOUNDED ?

250

OffsetsInitializer.latest() : null)

251

.build();

252

}

253

254

@Override

255

public List<String> generateTestData(TestingSourceSettings sourceSettings, int splitIndex, long seed) {

256

Random random = new Random(seed);

257

return IntStream.range(0, 50)

258

.mapToObj(i -> "split-" + splitIndex + "-record-" + i + "-" + random.nextInt(1000))

259

.collect(Collectors.toList());

260

}

261

262

@Override

263

public ExternalSystemSplitDataWriter<T> createSourceSplitDataWriter(TestingSourceSettings sourceSettings) {

264

return new KafkaSplitDataWriter(kafkaContainer.getBootstrapServers());

265

}

266

267

@Override

268

public TypeInformation<String> getProducedType() {

269

return Types.STRING;

270

}

271

}

272

```

273

274

### Data Access Interfaces

275

276

Interfaces for reading and writing data to external systems.

277

278

```java { .api }

279

/**

280

* Interface for reading data from external systems (used by sinks for validation)

281

* @param <T> Type of data elements

282

*/

283

public interface ExternalSystemDataReader<T> {

284

/**

285

* Poll for available data from external system

286

* @param timeout Maximum time to wait for data

287

* @return List of available data elements (may be empty)

288

*/

289

List<T> poll(Duration timeout);

290

}

291

292

/**

293

* Interface for writing data to external system splits (used by sources for test data setup)

294

* @param <T> Type of data elements

295

*/

296

public interface ExternalSystemSplitDataWriter<T> {

297

/**

298

* Write records to external system split

299

* @param records List of records to write

300

*/

301

void writeRecords(List<T> records);

302

}

303

```

304

305

**Usage Examples:**

306

307

```java

308

public class KafkaDataReader implements ExternalSystemDataReader<String> {

309

private final KafkaConsumer<String, String> consumer;

310

311

public KafkaDataReader(String bootstrapServers, String topic) {

312

Properties props = new Properties();

313

props.setProperty("bootstrap.servers", bootstrapServers);

314

props.setProperty("group.id", "test-consumer-" + UUID.randomUUID());

315

props.setProperty("key.deserializer", StringDeserializer.class.getName());

316

props.setProperty("value.deserializer", StringDeserializer.class.getName());

317

props.setProperty("auto.offset.reset", "earliest");

318

319

this.consumer = new KafkaConsumer<>(props);

320

this.consumer.subscribe(Arrays.asList(topic));

321

}

322

323

@Override

324

public List<String> poll(Duration timeout) {

325

ConsumerRecords<String, String> records = consumer.poll(timeout);

326

return StreamSupport.stream(records.spliterator(), false)

327

.map(ConsumerRecord::value)

328

.collect(Collectors.toList());

329

}

330

}

331

332

public class KafkaSplitDataWriter implements ExternalSystemSplitDataWriter<String> {

333

private final KafkaProducer<String, String> producer;

334

private final String topicPrefix;

335

336

public KafkaSplitDataWriter(String bootstrapServers) {

337

Properties props = new Properties();

338

props.setProperty("bootstrap.servers", bootstrapServers);

339

props.setProperty("key.serializer", StringSerializer.class.getName());

340

props.setProperty("value.serializer", StringSerializer.class.getName());

341

342

this.producer = new KafkaProducer<>(props);

343

this.topicPrefix = "test-source-";

344

}

345

346

@Override

347

public void writeRecords(List<String> records) {

348

String topic = topicPrefix + Thread.currentThread().getId();

349

records.forEach(record -> {

350

producer.send(new ProducerRecord<>(topic, record));

351

});

352

producer.flush();

353

}

354

}

355

```

356

357

### Containerized External Systems

358

359

Default implementation for containerized external systems using TestContainers.

360

361

```java { .api }

362

/**

363

* Abstract base class for containerized external systems

364

*/

365

public abstract class DefaultContainerizedExternalSystem {

366

367

/**

368

* Start containers and prepare external system

369

*/

370

protected abstract void startContainers();

371

372

/**

373

* Stop containers and cleanup resources

374

*/

375

protected abstract void stopContainers();

376

377

/**

378

* Get connection configuration for connectors

379

* @return Configuration properties for connector

380

*/

381

protected abstract Properties getConnectionProperties();

382

}

383

```

384

385

## Configuration Types

386

387

```java { .api }

388

/**

389

* Configuration settings for sink testing

390

*/

391

public class TestingSinkSettings {

392

public static Builder builder();

393

394

public static class Builder {

395

public Builder setCheckpointingMode(CheckpointingMode checkpointingMode);

396

public TestingSinkSettings build();

397

}

398

399

public CheckpointingMode getCheckpointingMode();

400

}

401

402

/**

403

* Configuration settings for source testing

404

*/

405

public class TestingSourceSettings {

406

public static Builder builder();

407

408

public static class Builder {

409

public Builder setBoundedness(Boundedness boundedness);

410

public Builder setCheckpointingMode(CheckpointingMode checkpointingMode);

411

public TestingSourceSettings build();

412

}

413

414

public Boundedness getBoundedness();

415

public CheckpointingMode getCheckpointingMode();

416

}

417

```

418

419

## Integration Patterns

420

421

### Lifecycle Management

422

423

External contexts follow a predictable lifecycle:

424

425

1. **Creation**: Context created via factory for each test case

426

2. **Setup**: External system resources initialized (containers, topics, etc.)

427

3. **Test Execution**: Context used throughout test execution

428

4. **Cleanup**: Context closed automatically after test completion

429

430

### Resource Isolation

431

432

Each test case gets its own external context instance, ensuring:

433

434

- **Test Isolation**: No interference between test cases

435

- **Resource Cleanup**: Automatic cleanup of external resources

436

- **Parallel Execution**: Tests can run in parallel safely

437

438

### Error Handling

439

440

External contexts should handle common error scenarios:

441

442

- **Resource Unavailability**: Throw clear exceptions when external systems are not available

443

- **Configuration Errors**: Validate configuration and provide helpful error messages

444

- **Cleanup Failures**: Log warnings but don't fail tests on cleanup issues