or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cancellation-testing.mdcheckpointing-migration.mdfault-tolerance-recovery.mdindex.mdoperator-lifecycle.mdplugin-testing.mdruntime-utilities.mdsession-window-testing.mdstate-backend-restore.mdtest-data-utilities.md

test-data-utilities.mddocs/

0

# Test Data and Utilities

1

2

Reusable datasets, POJOs, and runtime utilities for consistent testing across Flink modules. This framework provides standardized test data structures and execution utilities that ensure reproducible testing environments.

3

4

## Capabilities

5

6

### Collection Data Streams

7

8

Utility class providing standardized test datasets and data types for consistent testing across Flink modules.

9

10

```java { .api }

11

/**

12

* Standard test datasets and data types for Flink testing

13

*/

14

public class CollectionDataStreams {

15

16

/**

17

* Get standard 3-tuple dataset for testing

18

* @param env StreamExecutionEnvironment for dataset creation

19

* @return DataStreamSource of Tuple3<Integer, Long, String> with standard test data

20

*/

21

public static DataStreamSource<Tuple3<Integer, Long, String>> get3TupleDataSet(StreamExecutionEnvironment env);

22

23

/**

24

* Get small 3-tuple dataset for quick testing

25

* @param env StreamExecutionEnvironment for dataset creation

26

* @return DataStreamSource of Tuple3<Integer, Long, String> with reduced test data

27

*/

28

public static DataStreamSource<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(StreamExecutionEnvironment env);

29

30

/**

31

* Get batch 3-tuple dataset for batch testing

32

* @param env ExecutionEnvironment for batch dataset creation

33

* @return DataSet of Tuple3<Integer, Long, String> with standard test data

34

*/

35

public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env);

36

37

/**

38

* Get small batch 3-tuple dataset for quick batch testing

39

* @param env ExecutionEnvironment for batch dataset creation

40

* @return DataSet of Tuple3<Integer, Long, String> with reduced test data

41

*/

42

public static DataSet<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(ExecutionEnvironment env);

43

44

/**

45

* Get integer dataset for numeric testing

46

* @param env ExecutionEnvironment for dataset creation

47

* @return DataSet of Integer values

48

*/

49

public static DataSet<Integer> getIntegerDataSet(ExecutionEnvironment env);

50

51

/**

52

* Get string dataset for text processing testing

53

* @param env ExecutionEnvironment for dataset creation

54

* @return DataSet of String values

55

*/

56

public static DataSet<String> getStringDataSet(ExecutionEnvironment env);

57

58

/**

59

* Get 5-tuple dataset for complex tuple testing

60

* @param env ExecutionEnvironment for dataset creation

61

* @return DataSet of Tuple5<Integer, Long, Integer, String, Long>

62

*/

63

public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env);

64

65

/**

66

* Get CustomType dataset for POJO testing

67

* @param env ExecutionEnvironment for dataset creation

68

* @return DataSet of CustomType instances

69

*/

70

public static DataSet<CustomType> getCustomTypeDataSet(ExecutionEnvironment env);

71

72

/**

73

* Get small CustomType dataset for quick POJO testing

74

* @param env ExecutionEnvironment for dataset creation

75

* @return DataSet of CustomType instances (reduced size)

76

*/

77

public static DataSet<CustomType> getSmallCustomTypeDataSet(ExecutionEnvironment env);

78

79

/**

80

* Get POJO dataset for complex POJO testing

81

* @param env ExecutionEnvironment for dataset creation

82

* @return DataSet of POJO instances

83

*/

84

public static DataSet<POJO> getPojoDataSet(ExecutionEnvironment env);

85

86

/**

87

* Get small POJO dataset for quick complex POJO testing

88

* @param env ExecutionEnvironment for dataset creation

89

* @return DataSet of POJO instances (reduced size)

90

*/

91

public static DataSet<POJO> getSmallPojoDataSet(ExecutionEnvironment env);

92

93

/**

94

* Get POJO dataset with collections for collection testing

95

* @param env ExecutionEnvironment for dataset creation

96

* @return DataSet of PojoWithCollectionGeneric instances

97

*/

98

public static DataSet<PojoWithCollectionGeneric> getPojoWithCollectionDataSet(ExecutionEnvironment env);

99

}

100

```

101

102

### Test Data Types (POJOs)

103

104

Standard POJO classes for testing various serialization, deserialization, and data processing scenarios.

105

106

