or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

class-loading-programs.mdindex.mdmigration-testing.mdperformance-testing.mdstate-management-testing.mdstreaming-utilities.mdtest-base-classes.mdtest-data-generation.md
tile.json

test-data-generation.mddocs/

Test Data Generation

Comprehensive data generators providing standardized test datasets for DataSet and DataStream operations. The test data generation utilities create consistent, predictable datasets for various data types and structures, enabling reliable and repeatable testing across different Flink operations.

Capabilities

Collection Data Sets

Core utility class providing standardized test datasets for DataSet API testing.

/**
 * Utility class providing standardized test datasets for DataSet API testing
 */
public class CollectionDataSets {
    
    // Tuple datasets
    public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env);
    public static DataSet<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(ExecutionEnvironment env);
    public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env);
    public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> getSmall5TupleDataSet(ExecutionEnvironment env);
    
    // Nested tuple datasets
    public static DataSet<Tuple2<Tuple2<Integer, Integer>, String>> getSmallNestedTupleDataSet(ExecutionEnvironment env);
    public static DataSet<Tuple3<Tuple2<Integer, Integer>, String, Integer>> getGroupSortedNestedTupleDataSet(ExecutionEnvironment env);
    public static DataSet<Tuple3<Tuple2<Integer, Integer>, String, Integer>> getGroupSortedNestedTupleDataSet2(ExecutionEnvironment env);
    
    // Special datasets
    public static DataSet<Tuple2<String, byte[]>> getTuple2WithByteArrayDataSet(ExecutionEnvironment env);
    public static DataSet<String> getStringDataSet(ExecutionEnvironment env);
    public static DataSet<Integer> getIntegerDataSet(ExecutionEnvironment env);
    
    // POJO datasets
    public static DataSet<CustomType> getCustomTypeDataSet(ExecutionEnvironment env);
    public static DataSet<CustomType> getSmallCustomTypeDataSet(ExecutionEnvironment env);
    public static DataSet<POJO> getSmallPojoDataSet(ExecutionEnvironment env);
    public static DataSet<POJO> getDuplicatePojoDataSet(ExecutionEnvironment env);
    public static DataSet<POJO> getMixedPojoDataSet(ExecutionEnvironment env);
    
    // Complex nested datasets
    public static DataSet<CrazyNested> getCrazyNestedDataSet(ExecutionEnvironment env);
    public static DataSet<FromTupleWithCTor> getPojoExtendingFromTuple(ExecutionEnvironment env);
    public static DataSet<PojoContainingTupleAndWritable> getPojoContainingTupleAndWritable(ExecutionEnvironment env);
    public static DataSet<PojoContainingTupleAndWritable> getGroupSortedPojoContainingTupleAndWritable(ExecutionEnvironment env);
    
    // Multi-POJO datasets
    public static DataSet<Tuple3<Integer, POJO, POJO>> getTupleContainingPojos(ExecutionEnvironment env);
    public static DataSet<PojoWithMultiplePojos> getPojoWithMultiplePojos(ExecutionEnvironment env);
    public static DataSet<PojoWithDateAndEnum> getPojoWithDateAndEnum(ExecutionEnvironment env);
    public static DataSet<PojoWithCollection> getPojoWithCollection(ExecutionEnvironment env);
    
    // Tuple-based compatibility datasets
    public static DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> getSmallTuplebasedDataSet(ExecutionEnvironment env);
    public static DataSet<CustomType> getSmallTuplebasedDataSetMatchingPojo(ExecutionEnvironment env);
}

Usage Examples:

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.test.operators.util.CollectionDataSets;

// Create execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// Get standard 3-tuple dataset (21 records)
DataSet<Tuple3<Integer, Long, String>> data = CollectionDataSets.get3TupleDataSet(env);

// Get small dataset for quick tests (3 records)
DataSet<Tuple3<Integer, Long, String>> smallData = CollectionDataSets.getSmall3TupleDataSet(env);

// Get custom POJO dataset
DataSet<CollectionDataSets.CustomType> customData = CollectionDataSets.getCustomTypeDataSet(env);

// Use in transformations
data.map(tuple -> new Tuple2<>(tuple.f0, tuple.f2))
    .collect();

Value Collection Data Sets

Test data provider for Flink Value types (IntValue, LongValue, StringValue).

/**
 * Test data provider for Flink Value types
 */
public class ValueCollectionDataSets {
    
    // Value-based tuple datasets
    public static DataSet<Tuple3<IntValue, LongValue, StringValue>> get3TupleDataSet(ExecutionEnvironment env);
    public static DataSet<Tuple3<IntValue, LongValue, StringValue>> getSmall3TupleDataSet(ExecutionEnvironment env);
    public static DataSet<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> get5TupleDataSet(ExecutionEnvironment env);
    
