or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

class-loading-programs.mdindex.mdmigration-testing.mdperformance-testing.mdstate-management-testing.mdstreaming-utilities.mdtest-base-classes.mdtest-data-generation.md

streaming-utilities.mddocs/

0

# Streaming Test Utilities

1

2

Thread-safe utilities for collecting and validating results in streaming Flink applications. These utilities provide mechanisms for result collection, test coordination, fault injection, validation, and streaming-specific helper functions that enable comprehensive testing of streaming topologies including failure scenarios.

3

4

## Capabilities

5

6

### Test List Result Sink

7

8

Thread-safe sink for collecting streaming results into lists for verification.

9

10

```java { .api }

11

/**

12

* Thread-safe sink that collects streaming results into a list for test verification

13

*/

14

public class TestListResultSink<T> extends RichSinkFunction<T> {

15

16

/**

17

* Default constructor creating empty result sink

18

*/

19

public TestListResultSink();

20

21

/**

22

* Get the collected results as an unordered list

23

* @return List containing all collected elements

24

*/

25

public List<T> getResult();

26

27

/**

28

* Get the collected results sorted using natural ordering

29

* @return Sorted list containing all collected elements

30

*/

31

public List<T> getSortedResult();

32

33

/**

34

* Sink function implementation - adds element to internal collection

35

* @param value Element to collect

36

* @throws Exception if collection fails

37

*/

38

public void invoke(T value) throws Exception;

39

}

40

```

41

42

**Usage Example:**

43

44

```java

45

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

46

import org.apache.flink.test.streaming.runtime.util.TestListResultSink;

47

48

// Setup streaming environment

49

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

50

env.setParallelism(1); // Use parallelism 1 for deterministic results

51

52

// Create result collector

53

TestListResultSink<String> resultSink = new TestListResultSink<>();

54

55

// Build streaming topology

56

env.fromElements("hello", "world", "flink", "streaming")

57

.map(String::toUpperCase)

58

.addSink(resultSink);

59

60

// Execute and collect results

61

env.execute("Test Job");

62

63

// Verify results

64

List<String> results = resultSink.getResult();

65

assertEquals(4, results.size());

66

assertTrue(results.contains("HELLO"));

67

assertTrue(results.contains("WORLD"));

68

69

// Get sorted results for ordered verification

70

List<String> sortedResults = resultSink.getSortedResult();

71

assertEquals("FLINK", sortedResults.get(0));

72

assertEquals("HELLO", sortedResults.get(1));

73

```

74

75

### Test List Wrapper

76

77

Singleton catalog for managing multiple test result lists across different test scenarios.

78

79

```java { .api }

80

/**

81

* Singleton catalog for managing multiple test result lists

82

*/

83

public class TestListWrapper {

84

85

/**

86

* Get the singleton instance of TestListWrapper

87

* @return The singleton TestListWrapper instance

88

*/

89

public static TestListWrapper getInstance();

90

91

/**

92

* Create a new result list and return its ID

93

* @return Integer ID for the newly created list

94

*/

95

public int createList();

96

97

/**

98

* Retrieve a result list by its ID

99

* @param listId ID of the list to retrieve

100

* @return List of collected objects, or null if ID doesn't exist

101

*/

102

public List<Object> getList(int listId);

103

}

104

```

105

106

**Usage Example:**

107

108

```java

109

import org.apache.flink.test.streaming.runtime.util.TestListWrapper;

110

111

// Get wrapper instance

112

TestListWrapper wrapper = TestListWrapper.getInstance();

113

114

// Create lists for different test scenarios

115

int sourceResultsId = wrapper.createList();

116

int processedResultsId = wrapper.createList();

117

118

// Use in streaming topology (pseudocode - would need custom sink implementation)

119

dataStream1.addSink(new ListCollectorSink(sourceResultsId));

120

dataStream2.addSink(new ListCollectorSink(processedResultsId));

121

122

// After execution, retrieve results

123

List<Object> sourceResults = wrapper.getList(sourceResultsId);

124

List<Object> processedResults = wrapper.getList(processedResultsId);

125

126

// Verify both result sets

127

assertEquals(expectedSourceCount, sourceResults.size());

128

assertEquals(expectedProcessedCount, processedResults.size());

129

```

130

131

### Output Selectors and Transformations

132

133

Utility functions and selectors for streaming data routing and transformation.

134

135