```java { .api }

107

/**

108

* Custom test data type for general testing scenarios

109

*/

110

public class CustomType {

111

/** String field for testing */

112

public String myString;

113

/** Integer field for testing */

114

public int myInt;

115

116

/**

117

* Default constructor for CustomType

118

*/

119

public CustomType();

120

121

/**

122

* Constructor with field initialization

123

* @param myString string value

124

* @param myInt integer value

125

*/

126

public CustomType(String myString, int myInt);

127

128

/**

129

* Get string field value

130

* @return String value

131

*/

132

public String getMyString();

133

134

/**

135

* Set string field value

136

* @param myString string value to set

137

*/

138

public void setMyString(String myString);

139

140

/**

141

* Get integer field value

142

* @return int value

143

*/

144

public int getMyInt();

145

146

/**

147

* Set integer field value

148

* @param myInt integer value to set

149

*/

150

public void setMyInt(int myInt);

151

}

152

153

/**

154

* Basic POJO for general testing scenarios

155

*/

156

public class POJO {

157

/** Number field */

158

public int number;

159

/** String field */

160

public String str;

161

162

/**

163

* Default constructor for POJO

164

*/

165

public POJO();

166

167

/**

168

* Constructor with field initialization

169

* @param number integer value

170

* @param str string value

171

*/

172

public POJO(int number, String str);

173

}

174

175

/**

176

* Nested POJO structure for testing complex object hierarchies

177

*/

178

public class NestedPojo {

179

/** Nested POJO field */

180

public POJO nested;

181

/** Long field */

182

public long longField;

183

184

/**

185

* Default constructor for NestedPojo

186

*/

187

public NestedPojo();

188

189

/**

190

* Constructor with field initialization

191

* @param nested nested POJO instance

192

* @param longField long value

193

*/

194

public NestedPojo(POJO nested, long longField);

195

}

196

197

/**

198

* Complex nested POJO hierarchy for advanced testing scenarios

199

*/

200

public class CrazyNested {

201

/** Nested POJO */

202

public NestedPojo nestedPojo;

203

/** POJO field */

204

public POJO simplePojo;

205

/** String field */

206

public String stringField;

207

208

/**

209

* Default constructor for CrazyNested

210

*/

211

public CrazyNested();

212

213

/**

214

* Constructor with field initialization

215

* @param nestedPojo nested POJO instance

216

* @param simplePojo simple POJO instance

217

* @param stringField string value

218

*/

219

public CrazyNested(NestedPojo nestedPojo, POJO simplePojo, String stringField);

220

}

221

222

/**

223

* POJO with date and enum fields for testing special data types

224

*/

225

public class PojoWithDateAndEnum {

226

/** Date field */

227

public Date dateField;

228

/** Enum field */

229

public TestEnum enumField;

230

/** String field */

231

public String stringField;

232

233

/**

234

* Default constructor

235

*/

236

public PojoWithDateAndEnum();

237

238

/**

239

* Constructor with field initialization

240

* @param dateField date value

241

* @param enumField enum value

242

* @param stringField string value

243

*/

244

public PojoWithDateAndEnum(Date dateField, TestEnum enumField, String stringField);

245

246

/**

247

* Test enum for POJO testing

248

*/

249

public enum TestEnum {

250

VALUE1, VALUE2, VALUE3

251

}

252

}

253

254

/**

255

* POJO with generic collections for testing collection serialization

256

*/

257

public class PojoWithCollectionGeneric {

258

/** List of strings */

259

public List<String> stringList;

260

/** Map of string to integer */

261

public Map<String, Integer> stringIntMap;

262

/** Set of long values */

263

public Set<Long> longSet;

264

265

/**

266

* Default constructor

267

*/

268

public PojoWithCollectionGeneric();

269

270

/**

271

* Constructor with collection initialization

272

* @param stringList list of strings

273

* @param stringIntMap map of string to integer

274

* @param longSet set of long values

275

*/

276

public PojoWithCollectionGeneric(

277

List<String> stringList,

278

Map<String, Integer> stringIntMap,

279

Set<Long> longSet);

280

}

281

```

282

283

### Data Sources

284

285

Specialized data sources for various testing scenarios including infinite streams and coordinated sources.

286

287

