CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-test-utils-parent

Comprehensive testing utilities for Apache Flink stream processing framework

Pending
Overview
Eval results
Files

test-environments.mddocs/

Test Environments and Data Sources

Test execution environments, data sources, and utilities for creating controlled testing scenarios in Flink applications. These utilities enable comprehensive testing of streaming and batch applications with predictable test data and isolated execution contexts.

Capabilities

Test Environment Management

TestStreamEnvironment

Test-specific stream execution environment that provides controlled execution for testing streaming applications.

/**
 * Test-specific stream execution environment
 * Extends StreamExecutionEnvironment with test-friendly defaults
 */
class TestStreamEnvironment extends StreamExecutionEnvironment {
    /** Create a test stream environment with default parallelism of 1 */
    static TestStreamEnvironment createTestEnvironment();
    
    /** Create test environment with specific parallelism */
    static TestStreamEnvironment createTestEnvironment(int parallelism);
}

MultipleProgramsTestBase

Base classes for tests that need to run multiple Flink programs in sequence.

/**
 * Base class for tests running multiple programs
 * Provides clean execution environment for each program
 */
abstract class MultipleProgramsTestBase extends AbstractTestBase {
    /** Get execution environment for current test */
    ExecutionEnvironment getExecutionEnvironment();
    
    /** Get stream execution environment for current test */
    StreamExecutionEnvironment getStreamExecutionEnvironment();
}

/**
 * JUnit 4 version of multiple programs test base
 */
abstract class MultipleProgramsTestBaseJUnit4 extends AbstractTestBaseJUnit4 {
    ExecutionEnvironment getExecutionEnvironment();
    StreamExecutionEnvironment getStreamExecutionEnvironment();
}

Usage Examples:

import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.MultipleProgramsTestBase;

// Using TestStreamEnvironment
TestStreamEnvironment env = TestStreamEnvironment.createTestEnvironment();
DataStream<String> source = env.fromElements("hello", "world");
// Configure and execute test pipeline

// Using MultipleProgramsTestBase
public class MyIntegrationTest extends MultipleProgramsTestBase {
    @Test
    public void testProgram1() {
        StreamExecutionEnvironment env = getStreamExecutionEnvironment();
        // Test first program
    }
    
    @Test
    public void testProgram2() {
        StreamExecutionEnvironment env = getStreamExecutionEnvironment();
        // Test second program with fresh environment
    }
}

Test Data Sources

FiniteTestSource

Finite data source for streaming tests that emits a predetermined set of elements.

/**
 * Finite test data source for streaming tests
 * Implements SourceFunction and CheckpointListener for complete lifecycle testing
 */
class FiniteTestSource<T> implements SourceFunction<T>, CheckpointListener {
    /** Create source with collection of elements */
    FiniteTestSource(Collection<T> elements);
    
    /** Create source with array of elements */
    FiniteTestSource(T... elements);
    
    /** Create source with elements and emission delay */
    FiniteTestSource(Collection<T> elements, long delayBetweenElements);
    
    void run(SourceContext<T> ctx) throws Exception;
    void cancel();
    void notifyCheckpointComplete(long checkpointId);
}

Usage Examples:

import org.apache.flink.streaming.util.FiniteTestSource;

// Simple test source
FiniteTestSource<Integer> source = new FiniteTestSource<>(1, 2, 3, 4, 5);
DataStream<Integer> stream = env.addSource(source);

// Source with delay between elements
List<String> testData = Arrays.asList("a", "b", "c");
FiniteTestSource<String> delayedSource = 
    new FiniteTestSource<>(testData, 100); // 100ms between elements
DataStream<String> delayedStream = env.addSource(delayedSource);

Test Data Sinks

TestListResultSink

Sink that collects streaming results in a list for easy verification in tests.

/**
 * Sink that collects results in a list for test verification
 */
class TestListResultSink<T> extends RichSinkFunction<T> {
    /** Create sink with result collection */
    TestListResultSink(List<T> resultList);
    
    /** Create sink with thread-safe result collection */
    static <T> TestListResultSink<T> createThreadSafe();
    
    void invoke(T value, Context context);
    
    /** Get collected results (thread-safe) */
    List<T> getResults();
    
    /** Clear collected results */
    void clear();
}

Usage Examples:

import org.apache.flink.test.streaming.runtime.util.TestListResultSink;

// Collect results for verification
List<String> results = new ArrayList<>();
TestListResultSink<String> sink = new TestListResultSink<>(results);

DataStream<String> stream = env.fromElements("hello", "world");
stream.addSink(sink);
env.execute();

// Verify results
assertEquals(Arrays.asList("hello", "world"), results);

// Thread-safe version
TestListResultSink<Integer> threadSafeSink = TestListResultSink.createThreadSafe();
stream.addSink(threadSafeSink);
env.execute();
List<Integer> safeResults = threadSafeSink.getResults();

Upsert Testing Framework

Specialized testing framework for upsert operations in streaming applications.

UpsertTestSink

Test sink specifically designed for testing upsert behavior in streaming applications.

/**
 * Test sink for upsert operations
 * Tracks inserts, updates, and deletes separately
 */
class UpsertTestSink<IN> implements Sink<IN> {
    /** Get all received records with their operation types */
    List<UpsertRecord<IN>> getUpsertResults();
    
    /** Get only insert records */
    List<IN> getInsertResults();
    
    /** Get only update records */  
    List<IN> getUpdateResults();
    
    /** Get only delete records */
    List<IN> getDeleteResults();
    
    /** Clear all collected results */
    void clearResults();
}

