0
# Test Data Generation
1
2
Comprehensive data generators providing standardized test datasets for DataSet and DataStream operations. The test data generation utilities create consistent, predictable datasets for various data types and structures, enabling reliable and repeatable testing across different Flink operations.
3
4
## Capabilities
5
6
### Collection Data Sets
7
8
Core utility class providing standardized test datasets for DataSet API testing.
9
10
```java { .api }
11
/**
12
* Utility class providing standardized test datasets for DataSet API testing
13
*/
14
public class CollectionDataSets {
15
16
// Tuple datasets
17
public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env);
18
public static DataSet<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(ExecutionEnvironment env);
19
public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env);
20
public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> getSmall5TupleDataSet(ExecutionEnvironment env);
21
22
// Nested tuple datasets
23
public static DataSet<Tuple2<Tuple2<Integer, Integer>, String>> getSmallNestedTupleDataSet(ExecutionEnvironment env);
24
public static DataSet<Tuple3<Tuple2<Integer, Integer>, String, Integer>> getGroupSortedNestedTupleDataSet(ExecutionEnvironment env);
25
public static DataSet<Tuple3<Tuple2<Integer, Integer>, String, Integer>> getGroupSortedNestedTupleDataSet2(ExecutionEnvironment env);
26
27
// Special datasets
28
public static DataSet<Tuple2<String, byte[]>> getTuple2WithByteArrayDataSet(ExecutionEnvironment env);
29
public static DataSet<String> getStringDataSet(ExecutionEnvironment env);
30
public static DataSet<Integer> getIntegerDataSet(ExecutionEnvironment env);
31
32
// POJO datasets
33
public static DataSet<CustomType> getCustomTypeDataSet(ExecutionEnvironment env);
34
public static DataSet<CustomType> getSmallCustomTypeDataSet(ExecutionEnvironment env);
35
public static DataSet<POJO> getSmallPojoDataSet(ExecutionEnvironment env);
36
public static DataSet<POJO> getDuplicatePojoDataSet(ExecutionEnvironment env);
37
public static DataSet<POJO> getMixedPojoDataSet(ExecutionEnvironment env);
38
39
// Complex nested datasets
40
public static DataSet<CrazyNested> getCrazyNestedDataSet(ExecutionEnvironment env);
41
public static DataSet<FromTupleWithCTor> getPojoExtendingFromTuple(ExecutionEnvironment env);
42
public static DataSet<PojoContainingTupleAndWritable> getPojoContainingTupleAndWritable(ExecutionEnvironment env);
43
public static DataSet<PojoContainingTupleAndWritable> getGroupSortedPojoContainingTupleAndWritable(ExecutionEnvironment env);
44
45
// Multi-POJO datasets
46
public static DataSet<Tuple3<Integer, POJO, POJO>> getTupleContainingPojos(ExecutionEnvironment env);
47
public static DataSet<PojoWithMultiplePojos> getPojoWithMultiplePojos(ExecutionEnvironment env);
48
public static DataSet<PojoWithDateAndEnum> getPojoWithDateAndEnum(ExecutionEnvironment env);
49
public static DataSet<PojoWithCollection> getPojoWithCollection(ExecutionEnvironment env);
50
51
// Tuple-based compatibility datasets
52
public static DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> getSmallTuplebasedDataSet(ExecutionEnvironment env);
53
public static DataSet<CustomType> getSmallTuplebasedDataSetMatchingPojo(ExecutionEnvironment env);
54
}
55
```
56
57
**Usage Examples:**
58
59
```java
60
import org.apache.flink.api.java.ExecutionEnvironment;
61
import org.apache.flink.api.java.DataSet;
62
import org.apache.flink.test.operators.util.CollectionDataSets;
63
64
// Create execution environment
65
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
66
67
// Get standard 3-tuple dataset (21 records)
68
DataSet<Tuple3<Integer, Long, String>> data = CollectionDataSets.get3TupleDataSet(env);
69
70
// Get small dataset for quick tests (3 records)
71
DataSet<Tuple3<Integer, Long, String>> smallData = CollectionDataSets.getSmall3TupleDataSet(env);
72
73
// Get custom POJO dataset
74
DataSet<CollectionDataSets.CustomType> customData = CollectionDataSets.getCustomTypeDataSet(env);
75
76
// Use in transformations
77
data.map(tuple -> new Tuple2<>(tuple.f0, tuple.f2))
78
.collect();
79
```
80
81
### Value Collection Data Sets
82
83
Test data provider for Flink Value types (IntValue, LongValue, StringValue).
84
85
```java { .api }
86
/**
87
* Test data provider for Flink Value types
88
*/
89
public class ValueCollectionDataSets {
90
91
// Value-based tuple datasets
92
public static DataSet<Tuple3<IntValue, LongValue, StringValue>> get3TupleDataSet(ExecutionEnvironment env);
93
public static DataSet<Tuple3<IntValue, LongValue, StringValue>> getSmall3TupleDataSet(ExecutionEnvironment env);
94
public static DataSet<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> get5TupleDataSet(ExecutionEnvironment env);
95
96
// Value-based primitive datasets
97
public static DataSet<StringValue> getStringDataSet(ExecutionEnvironment env);
98
public static DataSet<IntValue> getIntegerDataSet(ExecutionEnvironment env);
99
100
// Value-based POJO datasets
101
public static DataSet<CustomType> getCustomTypeDataSet(ExecutionEnvironment env);
102
public static DataSet<CustomType> getSmallCustomTypeDataSet(ExecutionEnvironment env);
103
}
104
```
105
106
### Test Data Types
107
108
Standard POJO and data types used across test datasets.
109
110
```java { .api }
111
/**
112
* Standard test POJO with primitive fields
113
*/
114
public static class CustomType {
115
public int myInt;
116
public long myLong;
117
public String myString;
118
119
public CustomType();
120
public CustomType(int i, long l, String s);
121
122
// Standard equals, hashCode, toString methods
123
public boolean equals(Object obj);
124
public int hashCode();
125
public String toString();
126
}
127
128
/**
129
* Complex test POJO with nested structures
130
*/
131
public static class POJO {
132
public int number;
133
public String str;
134
public NestedPojo nestedPojo;
135
136
public POJO();
137
public POJO(int i, String s, NestedPojo np);
138
}
139
140
/**
141
* Nested POJO for complex data structures
142
*/
143
public static class NestedPojo {
144
public long longNumber;
145
146
public NestedPojo();
147
public NestedPojo(long l);
148
}
149
150
/**
151
* Multi-level nested POJO for deep structure testing
152
*/
153
public static class CrazyNested {
154
public int number;
155
public String str;
156
public CrazyNestedL1 nest_Lvl1;
157
public long timestamp;
158
159
public CrazyNested();
160
public CrazyNested(int number, String str, CrazyNestedL1 nest_Lvl1, long timestamp);
161
}
162
163
/**
164
* POJO extending Tuple with constructor
165
*/
166
public static class FromTupleWithCTor extends Tuple3<Long, Long, String> {
167
public FromTupleWithCTor();
168
public FromTupleWithCTor(Long l1, Long l2, String s);
169
}
170
171
/**
172
* POJO with Date and enum fields
173
*/
174
public static class PojoWithDateAndEnum {
175
public String str;
176
public Date date;
177
public Category cat;
178
179
public PojoWithDateAndEnum();
180
public PojoWithDateAndEnum(String str, Date date, Category cat);
181
}
182
183
/**
184
* POJO with collection fields
185
*/
186
public static class PojoWithCollection {
187
public List<Pojo1> pojosList;
188
public int[] intArray;
189
public List<String> stringList;
190
191
public PojoWithCollection();
192
public PojoWithCollection(List<Pojo1> pojosList, int[] intArray, List<String> stringList);
193
}
194
195
/**
196
* Test enumeration
197
*/
198
public enum Category {
199
CAT_A, CAT_B
200
}
201
```
202
203
### Input Format Utilities
204
205
Specialized input formats for generating continuous and configurable test data streams for streaming and batch applications.
206
207
```java { .api }
208
/**
209
* Input format that generates uniform integer tuple pairs for testing
210
*/
211
public class UniformIntTupleGeneratorInputFormat extends GenericInputFormat<Tuple2<Integer, Integer>> {
212
213
/**
214
* Constructor for uniform tuple generator
215
* @param numKeys Number of distinct keys to generate
216
* @param numValsPerKey Number of values to generate per key
217
*/
218
public UniformIntTupleGeneratorInputFormat(int numKeys, int numValsPerKey);
219
220
/**
221
* Generate next tuple element
222
* @param reuse Tuple to reuse for output
223
* @return Next tuple or null if exhausted
224
* @throws IOException if generation fails
225
*/
226
public Tuple2<Integer, Integer> nextRecord(Tuple2<Integer, Integer> reuse) throws IOException;
227
228
/**
229
* Check if more records are available
230
* @return true if more records can be generated
231
* @throws IOException if check fails
232
*/
233
public boolean reachedEnd() throws IOException;
234
}
235
236
/**
237
* Input format that generates infinite integer tuple sequence with optional delay
238
*/
239
public class InfiniteIntegerTupleInputFormat extends GenericInputFormat<Tuple2<Integer, Integer>> {
240
241
/**
242
* Constructor for infinite tuple generator
243
* @param addDelay Whether to add delay between elements
244
*/
245
public InfiniteIntegerTupleInputFormat(boolean addDelay);
246
247
/**
248
* Generate next tuple element (never returns null)
249
* @param reuse Tuple to reuse for output
250
* @return Next tuple in sequence
251
* @throws IOException if generation fails
252
*/
253
public Tuple2<Integer, Integer> nextRecord(Tuple2<Integer, Integer> reuse) throws IOException;
254
255
/**
256
* Always returns false - infinite generation
257
* @return false (never reaches end)
258
*/
259
public boolean reachedEnd() throws IOException;
260
}
261
262
/**
263
* Input format that generates infinite integer sequence with optional delay
264
*/
265
public class InfiniteIntegerInputFormat extends GenericInputFormat<Integer> {
266
267
/**
268
* Constructor for infinite integer generator
269
* @param addDelay Whether to add delay between elements
270
*/
271
public InfiniteIntegerInputFormat(boolean addDelay);
272
273
/**
274
* Generate next integer element (never returns null)
275
* @param reuse Integer to reuse for output
276
* @return Next integer in sequence
277
* @throws IOException if generation fails
278
*/
279
public Integer nextRecord(Integer reuse) throws IOException;
280
281
/**
282
* Always returns false - infinite generation
283
* @return false (never reaches end)
284
*/
285
public boolean reachedEnd() throws IOException;
286
}
287
```
288
289
**Input Format Usage Examples:**
290
291
```java
292
import org.apache.flink.test.util.UniformIntTupleGeneratorInputFormat;
293
import org.apache.flink.test.util.InfiniteIntegerTupleInputFormat;
294
import org.apache.flink.test.util.InfiniteIntegerInputFormat;
295
296
// Generate uniform test data for batch processing
297
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
298
299
// Create uniform tuple data: 10 keys, 100 values per key = 1000 total tuples
300
DataSet<Tuple2<Integer, Integer>> uniformData = env.createInput(
301
new UniformIntTupleGeneratorInputFormat(10, 100)
302
);
303
304
// Process uniform data
305
List<Tuple2<Integer, Integer>> results = uniformData
306
.filter(tuple -> tuple.f0 % 2 == 0) // Even keys only
307
.collect();
308
309
assertEquals(500, results.size()); // 5 even keys * 100 values each
310
311
// Generate infinite streaming data for stress testing
312
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
313
314
// Create infinite integer stream with delay for controlled rate
315
DataStream<Integer> infiniteStream = streamEnv.createInput(
316
new InfiniteIntegerInputFormat(true) // With delay
317
);
318
319
// Use in streaming topology (must have termination condition)
320
infiniteStream
321
.map(i -> i * 2)
322
.filter(i -> i < 10000) // Limit for testing
323
.addSink(new TestSink());
324
325
// Create infinite tuple stream without delay for performance testing
326
DataStream<Tuple2<Integer, Integer>> infiniteTuples = streamEnv.createInput(
327
new InfiniteIntegerTupleInputFormat(false) // No delay
328
);
329
330
// Use for performance benchmarking
331
infiniteTuples
332
.keyBy(0)
333
.timeWindow(Time.seconds(1))
334
.count()
335
.addSink(new CountingSink());
336
```
337
338
### Test Functions
339
340
Common transformation functions used across multiple test scenarios, including tokenization and data processing utilities.
341
342
```java { .api }
343
/**
344
* FlatMap function for tokenizing strings into word-count tuples
345
*/
346
public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
347
348
/**
349
* Tokenize input string and emit word-count pairs
350
* @param value Input string to tokenize
351
* @param out Collector for emitting word-count tuples
352
* @throws Exception if tokenization fails
353
*/
354
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception;
355
}
356
```
357
358
**Test Function Usage Example:**
359
360
```java
361
import org.apache.flink.test.testfunctions.Tokenizer;
362
363
// Use tokenizer in word count example
364
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
365
366
DataSet<String> text = env.fromElements(
367
"Hello World",
368
"Hello Flink World",
369
"World of Streaming"
370
);
371
372
DataSet<Tuple2<String, Integer>> words = text
373
.flatMap(new Tokenizer()) // Tokenize sentences into words
374
.groupBy(0) // Group by word
375
.sum(1); // Sum counts
376
377
List<Tuple2<String, Integer>> result = words.collect();
378
// Result contains: ("Hello", 2), ("World", 3), ("Flink", 1), ("of", 1), ("Streaming", 1)
379
```
380
381
### Data Characteristics
382
383
The test datasets provide predictable data patterns:
384
385
**3-Tuple Dataset (21 records)**:
386
- Integer field: Sequential values 1-21
387
- Long field: Values 1-6 (grouped for testing)
388
- String field: Mix of simple strings and "Comment#N" patterns
389
390
**Small Datasets (3 records)**:
391
- Subset of larger datasets for quick unit tests
392
- Consistent with larger dataset patterns
393
394
**Custom Type Dataset**:
395
- Maps to tuple data with same value patterns
396
- Tests POJO serialization and field access
397
398
**Complex Nested Datasets**:
399
- Multi-level nesting for serialization testing
400
- Combination of primitive and object types
401
- Date and enum types for type system testing
402
403
**Usage Patterns:**
404
405
```java
406
// Standard pattern for DataSet tests
407
DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
408
DataSet<Result> output = input.map(new MyMapper()).filter(new MyFilter());
409
List<Result> results = output.collect();
410
411
// Verification pattern
412
assertEquals(expectedCount, results.size());
413
assertTrue(results.contains(expectedValue));
414
415
// Small dataset for unit tests
416
DataSet<Tuple3<Integer, Long, String>> quickTest = CollectionDataSets.getSmall3TupleDataSet(env);
417
List<Tuple3<Integer, Long, String>> quickResults = quickTest.collect();
418
assertEquals(3, quickResults.size());
419
```
420
421
These data generators ensure consistent, reproducible test data across all Flink testing scenarios, supporting both simple unit tests and complex integration tests with predictable data patterns and edge cases.