0
# Test Data and Utilities
1
2
Reusable datasets, POJOs, and runtime utilities for consistent testing across Flink modules. This framework provides standardized test data structures and execution utilities that ensure reproducible testing environments.
3
4
## Capabilities
5
6
### Collection Data Streams
7
8
Utility class providing standardized test datasets and data types for consistent testing across Flink modules.
9
10
```java { .api }
11
/**
12
* Standard test datasets and data types for Flink testing
13
*/
14
public class CollectionDataStreams {
15
16
/**
17
* Get standard 3-tuple dataset for testing
18
* @param env StreamExecutionEnvironment for dataset creation
19
* @return DataStreamSource of Tuple3<Integer, Long, String> with standard test data
20
*/
21
public static DataStreamSource<Tuple3<Integer, Long, String>> get3TupleDataSet(StreamExecutionEnvironment env);
22
23
/**
24
* Get small 3-tuple dataset for quick testing
25
* @param env StreamExecutionEnvironment for dataset creation
26
* @return DataStreamSource of Tuple3<Integer, Long, String> with reduced test data
27
*/
28
public static DataStreamSource<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(StreamExecutionEnvironment env);
29
30
/**
31
* Get batch 3-tuple dataset for batch testing
32
* @param env ExecutionEnvironment for batch dataset creation
33
* @return DataSet of Tuple3<Integer, Long, String> with standard test data
34
*/
35
public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env);
36
37
/**
38
* Get small batch 3-tuple dataset for quick batch testing
39
* @param env ExecutionEnvironment for batch dataset creation
40
* @return DataSet of Tuple3<Integer, Long, String> with reduced test data
41
*/
42
public static DataSet<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(ExecutionEnvironment env);
43
44
/**
45
* Get integer dataset for numeric testing
46
* @param env ExecutionEnvironment for dataset creation
47
* @return DataSet of Integer values
48
*/
49
public static DataSet<Integer> getIntegerDataSet(ExecutionEnvironment env);
50
51
/**
52
* Get string dataset for text processing testing
53
* @param env ExecutionEnvironment for dataset creation
54
* @return DataSet of String values
55
*/
56
public static DataSet<String> getStringDataSet(ExecutionEnvironment env);
57
58
/**
59
* Get 5-tuple dataset for complex tuple testing
60
* @param env ExecutionEnvironment for dataset creation
61
* @return DataSet of Tuple5<Integer, Long, Integer, String, Long>
62
*/
63
public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env);
64
65
/**
66
* Get CustomType dataset for POJO testing
67
* @param env ExecutionEnvironment for dataset creation
68
* @return DataSet of CustomType instances
69
*/
70
public static DataSet<CustomType> getCustomTypeDataSet(ExecutionEnvironment env);
71
72
/**
73
* Get small CustomType dataset for quick POJO testing
74
* @param env ExecutionEnvironment for dataset creation
75
* @return DataSet of CustomType instances (reduced size)
76
*/
77
public static DataSet<CustomType> getSmallCustomTypeDataSet(ExecutionEnvironment env);
78
79
/**
80
* Get POJO dataset for complex POJO testing
81
* @param env ExecutionEnvironment for dataset creation
82
* @return DataSet of POJO instances
83
*/
84
public static DataSet<POJO> getPojoDataSet(ExecutionEnvironment env);
85
86
/**
87
* Get small POJO dataset for quick complex POJO testing
88
* @param env ExecutionEnvironment for dataset creation
89
* @return DataSet of POJO instances (reduced size)
90
*/
91
public static DataSet<POJO> getSmallPojoDataSet(ExecutionEnvironment env);
92
93
/**
94
* Get POJO dataset with collections for collection testing
95
* @param env ExecutionEnvironment for dataset creation
96
* @return DataSet of PojoWithCollectionGeneric instances
97
*/
98
public static DataSet<PojoWithCollectionGeneric> getPojoWithCollectionDataSet(ExecutionEnvironment env);
99
}
100
```
101
102
### Test Data Types (POJOs)
103
104
Standard POJO classes for testing various serialization, deserialization, and data processing scenarios.
105
106
```java { .api }
107
/**
108
* Custom test data type for general testing scenarios
109
*/
110
public class CustomType {
111
/** String field for testing */
112
public String myString;
113
/** Integer field for testing */
114
public int myInt;
115
116
/**
117
* Default constructor for CustomType
118
*/
119
public CustomType();
120
121
/**
122
* Constructor with field initialization
123
* @param myString string value
124
* @param myInt integer value
125
*/
126
public CustomType(String myString, int myInt);
127
128
/**
129
* Get string field value
130
* @return String value
131
*/
132
public String getMyString();
133
134
/**
135
* Set string field value
136
* @param myString string value to set
137
*/
138
public void setMyString(String myString);
139
140
/**
141
* Get integer field value
142
* @return int value
143
*/
144
public int getMyInt();
145
146
/**
147
* Set integer field value
148
* @param myInt integer value to set
149
*/
150
public void setMyInt(int myInt);
151
}
152
153
/**
154
* Basic POJO for general testing scenarios
155
*/
156
public class POJO {
157
/** Number field */
158
public int number;
159
/** String field */
160
public String str;
161
162
/**
163
* Default constructor for POJO
164
*/
165
public POJO();
166
167
/**
168
* Constructor with field initialization
169
* @param number integer value
170
* @param str string value
171
*/
172
public POJO(int number, String str);
173
}
174
175
/**
176
* Nested POJO structure for testing complex object hierarchies
177
*/
178
public class NestedPojo {
179
/** Nested POJO field */
180
public POJO nested;
181
/** Long field */
182
public long longField;
183
184
/**
185
* Default constructor for NestedPojo
186
*/
187
public NestedPojo();
188
189
/**
190
* Constructor with field initialization
191
* @param nested nested POJO instance
192
* @param longField long value
193
*/
194
public NestedPojo(POJO nested, long longField);
195
}
196
197
/**
198
* Complex nested POJO hierarchy for advanced testing scenarios
199
*/
200
public class CrazyNested {
201
/** Nested POJO */
202
public NestedPojo nestedPojo;
203
/** POJO field */
204
public POJO simplePojo;
205
/** String field */
206
public String stringField;
207
208
/**
209
* Default constructor for CrazyNested
210
*/
211
public CrazyNested();
212
213
/**
214
* Constructor with field initialization
215
* @param nestedPojo nested POJO instance
216
* @param simplePojo simple POJO instance
217
* @param stringField string value
218
*/
219
public CrazyNested(NestedPojo nestedPojo, POJO simplePojo, String stringField);
220
}
221
222
/**
223
* POJO with date and enum fields for testing special data types
224
*/
225
public class PojoWithDateAndEnum {
226
/** Date field */
227
public Date dateField;
228
/** Enum field */
229
public TestEnum enumField;
230
/** String field */
231
public String stringField;
232
233
/**
234
* Default constructor
235
*/
236
public PojoWithDateAndEnum();
237
238
/**
239
* Constructor with field initialization
240
* @param dateField date value
241
* @param enumField enum value
242
* @param stringField string value
243
*/
244
public PojoWithDateAndEnum(Date dateField, TestEnum enumField, String stringField);
245
246
/**
247
* Test enum for POJO testing
248
*/
249
public enum TestEnum {
250
VALUE1, VALUE2, VALUE3
251
}
252
}
253
254
/**
255
* POJO with generic collections for testing collection serialization
256
*/
257
public class PojoWithCollectionGeneric {
258
/** List of strings */
259
public List<String> stringList;
260
/** Map of string to integer */
261
public Map<String, Integer> stringIntMap;
262
/** Set of long values */
263
public Set<Long> longSet;
264
265
/**
266
* Default constructor
267
*/
268
public PojoWithCollectionGeneric();
269
270
/**
271
* Constructor with collection initialization
272
* @param stringList list of strings
273
* @param stringIntMap map of string to integer
274
* @param longSet set of long values
275
*/
276
public PojoWithCollectionGeneric(
277
List<String> stringList,
278
Map<String, Integer> stringIntMap,
279
Set<Long> longSet);
280
}
281
```
282
283
### Data Sources
284
285
Specialized data sources for various testing scenarios including infinite streams and coordinated sources.
286
287
```java { .api }
288
/**
289
* Source that emits integers indefinitely for long-running tests
290
*/
291
public class InfiniteIntegerSource implements SourceFunction<Integer> {
292
293
/**
294
* Constructor for infinite integer source
295
* @param startValue starting integer value
296
* @param incrementBy increment between values
297
*/
298
public InfiniteIntegerSource(int startValue, int incrementBy);
299
300
@Override
301
public void run(SourceContext<Integer> ctx) throws Exception;
302
303
@Override
304
public void cancel();
305
}
306
307
/**
308
* Number sequence source with checkpoint coordination capabilities
309
*/
310
public class NumberSequenceSourceWithWaitForCheckpoint implements SourceFunction<Long> {
311
312
/**
313
* Constructor for number sequence source with checkpoint coordination
314
* @param from starting value
315
* @param to ending value
316
* @param checkpointCoordination enable checkpoint coordination
317
*/
318
public NumberSequenceSourceWithWaitForCheckpoint(
319
long from,
320
long to,
321
boolean checkpointCoordination);
322
323
@Override
324
public void run(SourceContext<Long> ctx) throws Exception;
325
326
@Override
327
public void cancel();
328
}
329
```
330
331
### Input Formats
332
333
Input formats for batch processing tests and data generation scenarios.
334
335
```java { .api }
336
/**
337
* Infinite integer input format for batch testing scenarios
338
*/
339
public class InfiniteIntegerInputFormat implements InputFormat<Integer, InputSplit> {
340
341
/**
342
* Constructor for infinite integer input format
343
* @param startValue starting integer value
344
* @param maxElements maximum elements to generate (or -1 for infinite)
345
*/
346
public InfiniteIntegerInputFormat(int startValue, int maxElements);
347
348
@Override
349
public void configure(Configuration parameters);
350
351
@Override
352
public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException;
353
354
@Override
355
public InputSplit[] createInputSplits(int minNumSplits) throws IOException;
356
357
@Override
358
public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits);
359
360
@Override
361
public void open(InputSplit split) throws IOException;
362
363
@Override
364
public boolean reachedEnd() throws IOException;
365
366
@Override
367
public Integer nextRecord(Integer reuse) throws IOException;
368
369
@Override
370
public void close() throws IOException;
371
}
372
373
/**
374
* Uniform integer tuple generator input format for load testing
375
*/
376
public class UniformIntTupleGeneratorInputFormat
377
implements InputFormat<Tuple2<Integer, Integer>, InputSplit> {
378
379
/**
380
* Constructor for uniform tuple generator
381
* @param numTuples number of tuples to generate
382
* @param minValue minimum integer value
383
* @param maxValue maximum integer value
384
*/
385
public UniformIntTupleGeneratorInputFormat(int numTuples, int minValue, int maxValue);
386
387
@Override
388
public Tuple2<Integer, Integer> nextRecord(Tuple2<Integer, Integer> reuse) throws IOException;
389
}
390
```
391
392
### Runtime Utilities
393
394
Utility classes for job execution, testing infrastructure, and common testing operations.
395
396
```java { .api }
397
/**
398
* Utility for running JobGraphs on MiniCluster for testing
399
*/
400
public class JobGraphRunningUtil {
401
402
/**
403
* Execute JobGraph on MiniCluster and wait for completion
404
* @param jobGraph JobGraph to execute
405
* @param miniCluster MiniCluster instance for execution
406
* @throws Exception if job execution fails
407
*/
408
public static void execute(JobGraph jobGraph, MiniCluster miniCluster) throws Exception;
409
410
/**
411
* Execute JobGraph with timeout
412
* @param jobGraph JobGraph to execute
413
* @param miniCluster MiniCluster instance
414
* @param timeoutMs timeout in milliseconds
415
* @return JobExecutionResult containing execution results
416
* @throws Exception if execution fails or times out
417
*/
418
public static JobExecutionResult executeWithTimeout(
419
JobGraph jobGraph,
420
MiniCluster miniCluster,
421
long timeoutMs) throws Exception;
422
423
/**
424
* Execute JobGraph and return execution result
425
* @param jobGraph JobGraph to execute
426
* @param miniCluster MiniCluster instance
427
* @return JobExecutionResult with job execution details
428
* @throws Exception if execution fails
429
*/
430
public static JobExecutionResult executeAndGetResult(
431
JobGraph jobGraph,
432
MiniCluster miniCluster) throws Exception;
433
}
434
435
/**
436
* Identity mapper for testing data flow without transformation
437
*/
438
public class NoOpIntMap implements MapFunction<Integer, Integer> {
439
440
@Override
441
public Integer map(Integer value) throws Exception;
442
}
443
444
/**
445
* No-operation sink for testing data flow completion
446
*/
447
public class ReceiveCheckNoOpSink<T> implements SinkFunction<T> {
448
449
/**
450
* Constructor for no-op sink with receive tracking
451
* @param expectedCount expected number of elements to receive
452
*/
453
public ReceiveCheckNoOpSink(int expectedCount);
454
455
@Override
456
public void invoke(T value, Context context) throws Exception;
457
458
/**
459
* Check if expected number of elements were received
460
* @return boolean indicating if expected count was reached
461
*/
462
public boolean receivedExpectedCount();
463
}
464
```
465
466
**Usage Examples:**
467
468
```java
469
import org.apache.flink.test.operators.util.CollectionDataStreams;
470
import org.apache.flink.test.util.*;
471
472
// Using standard test datasets
473
public class DataProcessingTest {
474
475
@Test
476
public void testWithStandardTupleData() throws Exception {
477
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
478
479
// Get standard 3-tuple dataset
480
DataStreamSource<Tuple3<Integer, Long, String>> testData =
481
CollectionDataStreams.get3TupleDataSet(env);
482
483
// Process data
484
DataStream<Integer> result = testData
485
.map(new MapFunction<Tuple3<Integer, Long, String>, Integer>() {
486
@Override
487
public Integer map(Tuple3<Integer, Long, String> value) {
488
return value.f0 * 2;
489
}
490
})
491
.filter(x -> x > 10);
492
493
// Execute and validate results
494
env.execute();
495
}
496
497
@Test
498
public void testWithCustomTypes() throws Exception {
499
// Create custom type instances for testing
500
CustomType custom1 = new CustomType("test", 42);
501
CustomType custom2 = new CustomType("example", 123);
502
503
List<CustomType> customData = Arrays.asList(custom1, custom2);
504
505
// Validate custom type properties
506
assertEquals("test", customData.get(0).getMyString());
507
assertEquals(42, customData.get(0).getMyInt());
508
}
509
}
510
511
// Using runtime utilities
512
public class JobExecutionTest {
513
514
@Test
515
public void testJobExecution() throws Exception {
516
// Create test job graph
517
JobGraph jobGraph = new JobGraph();
518
519
// Add vertices with standard test operators
520
JobVertex source = new JobVertex("source");
521
source.setInvokableClass(InfiniteIntegerSource.class);
522
source.getConfiguration().setInteger("start-value", 1);
523
source.getConfiguration().setInteger("max-elements", 1000);
524
source.setParallelism(1);
525
526
JobVertex mapper = new JobVertex("mapper");
527
mapper.setInvokableClass(NoOpIntMap.class);
528
mapper.setParallelism(2);
529
530
JobVertex sink = new JobVertex("sink");
531
sink.setInvokableClass(ReceiveCheckNoOpSink.class);
532
sink.getConfiguration().setInteger("expected-count", 1000);
533
sink.setParallelism(1);
534
535
// Connect vertices
536
mapper.connectNewDataSetAsInput(source, DistributionPattern.REBALANCE);
537
sink.connectNewDataSetAsInput(mapper, DistributionPattern.FORWARD);
538
539
jobGraph.addVertex(source);
540
jobGraph.addVertex(mapper);
541
jobGraph.addVertex(sink);
542
543
// Execute using utility
544
MiniCluster miniCluster = new MiniCluster(createTestConfiguration());
545
miniCluster.start();
546
547
JobExecutionResult result = JobGraphRunningUtil.executeAndGetResult(
548
jobGraph, miniCluster);
549
550
// Validate execution
551
assertTrue(result.isSuccess());
552
miniCluster.close();
553
}
554
555
@Test
556
public void testLongRunningJobWithTimeout() throws Exception {
557
JobGraph longRunningJob = createLongRunningJob();
558
MiniCluster miniCluster = new MiniCluster(createTestConfiguration());
559
miniCluster.start();
560
561
// Execute with timeout
562
JobExecutionResult result = JobGraphRunningUtil.executeWithTimeout(
563
longRunningJob, miniCluster, 30000L);
564
565
assertNotNull(result);
566
miniCluster.close();
567
}
568
}
569
570
// Creating custom test data
571
public class CustomTestDataCreation {
572
573
@Test
574
public void createCustomPojoData() {
575
// Create custom POJO instances
576
POJO pojo1 = new POJO(1, "first");
577
POJO pojo2 = new POJO(2, "second");
578
579
NestedPojo nested1 = new NestedPojo(pojo1, 100L);
580
NestedPojo nested2 = new NestedPojo(pojo2, 200L);
581
582
CrazyNested complex1 = new CrazyNested(nested1, pojo1, "complex1");
583
CrazyNested complex2 = new CrazyNested(nested2, pojo2, "complex2");
584
585
// Use in test scenarios
586
List<CrazyNested> testData = Arrays.asList(complex1, complex2);
587
588
// Validate POJO structure
589
assertNotNull(testData.get(0).nestedPojo.nested);
590
assertEquals("first", testData.get(0).nestedPojo.nested.str);
591
}
592
}
593
```