or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

assertions.mdcontainers.mdexternal-systems.mdindex.mdjunit-integration.mdmetrics.mdtest-environments.mdtest-suites.md

test-suites.mddocs/

0

# Test Suites

1

2

The test suite framework provides base classes that implement comprehensive test scenarios for connector validation. These base classes use JUnit 5's `@TestTemplate` mechanism to automatically run multiple test variations with different configurations.

3

4

## Capabilities

5

6

### Sink Test Suite Base

7

8

Base class for testing sink connectors with comprehensive scenarios including basic functionality, savepoint restart, scaling, and metrics validation.

9

10

```java { .api }

11

/**

12

* Base class for sink test suite providing comprehensive test scenarios

13

* @param <T> Type of data elements, must be Comparable for result validation

14

*/

15

public abstract class SinkTestSuiteBase<T extends Comparable<T>> {

16

17

/**

18

* Test basic sink functionality with data writing and validation

19

* @param testEnv The test environment (injected by framework)

20

* @param externalContext External system context (injected by framework)

21

* @param semantic Checkpointing semantic (injected by framework)

22

*/

23

@TestTemplate

24

@DisplayName("Test data stream sink")

25

public void testBasicSink(

26

TestEnvironment testEnv,

27

DataStreamSinkExternalContext<T> externalContext,

28

CheckpointingMode semantic

29

) throws Exception;

30

31

/**

32

* Test sink restart from savepoint with same parallelism

33

* @param testEnv The test environment

34

* @param externalContext External system context

35

* @param semantic Checkpointing semantic

36

*/

37

@TestTemplate

38

@DisplayName("Test sink restarting from a savepoint")

39

public void testStartFromSavepoint(

40

TestEnvironment testEnv,

41

DataStreamSinkExternalContext<T> externalContext,

42

CheckpointingMode semantic

43

) throws Exception;

44

45

/**

46

* Test sink restart with higher parallelism (scale up)

47

* @param testEnv The test environment

48

* @param externalContext External system context

49

* @param semantic Checkpointing semantic

50

*/

51

@TestTemplate

52

@DisplayName("Test sink restarting with a higher parallelism")

53

public void testScaleUp(

54

TestEnvironment testEnv,

55

DataStreamSinkExternalContext<T> externalContext,

56

CheckpointingMode semantic

57

) throws Exception;

58

59

/**

60

* Test sink restart with lower parallelism (scale down)

61

* @param testEnv The test environment

62

* @param externalContext External system context

63

* @param semantic Checkpointing semantic

64

*/

65

@TestTemplate

66

@DisplayName("Test sink restarting with a lower parallelism")

67

public void testScaleDown(

68

TestEnvironment testEnv,

69

DataStreamSinkExternalContext<T> externalContext,

70

CheckpointingMode semantic

71

) throws Exception;

72

73

/**

74

* Test sink metrics reporting (e.g., numRecordsOut)

75

* @param testEnv The test environment

76

* @param externalContext External system context

77

* @param semantic Checkpointing semantic

78

*/

79

@TestTemplate

80

@DisplayName("Test sink metrics")

81

public void testMetrics(

82

TestEnvironment testEnv,

83

DataStreamSinkExternalContext<T> externalContext,

84

CheckpointingMode semantic

85

) throws Exception;

86

}

87

```

88

89

**Usage Examples:**

90

91

```java

92

import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;

93

import org.apache.flink.connector.testframe.junit.annotations.*;

94

95

public class KafkaSinkTestSuite extends SinkTestSuiteBase<String> {

96

97

@TestEnv

98

MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment();

99

100

@TestContext

101

ExternalContextFactory<KafkaSinkExternalContext> contextFactory =

102

testName -> new KafkaSinkExternalContext(testName);

103

104

// All test methods are inherited and run automatically

105

// testBasicSink, testStartFromSavepoint, testScaleUp, testScaleDown, testMetrics

106

}

107

```

108

109

### Source Test Suite Base

110

111

Base class for testing source connectors with scenarios including single/multiple splits, savepoint restart, scaling, idle readers, and failover testing.

112

113

