or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

assertions.mdcontainers.mdexternal-systems.mdindex.mdjunit-integration.mdmetrics.mdtest-environments.mdtest-suites.md

assertions.mddocs/

0

# Assertions and Validation

1

2

The assertion framework provides specialized utilities for validating connector behavior with support for different semantic guarantees (exactly-once, at-least-once). It integrates with Flink's collect sink mechanism to validate streaming results.

3

4

## Capabilities

5

6

### Collect Iterator Assertions

7

8

Entry point for creating assertion instances for iterator validation.

9

10

```java { .api }

11

/**

12

* Entry point for assertion methods for CollectIteratorAssert

13

* Each method is a static factory for creating assertion instances

14

*/

15

public final class CollectIteratorAssertions {

16

17

/**

18

* Create ordered assertion instance for iterator validation

19

* @param actual Iterator to validate

20

* @param <T> Type of elements in iterator

21

* @return CollectIteratorAssert instance for method chaining

22

*/

23

public static <T> CollectIteratorAssert<T> assertThat(Iterator<T> actual);

24

25

/**

26

* Create unordered assertion instance for iterator validation

27

* @param actual Iterator to validate

28

* @param <T> Type of elements in iterator

29

* @return UnorderedCollectIteratorAssert instance for method chaining

30

*/

31

public static <T> UnorderedCollectIteratorAssert<T> assertUnordered(Iterator<T> actual);

32

}

33

```

34

35

**Usage Examples:**

36

37

```java

38

import static org.apache.flink.connector.testframe.utils.CollectIteratorAssertions.assertThat;

39

import static org.apache.flink.connector.testframe.utils.CollectIteratorAssertions.assertUnordered;

40

41

// Basic ordered assertion

42

Iterator<String> results = collectResults();

43

assertThat(results)

44

.matchesRecordsFromSource(expectedData, CheckpointingMode.EXACTLY_ONCE);

45

46

// Unordered assertion for parallel processing

47

assertUnordered(results)

48

.containsExactlyInAnyOrder(expectedData);

49

```

50

51

### Ordered Iterator Assertions

52

53

Assertion utilities for validating iterator results with order considerations.

54

55

```java { .api }

56

/**

57

* Assertion utilities for ordered iterator validation

58

* @param <T> Type of elements being validated

59

*/

60

public class CollectIteratorAssert<T> {

61

62

/**

63

* Validate iterator matches expected records from source with semantic guarantees

64

* @param expected List of expected record collections (one per source split)

65

* @param semantic Checkpointing semantic to validate against

66

*/

67

public void matchesRecordsFromSource(

68

List<List<T>> expected,

69

CheckpointingMode semantic

70

);

71

72

/**

73

* Set limit on number of records to validate (for unbounded sources)

74

* @param limit Maximum number of records to validate

75

* @return Self for method chaining

76

*/

77

public CollectIteratorAssert<T> withNumRecordsLimit(int limit);

78

}

79

```

80

81

**Usage Examples:**

82

83

```java

84

// Source testing with multiple splits

85

List<List<String>> expectedBySplit = Arrays.asList(

86

Arrays.asList("split1-record1", "split1-record2"),

87

Arrays.asList("split2-record1", "split2-record2")

88

);

89

90

assertThat(resultIterator)

91

.matchesRecordsFromSource(expectedBySplit, CheckpointingMode.EXACTLY_ONCE);

92

93

// Unbounded source testing with record limit

94

assertThat(resultIterator)

95

.withNumRecordsLimit(100)

96

.matchesRecordsFromSource(expectedBySplit, CheckpointingMode.AT_LEAST_ONCE);

97

```

98

99

### Unordered Iterator Assertions

100

101

Assertion utilities for validating iterator results without order requirements.

102

103

