or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

class-loading-programs.mdindex.mdmigration-testing.mdperformance-testing.mdstate-management-testing.mdstreaming-utilities.mdtest-base-classes.mdtest-data-generation.md

test-data-generation.mddocs/

0

# Test Data Generation

1

2

Comprehensive data generators providing standardized test datasets for DataSet and DataStream operations. The test data generation utilities create consistent, predictable datasets for various data types and structures, enabling reliable and repeatable testing across different Flink operations.

3

4

## Capabilities

5

6

### Collection Data Sets

7

8

Core utility class providing standardized test datasets for DataSet API testing.

9

10

```java { .api }

11

/**

12

* Utility class providing standardized test datasets for DataSet API testing

13

*/

14

public class CollectionDataSets {

15

16

// Tuple datasets

17

public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env);

18

public static DataSet<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(ExecutionEnvironment env);

19

public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env);

20

public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> getSmall5TupleDataSet(ExecutionEnvironment env);

21

22

// Nested tuple datasets

23

public static DataSet<Tuple2<Tuple2<Integer, Integer>, String>> getSmallNestedTupleDataSet(ExecutionEnvironment env);

24

public static DataSet<Tuple3<Tuple2<Integer, Integer>, String, Integer>> getGroupSortedNestedTupleDataSet(ExecutionEnvironment env);

25

public static DataSet<Tuple3<Tuple2<Integer, Integer>, String, Integer>> getGroupSortedNestedTupleDataSet2(ExecutionEnvironment env);

26

27

// Special datasets

28

public static DataSet<Tuple2<String, byte[]>> getTuple2WithByteArrayDataSet(ExecutionEnvironment env);

29

public static DataSet<String> getStringDataSet(ExecutionEnvironment env);

30

public static DataSet<Integer> getIntegerDataSet(ExecutionEnvironment env);

31

32

// POJO datasets

33

public static DataSet<CustomType> getCustomTypeDataSet(ExecutionEnvironment env);

34

public static DataSet<CustomType> getSmallCustomTypeDataSet(ExecutionEnvironment env);

35

public static DataSet<POJO> getSmallPojoDataSet(ExecutionEnvironment env);

36

public static DataSet<POJO> getDuplicatePojoDataSet(ExecutionEnvironment env);

37

public static DataSet<POJO> getMixedPojoDataSet(ExecutionEnvironment env);

38

39

// Complex nested datasets

40

public static DataSet<CrazyNested> getCrazyNestedDataSet(ExecutionEnvironment env);

41

public static DataSet<FromTupleWithCTor> getPojoExtendingFromTuple(ExecutionEnvironment env);

42

public static DataSet<PojoContainingTupleAndWritable> getPojoContainingTupleAndWritable(ExecutionEnvironment env);

43

public static DataSet<PojoContainingTupleAndWritable> getGroupSortedPojoContainingTupleAndWritable(ExecutionEnvironment env);

44

45

// Multi-POJO datasets

46

public static DataSet<Tuple3<Integer, POJO, POJO>> getTupleContainingPojos(ExecutionEnvironment env);

47

public static DataSet<PojoWithMultiplePojos> getPojoWithMultiplePojos(ExecutionEnvironment env);

48

public static DataSet<PojoWithDateAndEnum> getPojoWithDateAndEnum(ExecutionEnvironment env);

49

public static DataSet<PojoWithCollection> getPojoWithCollection(ExecutionEnvironment env);

50

51

// Tuple-based compatibility datasets

52

public static DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> getSmallTuplebasedDataSet(ExecutionEnvironment env);

53

public static DataSet<CustomType> getSmallTuplebasedDataSetMatchingPojo(ExecutionEnvironment env);

54

}

55

```

56

57

**Usage Examples:**

58

59

