Comprehensive test library for Apache Flink stream processing framework with integration tests, test utilities, and end-to-end validation tests.
—
Reusable datasets, POJOs, and runtime utilities for consistent testing across Flink modules. This framework provides standardized test data structures and execution utilities that ensure reproducible testing environments.
Utility class providing standardized test datasets and data types for consistent testing across Flink modules.
/**
* Standard test datasets and data types for Flink testing
*/
public class CollectionDataStreams {
/**
* Get standard 3-tuple dataset for testing
* @param env StreamExecutionEnvironment for dataset creation
* @return DataStreamSource of Tuple3<Integer, Long, String> with standard test data
*/
public static DataStreamSource<Tuple3<Integer, Long, String>> get3TupleDataSet(StreamExecutionEnvironment env);
/**
* Get small 3-tuple dataset for quick testing
* @param env StreamExecutionEnvironment for dataset creation
* @return DataStreamSource of Tuple3<Integer, Long, String> with reduced test data
*/
public static DataStreamSource<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(StreamExecutionEnvironment env);
/**
* Get batch 3-tuple dataset for batch testing
* @param env ExecutionEnvironment for batch dataset creation
* @return DataSet of Tuple3<Integer, Long, String> with standard test data
*/
public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env);
/**
* Get small batch 3-tuple dataset for quick batch testing
* @param env ExecutionEnvironment for batch dataset creation
* @return DataSet of Tuple3<Integer, Long, String> with reduced test data
*/
public static DataSet<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(ExecutionEnvironment env);
/**
* Get integer dataset for numeric testing
* @param env ExecutionEnvironment for dataset creation
* @return DataSet of Integer values
*/
public static DataSet<Integer> getIntegerDataSet(ExecutionEnvironment env);
/**
* Get string dataset for text processing testing
* @param env ExecutionEnvironment for dataset creation
* @return DataSet of String values
*/
public static DataSet<String> getStringDataSet(ExecutionEnvironment env);
/**
* Get 5-tuple dataset for complex tuple testing
* @param env ExecutionEnvironment for dataset creation
* @return DataSet of Tuple5<Integer, Long, Integer, String, Long>
*/
public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env);
/**
* Get CustomType dataset for POJO testing
* @param env ExecutionEnvironment for dataset creation
* @return DataSet of CustomType instances
*/
public static DataSet<CustomType> getCustomTypeDataSet(ExecutionEnvironment env);
/**
* Get small CustomType dataset for quick POJO testing
* @param env ExecutionEnvironment for dataset creation
* @return DataSet of CustomType instances (reduced size)
*/
public static DataSet<CustomType> getSmallCustomTypeDataSet(ExecutionEnvironment env);
/**
* Get POJO dataset for complex POJO testing
* @param env ExecutionEnvironment for dataset creation
* @return DataSet of POJO instances
*/
public static DataSet<POJO> getPojoDataSet(ExecutionEnvironment env);
/**
* Get small POJO dataset for quick complex POJO testing
* @param env ExecutionEnvironment for dataset creation
* @return DataSet of POJO instances (reduced size)
*/
public static DataSet<POJO> getSmallPojoDataSet(ExecutionEnvironment env);
/**
* Get POJO dataset with collections for collection testing
* @param env ExecutionEnvironment for dataset creation
* @return DataSet of PojoWithCollectionGeneric instances
*/
public static DataSet<PojoWithCollectionGeneric> getPojoWithCollectionDataSet(ExecutionEnvironment env);
}Standard POJO classes for testing various serialization, deserialization, and data processing scenarios.
/**
* Custom test data type for general testing scenarios
*/
public class CustomType {
/** String field for testing */
public String myString;
/** Integer field for testing */
public int myInt;
/**
* Default constructor for CustomType
*/
public CustomType();
/**
* Constructor with field initialization
* @param myString string value
* @param myInt integer value
*/
public CustomType(String myString, int myInt);
/**
* Get string field value
* @return String value
*/
public String getMyString();
/**
* Set string field value
* @param myString string value to set
*/
public void setMyString(String myString);
/**
* Get integer field value
* @return int value
*/
public int getMyInt();
/**
* Set integer field value
* @param myInt integer value to set
*/
public void setMyInt(int myInt);
}
/**
* Basic POJO for general testing scenarios
*/
public class POJO {
/** Number field */
public int number;
/** String field */
public String str;
/**
* Default constructor for POJO
*/
public POJO();
/**
* Constructor with field initialization
* @param number integer value
* @param str string value
*/
public POJO(int number, String str);
}
/**
* Nested POJO structure for testing complex object hierarchies
*/
public class NestedPojo {
/** Nested POJO field */
public POJO nested;
/** Long field */
public long longField;
/**
* Default constructor for NestedPojo
*/
public NestedPojo();
/**
* Constructor with field initialization
* @param nested nested POJO instance
* @param longField long value
*/
public NestedPojo(POJO nested, long longField);
}
/**
* Complex nested POJO hierarchy for advanced testing scenarios
*/
public class CrazyNested {
/** Nested POJO */
public NestedPojo nestedPojo;
/** POJO field */
public POJO simplePojo;
/** String field */
public String stringField;
/**
* Default constructor for CrazyNested
*/
public CrazyNested();
/**
* Constructor with field initialization
* @param nestedPojo nested POJO instance
* @param simplePojo simple POJO instance
* @param stringField string value
*/
public CrazyNested(NestedPojo nestedPojo, POJO simplePojo, String stringField);
}
/**
* POJO with date and enum fields for testing special data types
*/
public class PojoWithDateAndEnum {
/** Date field */
public Date dateField;
/** Enum field */
public TestEnum enumField;
/** String field */
public String stringField;
/**
* Default constructor
*/
public PojoWithDateAndEnum();
/**
* Constructor with field initialization
* @param dateField date value
* @param enumField enum value
* @param stringField string value
*/
public PojoWithDateAndEnum(Date dateField, TestEnum enumField, String stringField);
/**
* Test enum for POJO testing
*/
public enum TestEnum {
VALUE1, VALUE2, VALUE3
}
}
/**
* POJO with generic collections for testing collection serialization
*/
public class PojoWithCollectionGeneric {
/** List of strings */
public List<String> stringList;
/** Map of string to integer */
public Map<String, Integer> stringIntMap;
/** Set of long values */
public Set<Long> longSet;
/**
* Default constructor
*/
public PojoWithCollectionGeneric();
/**
* Constructor with collection initialization
* @param stringList list of strings
* @param stringIntMap map of string to integer
* @param longSet set of long values
*/
public PojoWithCollectionGeneric(
List<String> stringList,
Map<String, Integer> stringIntMap,
Set<Long> longSet);
}Specialized data sources for various testing scenarios including infinite streams and coordinated sources.
/**
* Source that emits integers indefinitely for long-running tests
*/
public class InfiniteIntegerSource implements SourceFunction<Integer> {
/**
* Constructor for infinite integer source
* @param startValue starting integer value
* @param incrementBy increment between values
*/
public InfiniteIntegerSource(int startValue, int incrementBy);
@Override
public void run(SourceContext<Integer> ctx) throws Exception;
@Override
public void cancel();
}
/**
* Number sequence source with checkpoint coordination capabilities
*/
public class NumberSequenceSourceWithWaitForCheckpoint implements SourceFunction<Long> {
/**
* Constructor for number sequence source with checkpoint coordination
* @param from starting value
* @param to ending value
* @param checkpointCoordination enable checkpoint coordination
*/
public NumberSequenceSourceWithWaitForCheckpoint(
long from,
long to,
boolean checkpointCoordination);
@Override
public void run(SourceContext<Long> ctx) throws Exception;
@Override
public void cancel();
}Input formats for batch processing tests and data generation scenarios.
/**
* Infinite integer input format for batch testing scenarios
*/
public class InfiniteIntegerInputFormat implements InputFormat<Integer, InputSplit> {
/**
* Constructor for infinite integer input format
* @param startValue starting integer value
* @param maxElements maximum elements to generate (or -1 for infinite)
*/
public InfiniteIntegerInputFormat(int startValue, int maxElements);
@Override
public void configure(Configuration parameters);
@Override
public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException;
@Override
public InputSplit[] createInputSplits(int minNumSplits) throws IOException;
@Override
public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits);
@Override
public void open(InputSplit split) throws IOException;
@Override
public boolean reachedEnd() throws IOException;
@Override
public Integer nextRecord(Integer reuse) throws IOException;
@Override
public void close() throws IOException;
}
/**
* Uniform integer tuple generator input format for load testing
*/
public class UniformIntTupleGeneratorInputFormat
implements InputFormat<Tuple2<Integer, Integer>, InputSplit> {
/**
* Constructor for uniform tuple generator
* @param numTuples number of tuples to generate
* @param minValue minimum integer value
* @param maxValue maximum integer value
*/
public UniformIntTupleGeneratorInputFormat(int numTuples, int minValue, int maxValue);
@Override
public Tuple2<Integer, Integer> nextRecord(Tuple2<Integer, Integer> reuse) throws IOException;
}Utility classes for job execution, testing infrastructure, and common testing operations.
/**
* Utility for running JobGraphs on MiniCluster for testing
*/
public class JobGraphRunningUtil {
/**
* Execute JobGraph on MiniCluster and wait for completion
* @param jobGraph JobGraph to execute
* @param miniCluster MiniCluster instance for execution
* @throws Exception if job execution fails
*/
public static void execute(JobGraph jobGraph, MiniCluster miniCluster) throws Exception;
/**
* Execute JobGraph with timeout
* @param jobGraph JobGraph to execute
* @param miniCluster MiniCluster instance
* @param timeoutMs timeout in milliseconds
* @return JobExecutionResult containing execution results
* @throws Exception if execution fails or times out
*/
public static JobExecutionResult executeWithTimeout(
JobGraph jobGraph,
MiniCluster miniCluster,
long timeoutMs) throws Exception;
/**
* Execute JobGraph and return execution result
* @param jobGraph JobGraph to execute
* @param miniCluster MiniCluster instance
* @return JobExecutionResult with job execution details
* @throws Exception if execution fails
*/
public static JobExecutionResult executeAndGetResult(
JobGraph jobGraph,
MiniCluster miniCluster) throws Exception;
}
/**
* Identity mapper for testing data flow without transformation
*/
public class NoOpIntMap implements MapFunction<Integer, Integer> {
@Override
public Integer map(Integer value) throws Exception;
}
/**
* No-operation sink for testing data flow completion
*/
public class ReceiveCheckNoOpSink<T> implements SinkFunction<T> {
/**
* Constructor for no-op sink with receive tracking
* @param expectedCount expected number of elements to receive
*/
public ReceiveCheckNoOpSink(int expectedCount);
@Override
public void invoke(T value, Context context) throws Exception;
/**
* Check if expected number of elements were received
* @return boolean indicating if expected count was reached
*/
public boolean receivedExpectedCount();
}Usage Examples:
import org.apache.flink.test.operators.util.CollectionDataStreams;
import org.apache.flink.test.util.*;
// Using standard test datasets
public class DataProcessingTest {
@Test
public void testWithStandardTupleData() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Get standard 3-tuple dataset
DataStreamSource<Tuple3<Integer, Long, String>> testData =
CollectionDataStreams.get3TupleDataSet(env);
// Process data
DataStream<Integer> result = testData
.map(new MapFunction<Tuple3<Integer, Long, String>, Integer>() {
@Override
public Integer map(Tuple3<Integer, Long, String> value) {
return value.f0 * 2;
}
})
.filter(x -> x > 10);
// Execute and validate results
env.execute();
}
@Test
public void testWithCustomTypes() throws Exception {
// Create custom type instances for testing
CustomType custom1 = new CustomType("test", 42);
CustomType custom2 = new CustomType("example", 123);
List<CustomType> customData = Arrays.asList(custom1, custom2);
// Validate custom type properties
assertEquals("test", customData.get(0).getMyString());
assertEquals(42, customData.get(0).getMyInt());
}
}
// Using runtime utilities
public class JobExecutionTest {
@Test
public void testJobExecution() throws Exception {
// Create test job graph
JobGraph jobGraph = new JobGraph();
// Add vertices with standard test operators
JobVertex source = new JobVertex("source");
source.setInvokableClass(InfiniteIntegerSource.class);
source.getConfiguration().setInteger("start-value", 1);
source.getConfiguration().setInteger("max-elements", 1000);
source.setParallelism(1);
JobVertex mapper = new JobVertex("mapper");
mapper.setInvokableClass(NoOpIntMap.class);
mapper.setParallelism(2);
JobVertex sink = new JobVertex("sink");
sink.setInvokableClass(ReceiveCheckNoOpSink.class);
sink.getConfiguration().setInteger("expected-count", 1000);
sink.setParallelism(1);
// Connect vertices
mapper.connectNewDataSetAsInput(source, DistributionPattern.REBALANCE);
sink.connectNewDataSetAsInput(mapper, DistributionPattern.FORWARD);
jobGraph.addVertex(source);
jobGraph.addVertex(mapper);
jobGraph.addVertex(sink);
// Execute using utility
MiniCluster miniCluster = new MiniCluster(createTestConfiguration());
miniCluster.start();
JobExecutionResult result = JobGraphRunningUtil.executeAndGetResult(
jobGraph, miniCluster);
// Validate execution
assertTrue(result.isSuccess());
miniCluster.close();
}
@Test
public void testLongRunningJobWithTimeout() throws Exception {
JobGraph longRunningJob = createLongRunningJob();
MiniCluster miniCluster = new MiniCluster(createTestConfiguration());
miniCluster.start();
// Execute with timeout
JobExecutionResult result = JobGraphRunningUtil.executeWithTimeout(
longRunningJob, miniCluster, 30000L);
assertNotNull(result);
miniCluster.close();
}
}
// Creating custom test data
public class CustomTestDataCreation {
@Test
public void createCustomPojoData() {
// Create custom POJO instances
POJO pojo1 = new POJO(1, "first");
POJO pojo2 = new POJO(2, "second");
NestedPojo nested1 = new NestedPojo(pojo1, 100L);
NestedPojo nested2 = new NestedPojo(pojo2, 200L);
CrazyNested complex1 = new CrazyNested(nested1, pojo1, "complex1");
CrazyNested complex2 = new CrazyNested(nested2, pojo2, "complex2");
// Use in test scenarios
List<CrazyNested> testData = Arrays.asList(complex1, complex2);
// Validate POJO structure
assertNotNull(testData.get(0).nestedPojo.nested);
assertEquals("first", testData.get(0).nestedPojo.nested.str);
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-tests