```java { .api }

104

/**

105

* Assertion utilities for unordered iterator validation

106

* @param <T> Type of elements being validated

107

*/

108

public class UnorderedCollectIteratorAssert<T> {

109

110

/**

111

* Set limit on number of records to validate (for unbounded sources)

112

* @param limit Maximum number of records to validate

113

* @return Self for method chaining

114

*/

115

public UnorderedCollectIteratorAssert<T> withNumRecordsLimit(int limit);

116

117

/**

118

* Validate iterator matches expected records from source with semantic guarantees

119

* @param expected List of expected record collections (one per source split)

120

* @param semantic Checkpointing semantic to validate against

121

*/

122

public void matchesRecordsFromSource(

123

List<List<T>> expected,

124

CheckpointingMode semantic

125

);

126

}

127

```

128

129

**Usage Examples:**

130

131

```java

132

// Unordered validation for parallel processing

133

List<List<String>> expectedBySplit = Arrays.asList(

134

Arrays.asList("split1-record1", "split1-record2"),

135

Arrays.asList("split2-record1", "split2-record2")

136

);

137

138

assertUnordered(resultIterator)

139

.matchesRecordsFromSource(expectedBySplit, CheckpointingMode.EXACTLY_ONCE);

140

141

// Unbounded source testing with record limit

142

assertUnordered(resultIterator)

143

.withNumRecordsLimit(100)

144

.matchesRecordsFromSource(expectedBySplit, CheckpointingMode.AT_LEAST_ONCE);

145

```

146

147

## Semantic Guarantee Validation

148

149

### Exactly-Once Semantics

150

151

Validates that each record appears exactly once in the results.

152

153

```java

154

// Exactly-once validation

155

assertThat(resultIterator)

156

.matchesRecordsFromSource(expectedData, CheckpointingMode.EXACTLY_ONCE);

157

158

// Behavior:

159

// - Each expected record must appear exactly once

160

// - No duplicates allowed

161

// - Missing records cause assertion failure

162

// - Extra records cause assertion failure

163

```

164

165

### At-Least-Once Semantics

166

167

Validates that each record appears at least once, allowing for duplicates.

168

169

```java

170

// At-least-once validation

171

assertThat(resultIterator)

172

.matchesRecordsFromSource(expectedData, CheckpointingMode.AT_LEAST_ONCE);

173

174

// Behavior:

175

// - Each expected record must appear at least once

176

// - Duplicates are allowed and ignored

177

// - Missing records cause assertion failure

178

// - Extra records that match expected records are allowed

179

```

180

181

## Integration with Test Framework

182

183

### Automatic Result Collection

184

185

Test suites automatically set up result collection using Flink's collect sink.

186

187

```java { .api }

188

/**

189

* Helper class for building CollectResultIterator instances

190

* @param <T> Type of collected elements

191

*/

192

protected static class CollectIteratorBuilder<T> {

193

194

/**

195

* Build CollectResultIterator bound to job client

196

* @param jobClient Job client for result collection

197

* @return CollectResultIterator for accessing results

198

*/

199

protected CollectResultIterator<T> build(JobClient jobClient);

200

}

201

202

// Used internally by test suite base classes

203

protected CollectIteratorBuilder<T> addCollectSink(DataStream<T> stream);

204

```

205

206

**Usage in Test Suites:**

207

208

```java

209

// Automatic setup in test suite base classes

210

DataStreamSource<T> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test Source");

211

CollectIteratorBuilder<T> iteratorBuilder = addCollectSink(stream);

212

JobClient jobClient = env.executeAsync("Test Job");

213

214

// Get results and validate

215

try (CollectResultIterator<T> resultIterator = iteratorBuilder.build(jobClient)) {

216

assertThat(resultIterator)

217

.matchesRecordsFromSource(expectedData, semantic);

218

}

219

```

220

221

### Result Validation Patterns

222

223

#### Sink Testing Pattern

224

225

```java

226

// In SinkTestSuiteBase

227

protected void checkResultWithSemantic(

228

ExternalSystemDataReader<T> reader,

229

List<T> testData,

230

CheckpointingMode semantic

231

) throws Exception {

232

233

final ArrayList<T> result = new ArrayList<>();

234

waitUntilCondition(() -> {

235

// Poll data from external system

236

pollAndAppendResultData(result, reader, testData, 30, semantic);

237

try {

238

// Validate using assertions

239

CollectIteratorAssertions.assertThat(sort(result).iterator())

240

.matchesRecordsFromSource(Arrays.asList(sort(testData)), semantic);

241

return true;

242

} catch (Throwable t) {

243

return false;

244

}

245

});

246

}

247

```

