or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

cancellation-testing.mdcheckpointing-migration.mdfault-tolerance-recovery.mdindex.mdoperator-lifecycle.mdplugin-testing.mdruntime-utilities.mdsession-window-testing.mdstate-backend-restore.mdtest-data-utilities.md
tile.json

tessl/maven-org-apache-flink--flink-tests

Comprehensive test library for Apache Flink stream processing framework with integration tests, test utilities, and end-to-end validation tests.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-tests@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-tests@2.1.0

index.mddocs/

Apache Flink Tests

Apache Flink Tests is a comprehensive test library providing reusable testing infrastructure, utilities, and frameworks for testing Apache Flink stream processing functionality. This module packages as a test-jar, making its test utilities available to other Flink modules for comprehensive validation of streaming, batch processing, state management, and fault tolerance features.

Package Information

  • Package Name: flink-tests
  • Package Type: maven
  • Language: Java
  • Installation: Add as test-jar dependency in Maven projects
  • Coordinates: org.apache.flink:flink-tests:2.1.0

Core Imports

// Test utilities and base classes
import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase;
import org.apache.flink.test.checkpointing.utils.MigrationTestUtils;
import org.apache.flink.test.recovery.SimpleRecoveryITCaseBase;
import org.apache.flink.test.util.JobGraphRunningUtil;

// Test data and operators
import org.apache.flink.test.operators.util.CollectionDataStreams;
import org.apache.flink.runtime.operators.lifecycle.graph.TestJobBuilders;

Maven dependency (test-jar):

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-tests</artifactId>
    <version>2.1.0</version>
    <type>test-jar</type>
    <scope>test</scope>
</dependency>

Basic Usage

import org.apache.flink.test.checkpointing.utils.SnapshotMigrationTestBase;
import org.apache.flink.test.operators.util.CollectionDataStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.api.java.tuple.Tuple3;

// Example: Using standard test data
public class MyFlinkTest {
    
    @Test
    public void testWithStandardData() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        DataStreamSource<Tuple3<Integer, Long, String>> testData = 
            CollectionDataStreams.get3TupleDataSet(env);
        
        // Use test data in your Flink job
        DataStream<Integer> result = testData
            .map(t -> t.f0)
            .filter(x -> x > 5);
        
        // Add sink and execute
        result.print();
        env.execute("Test Job");
    }
}

// Example: Testing migration scenarios
public class MigrationTest extends SnapshotMigrationTestBase {
    
    @Test
    public void testStateMigration() throws Exception {
        // Create and execute job with state
        JobGraph job = createJobWithOperatorState();
        SnapshotSpec snapshot = executeAndSnapshot(job);
        
        // Restore and validate in new version
        JobGraph restoredJob = createUpdatedJob();
        restoreAndExecute(restoredJob, snapshot);
    }
}

Architecture

Apache Flink Tests is organized around several key testing frameworks:

  • Test Infrastructure: Base classes and utilities for setting up test environments
  • Migration Framework: Comprehensive support for testing snapshot migration across Flink versions
  • Fault Tolerance Framework: Failure injection mechanisms and recovery testing utilities
  • Operator Lifecycle Framework: Complete framework for testing streaming operator behavior and lifecycle events
  • State Management Framework: Extensive support for testing state backends, checkpointing, and state migration
  • Test Data Framework: Standardized datasets, POJOs, and data generators for consistent testing across modules

Capabilities

Checkpointing and Migration Testing

Comprehensive framework for testing snapshot migration across Flink versions with utilities for state validation and checkpoint management.

public abstract class SnapshotMigrationTestBase {
    protected SnapshotSpec executeAndSnapshot(JobGraph job) throws Exception;
    protected void restoreAndExecute(JobGraph job, SnapshotSpec snapshot) throws Exception;
    
    public static class SnapshotSpec {
        public String getSnapshotPath();
        public String getSnapshotVersion();
    }
}

Checkpointing and Migration

Fault Tolerance and Recovery Testing

Multiple failure injection mechanisms and recovery testing utilities for validating Flink's fault tolerance capabilities.

public abstract class SimpleRecoveryITCaseBase {
    protected void runAndCancelJob(JobGraph jobGraph) throws Exception;
    
    public static class FailingMapper1 implements MapFunction<Integer, Integer>;
    public static class FailingMapper2 implements MapFunction<Integer, Integer>;
}

Fault Tolerance and Recovery

Operator Lifecycle Testing

Complete framework for testing streaming operator behavior including startup, checkpointing, finishing, and shutdown phases.

public class TestJobBuilders {
    public static final TestJobBuilder SIMPLE_GRAPH_BUILDER;
    public static final TestJobBuilder COMPLEX_GRAPH_BUILDER;
}

public class OneInputTestStreamOperator extends AbstractStreamOperator<TestDataElement> 
    implements OneInputStreamOperator<TestDataElement, TestDataElement>;

public class TestEventQueue {
    public void add(TestEvent event);
    public List<TestEvent> getEvents();
}

Operator Lifecycle Testing

State Backend and Operator Restore Testing

Framework for testing state backend switching, operator restore scenarios, and state migration validation.

public abstract class AbstractOperatorRestoreTestBase {
    protected void testRestore() throws Exception;
}

