or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-connector-test-utils

Testing utilities for Apache Flink connectors providing test frameworks, assertion utilities, and abstractions for comprehensive connector testing

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-test-utils@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-test-utils@2.1.0

0

# Apache Flink Connector Test Utils

1

2

The Apache Flink Connector Test Utils library provides a comprehensive testing framework for Apache Flink connectors. It enables developers to create standardized, robust tests for both source and sink connectors with support for various testing scenarios including failover, scaling, metrics validation, and external system integration.

3

4

## Package Information

5

6

- **Package Name**: flink-connector-test-utils

7

- **Package Type**: maven

8

- **Group ID**: org.apache.flink

9

- **Artifact ID**: flink-connector-test-utils

10

- **Language**: Java

11

- **Installation**:

12

```xml

13

<dependency>

14

<groupId>org.apache.flink</groupId>

15

<artifactId>flink-connector-test-utils</artifactId>

16

<version>2.1.0</version>

17

<scope>test</scope>

18

</dependency>

19

```

20

21

## Core Imports

22

23

```java

24

// Test suite base classes

25

import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;

26

import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;

27

28

// JUnit annotations

29

import org.apache.flink.connector.testframe.junit.annotations.TestEnv;

30

import org.apache.flink.connector.testframe.junit.annotations.TestContext;

31

32

// Test utilities

33

import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions;

34

import org.apache.flink.connector.testframe.utils.MetricQuerier;

35

36

// External context interfaces

37

import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;

38

import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;

39

```

40

41

## Basic Usage

42

43

### Creating a Sink Test Suite

44

45

```java

46

import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;

47

import org.apache.flink.connector.testframe.junit.annotations.TestEnv;

48

import org.apache.flink.connector.testframe.junit.annotations.TestContext;

49

import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;

50

51

@ExtendWith(ConnectorTestingExtension.class)

52

public class MySinkTestSuite extends SinkTestSuiteBase<String> {

53

54

@TestEnv

55

MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment();

56

57

@TestContext

58

ExternalContextFactory<MySinkExternalContext> sinkContextFactory =

59

(testName) -> new MySinkExternalContext(testName);

60

61

// Test methods are automatically provided by SinkTestSuiteBase

62

// Including: testBasicSink, testStartFromSavepoint, testScaleUp, etc.

63

}

64

65

// External context implementation

66

public class MySinkExternalContext extends DataStreamSinkV2ExternalContext<String> {

67

68

@Override

69

public Sink<String> createSink(TestingSinkSettings sinkSettings) {

70

// Return your sink implementation

71

return new MySink(/* configuration */);

72

}

73

74

@Override

75

public List<String> generateTestData(TestingSinkSettings sinkSettings, long seed) {

76

// Generate test data for your sink

77

return Arrays.asList("test1", "test2", "test3");

78

}

79

80

@Override

81

public ExternalSystemDataReader<String> createSinkDataReader(TestingSinkSettings sinkSettings) {

82

// Return a reader to validate data written to external system

83

return new MySinkDataReader(/* configuration */);

84

}

85

}

86

```

87

88

### Creating a Source Test Suite

89

90

```java

91

import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;

92

93

public class MySourceTestSuite extends SourceTestSuiteBase<String> {

94

95

@TestEnv

96

MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment();

97

98

@TestContext

99

ExternalContextFactory<MySourceExternalContext> sourceContextFactory =

100

(testName) -> new MySourceExternalContext(testName);

101

102

// Test methods are automatically provided by SourceTestSuiteBase

103

// Including: testSourceSingleSplit, testMultipleSplits, testSavepoint, etc.

104

}

105

```

106

107

## Architecture

108

109

The testing framework is built around several key components:

110

111

- **Test Suite Base Classes**: Provide pre-built test cases for common scenarios (basic functionality, failover, scaling, metrics)

112

- **External Context Framework**: Abstracts external system interactions for both sources and sinks

113

- **Test Environment**: Manages Flink cluster lifecycle (MiniCluster or containerized)

114

- **JUnit Integration**: Annotation-driven configuration with resource lifecycle management

115

- **Assertion Utilities**: Specialized assertions for validating connector behavior with different semantic guarantees

116

- **Container Support**: TestContainers integration for isolated testing environments

117

118

## Capabilities

119

120

### Test Suite Framework

121

122

Core test suite base classes providing standardized test scenarios for connector validation. Supports both sink and source connectors with comprehensive test coverage.

123

124