    // Value-based primitive datasets
    public static DataSet<StringValue> getStringDataSet(ExecutionEnvironment env);
    public static DataSet<IntValue> getIntegerDataSet(ExecutionEnvironment env);
    
    // Value-based POJO datasets
    public static DataSet<CustomType> getCustomTypeDataSet(ExecutionEnvironment env);
    public static DataSet<CustomType> getSmallCustomTypeDataSet(ExecutionEnvironment env);
}

Test Data Types

Standard POJO and data types used across test datasets.

/**
 * Standard test POJO with primitive fields
 */
public static class CustomType {
    public int myInt;
    public long myLong;
    public String myString;
    
    public CustomType();
    public CustomType(int i, long l, String s);
    
    // Standard equals, hashCode, toString methods
    public boolean equals(Object obj);
    public int hashCode();
    public String toString();
}

/**
 * Complex test POJO with nested structures
 */
public static class POJO {
    public int number;
    public String str;
    public NestedPojo nestedPojo;
    
    public POJO();
    public POJO(int i, String s, NestedPojo np);
}

/**
 * Nested POJO for complex data structures
 */
public static class NestedPojo {
    public long longNumber;
    
    public NestedPojo();
    public NestedPojo(long l);
}

/**
 * Multi-level nested POJO for deep structure testing
 */
public static class CrazyNested {
    public int number;
    public String str;
    public CrazyNestedL1 nest_Lvl1;
    public long timestamp;
    
    public CrazyNested();
    public CrazyNested(int number, String str, CrazyNestedL1 nest_Lvl1, long timestamp);
}

/**
 * POJO extending Tuple with constructor
 */
public static class FromTupleWithCTor extends Tuple3<Long, Long, String> {
    public FromTupleWithCTor();
    public FromTupleWithCTor(Long l1, Long l2, String s);
}

/**
 * POJO with Date and enum fields
 */
public static class PojoWithDateAndEnum {
    public String str;
    public Date date;
    public Category cat;
    
    public PojoWithDateAndEnum();
    public PojoWithDateAndEnum(String str, Date date, Category cat);
}

/**
 * POJO with collection fields
 */
public static class PojoWithCollection {
    public List<Pojo1> pojosList;
    public int[] intArray;
    public List<String> stringList;
    
    public PojoWithCollection();
    public PojoWithCollection(List<Pojo1> pojosList, int[] intArray, List<String> stringList);
}

/**
 * Test enumeration
 */
public enum Category {
    CAT_A, CAT_B
}

Input Format Utilities

Specialized input formats for generating continuous and configurable test data streams for streaming and batch applications.

/**
 * Input format that generates uniform integer tuple pairs for testing
 */
public class UniformIntTupleGeneratorInputFormat extends GenericInputFormat<Tuple2<Integer, Integer>> {
    
    /**
     * Constructor for uniform tuple generator
     * @param numKeys Number of distinct keys to generate
     * @param numValsPerKey Number of values to generate per key
     */
    public UniformIntTupleGeneratorInputFormat(int numKeys, int numValsPerKey);
    
    /**
     * Generate next tuple element
     * @param reuse Tuple to reuse for output
     * @return Next tuple or null if exhausted
     * @throws IOException if generation fails
     */
    public Tuple2<Integer, Integer> nextRecord(Tuple2<Integer, Integer> reuse) throws IOException;
    
    /**
     * Check if more records are available
     * @return true if more records can be generated
     * @throws IOException if check fails
     */
    public boolean reachedEnd() throws IOException;
}

/**
 * Input format that generates infinite integer tuple sequence with optional delay
 */
public class InfiniteIntegerTupleInputFormat extends GenericInputFormat<Tuple2<Integer, Integer>> {
    
    /**
     * Constructor for infinite tuple generator
     * @param addDelay Whether to add delay between elements
     */
    public InfiniteIntegerTupleInputFormat(boolean addDelay);
    
    /**
     * Generate next tuple element (never returns null)
     * @param reuse Tuple to reuse for output
     * @return Next tuple in sequence
     * @throws IOException if generation fails
     */
    public Tuple2<Integer, Integer> nextRecord(Tuple2<Integer, Integer> reuse) throws IOException;
    
    /**
     * Always returns false - infinite generation
     * @return false (never reaches end)
     */
    public boolean reachedEnd() throws IOException;
}

/**
 * Input format that generates infinite integer sequence with optional delay
 */
