0
# Assertions and Validation
1
2
The assertion framework provides specialized utilities for validating connector behavior with support for different semantic guarantees (exactly-once, at-least-once). It integrates with Flink's collect sink mechanism to validate streaming results.
3
4
## Capabilities
5
6
### Collect Iterator Assertions
7
8
Entry point for creating assertion instances for iterator validation.
9
10
```java { .api }
11
/**
12
* Entry point for assertion methods for CollectIteratorAssert
13
* Each method is a static factory for creating assertion instances
14
*/
15
public final class CollectIteratorAssertions {
16
17
/**
18
* Create ordered assertion instance for iterator validation
19
* @param actual Iterator to validate
20
* @param <T> Type of elements in iterator
21
* @return CollectIteratorAssert instance for method chaining
22
*/
23
public static <T> CollectIteratorAssert<T> assertThat(Iterator<T> actual);
24
25
/**
26
* Create unordered assertion instance for iterator validation
27
* @param actual Iterator to validate
28
* @param <T> Type of elements in iterator
29
* @return UnorderedCollectIteratorAssert instance for method chaining
30
*/
31
public static <T> UnorderedCollectIteratorAssert<T> assertUnordered(Iterator<T> actual);
32
}
33
```
34
35
**Usage Examples:**
36
37
```java
38
import static org.apache.flink.connector.testframe.utils.CollectIteratorAssertions.assertThat;
39
import static org.apache.flink.connector.testframe.utils.CollectIteratorAssertions.assertUnordered;
40
41
// Basic ordered assertion
42
Iterator<String> results = collectResults();
43
assertThat(results)
44
.matchesRecordsFromSource(expectedData, CheckpointingMode.EXACTLY_ONCE);
45
46
// Unordered assertion for parallel processing
47
assertUnordered(results)
48
.containsExactlyInAnyOrder(expectedData);
49
```
50
51
### Ordered Iterator Assertions
52
53
Assertion utilities for validating iterator results with order considerations.
54
55
```java { .api }
56
/**
57
* Assertion utilities for ordered iterator validation
58
* @param <T> Type of elements being validated
59
*/
60
public class CollectIteratorAssert<T> {
61
62
/**
63
* Validate iterator matches expected records from source with semantic guarantees
64
* @param expected List of expected record collections (one per source split)
65
* @param semantic Checkpointing semantic to validate against
66
*/
67
public void matchesRecordsFromSource(
68
List<List<T>> expected,
69
CheckpointingMode semantic
70
);
71
72
/**
73
* Set limit on number of records to validate (for unbounded sources)
74
* @param limit Maximum number of records to validate
75
* @return Self for method chaining
76
*/
77
public CollectIteratorAssert<T> withNumRecordsLimit(int limit);
78
}
79
```
80
81
**Usage Examples:**
82
83
```java
84
// Source testing with multiple splits
85
List<List<String>> expectedBySplit = Arrays.asList(
86
Arrays.asList("split1-record1", "split1-record2"),
87
Arrays.asList("split2-record1", "split2-record2")
88
);
89
90
assertThat(resultIterator)
91
.matchesRecordsFromSource(expectedBySplit, CheckpointingMode.EXACTLY_ONCE);
92
93
// Unbounded source testing with record limit
94
assertThat(resultIterator)
95
.withNumRecordsLimit(100)
96
.matchesRecordsFromSource(expectedBySplit, CheckpointingMode.AT_LEAST_ONCE);
97
```
98
99
### Unordered Iterator Assertions
100
101
Assertion utilities for validating iterator results without order requirements.
102
103
```java { .api }
104
/**
105
* Assertion utilities for unordered iterator validation
106
* @param <T> Type of elements being validated
107
*/
108
public class UnorderedCollectIteratorAssert<T> {
109
110
/**
111
* Set limit on number of records to validate (for unbounded sources)
112
* @param limit Maximum number of records to validate
113
* @return Self for method chaining
114
*/
115
public UnorderedCollectIteratorAssert<T> withNumRecordsLimit(int limit);
116
117
/**
118
* Validate iterator matches expected records from source with semantic guarantees
119
* @param expected List of expected record collections (one per source split)
120
* @param semantic Checkpointing semantic to validate against
121
*/
122
public void matchesRecordsFromSource(
123
List<List<T>> expected,
124
CheckpointingMode semantic
125
);
126
}
127
```
128
129
**Usage Examples:**
130
131
```java
132
// Unordered validation for parallel processing
133
List<List<String>> expectedBySplit = Arrays.asList(
134
Arrays.asList("split1-record1", "split1-record2"),
135
Arrays.asList("split2-record1", "split2-record2")
136
);
137
138
assertUnordered(resultIterator)
139
.matchesRecordsFromSource(expectedBySplit, CheckpointingMode.EXACTLY_ONCE);
140
141
// Unbounded source testing with record limit
142
assertUnordered(resultIterator)
143
.withNumRecordsLimit(100)
144
.matchesRecordsFromSource(expectedBySplit, CheckpointingMode.AT_LEAST_ONCE);
145
```
146
147
## Semantic Guarantee Validation
148
149
### Exactly-Once Semantics
150
151
Validates that each record appears exactly once in the results.
152
153
```java
154
// Exactly-once validation
155
assertThat(resultIterator)
156
.matchesRecordsFromSource(expectedData, CheckpointingMode.EXACTLY_ONCE);
157
158
// Behavior:
159
// - Each expected record must appear exactly once
160
// - No duplicates allowed
161
// - Missing records cause assertion failure
162
// - Extra records cause assertion failure
163
```
164
165
### At-Least-Once Semantics
166
167
Validates that each record appears at least once, allowing for duplicates.
168
169
```java
170
// At-least-once validation
171
assertThat(resultIterator)
172
.matchesRecordsFromSource(expectedData, CheckpointingMode.AT_LEAST_ONCE);
173
174
// Behavior:
175
// - Each expected record must appear at least once
176
// - Duplicates are allowed and ignored
177
// - Missing records cause assertion failure
178
// - Extra records that match expected records are allowed
179
```
180
181
## Integration with Test Framework
182
183
### Automatic Result Collection
184
185
Test suites automatically set up result collection using Flink's collect sink.
186
187
```java { .api }
188
/**
189
* Helper class for building CollectResultIterator instances
190
* @param <T> Type of collected elements
191
*/
192
protected static class CollectIteratorBuilder<T> {
193
194
/**
195
* Build CollectResultIterator bound to job client
196
* @param jobClient Job client for result collection
197
* @return CollectResultIterator for accessing results
198
*/
199
protected CollectResultIterator<T> build(JobClient jobClient);
200
}
201
202
// Used internally by test suite base classes
203
protected CollectIteratorBuilder<T> addCollectSink(DataStream<T> stream);
204
```
205
206
**Usage in Test Suites:**
207
208
```java
209
// Automatic setup in test suite base classes
210
DataStreamSource<T> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test Source");
211
CollectIteratorBuilder<T> iteratorBuilder = addCollectSink(stream);
212
JobClient jobClient = env.executeAsync("Test Job");
213
214
// Get results and validate
215
try (CollectResultIterator<T> resultIterator = iteratorBuilder.build(jobClient)) {
216
assertThat(resultIterator)
217
.matchesRecordsFromSource(expectedData, semantic);
218
}
219
```
220
221
### Result Validation Patterns
222
223
#### Sink Testing Pattern
224
225
```java
226
// In SinkTestSuiteBase
227
protected void checkResultWithSemantic(
228
ExternalSystemDataReader<T> reader,
229
List<T> testData,
230
CheckpointingMode semantic
231
) throws Exception {
232
233
final ArrayList<T> result = new ArrayList<>();
234
waitUntilCondition(() -> {
235
// Poll data from external system
236
pollAndAppendResultData(result, reader, testData, 30, semantic);
237
try {
238
// Validate using assertions
239
CollectIteratorAssertions.assertThat(sort(result).iterator())
240
.matchesRecordsFromSource(Arrays.asList(sort(testData)), semantic);
241
return true;
242
} catch (Throwable t) {
243
return false;
244
}
245
});
246
}
247
```
248
249
#### Source Testing Pattern
250
251
```java
252
// In SourceTestSuiteBase
253
protected void checkResultWithSemantic(
254
CloseableIterator<T> resultIterator,
255
List<List<T>> testData,
256
CheckpointingMode semantic,
257
Integer limit
258
) {
259
if (limit != null) {
260
// Unbounded source with record limit
261
Runnable runnable = () ->
262
CollectIteratorAssertions.assertThat(resultIterator)
263
.withNumRecordsLimit(limit)
264
.matchesRecordsFromSource(testData, semantic);
265
266
assertThatFuture(runAsync(runnable)).eventuallySucceeds();
267
} else {
268
// Bounded source
269
CollectIteratorAssertions.assertThat(resultIterator)
270
.matchesRecordsFromSource(testData, semantic);
271
}
272
}
273
```
274
275
## Advanced Validation Scenarios
276
277
### Multi-Split Source Validation
278
279
Validates results from sources with multiple splits, ensuring each split's data is properly consumed.
280
281
```java
282
// Test data organized by split
283
List<List<String>> testDataBySplit = Arrays.asList(
284
Arrays.asList("split0-rec1", "split0-rec2", "split0-rec3"), // Split 0
285
Arrays.asList("split1-rec1", "split1-rec2"), // Split 1
286
Arrays.asList("split2-rec1", "split2-rec2", "split2-rec3") // Split 2
287
);
288
289
// Validation allows records from different splits to be interleaved
290
assertThat(resultIterator)
291
.matchesRecordsFromSource(testDataBySplit, CheckpointingMode.EXACTLY_ONCE);
292
```
293
294
### Bounded vs Unbounded Source Validation
295
296
```java
297
// Bounded source - validate all expected records
298
public void testBoundedSource() {
299
assertThat(resultIterator)
300
.matchesRecordsFromSource(expectedData, semantic);
301
// Will wait for job to finish naturally
302
}
303
304
// Unbounded source - validate limited number of records
305
public void testUnboundedSource() {
306
assertThat(resultIterator)
307
.withNumRecordsLimit(expectedSize)
308
.matchesRecordsFromSource(expectedData, semantic);
309
// Will validate first expectedSize records then succeed
310
}
311
```
312
313
### Failure Scenario Validation
314
315
```java
316
// Validate recovery after failure
317
public void testTaskManagerFailure() {
318
// Phase 1: Validate records before failure
319
checkResultWithSemantic(iterator, beforeFailureData, semantic, beforeFailureData.size());
320
321
// Trigger failure
322
controller.triggerTaskManagerFailover(jobClient, () -> {
323
// Write additional data after failure
324
writeTestData(afterFailureData);
325
});
326
327
// Phase 2: Validate records after recovery
328
checkResultWithSemantic(iterator, afterFailureData, semantic, afterFailureData.size());
329
}
330
```
331
332
## Error Handling
333
334
### Common Assertion Failures
335
336
```java
337
// Record count mismatch
338
AssertionError: Expected 100 records but found 95
339
// Missing records in exactly-once
340
AssertionError: Expected record 'test-42' not found in results
341
// Unexpected records in exactly-once
342
AssertionError: Unexpected record 'duplicate-7' found in results
343
```
344
345
### Timeout Configuration
346
347
```java
348
// Configure timeouts for result collection
349
CollectResultIterator<T> iterator = new CollectResultIterator<>(
350
operatorUid,
351
serializer,
352
accumulatorName,
353
checkpointConfig,
354
Duration.ofMinutes(5).toMillis() // 5 minute timeout
355
);
356
```
357
358
### Debugging Failed Assertions
359
360
```java
361
// Enable detailed logging for debugging
362
Logger logger = LoggerFactory.getLogger(CollectIteratorAssert.class);
363
// Add logging to capture actual vs expected results
364
// Use breakpoints to inspect iterator contents
365
// Check external system state for sink validations
366
```