The Flink Tests module provides comprehensive testing infrastructure and utilities for Apache Flink applications. This test-jar package offers data generators, test base classes, migration testing frameworks, and specialized utilities for testing stream processing applications with fault tolerance, state management, and performance validation.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-tests_2.11</artifactId>
<version>1.8.3</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>// Test data generation
import org.apache.flink.test.operators.util.CollectionDataSets;
import org.apache.flink.test.operators.util.ValueCollectionDataSets;
// Input format utilities
import org.apache.flink.test.util.UniformIntTupleGeneratorInputFormat;
import org.apache.flink.test.util.InfiniteIntegerTupleInputFormat;
import org.apache.flink.test.util.InfiniteIntegerInputFormat;
// Migration testing
import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase;
import org.apache.flink.test.checkpointing.utils.MigrationTestUtils;
// Streaming utilities and fault injection
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
import org.apache.flink.test.streaming.runtime.util.TestListWrapper;
import org.apache.flink.test.checkpointing.utils.FailingSource;
import org.apache.flink.test.checkpointing.utils.ValidatingSink;
// Test base classes
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
import org.apache.flink.test.recovery.SimpleRecoveryITCaseBase;
// State management testing
import org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase;
import org.apache.flink.test.state.operator.restore.AbstractKeyedOperatorRestoreTestBase;
import org.apache.flink.test.state.operator.restore.AbstractNonKeyedOperatorRestoreTestBase;
// Test functions
import org.apache.flink.test.testfunctions.Tokenizer;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.test.operators.util.CollectionDataSets;
// Create test data for batch processing tests
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Long, String>> testData = CollectionDataSets.get3TupleDataSet(env);
// Use the data in your test
testData.map(/* your transformation */).collect();import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
// Collect results in streaming tests
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TestListResultSink<String> sink = new TestListResultSink<>();
dataStream.addSink(sink);
env.execute();
List<String> results = sink.getResult();The Flink Tests module is organized around several key components:
Comprehensive data generators providing standardized test datasets, input format utilities, and test functions for DataSet and DataStream operations. Includes tuple data, POJO data, nested structures, custom types, infinite data generators, and common transformation functions.
// 3-tuple dataset with 21 records
public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env);
// Custom POJO dataset with 21 records
public static DataSet<CustomType> getCustomTypeDataSet(ExecutionEnvironment env);
// Small datasets for quick tests (3 records each)
public static DataSet<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(ExecutionEnvironment env);
// Input format utilities for continuous data generation
public class UniformIntTupleGeneratorInputFormat extends GenericInputFormat<Tuple2<Integer, Integer>> {
public UniformIntTupleGeneratorInputFormat(int numKeys, int numValsPerKey);
}
public class InfiniteIntegerInputFormat extends GenericInputFormat<Integer> {
public InfiniteIntegerInputFormat(boolean addDelay);
}
// Test functions for common transformations
public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out);
}Complete framework for testing savepoint and checkpoint migration across different Flink versions. Provides base classes, utilities, and pre-configured sources/sinks for migration validation.
public abstract class SavepointMigrationTestBase extends TestBaseUtils {
protected String getResourceFilename(String filename);
protected void executeAndSavepoint(StreamExecutionEnvironment env, String savepointPath,
Tuple2<String, Integer>... expectedAccumulators) throws Exception;
protected void restoreAndExecute(StreamExecutionEnvironment env, String savepointPath,
Tuple2<String, Integer>... expectedAccumulators) throws Exception;
}Thread-safe utilities for collecting and validating results in streaming applications. Includes result sinks, test wrappers, fault injection sources, and streaming-specific helper functions.
public class TestListResultSink<T> extends RichSinkFunction<T> {
public List<T> getResult();
public List<T> getSortedResult();
}
public class TestListWrapper {
public static TestListWrapper getInstance();
public int createList();
public List<Object> getList(int listId);
}
public class FailingSource<T> extends RichSourceFunction<T> {
public FailingSource(EventEmittingGenerator<T> generator, int failAfterElements);
public static interface EventEmittingGenerator<T> extends Serializable {
public void emitEvent(SourceContext<T> ctx, int eventSequenceNo);
}
}
public class ValidatingSink<T> extends RichSinkFunction<T> {
public ValidatingSink(ResultChecker<T> resultChecker, CountUpdater countUpdater);
public static interface ResultChecker<T> extends Serializable {
public boolean checkResult(T result);
}
public static interface CountUpdater extends Serializable {
public void updateCount(long count);
}
}Abstract base classes providing common patterns for different testing scenarios including cancellation testing, fault tolerance testing, and recovery testing.
public abstract class StreamFaultToleranceTestBase extends TestLogger {
public static final int NUM_TASK_MANAGERS = 2;
public static final int NUM_TASK_SLOTS = 8;
public static final int PARALLELISM = 4;
public abstract void testProgram(StreamExecutionEnvironment env);
public abstract void postSubmit() throws Exception;
}
public abstract class SimpleRecoveryITCaseBase extends TestLogger {
protected abstract void executeRecoveryTest() throws Exception;
}Complete programs for testing dynamic class loading, user code isolation, and class loading policies. Each program serves as a standalone test case for different class loading scenarios.
public class StreamingProgram {
public static void main(String[] args) throws Exception;
}
public class CheckpointedStreamingProgram {
public static void main(String[] args) throws Exception;
}
public class KMeansForTest {
public static void main(String[] args) throws Exception;
public static class Point { /* 2D point representation */ }
public static class Centroid extends Point { /* cluster center */ }
}Framework for testing operator state restoration and migration, including utilities for both keyed and non-keyed state scenarios.
public enum ExecutionMode {
GENERATE, MIGRATE, RESTORE
}
public abstract class AbstractOperatorRestoreTestBase {
protected abstract StreamExecutionEnvironment createMigrationJob(String savepointPath) throws Exception;
protected abstract StreamExecutionEnvironment createRestoredJob(String savepointPath) throws Exception;
protected abstract String getMigrationSavepointName();
}
public abstract class AbstractKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {
// Specialized testing for keyed state operators
}
public abstract class AbstractNonKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {
// Specialized testing for non-keyed state operators
}
public class KeyedJob {
public static void main(String[] args) throws Exception;
}
public class NonKeyedJob {
public static void main(String[] args) throws Exception;
}Manual programs for performance benchmarking, scalability testing, and resource usage validation. These programs are designed for manual execution and analysis.
public class MassiveStringSorting {
public static void main(String[] args) throws Exception;
}
public class StreamingScalabilityAndLatency {
public static void main(String[] args) throws Exception;
}
public class ReducePerformance {
public static void main(String[] args) throws Exception;
}// Test data POJO
public static class CustomType {
public int myInt;
public long myLong;
public String myString;
public CustomType();
public CustomType(int i, long l, String s);
}
// Success indication exception
public class SuccessException extends RuntimeException {
public SuccessException();
}
// Simple integer wrapper for testing
public class IntType {
public int value;
public IntType(int value);
}
// Test enum
public enum Category {
CAT_A, CAT_B
}The testing framework includes specialized exceptions and error handling patterns:
Tests should handle these exceptions appropriately and use the provided error injection mechanisms for fault tolerance testing.