```java { .api }

125

public abstract class SinkTestSuiteBase<T extends Comparable<T>> {

126

// Test methods provided automatically via JUnit @TestTemplate

127

void testBasicSink(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic);

128

void testStartFromSavepoint(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic);

129

void testScaleUp(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic);

130

void testScaleDown(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic);

131

void testMetrics(TestEnvironment testEnv, DataStreamSinkExternalContext<T> externalContext, CheckpointingMode semantic);

132

}

133

134

public abstract class SourceTestSuiteBase<T> {

135

// Test methods provided automatically via JUnit @TestTemplate

136

void testSourceSingleSplit(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic);

137

void testMultipleSplits(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic);

138

void testSavepoint(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic);

139

void testScaleUp(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic);

140

void testScaleDown(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic);

141

void testSourceMetrics(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic);

142

void testIdleReader(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, CheckpointingMode semantic);

143

void testTaskManagerFailure(TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext, ClusterControllable controller, CheckpointingMode semantic);

144

}

145

```

146

147

[Test Suites](./test-suites.md)

148

149

### External System Integration

150

151

Framework for integrating with external systems, providing abstractions for source and sink connectors to interact with their respective external systems.

152

153

```java { .api }

154

public interface ExternalContext extends AutoCloseable {

155

List<URL> getConnectorJarPaths();

156

}

157

158

public interface ExternalContextFactory<C extends ExternalContext> {

159

C createExternalContext(String testName);

160

}

161

162

public abstract class DataStreamSinkV2ExternalContext<T> extends ExternalContext {

163

public abstract Sink<T> createSink(TestingSinkSettings sinkSettings);

164

public abstract List<T> generateTestData(TestingSinkSettings sinkSettings, long seed);

165

public abstract ExternalSystemDataReader<T> createSinkDataReader(TestingSinkSettings sinkSettings);

166

public abstract TypeInformation<T> getProducedType();

167

}

168

169

public abstract class DataStreamSourceExternalContext<T> extends ExternalContext {

170

public abstract Source<T, ?, ?> createSource(TestingSourceSettings sourceSettings);

171

public abstract List<T> generateTestData(TestingSourceSettings sourceSettings, int splitIndex, long seed);

172

public abstract ExternalSystemSplitDataWriter<T> createSourceSplitDataWriter(TestingSourceSettings sourceSettings);

173

public abstract TypeInformation<T> getProducedType();

174

}

175

```

176

177

[External System Integration](./external-systems.md)

178

179

### Test Environment Management

180

181

Test environment abstractions for managing Flink cluster lifecycle, supporting both MiniCluster and containerized deployments.

182

183

```java { .api }

184

public interface TestEnvironment extends TestResource {

185

StreamExecutionEnvironment createExecutionEnvironment(TestEnvironmentSettings envOptions);

186

Endpoint getRestEndpoint();

187

String getCheckpointUri();

188

189

class Endpoint {

190

public Endpoint(String address, int port);

191

public String getAddress();

192

public int getPort();

193

}

194

}

195

196

public interface TestResource {

197

void startUp() throws Exception;

198

void tearDown() throws Exception;

199

}

200

```

201

202

[Test Environments](./test-environments.md)

203

204

### Assertion and Validation Utilities

205

206

Specialized assertion utilities for validating connector behavior with support for different semantic guarantees (exactly-once, at-least-once).

207

208

```java { .api }

209

public final class CollectIteratorAssertions {

210

public static <T> CollectIteratorAssert<T> assertThat(Iterator<T> actual);

211

public static <T> UnorderedCollectIteratorAssert<T> assertUnordered(Iterator<T> actual);

212

}

213

214

public class CollectIteratorAssert<T> {

215

public CollectIteratorAssert<T> matchesRecordsFromSource(List<List<T>> expected, CheckpointingMode semantic);

216

public CollectIteratorAssert<T> withNumRecordsLimit(int limit);

217

}

218

```

219

220

[Assertions and Validation](./assertions.md)

221

222

### Metrics and Monitoring

223

224

Utilities for querying and validating Flink job metrics via REST API, enabling performance and behavior validation.

225

226