```java { .api }

114

/**

115

* Base class for source test suite providing comprehensive test scenarios

116

* @param <T> Type of data elements produced by the source

117

*/

118

public abstract class SourceTestSuiteBase<T> {

119

120

/**

121

* Test source with single split (bounded source required)

122

* @param testEnv The test environment

123

* @param externalContext External system context

124

* @param semantic Checkpointing semantic

125

*/

126

@TestTemplate

127

@DisplayName("Test source with single split")

128

public void testSourceSingleSplit(

129

TestEnvironment testEnv,

130

DataStreamSourceExternalContext<T> externalContext,

131

CheckpointingMode semantic

132

) throws Exception;

133

134

/**

135

* Test source with multiple splits (bounded source required)

136

* @param testEnv The test environment

137

* @param externalContext External system context

138

* @param semantic Checkpointing semantic

139

*/

140

@TestTemplate

141

@DisplayName("Test source with multiple splits")

142

public void testMultipleSplits(

143

TestEnvironment testEnv,

144

DataStreamSourceExternalContext<T> externalContext,

145

CheckpointingMode semantic

146

) throws Exception;

147

148

/**

149

* Test source restart from savepoint

150

* @param testEnv The test environment

151

* @param externalContext External system context

152

* @param semantic Checkpointing semantic

153

*/

154

@TestTemplate

155

@DisplayName("Test source restarting from a savepoint")

156

public void testSavepoint(

157

TestEnvironment testEnv,

158

DataStreamSourceExternalContext<T> externalContext,

159

CheckpointingMode semantic

160

) throws Exception;

161

162

/**

163

* Test source scale up (restart with higher parallelism)

164

* @param testEnv The test environment

165

* @param externalContext External system context

166

* @param semantic Checkpointing semantic

167

*/

168

@TestTemplate

169

@DisplayName("Test source restarting with a higher parallelism")

170

public void testScaleUp(

171

TestEnvironment testEnv,

172

DataStreamSourceExternalContext<T> externalContext,

173

CheckpointingMode semantic

174

) throws Exception;

175

176

/**

177

* Test source scale down (restart with lower parallelism)

178

* @param testEnv The test environment

179

* @param externalContext External system context

180

* @param semantic Checkpointing semantic

181

*/

182

@TestTemplate

183

@DisplayName("Test source restarting with a lower parallelism")

184

public void testScaleDown(

185

TestEnvironment testEnv,

186

DataStreamSourceExternalContext<T> externalContext,

187

CheckpointingMode semantic

188

) throws Exception;

189

190

/**

191

* Test source metrics reporting (e.g., numRecordsIn)

192

* @param testEnv The test environment

193

* @param externalContext External system context

194

* @param semantic Checkpointing semantic

195

*/

196

@TestTemplate

197

@DisplayName("Test source metrics")

198

public void testSourceMetrics(

199

TestEnvironment testEnv,

200

DataStreamSourceExternalContext<T> externalContext,

201

CheckpointingMode semantic

202

) throws Exception;

203

204

/**

205

* Test source with idle readers (bounded source required)

206

* Tests that sources properly handle idle readers with no assigned splits

207

* @param testEnv The test environment

208

* @param externalContext External system context

209

* @param semantic Checkpointing semantic

210

*/

211

@TestTemplate

212

@DisplayName("Test source with at least one idle parallelism")

213

public void testIdleReader(

214

TestEnvironment testEnv,

215

DataStreamSourceExternalContext<T> externalContext,

216

CheckpointingMode semantic

217

) throws Exception;

218

219

/**

220

* Test source with TaskManager failover (unbounded source required)

221

* @param testEnv The test environment

222

* @param externalContext External system context

223

* @param controller Cluster controller for triggering failures

224

* @param semantic Checkpointing semantic

225

*/

226

@TestTemplate

227

@DisplayName("Test TaskManager failure")

228

public void testTaskManagerFailure(

229

TestEnvironment testEnv,

230

DataStreamSourceExternalContext<T> externalContext,

231

ClusterControllable controller,

232

CheckpointingMode semantic

233

) throws Exception;

234

}

235

```

236

237

**Usage Examples:**

238

239