public abstract class SavepointStateBackendSwitchTestBase {
    protected void testSwitchingStateBackend() throws Exception;
}

State Backend and Restore Testing

Standardized Test Data and Utilities

Reusable datasets, POJOs, and runtime utilities for consistent testing across Flink modules.

public class CollectionDataStreams {
    public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env);
    public static DataSet<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(ExecutionEnvironment env);
}

public class JobGraphRunningUtil {
    public static void execute(JobGraph jobGraph, MiniCluster miniCluster) throws Exception;
}

Test Data and Utilities

Cancellation Testing Framework

Framework for testing job cancellation scenarios and cleanup behavior.

public abstract class CancelingTestBase {
    protected void runAndCancelJob(JobGraph jobGraph, long cancelAfterMs) throws Exception;
}

Cancellation Testing

Session Window Testing Framework

Specialized testing framework for session window functionality with event generation and validation.

public class EventGeneratorFactory {
    public static SessionEventGenerator create(SessionConfiguration config);
}

public class SessionEvent {
    public String getSessionId();
    public long getTimestamp();
    public TestEventPayload getPayload();
}

Session Window Testing

Plugin Testing Framework

Framework for testing Flink's plugin system and service provider interface (SPI) implementations.

public abstract class PluginTestBase {
    protected void testPluginLoading() throws Exception;
}

Plugin Testing

Runtime Utilities

Job execution utilities, process management, and common testing operations for controlled test environments.

public class JobGraphRunningUtil {
    public static void execute(JobGraph jobGraph, MiniCluster miniCluster) throws Exception;
    public static JobExecutionResult executeWithTimeout(JobGraph jobGraph, MiniCluster miniCluster, long timeoutMs) throws Exception;
}

public class TestEnvironmentUtil {
    public static Configuration createTestClusterConfig(int parallelism, int numTaskManagers);
    public static StreamExecutionEnvironment createTestStreamEnv(int parallelism, boolean checkpointingEnabled);
}

Runtime Utilities

Types

// Core test data types
public class CustomType {
    public String myString;
    public int myInt;
    public CustomType(String myString, int myInt);
}

public class POJO {
    public int number;
    public String str;
    public POJO();
    public POJO(int number, String str);
}

public class NestedPojo {
    public POJO nested;
    public long longField;
    public NestedPojo(POJO nested, long longField);
}

public class CrazyNested {
    public NestedPojo nestedPojo;
    public POJO simplePojo;
    public String stringField;
    public CrazyNested(NestedPojo nestedPojo, POJO simplePojo, String stringField);
}

public class PojoWithDateAndEnum {
    public Date dateField;
    public TestEnum enumField;
    public String stringField;
    public PojoWithDateAndEnum(Date dateField, TestEnum enumField, String stringField);
    
    public enum TestEnum { VALUE1, VALUE2, VALUE3 }
}

public class PojoWithCollectionGeneric {
    public List<String> stringList;
    public Map<String, Integer> stringIntMap;
    public Set<Long> longSet;
    public PojoWithCollectionGeneric(List<String> stringList, Map<String, Integer> stringIntMap, Set<Long> longSet);
}

// Fault tolerance types
public class PrefixCount {
    public String prefix;
    public Integer value;
    public Long count;
    public PrefixCount(String prefix, Integer value, Long count);
}

// Migration testing types
public class SnapshotSpec {
    public String getSnapshotPath();
    public String getSnapshotVersion();
}

// Event system types  
public interface TestEvent {
    String getOperatorId();
    long getTimestamp();
}

public class OperatorStartedEvent implements TestEvent;
public class OperatorFinishedEvent implements TestEvent;
public class CheckpointStartedEvent implements TestEvent;
public class CheckpointCompletedEvent implements TestEvent;
public class InputEndedEvent implements TestEvent;
public class WatermarkReceivedEvent implements TestEvent;
public class TestCommandAckEvent implements TestEvent;

// Command system types
public interface TestCommand {
    void execute(StreamOperator<?> operator);
    String getCommandType();
}

public enum TestCommandScope {
    ALL_SUBTASKS, SINGLE_SUBTASK
}

// Test data element
public class TestDataElement {
    public String value;
    public long timestamp;
    public TestDataElement(String value, long timestamp);
}

// Session window testing types
public class SessionEvent {
    public String getSessionId();
    public long getTimestamp();
    public TestEventPayload getPayload();
}

public class TestEventPayload {
    public String data;
    public Map<String, Object> properties;
}

// Enumeration types
public enum FailoverStrategy {
    RestartAllFailoverStrategy,
    RestartPipelinedRegionFailoverStrategy
}

public enum ExecutionMode {
    CREATE_SNAPSHOT,
    VERIFY_SNAPSHOT
}

public enum SnapshotType {
    SAVEPOINT_CANONICAL,
    SAVEPOINT_NATIVE,
    CHECKPOINT
}

// Configuration types
public class SessionConfiguration {
    public long sessionTimeout;
    public long sessionGap;
    public int maxConcurrentSessions;
}

public class TestConfiguration {
    public int parallelism;
    public long checkpointInterval;
    public boolean enableCheckpointing;
}