/**
 * Builder for UpsertTestSink
 */
class UpsertTestSinkBuilder<IN> {
    /** Set key fields for upsert operations */
    UpsertTestSinkBuilder<IN> setKeyFields(String... keyFields);
    
    /** Enable changelog mode tracking */
    UpsertTestSinkBuilder<IN> enableChangelogMode();
    
    /** Build the upsert test sink */
    UpsertTestSink<IN> build();
}

/**
 * Record in upsert sink with operation type
 */
class UpsertRecord<T> {
    T getRecord();
    UpsertOperation getOperation(); // INSERT, UPDATE, DELETE
    long getTimestamp();
}

Usage Examples:

import org.apache.flink.connector.upserttest.sink.UpsertTestSink;
import org.apache.flink.connector.upserttest.sink.UpsertTestSinkBuilder;

// Create upsert test sink
UpsertTestSink<MyRecord> upsertSink = new UpsertTestSinkBuilder<MyRecord>()
    .setKeyFields("id")
    .enableChangelogMode()
    .build();

// Use in streaming pipeline
DataStream<MyRecord> changelogStream = // ... your upsert stream
changelogStream.sinkTo(upsertSink);
env.execute();

// Verify upsert behavior
List<MyRecord> inserts = upsertSink.getInsertResults();
List<MyRecord> updates = upsertSink.getUpdateResults();
List<UpsertRecord<MyRecord>> allChanges = upsertSink.getUpsertResults();

assertEquals(3, inserts.size());
assertEquals(2, updates.size());

Test Data Generators

Pre-built test datasets for common algorithms and testing scenarios.

Algorithm Test Data

/**
 * Test data for connected components algorithm
 */
class ConnectedComponentsData {
    /** Get default vertex data for testing */
    static DataSet<Tuple2<Long, Long>> getDefaultVertexDataSet(ExecutionEnvironment env);
    
    /** Get default edge data for testing */
    static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env);
    
    /** Get expected results for default dataset */
    static String getExpectedResult();
}

/**
 * Test data for word count examples
 */
class WordCountData {
    /** Get default text data for word count testing */
    static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env);
    
    /** Get expected word count results */
    static String getExpectedResult();
}

/**
 * Test data for K-means clustering
 */
class KMeansData {
    /** Get default point data for clustering */
    static DataSet<Point> getDefaultPointDataSet(ExecutionEnvironment env);
    
    /** Get default centroid data */
    static DataSet<Centroid> getDefaultCentroidDataSet(ExecutionEnvironment env);
    
    /** Get expected clustering results */
    static String getExpectedResult();
}

/**
 * Test data for PageRank algorithm
 */
class PageRankData {
    /** Get default vertices for PageRank testing */
    static DataSet<Tuple2<Long, Double>> getDefaultVerticesDataSet(ExecutionEnvironment env);
    
    /** Get default edges for PageRank testing */
    static DataSet<Tuple2<Long, Long>> getDefaultEdgesDataSet(ExecutionEnvironment env);
    
    /** Get expected PageRank results */
    static String getExpectedResult();
}

Usage Examples:

import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.testdata.ConnectedComponentsData;

// Word count test
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = WordCountData.getDefaultTextLineDataSet(env);
// Run word count algorithm
String result = // ... algorithm result
assertEquals(WordCountData.getExpectedResult(), result);

// Connected components test  
DataSet<Tuple2<Long, Long>> vertices = 
    ConnectedComponentsData.getDefaultVertexDataSet(env);
DataSet<Tuple2<Long, Long>> edges = 
    ConnectedComponentsData.getDefaultEdgeDataSet(env);
// Run connected components algorithm

File and Process Utilities

TestBaseUtils and FileUtils

Utilities for file operations and process management in tests.

/**
 * Base utilities for Flink tests
 */
class TestBaseUtils {
    /** Create temporary directory for test data */
    static String createTempDirectory();
    
    /** Clean up test resources */
    static void cleanup();
    
    /** Compare result files with expected output */
    static void compareResultsByLinesInMemory(String expectedResult, String actualResult);
}

/**
 * File utilities for testing
 */
class FileUtils {
    /** Write string to temporary file */
    static Path writeToTempFile(String content, String suffix);
    
    /** Read entire file as string */
    static String readFileAsString(Path file);
    
    /** Create temporary directory */
    static Path createTempDirectory(String prefix);
    
    /** Delete directory recursively */
    static void deleteDirectoryRecursively(Path directory);
}

/**
 * Build and run test processes
 */
class TestProcessBuilder {
    /** Create process builder for Java application */
    static TestProcessBuilder createJavaProcess(Class<?> mainClass);
    
    /** Add JVM arguments */
    TestProcessBuilder addJvmArgs(String... args);
    
    /** Add program arguments */
    TestProcessBuilder addArgs(String... args);
    
    /** Start the process and wait for completion */
    ProcessResult start();
}

Usage Examples:

import org.apache.flink.test.util.FileUtils;
import org.apache.flink.test.util.TestProcessBuilder;

// File operations
Path tempFile = FileUtils.writeToTempFile("test data", ".txt");
String content = FileUtils.readFileAsString(tempFile);
assertEquals("test data", content);

// Process testing
ProcessResult result = TestProcessBuilder
    .createJavaProcess(MyFlinkJob.class)
    .addJvmArgs("-Xmx1g")
    .addArgs("--input", "test-input.txt")
    .start();

assertEquals(0, result.getExitCode());

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-test-utils-parent

docs

client-testing.md

connector-testing.md

core-testing.md

index.md

migration-testing.md

table-testing.md

test-environments.md

tile.json