```java

240

import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;

241

242

public class KafkaSourceTestSuite extends SourceTestSuiteBase<String> {

243

244

@TestEnv

245

MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment();

246

247

@TestContext

248

ExternalContextFactory<KafkaSourceExternalContext> contextFactory =

249

testName -> new KafkaSourceExternalContext(testName);

250

251

// All test methods are inherited:

252

// testSourceSingleSplit, testMultipleSplits, testSavepoint,

253

// testScaleUp, testScaleDown, testSourceMetrics, testIdleReader, testTaskManagerFailure

254

}

255

```

256

257

### Helper Methods

258

259

Both base classes provide protected helper methods for test data generation and result validation.

260

261

```java { .api }

262

// SinkTestSuiteBase helper methods

263

public abstract class SinkTestSuiteBase<T extends Comparable<T>> {

264

265

/**

266

* Generate test data for sink testing

267

* @param testingSinkSettings Settings for the sink test

268

* @param externalContext External context for data generation

269

* @return List of generated test records

270

*/

271

protected List<T> generateTestData(

272

TestingSinkSettings testingSinkSettings,

273

DataStreamSinkExternalContext<T> externalContext

274

);

275

276

/**

277

* Validate sink results with semantic guarantees

278

* @param reader Reader for external system data

279

* @param testData Expected test data

280

* @param semantic Semantic guarantee to validate against

281

*/

282

protected void checkResultWithSemantic(

283

ExternalSystemDataReader<T> reader,

284

List<T> testData,

285

CheckpointingMode semantic

286

) throws Exception;

287

}

288

289

// SourceTestSuiteBase helper methods

290

public abstract class SourceTestSuiteBase<T> {

291

292

/**

293

* Generate test data and write to external system

294

* @param splitIndex Index of the split to write to

295

* @param externalContext External context

296

* @param testingSourceSettings Source settings

297

* @return List of generated test records

298

*/

299

protected List<T> generateAndWriteTestData(

300

int splitIndex,

301

DataStreamSourceExternalContext<T> externalContext,

302

TestingSourceSettings testingSourceSettings

303

);

304

305

/**

306

* Validate source results with semantic guarantees

307

* @param resultIterator Iterator over result data

308

* @param testData Expected test data (list of splits)

309

* @param semantic Semantic guarantee to validate against

310

* @param limit Optional limit for unbounded sources

311

*/

312

protected void checkResultWithSemantic(

313

CloseableIterator<T> resultIterator,

314

List<List<T>> testData,

315

CheckpointingMode semantic,

316

Integer limit

317

);

318

319

/**

320

* Create source instance for testing

321

* @param externalContext External context

322

* @param sourceOptions Source configuration

323

* @return Source instance

324

*/

325

protected Source<T, ?, ?> tryCreateSource(

326

DataStreamSourceExternalContext<T> externalContext,

327

TestingSourceSettings sourceOptions

328

);

329

330

/**

331

* Submit Flink job for testing

332

* @param env Stream execution environment

333

* @param jobName Name for the job

334

* @return JobClient for managing the job

335

*/

336

protected JobClient submitJob(StreamExecutionEnvironment env, String jobName) throws Exception;

337

}

338

```

339

340

## Test Execution Flow

341

342

### Sink Test Flow

343

344

1. **Setup Phase**: Framework injects test environment, external context, and semantic mode

345

2. **Data Generation**: Generate test data using external context

346

3. **Job Creation**: Create Flink job with sink connected to external system

347

4. **Job Execution**: Execute job and wait for completion

348

5. **Validation**: Read data from external system and validate against expected results

349

6. **Cleanup**: Framework handles resource cleanup

350

351

### Source Test Flow

352

353

1. **Setup Phase**: Framework injects test environment, external context, and semantic mode

354

2. **Data Preparation**: Write test data to external system splits

355

3. **Job Creation**: Create Flink job with source reading from external system

356

4. **Job Execution**: Execute job and collect results

357

5. **Validation**: Validate collected results against written test data

358

6. **Cleanup**: Framework handles resource cleanup

359

360

## Test Configuration

361

362

Tests are automatically parameterized across different configurations:

363

364

- **Checkpointing Modes**: `EXACTLY_ONCE`, `AT_LEAST_ONCE`

365

- **External Contexts**: All contexts provided via `@TestContext` annotations

366

- **Test Environments**: Environment provided via `@TestEnv` annotation

367

368

The framework automatically generates test combinations and runs each test method multiple times with different parameter combinations.