Comprehensive integration test suite for Apache Flink stream processing framework providing test utilities, base classes, and infrastructure for validating fault tolerance, checkpointing, and streaming operations.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-tests_2-10@1.3.0Apache Flink Tests is a comprehensive integration test suite providing reusable testing infrastructure, utilities, and base classes for validating Apache Flink's stream processing functionality. It contains 50+ test utilities, base classes, and components covering fault tolerance, checkpointing, state migration, data generation, and streaming operations.
org.apache.flink:flink-tests_2.10:1.3.3pom.xml dependencies with <scope>test</scope><dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-tests_2.10</artifactId>
<version>1.3.3</version>
<scope>test</scope>
</dependency>// Test utilities
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.test.util.SuccessException;
// Data generation utilities
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.javaApiOperators.util.ValueCollectionDataSets;
// Base test classes for checkpointing and state management
import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase;
import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
// Streaming test utilities
import org.apache.flink.test.streaming.runtime.util.EvenOddOutputSelector;
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;// Using test data generation
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Long, String>> testData =
CollectionDataSets.get3TupleDataSet(env);
// Using test execution utilities
@Test
public void testStreamingJob() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configure your streaming job
env.fromElements(1, 2, 3, 4, 5)
.map(x -> x * 2)
.addSink(new TestListResultSink<>());
// Execute with proper exception handling
TestUtils.tryExecute(env, "Test Job");
}
// Using savepoint migration testing
public class MySavepointMigrationTest extends SavepointMigrationTestBase {
@Test
public void testMigration() throws Exception {
// Test savepoint compatibility across versions
StreamExecutionEnvironment env1 = createTestJob();
executeAndSavepoint(env1, "test-savepoint");
StreamExecutionEnvironment env2 = createModifiedJob();
restoreAndExecute(env2, "test-savepoint");
}
}Apache Flink Tests is organized around several key infrastructure areas:
Comprehensive test data generation utilities providing consistent datasets for Java and Scala API testing. Includes custom types, collections, and input formats.
public class CollectionDataSets {
public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env);
public static DataSet<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(ExecutionEnvironment env);
public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env);
public static DataSet<String> getStringDataSet(ExecutionEnvironment env);
public static DataSet<Integer> getIntegerDataSet(ExecutionEnvironment env);
public static DataSet<CustomType> getCustomTypeDataSet(ExecutionEnvironment env);
}Infrastructure for testing savepoint migration, state compatibility, and checkpoint recovery across Flink versions. Essential for validating upgrade paths and state serialization.
public abstract class SavepointMigrationTestBase {
protected void executeAndSavepoint(JobGraph jobGraph, String savepointName) throws Exception;
protected void restoreAndExecute(JobGraph jobGraph, String savepointName) throws Exception;
protected abstract String getResourceFilename(String filename);
}Checkpointing and State Management
Base classes and utilities for testing job cancellation, task failure recovery, and fault tolerance mechanisms. Provides controlled failure injection and recovery validation.
public abstract class StreamFaultToleranceTestBase {
protected abstract void testProgram(StreamExecutionEnvironment env);
protected void postSubmit() throws Exception;
}
public abstract class SimpleRecoveryITCaseBase {
protected void execute() throws Exception;
}Specialized components for testing streaming operations including output selectors, result collection, and stream partitioning utilities.
public class EvenOddOutputSelector implements OutputSelector<Integer> {
public Iterable<String> select(Integer value);
}
public class TestListResultSink<T> implements SinkFunction<T> {
public void invoke(T value);
public List<T> getResult();
}Core utilities for executing test jobs with proper exception handling and result validation. Handles the complexities of test execution in Flink environments.
public class TestUtils {
public static JobExecutionResult tryExecute(StreamExecutionEnvironment env, String jobName) throws Exception;
}
public class SuccessException extends Exception {
public SuccessException();
}Framework for verifying API parity between Java and Scala implementations using reflection-based method comparison.
public abstract class ScalaAPICompletenessTestBase {
protected void compareApis(Class<?> javaClass, Class<?> scalaClass);
protected boolean isExcluded(Method method);
}