```java

60

import org.apache.flink.api.java.ExecutionEnvironment;

61

import org.apache.flink.api.java.DataSet;

62

import org.apache.flink.test.operators.util.CollectionDataSets;

63

64

// Create execution environment

65

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

66

67

// Get standard 3-tuple dataset (21 records)

68

DataSet<Tuple3<Integer, Long, String>> data = CollectionDataSets.get3TupleDataSet(env);

69

70

// Get small dataset for quick tests (3 records)

71

DataSet<Tuple3<Integer, Long, String>> smallData = CollectionDataSets.getSmall3TupleDataSet(env);

72

73

// Get custom POJO dataset

74

DataSet<CollectionDataSets.CustomType> customData = CollectionDataSets.getCustomTypeDataSet(env);

75

76

// Use in transformations

77

data.map(tuple -> new Tuple2<>(tuple.f0, tuple.f2))

78

.collect();

79

```

80

81

### Value Collection Data Sets

82

83

Test data provider for Flink Value types (IntValue, LongValue, StringValue).

84

85

```java { .api }

86

/**

87

* Test data provider for Flink Value types

88

*/

89

public class ValueCollectionDataSets {

90

91

// Value-based tuple datasets

92

public static DataSet<Tuple3<IntValue, LongValue, StringValue>> get3TupleDataSet(ExecutionEnvironment env);

93

public static DataSet<Tuple3<IntValue, LongValue, StringValue>> getSmall3TupleDataSet(ExecutionEnvironment env);

94

public static DataSet<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> get5TupleDataSet(ExecutionEnvironment env);

95

96

// Value-based primitive datasets

97

public static DataSet<StringValue> getStringDataSet(ExecutionEnvironment env);

98

public static DataSet<IntValue> getIntegerDataSet(ExecutionEnvironment env);

99

100

// Value-based POJO datasets

101

public static DataSet<CustomType> getCustomTypeDataSet(ExecutionEnvironment env);

102

public static DataSet<CustomType> getSmallCustomTypeDataSet(ExecutionEnvironment env);

103

}

104

```

105

106

### Test Data Types

107

108

Standard POJO and data types used across test datasets.

109

110

```java { .api }

111

/**

112

* Standard test POJO with primitive fields

113

*/

114

public static class CustomType {

115

public int myInt;

116

public long myLong;

117

public String myString;

118

119

public CustomType();

120

public CustomType(int i, long l, String s);

121

122

// Standard equals, hashCode, toString methods

123

public boolean equals(Object obj);

124

public int hashCode();

125

public String toString();

126

}

127

128

/**

129

* Complex test POJO with nested structures

130

*/

131

public static class POJO {

132

public int number;

133

public String str;

134

public NestedPojo nestedPojo;

135

136

public POJO();

137

public POJO(int i, String s, NestedPojo np);

138

}

139

140

/**

141

* Nested POJO for complex data structures

142

*/

143

public static class NestedPojo {

144

public long longNumber;

145

146

public NestedPojo();

147

public NestedPojo(long l);

148

}

149

150

/**

151

* Multi-level nested POJO for deep structure testing

152

*/

153

public static class CrazyNested {

154

public int number;

155

public String str;

156

public CrazyNestedL1 nest_Lvl1;

157

public long timestamp;

158

159

public CrazyNested();

160

public CrazyNested(int number, String str, CrazyNestedL1 nest_Lvl1, long timestamp);

161

}

162

163

/**

164

* POJO extending Tuple with constructor

165

*/

166

public static class FromTupleWithCTor extends Tuple3<Long, Long, String> {

167

public FromTupleWithCTor();

168

public FromTupleWithCTor(Long l1, Long l2, String s);

169

}

170

171

/**

172

* POJO with Date and enum fields

173

*/

174

public static class PojoWithDateAndEnum {

175

public String str;

176

public Date date;

177

public Category cat;

178

179

public PojoWithDateAndEnum();

180

public PojoWithDateAndEnum(String str, Date date, Category cat);

181

}

182

183

/**

184

* POJO with collection fields

185

*/

186

public static class PojoWithCollection {

187

public List<Pojo1> pojosList;

188

public int[] intArray;

189

public List<String> stringList;

190

191

public PojoWithCollection();

192

public PojoWithCollection(List<Pojo1> pojosList, int[] intArray, List<String> stringList);

193

}

194

195

/**

196

* Test enumeration

197

*/

198

public enum Category {

199

CAT_A, CAT_B

200

}

201

```