```java { .api }

288

/**

289

* Source that emits integers indefinitely for long-running tests

290

*/

291

public class InfiniteIntegerSource implements SourceFunction<Integer> {

292

293

/**

294

* Constructor for infinite integer source

295

* @param startValue starting integer value

296

* @param incrementBy increment between values

297

*/

298

public InfiniteIntegerSource(int startValue, int incrementBy);

299

300

@Override

301

public void run(SourceContext<Integer> ctx) throws Exception;

302

303

@Override

304

public void cancel();

305

}

306

307

/**

308

* Number sequence source with checkpoint coordination capabilities

309

*/

310

public class NumberSequenceSourceWithWaitForCheckpoint implements SourceFunction<Long> {

311

312

/**

313

* Constructor for number sequence source with checkpoint coordination

314

* @param from starting value

315

* @param to ending value

316

* @param checkpointCoordination enable checkpoint coordination

317

*/

318

public NumberSequenceSourceWithWaitForCheckpoint(

319

long from,

320

long to,

321

boolean checkpointCoordination);

322

323

@Override

324

public void run(SourceContext<Long> ctx) throws Exception;

325

326

@Override

327

public void cancel();

328

}

329

```

330

331

### Input Formats

332

333

Input formats for batch processing tests and data generation scenarios.

334

335

```java { .api }

336

/**

337

* Infinite integer input format for batch testing scenarios

338

*/

339

public class InfiniteIntegerInputFormat implements InputFormat<Integer, InputSplit> {

340

341

/**

342

* Constructor for infinite integer input format

343

* @param startValue starting integer value

344

* @param maxElements maximum elements to generate (or -1 for infinite)

345

*/

346

public InfiniteIntegerInputFormat(int startValue, int maxElements);

347

348

@Override

349

public void configure(Configuration parameters);

350

351

@Override

352

public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException;

353

354

@Override

355

public InputSplit[] createInputSplits(int minNumSplits) throws IOException;

356

357

@Override

358

public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits);

359

360

@Override

361

public void open(InputSplit split) throws IOException;

362

363

@Override

364

public boolean reachedEnd() throws IOException;

365

366

@Override

367

public Integer nextRecord(Integer reuse) throws IOException;

368

369

@Override

370

public void close() throws IOException;

371

}

372

373

/**

374

* Uniform integer tuple generator input format for load testing

375

*/

376

public class UniformIntTupleGeneratorInputFormat

377

implements InputFormat<Tuple2<Integer, Integer>, InputSplit> {

378

379

/**

380

* Constructor for uniform tuple generator

381

* @param numTuples number of tuples to generate

382

* @param minValue minimum integer value

383

* @param maxValue maximum integer value

384

*/

385

public UniformIntTupleGeneratorInputFormat(int numTuples, int minValue, int maxValue);

386

387

@Override

388

public Tuple2<Integer, Integer> nextRecord(Tuple2<Integer, Integer> reuse) throws IOException;

389

}

390

```

391

392

### Runtime Utilities

393

394

Utility classes for job execution, testing infrastructure, and common testing operations.

395

396

```java { .api }

397

/**

398

* Utility for running JobGraphs on MiniCluster for testing

399

*/

400

public class JobGraphRunningUtil {

401

402

/**

403

* Execute JobGraph on MiniCluster and wait for completion

404

* @param jobGraph JobGraph to execute

405

* @param miniCluster MiniCluster instance for execution

406

* @throws Exception if job execution fails

407

*/

408

public static void execute(JobGraph jobGraph, MiniCluster miniCluster) throws Exception;

409

410

/**

411

* Execute JobGraph with timeout

412

* @param jobGraph JobGraph to execute

413

* @param miniCluster MiniCluster instance

414

* @param timeoutMs timeout in milliseconds

415

* @return JobExecutionResult containing execution results

416

* @throws Exception if execution fails or times out

417

*/

418

public static JobExecutionResult executeWithTimeout(

419

JobGraph jobGraph,

420

MiniCluster miniCluster,

421

long timeoutMs) throws Exception;

422

423

/**

424

* Execute JobGraph and return execution result

425

* @param jobGraph JobGraph to execute

426

* @param miniCluster MiniCluster instance

427

* @return JobExecutionResult with job execution details

428

* @throws Exception if execution fails

429

*/

430

public static JobExecutionResult executeAndGetResult(

431

JobGraph jobGraph,

432

MiniCluster miniCluster) throws Exception;

433

}

434

435

/**

436

* Identity mapper for testing data flow without transformation

437

*/

438

public class NoOpIntMap implements MapFunction<Integer, Integer> {

439

440

@Override

441

public Integer map(Integer value) throws Exception;

442

}

443

444

/**

445

* No-operation sink for testing data flow completion

446

*/

447

public class ReceiveCheckNoOpSink<T> implements SinkFunction<T> {

448

449

/**

450

* Constructor for no-op sink with receive tracking

451

* @param expectedCount expected number of elements to receive

452

*/

453

public ReceiveCheckNoOpSink(int expectedCount);

454

455

@Override

456

public void invoke(T value, Context context) throws Exception;

457

458

/**

459

* Check if expected number of elements were received

460

* @return boolean indicating if expected count was reached

461

*/

462

public boolean receivedExpectedCount();

463

}

464

```