248

249

#### Source Testing Pattern

250

251

```java

252

// In SourceTestSuiteBase

253

protected void checkResultWithSemantic(

254

CloseableIterator<T> resultIterator,

255

List<List<T>> testData,

256

CheckpointingMode semantic,

257

Integer limit

258

) {

259

if (limit != null) {

260

// Unbounded source with record limit

261

Runnable runnable = () ->

262

CollectIteratorAssertions.assertThat(resultIterator)

263

.withNumRecordsLimit(limit)

264

.matchesRecordsFromSource(testData, semantic);

265

266

assertThatFuture(runAsync(runnable)).eventuallySucceeds();

267

} else {

268

// Bounded source

269

CollectIteratorAssertions.assertThat(resultIterator)

270

.matchesRecordsFromSource(testData, semantic);

271

}

272

}

273

```

274

275

## Advanced Validation Scenarios

276

277

### Multi-Split Source Validation

278

279

Validates results from sources with multiple splits, ensuring each split's data is properly consumed.

280

281

```java

282

// Test data organized by split

283

List<List<String>> testDataBySplit = Arrays.asList(

284

Arrays.asList("split0-rec1", "split0-rec2", "split0-rec3"), // Split 0

285

Arrays.asList("split1-rec1", "split1-rec2"), // Split 1

286

Arrays.asList("split2-rec1", "split2-rec2", "split2-rec3") // Split 2

287

);

288

289

// Validation allows records from different splits to be interleaved

290

assertThat(resultIterator)

291

.matchesRecordsFromSource(testDataBySplit, CheckpointingMode.EXACTLY_ONCE);

292

```

293

294

### Bounded vs Unbounded Source Validation

295

296

```java

297

// Bounded source - validate all expected records

298

public void testBoundedSource() {

299

assertThat(resultIterator)

300

.matchesRecordsFromSource(expectedData, semantic);

301

// Will wait for job to finish naturally

302

}

303

304

// Unbounded source - validate limited number of records

305

public void testUnboundedSource() {

306

assertThat(resultIterator)

307

.withNumRecordsLimit(expectedSize)

308

.matchesRecordsFromSource(expectedData, semantic);

309

// Will validate first expectedSize records then succeed

310

}

311

```

312

313

### Failure Scenario Validation

314

315

```java

316

// Validate recovery after failure

317

public void testTaskManagerFailure() {

318

// Phase 1: Validate records before failure

319

checkResultWithSemantic(iterator, beforeFailureData, semantic, beforeFailureData.size());

320

321

// Trigger failure

322

controller.triggerTaskManagerFailover(jobClient, () -> {

323

// Write additional data after failure

324

writeTestData(afterFailureData);

325

});

326

327

// Phase 2: Validate records after recovery

328

checkResultWithSemantic(iterator, afterFailureData, semantic, afterFailureData.size());

329

}

330

```

331

332

## Error Handling

333

334

### Common Assertion Failures

335

336

```java

337

// Record count mismatch

338

AssertionError: Expected 100 records but found 95

339

// Missing records in exactly-once

340

AssertionError: Expected record 'test-42' not found in results

341

// Unexpected records in exactly-once

342

AssertionError: Unexpected record 'duplicate-7' found in results

343

```

344

345

### Timeout Configuration

346

347

```java

348

// Configure timeouts for result collection

349

CollectResultIterator<T> iterator = new CollectResultIterator<>(

350

operatorUid,

351

serializer,

352

accumulatorName,

353

checkpointConfig,

354

Duration.ofMinutes(5).toMillis() // 5 minute timeout

355

);

356

```

357

358

### Debugging Failed Assertions

359

360

```java

361

// Enable detailed logging for debugging

362

Logger logger = LoggerFactory.getLogger(CollectIteratorAssert.class);

363

// Add logging to capture actual vs expected results

364

// Use breakpoints to inspect iterator contents

365

// Check external system state for sink validations

366

```