202

203

### Input Format Utilities

204

205

Specialized input formats for generating continuous and configurable test data streams for streaming and batch applications.

206

207

```java { .api }

208

/**

209

* Input format that generates uniform integer tuple pairs for testing

210

*/

211

public class UniformIntTupleGeneratorInputFormat extends GenericInputFormat<Tuple2<Integer, Integer>> {

212

213

/**

214

* Constructor for uniform tuple generator

215

* @param numKeys Number of distinct keys to generate

216

* @param numValsPerKey Number of values to generate per key

217

*/

218

public UniformIntTupleGeneratorInputFormat(int numKeys, int numValsPerKey);

219

220

/**

221

* Generate next tuple element

222

* @param reuse Tuple to reuse for output

223

* @return Next tuple or null if exhausted

224

* @throws IOException if generation fails

225

*/

226

public Tuple2<Integer, Integer> nextRecord(Tuple2<Integer, Integer> reuse) throws IOException;

227

228

/**

229

* Check if more records are available

230

* @return true if more records can be generated

231

* @throws IOException if check fails

232

*/

233

public boolean reachedEnd() throws IOException;

234

}

235

236

/**

237

* Input format that generates infinite integer tuple sequence with optional delay

238

*/

239

public class InfiniteIntegerTupleInputFormat extends GenericInputFormat<Tuple2<Integer, Integer>> {

240

241

/**

242

* Constructor for infinite tuple generator

243

* @param addDelay Whether to add delay between elements

244

*/

245

public InfiniteIntegerTupleInputFormat(boolean addDelay);

246

247

/**

248

* Generate next tuple element (never returns null)

249

* @param reuse Tuple to reuse for output

250

* @return Next tuple in sequence

251

* @throws IOException if generation fails

252

*/

253

public Tuple2<Integer, Integer> nextRecord(Tuple2<Integer, Integer> reuse) throws IOException;

254

255

/**

256

* Always returns false - infinite generation

257

* @return false (never reaches end)

258

*/

259

public boolean reachedEnd() throws IOException;

260

}

261

262

/**

263

* Input format that generates infinite integer sequence with optional delay

264

*/

265

public class InfiniteIntegerInputFormat extends GenericInputFormat<Integer> {

266

267

/**

268

* Constructor for infinite integer generator

269

* @param addDelay Whether to add delay between elements

270

*/

271

public InfiniteIntegerInputFormat(boolean addDelay);

272

273

/**

274

* Generate next integer element (never returns null)

275

* @param reuse Integer to reuse for output

276

* @return Next integer in sequence

277

* @throws IOException if generation fails

278

*/

279

public Integer nextRecord(Integer reuse) throws IOException;

280

281

/**

282

* Always returns false - infinite generation

283

* @return false (never reaches end)

284

*/

285

public boolean reachedEnd() throws IOException;

286

}

287

```

288

289

**Input Format Usage Examples:**

290

291

