Comprehensive test library for Apache Flink stream processing framework with integration tests, test utilities, and end-to-end validation tests.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-tests@2.1.0Apache Flink Tests is a comprehensive test library providing reusable testing infrastructure, utilities, and frameworks for testing Apache Flink stream processing functionality. This module packages as a test-jar, making its test utilities available to other Flink modules for comprehensive validation of streaming, batch processing, state management, and fault tolerance features.
org.apache.flink:flink-tests:2.1.0// Test utilities and base classes
import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase;
import org.apache.flink.test.checkpointing.utils.MigrationTestUtils;
import org.apache.flink.test.recovery.SimpleRecoveryITCaseBase;
import org.apache.flink.test.util.JobGraphRunningUtil;
// Test data and operators
import org.apache.flink.test.operators.util.CollectionDataStreams;
import org.apache.flink.runtime.operators.lifecycle.graph.TestJobBuilders;Maven dependency (test-jar):
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-tests</artifactId>
<version>2.1.0</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase;
import org.apache.flink.test.operators.util.CollectionDataStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.api.java.tuple.Tuple3;
// Example: Using standard test data
public class MyFlinkTest {
@Test
public void testWithStandardData() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple3<Integer, Long, String>> testData =
CollectionDataStreams.get3TupleDataSet(env);
// Use test data in your Flink job
DataStream<Integer> result = testData
.map(t -> t.f0)
.filter(x -> x > 5);
// Add sink and execute
result.print();
env.execute("Test Job");
}
}
// Example: Testing migration scenarios
public class MigrationTest extends SnapshotMigrationTestBase {
@Test
public void testStateMigration() throws Exception {
// Create and execute job with state
JobGraph job = createJobWithOperatorState();
SnapshotSpec snapshot = executeAndSnapshot(job);
// Restore and validate in new version
JobGraph restoredJob = createUpdatedJob();
restoreAndExecute(restoredJob, snapshot);
}
}Apache Flink Tests is organized around several key testing frameworks:
Comprehensive framework for testing snapshot migration across Flink versions with utilities for state validation and checkpoint management.
public abstract class SnapshotMigrationTestBase {
protected SnapshotSpec executeAndSnapshot(JobGraph job) throws Exception;
protected void restoreAndExecute(JobGraph job, SnapshotSpec snapshot) throws Exception;
public static class SnapshotSpec {
public String getSnapshotPath();
public String getSnapshotVersion();
}
}Multiple failure injection mechanisms and recovery testing utilities for validating Flink's fault tolerance capabilities.
public abstract class SimpleRecoveryITCaseBase {
protected void runAndCancelJob(JobGraph jobGraph) throws Exception;
public static class FailingMapper1 implements MapFunction<Integer, Integer>;
public static class FailingMapper2 implements MapFunction<Integer, Integer>;
}Complete framework for testing streaming operator behavior including startup, checkpointing, finishing, and shutdown phases.
public class TestJobBuilders {
public static final TestJobBuilder SIMPLE_GRAPH_BUILDER;
public static final TestJobBuilder COMPLEX_GRAPH_BUILDER;
}
public class OneInputTestStreamOperator extends AbstractStreamOperator<TestDataElement>
implements OneInputStreamOperator<TestDataElement, TestDataElement>;
public class TestEventQueue {
public void add(TestEvent event);
public List<TestEvent> getEvents();
}Framework for testing state backend switching, operator restore scenarios, and state migration validation.
public abstract class AbstractOperatorRestoreTestBase {
protected void testRestore() throws Exception;
}
public abstract class SavepointStateBackendSwitchTestBase {
protected void testSwitchingStateBackend() throws Exception;
}State Backend and Restore Testing
Reusable datasets, POJOs, and runtime utilities for consistent testing across Flink modules.
public class CollectionDataStreams {
public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env);
public static DataSet<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(ExecutionEnvironment env);
}
public class JobGraphRunningUtil {
public static void execute(JobGraph jobGraph, MiniCluster miniCluster) throws Exception;
}Framework for testing job cancellation scenarios and cleanup behavior.
public abstract class CancelingTestBase {
protected void runAndCancelJob(JobGraph jobGraph, long cancelAfterMs) throws Exception;
}Specialized testing framework for session window functionality with event generation and validation.
public class EventGeneratorFactory {
public static SessionEventGenerator create(SessionConfiguration config);
}
public class SessionEvent {
public String getSessionId();
public long getTimestamp();
public TestEventPayload getPayload();
}Framework for testing Flink's plugin system and service provider interface (SPI) implementations.
public abstract class PluginTestBase {
protected void testPluginLoading() throws Exception;
}Job execution utilities, process management, and common testing operations for controlled test environments.
public class JobGraphRunningUtil {
public static void execute(JobGraph jobGraph, MiniCluster miniCluster) throws Exception;
public static JobExecutionResult executeWithTimeout(JobGraph jobGraph, MiniCluster miniCluster, long timeoutMs) throws Exception;
}
public class TestEnvironmentUtil {
public static Configuration createTestClusterConfig(int parallelism, int numTaskManagers);
public static StreamExecutionEnvironment createTestStreamEnv(int parallelism, boolean checkpointingEnabled);
}// Core test data types
public class CustomType {
public String myString;
public int myInt;
public CustomType(String myString, int myInt);
}
public class POJO {
public int number;
public String str;
public POJO();
public POJO(int number, String str);
}
public class NestedPojo {
public POJO nested;
public long longField;
public NestedPojo(POJO nested, long longField);
}
public class CrazyNested {
public NestedPojo nestedPojo;
public POJO simplePojo;
public String stringField;
public CrazyNested(NestedPojo nestedPojo, POJO simplePojo, String stringField);
}
public class PojoWithDateAndEnum {
public Date dateField;
public TestEnum enumField;
public String stringField;
public PojoWithDateAndEnum(Date dateField, TestEnum enumField, String stringField);
public enum TestEnum { VALUE1, VALUE2, VALUE3 }
}
public class PojoWithCollectionGeneric {
public List<String> stringList;
public Map<String, Integer> stringIntMap;
public Set<Long> longSet;
public PojoWithCollectionGeneric(List<String> stringList, Map<String, Integer> stringIntMap, Set<Long> longSet);
}
// Fault tolerance types
public class PrefixCount {
public String prefix;
public Integer value;
public Long count;
public PrefixCount(String prefix, Integer value, Long count);
}
// Migration testing types
public class SnapshotSpec {
public String getSnapshotPath();
public String getSnapshotVersion();
}
// Event system types
public interface TestEvent {
String getOperatorId();
long getTimestamp();
}
public class OperatorStartedEvent implements TestEvent;
public class OperatorFinishedEvent implements TestEvent;
public class CheckpointStartedEvent implements TestEvent;
public class CheckpointCompletedEvent implements TestEvent;
public class InputEndedEvent implements TestEvent;
public class WatermarkReceivedEvent implements TestEvent;
public class TestCommandAckEvent implements TestEvent;
// Command system types
public interface TestCommand {
void execute(StreamOperator<?> operator);
String getCommandType();
}
public enum TestCommandScope {
ALL_SUBTASKS, SINGLE_SUBTASK
}
// Test data element
public class TestDataElement {
public String value;
public long timestamp;
public TestDataElement(String value, long timestamp);
}
// Session window testing types
public class SessionEvent {
public String getSessionId();
public long getTimestamp();
public TestEventPayload getPayload();
}
public class TestEventPayload {
public String data;
public Map<String, Object> properties;
}
// Enumeration types
public enum FailoverStrategy {
RestartAllFailoverStrategy,
RestartPipelinedRegionFailoverStrategy
}
public enum ExecutionMode {
CREATE_SNAPSHOT,
VERIFY_SNAPSHOT
}
public enum SnapshotType {
SAVEPOINT_CANONICAL,
SAVEPOINT_NATIVE,
CHECKPOINT
}
// Configuration types
public class SessionConfiguration {
public long sessionTimeout;
public long sessionGap;
public int maxConcurrentSessions;
}
public class TestConfiguration {
public int parallelism;
public long checkpointInterval;
public boolean enableCheckpointing;
}