or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

class-loading-programs.mdindex.mdmigration-testing.mdperformance-testing.mdstate-management-testing.mdstreaming-utilities.mdtest-base-classes.mdtest-data-generation.md
tile.json

tessl/maven-org-apache-flink--flink-tests-2-11

Comprehensive testing infrastructure and utilities for Apache Flink stream processing framework

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-tests_2.11@1.8.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-tests-2-11@1.8.0

index.mddocs/

Flink Tests

The Flink Tests module provides comprehensive testing infrastructure and utilities for Apache Flink applications. This test-jar package offers data generators, test base classes, migration testing frameworks, and specialized utilities for testing stream processing applications with fault tolerance, state management, and performance validation.

Package Information

  • Package Name: flink-tests_2.11
  • Package Type: maven
  • Language: Java/Scala
  • Installation:
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-tests_2.11</artifactId>
      <version>1.8.3</version>
      <type>test-jar</type>
      <scope>test</scope>
    </dependency>

Core Imports

// Test data generation
import org.apache.flink.test.operators.util.CollectionDataSets;
import org.apache.flink.test.operators.util.ValueCollectionDataSets;

// Input format utilities
import org.apache.flink.test.util.UniformIntTupleGeneratorInputFormat;
import org.apache.flink.test.util.InfiniteIntegerTupleInputFormat;
import org.apache.flink.test.util.InfiniteIntegerInputFormat;

// Migration testing
import org.apache.flink.test.checkpointing.utils.SavepointMigrationTestBase;
import org.apache.flink.test.checkpointing.utils.MigrationTestUtils;

// Streaming utilities and fault injection
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
import org.apache.flink.test.streaming.runtime.util.TestListWrapper;
import org.apache.flink.test.checkpointing.utils.FailingSource;
import org.apache.flink.test.checkpointing.utils.ValidatingSink;

// Test base classes
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
import org.apache.flink.test.recovery.SimpleRecoveryITCaseBase;

// State management testing
import org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase;
import org.apache.flink.test.state.operator.restore.AbstractKeyedOperatorRestoreTestBase;
import org.apache.flink.test.state.operator.restore.AbstractNonKeyedOperatorRestoreTestBase;

// Test functions
import org.apache.flink.test.testfunctions.Tokenizer;

Basic Usage

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.test.operators.util.CollectionDataSets;

// Create test data for batch processing tests
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Long, String>> testData = CollectionDataSets.get3TupleDataSet(env);

// Use the data in your test
testData.map(/* your transformation */).collect();
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.streaming.runtime.util.TestListResultSink;

// Collect results in streaming tests
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TestListResultSink<String> sink = new TestListResultSink<>();

dataStream.addSink(sink);
env.execute();

List<String> results = sink.getResult();

Architecture

The Flink Tests module is organized around several key components:

  • Data Generation: Standardized test datasets for various data types and structures
  • Migration Framework: Complete infrastructure for testing savepoint and checkpoint migration across Flink versions
  • Streaming Utilities: Thread-safe result collection and streaming-specific test helpers
  • Test Base Classes: Abstract base classes providing common test patterns for different scenarios
  • Fault Tolerance Testing: Specialized sources and sinks for testing failure scenarios and recovery
  • State Management Testing: Framework for validating operator state restoration and migration
  • Performance Testing: Manual programs for benchmarking and scalability testing

Capabilities

Test Data Generation

Comprehensive data generators providing standardized test datasets, input format utilities, and test functions for DataSet and DataStream operations. Includes tuple data, POJO data, nested structures, custom types, infinite data generators, and common transformation functions.

// 3-tuple dataset with 21 records
public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env);

// Custom POJO dataset with 21 records  
public static DataSet<CustomType> getCustomTypeDataSet(ExecutionEnvironment env);

// Small datasets for quick tests (3 records each)
public static DataSet<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(ExecutionEnvironment env);

// Input format utilities for continuous data generation
public class UniformIntTupleGeneratorInputFormat extends GenericInputFormat<Tuple2<Integer, Integer>> {
    public UniformIntTupleGeneratorInputFormat(int numKeys, int numValsPerKey);
}

public class InfiniteIntegerInputFormat extends GenericInputFormat<Integer> {
    public InfiniteIntegerInputFormat(boolean addDelay);
}

// Test functions for common transformations
public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out);
}

Test Data Generation

Migration Testing Framework

Complete framework for testing savepoint and checkpoint migration across different Flink versions. Provides base classes, utilities, and pre-configured sources/sinks for migration validation.

public abstract class SavepointMigrationTestBase extends TestBaseUtils {
    protected String getResourceFilename(String filename);
    protected void executeAndSavepoint(StreamExecutionEnvironment env, String savepointPath, 
                                       Tuple2<String, Integer>... expectedAccumulators) throws Exception;
    protected void restoreAndExecute(StreamExecutionEnvironment env, String savepointPath,
                                     Tuple2<String, Integer>... expectedAccumulators) throws Exception;
}