```java

292

import org.apache.flink.test.util.UniformIntTupleGeneratorInputFormat;

293

import org.apache.flink.test.util.InfiniteIntegerTupleInputFormat;

294

import org.apache.flink.test.util.InfiniteIntegerInputFormat;

295

296

// Generate uniform test data for batch processing

297

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

298

299

// Create uniform tuple data: 10 keys, 100 values per key = 1000 total tuples

300

DataSet<Tuple2<Integer, Integer>> uniformData = env.createInput(

301

new UniformIntTupleGeneratorInputFormat(10, 100)

302

);

303

304

// Process uniform data

305

List<Tuple2<Integer, Integer>> results = uniformData

306

.filter(tuple -> tuple.f0 % 2 == 0) // Even keys only

307

.collect();

308

309

assertEquals(500, results.size()); // 5 even keys * 100 values each

310

311

// Generate infinite streaming data for stress testing

312

StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

313

314

// Create infinite integer stream with delay for controlled rate

315

DataStream<Integer> infiniteStream = streamEnv.createInput(

316

new InfiniteIntegerInputFormat(true) // With delay

317

);

318

319

// Use in streaming topology (must have termination condition)

320

infiniteStream

321

.map(i -> i * 2)

322

.filter(i -> i < 10000) // Limit for testing

323

.addSink(new TestSink());

324

325

// Create infinite tuple stream without delay for performance testing

326

DataStream<Tuple2<Integer, Integer>> infiniteTuples = streamEnv.createInput(

327

new InfiniteIntegerTupleInputFormat(false) // No delay

328

);

329

330

// Use for performance benchmarking

331

infiniteTuples

332

.keyBy(0)

333

.timeWindow(Time.seconds(1))

334

.count()

335

.addSink(new CountingSink());

336

```

337

338

### Test Functions

339

340

Common transformation functions used across multiple test scenarios, including tokenization and data processing utilities.

341

342

```java { .api }

343

/**

344

* FlatMap function for tokenizing strings into word-count tuples

345

*/

346

public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

347

348

/**

349

* Tokenize input string and emit word-count pairs

350

* @param value Input string to tokenize

351

* @param out Collector for emitting word-count tuples

352

* @throws Exception if tokenization fails

353

*/

354

public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception;

355

}

356

```

357

358

**Test Function Usage Example:**

359

360

```java

361

import org.apache.flink.test.testfunctions.Tokenizer;

362

363

// Use tokenizer in word count example

364

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

365

366

DataSet<String> text = env.fromElements(

367

"Hello World",

368

"Hello Flink World",

369

"World of Streaming"

370

);

371

372

DataSet<Tuple2<String, Integer>> words = text

373

.flatMap(new Tokenizer()) // Tokenize sentences into words

374

.groupBy(0) // Group by word

375

.sum(1); // Sum counts

376

377

List<Tuple2<String, Integer>> result = words.collect();

378

// Result contains: ("Hello", 2), ("World", 3), ("Flink", 1), ("of", 1), ("Streaming", 1)

379

```

380

381

### Data Characteristics

382

383

The test datasets provide predictable data patterns:

384

385

**3-Tuple Dataset (21 records)**:

386

- Integer field: Sequential values 1-21

387

- Long field: Values 1-6 (grouped for testing)

388

- String field: Mix of simple strings and "Comment#N" patterns

389

390

**Small Datasets (3 records)**:

391

- Subset of larger datasets for quick unit tests

392

- Consistent with larger dataset patterns

393

394

**Custom Type Dataset**:

395

- Maps to tuple data with same value patterns

396

- Tests POJO serialization and field access

397

398

**Complex Nested Datasets**:

399

- Multi-level nesting for serialization testing

400

- Combination of primitive and object types

401

- Date and enum types for type system testing

402

403

**Usage Patterns:**

404

405

```java

406

// Standard pattern for DataSet tests

407

DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);

408

DataSet<Result> output = input.map(new MyMapper()).filter(new MyFilter());

409

List<Result> results = output.collect();

410

411

// Verification pattern

412

assertEquals(expectedCount, results.size());

413

assertTrue(results.contains(expectedValue));

414

415

// Small dataset for unit tests

416

DataSet<Tuple3<Integer, Long, String>> quickTest = CollectionDataSets.getSmall3TupleDataSet(env);

417

List<Tuple3<Integer, Long, String>> quickResults = quickTest.collect();

418

assertEquals(3, quickResults.size());

419

```

420

421

These data generators ensure consistent, reproducible test data across all Flink testing scenarios, supporting both simple unit tests and complex integration tests with predictable data patterns and edge cases.