```java { .api }

136

/**

137

* Output selector that routes integers based on even/odd criteria

138

*/

139

public class EvenOddOutputSelector implements OutputSelector<Integer> {

140

141

/**

142

* Select output names based on whether the integer is even or odd

143

* @param value Integer value to evaluate

144

* @return Iterable of output names ("even" or "odd")

145

*/

146

public Iterable<String> select(Integer value);

147

}

148

149

/**

150

* No-operation mapper that returns input unchanged

151

*/

152

public class NoOpIntMap implements MapFunction<Integer, Integer> {

153

154

/**

155

* Identity mapping function for integers

156

* @param value Input integer

157

* @return Same integer unchanged

158

* @throws Exception if mapping fails

159

*/

160

public Integer map(Integer value) throws Exception;

161

}

162

163

/**

164

* No-operation sink that receives elements but does nothing with them

165

*/

166

public class ReceiveCheckNoOpSink<T> extends RichSinkFunction<T> {

167

168

/**

169

* Constructor with element count tracking

170

* @param expectedElementCount Expected number of elements to receive

171

*/

172

public ReceiveCheckNoOpSink(int expectedElementCount);

173

174

/**

175

* Receive element but perform no operation

176

* @param value Element to receive

177

* @param context Sink context

178

* @throws Exception if receive fails

179

*/

180

public void invoke(T value, Context context) throws Exception;

181

182

/**

183

* Check if expected number of elements were received

184

* @return true if expected count was reached

185

*/

186

public boolean isExpectedCountReached();

187

}

188

```

189

190

### Fault Injection Utilities

191

192

Sources and sinks for testing failure scenarios and fault tolerance in streaming applications.

193

194

```java { .api }

195

/**

196

* Source that introduces artificial failures for testing fault tolerance

197

*/

198

public class FailingSource<T> extends RichSourceFunction<T> {

199

200

/**

201

* Constructor for failing source with custom event generator

202

* @param generator Custom generator for emitting events before failure

203

* @param failAfterElements Number of elements to emit before inducing failure

204

*/

205

public FailingSource(EventEmittingGenerator<T> generator, int failAfterElements);

206

207

/**

208

* Interface for custom event generation in failing source

209

*/

210

public static interface EventEmittingGenerator<T> extends Serializable {

211

/**

212

* Emit a single event to the source context

213

* @param ctx Source context for emitting events

214

* @param eventSequenceNo Sequence number of current event

215

*/

216

public void emitEvent(SourceContext<T> ctx, int eventSequenceNo);

217

}

218

}

219

220

/**

221

* Sink for validating streaming results with custom validation logic

222

*/

223

public class ValidatingSink<T> extends RichSinkFunction<T> {

224

225

/**

226

* Constructor with result checker and count updater

227

* @param resultChecker Custom checker for validating individual results

228

* @param countUpdater Updater for tracking element counts

229

*/

230

public ValidatingSink(ResultChecker<T> resultChecker, CountUpdater countUpdater);

231

232

/**

233

* Interface for custom result validation logic

234

*/

235

public static interface ResultChecker<T> extends Serializable {

236

/**

237

* Check if a result meets validation criteria

238

* @param result Result element to validate

239

* @return true if result passes validation

240

*/

241

public boolean checkResult(T result);

242

}

243

244

/**

245

* Interface for updating element counts during validation

246

*/

247

public static interface CountUpdater extends Serializable {

248

/**

249

* Update the element count

250

* @param count Current element count

251

*/

252

public void updateCount(long count);

253

}

254

}

255

```

256

257

**Fault Injection Usage Examples:**

258

259

```java

260

import org.apache.flink.test.checkpointing.utils.FailingSource;

261

import org.apache.flink.test.checkpointing.utils.ValidatingSink;

262

263

// Create a failing source that emits integers and fails after 1000 elements

264

FailingSource<Integer> failingSource = new FailingSource<>(

265

new FailingSource.EventEmittingGenerator<Integer>() {

266

@Override

267

public void emitEvent(SourceContext<Integer> ctx, int eventSequenceNo) {

268

ctx.collect(eventSequenceNo);

269

}

270

},

271

1000 // Fail after 1000 elements

272

);

273

274

// Create a validating sink that checks for positive integers

275

ValidatingSink<Integer> validatingSink = new ValidatingSink<>(

276

new ValidatingSink.ResultChecker<Integer>() {

277

@Override

278

public boolean checkResult(Integer result) {

279

return result > 0; // Only accept positive integers

280

}

281

},

282

new ValidatingSink.CountUpdater() {

283

@Override

284

public void updateCount(long count) {

285

// Track count of validated elements

286

System.out.println("Validated elements: " + count);

287

}

288

}

289

);

290

291

// Build fault-tolerant streaming topology

292

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

293

env.enableCheckpointing(1000); // Enable checkpointing for fault tolerance

294

295

env.addSource(failingSource)

296

.map(x -> Math.abs(x)) // Ensure positive values

297

.addSink(validatingSink);

298

299

// Job will fail after 1000 elements, then restart and validate recovery

300

env.execute("Fault Tolerance Test");

301

```

302

303