Migration Testing Framework

Streaming Test Utilities

Thread-safe utilities for collecting and validating results in streaming applications. Includes result sinks, test wrappers, fault injection sources, and streaming-specific helper functions.

public class TestListResultSink<T> extends RichSinkFunction<T> {
    public List<T> getResult();
    public List<T> getSortedResult();
}

public class TestListWrapper {
    public static TestListWrapper getInstance();
    public int createList();
    public List<Object> getList(int listId);
}

public class FailingSource<T> extends RichSourceFunction<T> {
    public FailingSource(EventEmittingGenerator<T> generator, int failAfterElements);
    public static interface EventEmittingGenerator<T> extends Serializable {
        public void emitEvent(SourceContext<T> ctx, int eventSequenceNo);
    }
}

public class ValidatingSink<T> extends RichSinkFunction<T> {
    public ValidatingSink(ResultChecker<T> resultChecker, CountUpdater countUpdater);
    public static interface ResultChecker<T> extends Serializable {
        public boolean checkResult(T result);
    }
    public static interface CountUpdater extends Serializable {
        public void updateCount(long count);
    }
}

Streaming Test Utilities

Test Base Classes

Abstract base classes providing common patterns for different testing scenarios including cancellation testing, fault tolerance testing, and recovery testing.

public abstract class StreamFaultToleranceTestBase extends TestLogger {
    public static final int NUM_TASK_MANAGERS = 2;
    public static final int NUM_TASK_SLOTS = 8;  
    public static final int PARALLELISM = 4;
    
    public abstract void testProgram(StreamExecutionEnvironment env);
    public abstract void postSubmit() throws Exception;
}

public abstract class SimpleRecoveryITCaseBase extends TestLogger {
    protected abstract void executeRecoveryTest() throws Exception;
}

Test Base Classes

Class Loading Test Programs

Complete programs for testing dynamic class loading, user code isolation, and class loading policies. Each program serves as a standalone test case for different class loading scenarios.

public class StreamingProgram {
    public static void main(String[] args) throws Exception;
}

public class CheckpointedStreamingProgram {
    public static void main(String[] args) throws Exception;
}

public class KMeansForTest {
    public static void main(String[] args) throws Exception;
    public static class Point { /* 2D point representation */ }
    public static class Centroid extends Point { /* cluster center */ }
}

Class Loading Test Programs

State Management Testing

Framework for testing operator state restoration and migration, including utilities for both keyed and non-keyed state scenarios.

public enum ExecutionMode {
    GENERATE, MIGRATE, RESTORE
}

public abstract class AbstractOperatorRestoreTestBase {
    protected abstract StreamExecutionEnvironment createMigrationJob(String savepointPath) throws Exception;
    protected abstract StreamExecutionEnvironment createRestoredJob(String savepointPath) throws Exception;
    protected abstract String getMigrationSavepointName();
}

public abstract class AbstractKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {
    // Specialized testing for keyed state operators
}

public abstract class AbstractNonKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {
    // Specialized testing for non-keyed state operators
}

public class KeyedJob {
    public static void main(String[] args) throws Exception;
}

public class NonKeyedJob {
    public static void main(String[] args) throws Exception;
}

State Management Testing

Performance Testing Programs

Manual programs for performance benchmarking, scalability testing, and resource usage validation. These programs are designed for manual execution and analysis.

public class MassiveStringSorting {
    public static void main(String[] args) throws Exception;
}

public class StreamingScalabilityAndLatency {
    public static void main(String[] args) throws Exception;
}

public class ReducePerformance {
    public static void main(String[] args) throws Exception;
}

Performance Testing Programs

Common Types

// Test data POJO
public static class CustomType {
    public int myInt;
    public long myLong;
    public String myString;
    
    public CustomType();
    public CustomType(int i, long l, String s);
}

// Success indication exception
public class SuccessException extends RuntimeException {
    public SuccessException();
}

// Simple integer wrapper for testing
public class IntType {
    public int value;
    public IntType(int value);
}

// Test enum
public enum Category {
    CAT_A, CAT_B
}

Error Handling

The testing framework includes specialized exceptions and error handling patterns:

  • SuccessException: RuntimeException thrown to indicate successful test completion in scenarios where normal completion is not expected
  • Fault injection: Controlled failure injection through FailingSource and related utilities
  • Validation failures: Clear error reporting through ValidatingSink and migration test utilities
  • Timeout handling: Configurable timeouts for long-running test operations

Tests should handle these exceptions appropriately and use the provided error injection mechanisms for fault tolerance testing.