0
# Test Suites
1
2
The test suite framework provides base classes that implement comprehensive test scenarios for connector validation. These base classes use JUnit 5's `@TestTemplate` mechanism to automatically run multiple test variations with different configurations.
3
4
## Capabilities
5
6
### Sink Test Suite Base
7
8
Base class for testing sink connectors with comprehensive scenarios including basic functionality, savepoint restart, scaling, and metrics validation.
9
10
```java { .api }
11
/**
12
* Base class for sink test suite providing comprehensive test scenarios
13
* @param <T> Type of data elements, must be Comparable for result validation
14
*/
15
public abstract class SinkTestSuiteBase<T extends Comparable<T>> {
16
17
/**
18
* Test basic sink functionality with data writing and validation
19
* @param testEnv The test environment (injected by framework)
20
* @param externalContext External system context (injected by framework)
21
* @param semantic Checkpointing semantic (injected by framework)
22
*/
23
@TestTemplate
24
@DisplayName("Test data stream sink")
25
public void testBasicSink(
26
TestEnvironment testEnv,
27
DataStreamSinkExternalContext<T> externalContext,
28
CheckpointingMode semantic
29
) throws Exception;
30
31
/**
32
* Test sink restart from savepoint with same parallelism
33
* @param testEnv The test environment
34
* @param externalContext External system context
35
* @param semantic Checkpointing semantic
36
*/
37
@TestTemplate
38
@DisplayName("Test sink restarting from a savepoint")
39
public void testStartFromSavepoint(
40
TestEnvironment testEnv,
41
DataStreamSinkExternalContext<T> externalContext,
42
CheckpointingMode semantic
43
) throws Exception;
44
45
/**
46
* Test sink restart with higher parallelism (scale up)
47
* @param testEnv The test environment
48
* @param externalContext External system context
49
* @param semantic Checkpointing semantic
50
*/
51
@TestTemplate
52
@DisplayName("Test sink restarting with a higher parallelism")
53
public void testScaleUp(
54
TestEnvironment testEnv,
55
DataStreamSinkExternalContext<T> externalContext,
56
CheckpointingMode semantic
57
) throws Exception;
58
59
/**
60
* Test sink restart with lower parallelism (scale down)
61
* @param testEnv The test environment
62
* @param externalContext External system context
63
* @param semantic Checkpointing semantic
64
*/
65
@TestTemplate
66
@DisplayName("Test sink restarting with a lower parallelism")
67
public void testScaleDown(
68
TestEnvironment testEnv,
69
DataStreamSinkExternalContext<T> externalContext,
70
CheckpointingMode semantic
71
) throws Exception;
72
73
/**
74
* Test sink metrics reporting (e.g., numRecordsOut)
75
* @param testEnv The test environment
76
* @param externalContext External system context
77
* @param semantic Checkpointing semantic
78
*/
79
@TestTemplate
80
@DisplayName("Test sink metrics")
81
public void testMetrics(
82
TestEnvironment testEnv,
83
DataStreamSinkExternalContext<T> externalContext,
84
CheckpointingMode semantic
85
) throws Exception;
86
}
87
```
88
89
**Usage Examples:**
90
91
```java
92
import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
93
import org.apache.flink.connector.testframe.junit.annotations.*;
94
95
public class KafkaSinkTestSuite extends SinkTestSuiteBase<String> {
96
97
@TestEnv
98
MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment();
99
100
@TestContext
101
ExternalContextFactory<KafkaSinkExternalContext> contextFactory =
102
testName -> new KafkaSinkExternalContext(testName);
103
104
// All test methods are inherited and run automatically
105
// testBasicSink, testStartFromSavepoint, testScaleUp, testScaleDown, testMetrics
106
}
107
```
108
109
### Source Test Suite Base
110
111
Base class for testing source connectors with scenarios including single/multiple splits, savepoint restart, scaling, idle readers, and failover testing.
112
113
```java { .api }
114
/**
115
* Base class for source test suite providing comprehensive test scenarios
116
* @param <T> Type of data elements produced by the source
117
*/
118
public abstract class SourceTestSuiteBase<T> {
119
120
/**
121
* Test source with single split (bounded source required)
122
* @param testEnv The test environment
123
* @param externalContext External system context
124
* @param semantic Checkpointing semantic
125
*/
126
@TestTemplate
127
@DisplayName("Test source with single split")
128
public void testSourceSingleSplit(
129
TestEnvironment testEnv,
130
DataStreamSourceExternalContext<T> externalContext,
131
CheckpointingMode semantic
132
) throws Exception;
133
134
/**
135
* Test source with multiple splits (bounded source required)
136
* @param testEnv The test environment
137
* @param externalContext External system context
138
* @param semantic Checkpointing semantic
139
*/
140
@TestTemplate
141
@DisplayName("Test source with multiple splits")
142
public void testMultipleSplits(
143
TestEnvironment testEnv,
144
DataStreamSourceExternalContext<T> externalContext,
145
CheckpointingMode semantic
146
) throws Exception;
147
148
/**
149
* Test source restart from savepoint
150
* @param testEnv The test environment
151
* @param externalContext External system context
152
* @param semantic Checkpointing semantic
153
*/
154
@TestTemplate
155
@DisplayName("Test source restarting from a savepoint")
156
public void testSavepoint(
157
TestEnvironment testEnv,
158
DataStreamSourceExternalContext<T> externalContext,
159
CheckpointingMode semantic
160
) throws Exception;
161
162
/**
163
* Test source scale up (restart with higher parallelism)
164
* @param testEnv The test environment
165
* @param externalContext External system context
166
* @param semantic Checkpointing semantic
167
*/
168
@TestTemplate
169
@DisplayName("Test source restarting with a higher parallelism")
170
public void testScaleUp(
171
TestEnvironment testEnv,
172
DataStreamSourceExternalContext<T> externalContext,
173
CheckpointingMode semantic
174
) throws Exception;
175
176
/**
177
* Test source scale down (restart with lower parallelism)
178
* @param testEnv The test environment
179
* @param externalContext External system context
180
* @param semantic Checkpointing semantic
181
*/
182
@TestTemplate
183
@DisplayName("Test source restarting with a lower parallelism")
184
public void testScaleDown(
185
TestEnvironment testEnv,
186
DataStreamSourceExternalContext<T> externalContext,
187
CheckpointingMode semantic
188
) throws Exception;
189
190
/**
191
* Test source metrics reporting (e.g., numRecordsIn)
192
* @param testEnv The test environment
193
* @param externalContext External system context
194
* @param semantic Checkpointing semantic
195
*/
196
@TestTemplate
197
@DisplayName("Test source metrics")
198
public void testSourceMetrics(
199
TestEnvironment testEnv,
200
DataStreamSourceExternalContext<T> externalContext,
201
CheckpointingMode semantic
202
) throws Exception;
203
204
/**
205
* Test source with idle readers (bounded source required)
206
* Tests that sources properly handle idle readers with no assigned splits
207
* @param testEnv The test environment
208
* @param externalContext External system context
209
* @param semantic Checkpointing semantic
210
*/
211
@TestTemplate
212
@DisplayName("Test source with at least one idle parallelism")
213
public void testIdleReader(
214
TestEnvironment testEnv,
215
DataStreamSourceExternalContext<T> externalContext,
216
CheckpointingMode semantic
217
) throws Exception;
218
219
/**
220
* Test source with TaskManager failover (unbounded source required)
221
* @param testEnv The test environment
222
* @param externalContext External system context
223
* @param controller Cluster controller for triggering failures
224
* @param semantic Checkpointing semantic
225
*/
226
@TestTemplate
227
@DisplayName("Test TaskManager failure")
228
public void testTaskManagerFailure(
229
TestEnvironment testEnv,
230
DataStreamSourceExternalContext<T> externalContext,
231
ClusterControllable controller,
232
CheckpointingMode semantic
233
) throws Exception;
234
}
235
```
236
237
**Usage Examples:**
238
239
```java
240
import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
241
242
public class KafkaSourceTestSuite extends SourceTestSuiteBase<String> {
243
244
@TestEnv
245
MiniClusterTestEnvironment testEnv = new MiniClusterTestEnvironment();
246
247
@TestContext
248
ExternalContextFactory<KafkaSourceExternalContext> contextFactory =
249
testName -> new KafkaSourceExternalContext(testName);
250
251
// All test methods are inherited:
252
// testSourceSingleSplit, testMultipleSplits, testSavepoint,
253
// testScaleUp, testScaleDown, testSourceMetrics, testIdleReader, testTaskManagerFailure
254
}
255
```
256
257
### Helper Methods
258
259
Both base classes provide protected helper methods for test data generation and result validation.
260
261
```java { .api }
262
// SinkTestSuiteBase helper methods
263
public abstract class SinkTestSuiteBase<T extends Comparable<T>> {
264
265
/**
266
* Generate test data for sink testing
267
* @param testingSinkSettings Settings for the sink test
268
* @param externalContext External context for data generation
269
* @return List of generated test records
270
*/
271
protected List<T> generateTestData(
272
TestingSinkSettings testingSinkSettings,
273
DataStreamSinkExternalContext<T> externalContext
274
);
275
276
/**
277
* Validate sink results with semantic guarantees
278
* @param reader Reader for external system data
279
* @param testData Expected test data
280
* @param semantic Semantic guarantee to validate against
281
*/
282
protected void checkResultWithSemantic(
283
ExternalSystemDataReader<T> reader,
284
List<T> testData,
285
CheckpointingMode semantic
286
) throws Exception;
287
}
288
289
// SourceTestSuiteBase helper methods
290
public abstract class SourceTestSuiteBase<T> {
291
292
/**
293
* Generate test data and write to external system
294
* @param splitIndex Index of the split to write to
295
* @param externalContext External context
296
* @param testingSourceSettings Source settings
297
* @return List of generated test records
298
*/
299
protected List<T> generateAndWriteTestData(
300
int splitIndex,
301
DataStreamSourceExternalContext<T> externalContext,
302
TestingSourceSettings testingSourceSettings
303
);
304
305
/**
306
* Validate source results with semantic guarantees
307
* @param resultIterator Iterator over result data
308
* @param testData Expected test data (list of splits)
309
* @param semantic Semantic guarantee to validate against
310
* @param limit Optional limit for unbounded sources
311
*/
312
protected void checkResultWithSemantic(
313
CloseableIterator<T> resultIterator,
314
List<List<T>> testData,
315
CheckpointingMode semantic,
316
Integer limit
317
);
318
319
/**
320
* Create source instance for testing
321
* @param externalContext External context
322
* @param sourceOptions Source configuration
323
* @return Source instance
324
*/
325
protected Source<T, ?, ?> tryCreateSource(
326
DataStreamSourceExternalContext<T> externalContext,
327
TestingSourceSettings sourceOptions
328
);
329
330
/**
331
* Submit Flink job for testing
332
* @param env Stream execution environment
333
* @param jobName Name for the job
334
* @return JobClient for managing the job
335
*/
336
protected JobClient submitJob(StreamExecutionEnvironment env, String jobName) throws Exception;
337
}
338
```
339
340
## Test Execution Flow
341
342
### Sink Test Flow
343
344
1. **Setup Phase**: Framework injects test environment, external context, and semantic mode
345
2. **Data Generation**: Generate test data using external context
346
3. **Job Creation**: Create Flink job with sink connected to external system
347
4. **Job Execution**: Execute job and wait for completion
348
5. **Validation**: Read data from external system and validate against expected results
349
6. **Cleanup**: Framework handles resource cleanup
350
351
### Source Test Flow
352
353
1. **Setup Phase**: Framework injects test environment, external context, and semantic mode
354
2. **Data Preparation**: Write test data to external system splits
355
3. **Job Creation**: Create Flink job with source reading from external system
356
4. **Job Execution**: Execute job and collect results
357
5. **Validation**: Validate collected results against written test data
358
6. **Cleanup**: Framework handles resource cleanup
359
360
## Test Configuration
361
362
Tests are automatically parameterized across different configurations:
363
364
- **Checkpointing Modes**: `EXACTLY_ONCE`, `AT_LEAST_ONCE`
365
- **External Contexts**: All contexts provided via `@TestContext` annotations
366
- **Test Environments**: Environment provided via `@TestEnv` annotation
367
368
The framework automatically generates test combinations and runs each test method multiple times with different parameter combinations.