465

466

**Usage Examples:**

467

468

```java

469

import org.apache.flink.test.operators.util.CollectionDataStreams;

470

import org.apache.flink.test.util.*;

471

472

// Using standard test datasets

473

public class DataProcessingTest {

474

475

@Test

476

public void testWithStandardTupleData() throws Exception {

477

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

478

479

// Get standard 3-tuple dataset

480

DataStreamSource<Tuple3<Integer, Long, String>> testData =

481

CollectionDataStreams.get3TupleDataSet(env);

482

483

// Process data

484

DataStream<Integer> result = testData

485

.map(new MapFunction<Tuple3<Integer, Long, String>, Integer>() {

486

@Override

487

public Integer map(Tuple3<Integer, Long, String> value) {

488

return value.f0 * 2;

489

}

490

})

491

.filter(x -> x > 10);

492

493

// Execute and validate results

494

env.execute();

495

}

496

497

@Test

498

public void testWithCustomTypes() throws Exception {

499

// Create custom type instances for testing

500

CustomType custom1 = new CustomType("test", 42);

501

CustomType custom2 = new CustomType("example", 123);

502

503

List<CustomType> customData = Arrays.asList(custom1, custom2);

504

505

// Validate custom type properties

506

assertEquals("test", customData.get(0).getMyString());

507

assertEquals(42, customData.get(0).getMyInt());

508

}

509

}

510

511

// Using runtime utilities

512

public class JobExecutionTest {

513

514

@Test

515

public void testJobExecution() throws Exception {

516

// Create test job graph

517

JobGraph jobGraph = new JobGraph();

518

519

// Add vertices with standard test operators

520

JobVertex source = new JobVertex("source");

521

source.setInvokableClass(InfiniteIntegerSource.class);

522

source.getConfiguration().setInteger("start-value", 1);

523

source.getConfiguration().setInteger("max-elements", 1000);

524

source.setParallelism(1);

525

526

JobVertex mapper = new JobVertex("mapper");

527

mapper.setInvokableClass(NoOpIntMap.class);

528

mapper.setParallelism(2);

529

530

JobVertex sink = new JobVertex("sink");

531

sink.setInvokableClass(ReceiveCheckNoOpSink.class);

532

sink.getConfiguration().setInteger("expected-count", 1000);

533

sink.setParallelism(1);

534

535

// Connect vertices

536

mapper.connectNewDataSetAsInput(source, DistributionPattern.REBALANCE);

537

sink.connectNewDataSetAsInput(mapper, DistributionPattern.FORWARD);

538

539

jobGraph.addVertex(source);

540

jobGraph.addVertex(mapper);

541

jobGraph.addVertex(sink);

542

543

// Execute using utility

544

MiniCluster miniCluster = new MiniCluster(createTestConfiguration());

545

miniCluster.start();

546

547

JobExecutionResult result = JobGraphRunningUtil.executeAndGetResult(

548

jobGraph, miniCluster);

549

550

// Validate execution

551

assertTrue(result.isSuccess());

552

miniCluster.close();

553

}

554

555

@Test

556

public void testLongRunningJobWithTimeout() throws Exception {

557

JobGraph longRunningJob = createLongRunningJob();

558

MiniCluster miniCluster = new MiniCluster(createTestConfiguration());

559

miniCluster.start();

560

561

// Execute with timeout

562

JobExecutionResult result = JobGraphRunningUtil.executeWithTimeout(

563

longRunningJob, miniCluster, 30000L);

564

565

assertNotNull(result);

566

miniCluster.close();

567

}

568

}

569

570

// Creating custom test data

571

public class CustomTestDataCreation {

572

573

@Test

574

public void createCustomPojoData() {

575

// Create custom POJO instances

576

POJO pojo1 = new POJO(1, "first");

577

POJO pojo2 = new POJO(2, "second");

578

579

NestedPojo nested1 = new NestedPojo(pojo1, 100L);

580

NestedPojo nested2 = new NestedPojo(pojo2, 200L);

581

582

CrazyNested complex1 = new CrazyNested(nested1, pojo1, "complex1");

583

CrazyNested complex2 = new CrazyNested(nested2, pojo2, "complex2");

584

585

// Use in test scenarios

586

List<CrazyNested> testData = Arrays.asList(complex1, complex2);

587

588

// Validate POJO structure

589

assertNotNull(testData.get(0).nestedPojo.nested);

590

assertEquals("first", testData.get(0).nestedPojo.nested.str);

591

}

592

}

593

```