```java { .api }

227

public class MetricQuerier {

228

public MetricQuerier(Configuration configuration) throws ConfigurationException;

229

230

public static JobDetailsInfo getJobDetails(RestClient client, TestEnvironment.Endpoint endpoint, JobID jobId) throws Exception;

231

232

public Double getAggregatedMetricsByRestAPI(

233

TestEnvironment.Endpoint endpoint,

234

JobID jobId,

235

String sourceOrSinkName,

236

String metricName,

237

String filter

238

) throws Exception;

239

240

public AggregatedMetricsResponseBody getMetricList(TestEnvironment.Endpoint endpoint, JobID jobId, JobVertexID vertexId) throws Exception;

241

public AggregatedMetricsResponseBody getMetrics(TestEnvironment.Endpoint endpoint, JobID jobId, JobVertexID vertexId, String filters) throws Exception;

242

}

243

```

244

245

[Metrics and Monitoring](./metrics.md)

246

247

### JUnit Integration

248

249

Annotation-driven JUnit 5 integration with automatic resource lifecycle management and test parameterization.

250

251

```java { .api }

252

@Target(ElementType.FIELD)

253

@Retention(RetentionPolicy.RUNTIME)

254

public @interface TestEnv {}

255

256

@Target(ElementType.FIELD)

257

@Retention(RetentionPolicy.RUNTIME)

258

public @interface TestContext {}

259

260

@Target(ElementType.FIELD)

261

@Retention(RetentionPolicy.RUNTIME)

262

public @interface TestExternalSystem {}

263

264

@Target(ElementType.FIELD)

265

@Retention(RetentionPolicy.RUNTIME)

266

public @interface TestSemantics {}

267

268

public class ConnectorTestingExtension implements BeforeAllCallback, AfterAllCallback,

269

TestTemplateInvocationContextProvider, ParameterResolver {

270

// Automatic lifecycle management and parameter injection

271

}

272

```

273

274

[JUnit Integration](./junit-integration.md)

275

276

### Container Support

277

278

TestContainers integration for running tests in isolated containerized environments with custom Flink clusters.

279

280

```java { .api }

281

public class FlinkContainers {

282

public static FlinkContainer jobManager();

283

public static FlinkContainer taskManager();

284

public static FlinkContainer cluster();

285

}

286

287

public class FlinkContainerTestEnvironment implements TestEnvironment {

288

public FlinkContainerTestEnvironment(FlinkContainersSettings settings);

289

// Implements TestEnvironment methods

290

}

291

292

public class FlinkImageBuilder {

293

public static DockerImageName buildImage(List<URL> jarPaths) throws ImageBuildException;

294

}

295

```

296

297

[Container Support](./containers.md)

298

299

## Types

300

301

### Core Configuration Types

302

303

```java { .api }

304

public class TestEnvironmentSettings {

305

public static Builder builder();

306

307

public static class Builder {

308

public Builder setConnectorJarPaths(List<URL> connectorJarPaths);

309

public Builder setSavepointRestorePath(String savepointRestorePath);

310

public TestEnvironmentSettings build();

311

}

312

}

313

314

public class TestingSinkSettings {

315

public static Builder builder();

316

317

public static class Builder {

318

public Builder setCheckpointingMode(CheckpointingMode checkpointingMode);

319

public TestingSinkSettings build();

320

}

321

}

322

323

public class TestingSourceSettings {

324

public static Builder builder();

325

326

public static class Builder {

327

public Builder setBoundedness(Boundedness boundedness);

328

public Builder setCheckpointingMode(CheckpointingMode checkpointingMode);

329

public TestingSourceSettings build();

330

}

331

}

332

```

333

334

### Data Interface Types

335

336

```java { .api }

337

public interface ExternalSystemDataReader<T> {

338

List<T> poll(Duration timeout);

339

}

340

341

public interface ExternalSystemSplitDataWriter<T> {

342

void writeRecords(List<T> records);

343

}

344

```

345

346

### Container Configuration Types

347

348

```java { .api }

349

public class FlinkContainersSettings {

350

public static Builder builder();

351

352

public static class Builder {

353

public Builder setNumTaskManagers(int numTaskManagers);

354

public Builder setNumSlotsPerTaskManager(int numSlotsPerTaskManager);

355

public Builder setJobManagerMemory(String jobManagerMemory);

356

public Builder setTaskManagerMemory(String taskManagerMemory);

357

public FlinkContainersSettings build();

358

}

359

}

360

361

public class TestcontainersSettings {

362

public static Builder builder();

363

364

public static class Builder {

365

public Builder setNetwork(Network network);

366

public Builder setLogConsumers(Map<String, Consumer<OutputFrame>> logConsumers);

367

public TestcontainersSettings build();

368

}

369

}

370

```