### Test Execution Utilities

304

305

Utilities for executing streaming tests with specialized exception handling.

306

307

```java { .api }

308

/**

309

* Utility class for streaming test execution

310

*/

311

public class TestUtils {

312

313

/**

314

* Execute streaming job with SuccessException handling

315

* @param see StreamExecutionEnvironment configured for execution

316

* @param name Job name for identification

317

* @throws Exception if execution fails for reasons other than SuccessException

318

*/

319

public static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception;

320

}

321

322

/**

323

* Exception thrown to indicate successful test completion

324

*/

325

public class SuccessException extends RuntimeException {

326

327

/**

328

* Default constructor for success indication

329

*/

330

public SuccessException();

331

332

/**

333

* Constructor with success message

334

* @param message Success message

335

*/

336

public SuccessException(String message);

337

}

338

```

339

340

### Streaming Test Patterns

341

342

Common patterns for implementing streaming tests:

343

344

**Basic Result Collection Pattern:**

345

346

```java

347

@Test

348

public void testStreamingTransformation() throws Exception {

349

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

350

env.setParallelism(1);

351

352

// Create result collector

353

TestListResultSink<Integer> resultSink = new TestListResultSink<>();

354

355

// Build topology

356

env.fromElements(1, 2, 3, 4, 5)

357

.map(x -> x * 2)

358

.filter(x -> x > 5)

359

.addSink(resultSink);

360

361

// Execute and verify

362

env.execute("Transformation Test");

363

364

List<Integer> results = resultSink.getSortedResult();

365

assertEquals(Arrays.asList(6, 8, 10), results);

366

}

367

```

368

369

**Multi-Stream Result Collection:**

370

371

```java

372

@Test

373

public void testMultiStreamProcessing() throws Exception {

374

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

375

376

TestListWrapper wrapper = TestListWrapper.getInstance();

377

int evenResultsId = wrapper.createList();

378

int oddResultsId = wrapper.createList();

379

380

DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6);

381

382

// Split stream using output selector

383

SplitStream<Integer> splitStream = numbers.split(new EvenOddOutputSelector());

384

385

splitStream.select("even").addSink(new CustomListSink(evenResultsId));

386

splitStream.select("odd").addSink(new CustomListSink(oddResultsId));

387

388

env.execute("Multi-Stream Test");

389

390

// Verify results

391

List<Object> evenResults = wrapper.getList(evenResultsId);

392

List<Object> oddResults = wrapper.getList(oddResultsId);

393

394

assertEquals(3, evenResults.size()); // 2, 4, 6

395

assertEquals(3, oddResults.size()); // 1, 3, 5

396

}

397

```

398

399

**Success Exception Pattern:**

400

401

```java

402

@Test

403

public void testSuccessfulCompletion() throws Exception {

404

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

405

406

env.fromElements("test")

407

.map(value -> {

408

// Simulate successful completion

409

throw new SuccessException("Test completed successfully");

410

})

411

.addSink(new DiscardingSink<>());

412

413

// Execute with success exception handling

414

TestUtils.tryExecute(env, "Success Test");

415

416

// Test passes if SuccessException was caught and handled

417

}

418

```

419

420

**Parallel Result Collection:**

421

422

```java

423

@Test

424

public void testParallelResultCollection() throws Exception {

425

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

426

env.setParallelism(4);

427

428

TestListResultSink<String> resultSink = new TestListResultSink<>();

429

430

env.fromCollection(generateLargeDataset())

431

.map(data -> processData(data))

432

.addSink(resultSink);

433

434

env.execute("Parallel Test");

435

436

// Results from parallel execution - order not guaranteed

437

List<String> results = resultSink.getResult();

438

assertEquals(expectedTotalCount, results.size());

439

440

// Use sorted results for deterministic verification

441

List<String> sortedResults = resultSink.getSortedResult();

442

assertEquals(expectedFirstElement, sortedResults.get(0));

443

assertEquals(expectedLastElement, sortedResults.get(sortedResults.size() - 1));

444

}

445

```

446

447

**Custom Sink with Count Verification:**

448

449

```java

450

@Test

451

public void testElementCountVerification() throws Exception {

452

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

453

454

int expectedCount = 1000;

455

ReceiveCheckNoOpSink<Integer> countingSink = new ReceiveCheckNoOpSink<>(expectedCount);

456

457

env.fromCollection(generateIntegerSequence(expectedCount))

458

.addSink(countingSink);

459

460

env.execute("Count Verification Test");

461

462

assertTrue("Expected count not reached", countingSink.isExpectedCountReached());

463

}

464

```

465

466

These streaming utilities provide the foundation for reliable, deterministic testing of streaming Flink applications, enabling comprehensive verification of streaming transformations, windowing operations, and stateful processing.