public class InfiniteIntegerInputFormat extends GenericInputFormat<Integer> {
    
    /**
     * Constructor for infinite integer generator
     * @param addDelay Whether to add delay between elements
     */
    public InfiniteIntegerInputFormat(boolean addDelay);
    
    /**
     * Generate next integer element (never returns null)
     * @param reuse Integer to reuse for output
     * @return Next integer in sequence
     * @throws IOException if generation fails
     */
    public Integer nextRecord(Integer reuse) throws IOException;
    
    /**
     * Always returns false - infinite generation
     * @return false (never reaches end)
     */
    public boolean reachedEnd() throws IOException;
}

Input Format Usage Examples:

import org.apache.flink.test.util.UniformIntTupleGeneratorInputFormat;
import org.apache.flink.test.util.InfiniteIntegerTupleInputFormat;
import org.apache.flink.test.util.InfiniteIntegerInputFormat;

// Generate uniform test data for batch processing
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// Create uniform tuple data: 10 keys, 100 values per key = 1000 total tuples
DataSet<Tuple2<Integer, Integer>> uniformData = env.createInput(
    new UniformIntTupleGeneratorInputFormat(10, 100)
);

// Process uniform data
List<Tuple2<Integer, Integer>> results = uniformData
    .filter(tuple -> tuple.f0 % 2 == 0)  // Even keys only
    .collect();

assertEquals(500, results.size());  // 5 even keys * 100 values each

// Generate infinite streaming data for stress testing
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

// Create infinite integer stream with delay for controlled rate
DataStream<Integer> infiniteStream = streamEnv.createInput(
    new InfiniteIntegerInputFormat(true)  // With delay
);

// Use in streaming topology (must have termination condition)
infiniteStream
    .map(i -> i * 2)
    .filter(i -> i < 10000)  // Limit for testing
    .addSink(new TestSink());

// Create infinite tuple stream without delay for performance testing
DataStream<Tuple2<Integer, Integer>> infiniteTuples = streamEnv.createInput(
    new InfiniteIntegerTupleInputFormat(false)  // No delay
);

// Use for performance benchmarking
infiniteTuples
    .keyBy(0)
    .timeWindow(Time.seconds(1))
    .count()
    .addSink(new CountingSink());

Test Functions

Common transformation functions used across multiple test scenarios, including tokenization and data processing utilities.

/**
 * FlatMap function for tokenizing strings into word-count tuples
 */
public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
    
    /**
     * Tokenize input string and emit word-count pairs
     * @param value Input string to tokenize
     * @param out Collector for emitting word-count tuples
     * @throws Exception if tokenization fails
     */
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception;
}

Test Function Usage Example:

import org.apache.flink.test.testfunctions.Tokenizer;

// Use tokenizer in word count example
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<String> text = env.fromElements(
    "Hello World",
    "Hello Flink World",
    "World of Streaming"
);

DataSet<Tuple2<String, Integer>> words = text
    .flatMap(new Tokenizer())  // Tokenize sentences into words
    .groupBy(0)                // Group by word
    .sum(1);                   // Sum counts

List<Tuple2<String, Integer>> result = words.collect();
// Result contains: ("Hello", 2), ("World", 3), ("Flink", 1), ("of", 1), ("Streaming", 1)

Data Characteristics

The test datasets provide predictable data patterns:

3-Tuple Dataset (21 records):

  • Integer field: Sequential values 1-21
  • Long field: Values 1-6 (grouped for testing)
  • String field: Mix of simple strings and "Comment#N" patterns

Small Datasets (3 records):

  • Subset of larger datasets for quick unit tests
  • Consistent with larger dataset patterns

Custom Type Dataset:

  • Maps to tuple data with same value patterns
  • Tests POJO serialization and field access

Complex Nested Datasets:

  • Multi-level nesting for serialization testing
  • Combination of primitive and object types
  • Date and enum types for type system testing

Usage Patterns:

// Standard pattern for DataSet tests
DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
DataSet<Result> output = input.map(new MyMapper()).filter(new MyFilter());
List<Result> results = output.collect();

// Verification pattern
assertEquals(expectedCount, results.size());
assertTrue(results.contains(expectedValue));

// Small dataset for unit tests
DataSet<Tuple3<Integer, Long, String>> quickTest = CollectionDataSets.getSmall3TupleDataSet(env);
List<Tuple3<Integer, Long, String>> quickResults = quickTest.collect();
assertEquals(3, quickResults.size());

These data generators ensure consistent, reproducible test data across all Flink testing scenarios, supporting both simple unit tests and complex integration tests with predictable data patterns and edge cases.