or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdsecurity-testing.mdtest-base-classes.mdtest-data.mdtest-environments.mdtest-utilities.md
tile.json

test-utilities.mddocs/

Test Utilities

TestBaseUtils provides comprehensive utilities for Flink testing including cluster management, result comparison, file I/O operations, and test data handling.

Cluster Management

Starting and Stopping Clusters

public class TestBaseUtils extends TestLogger {
    public static FiniteDuration DEFAULT_TIMEOUT;
    
    // Start a mini cluster with full configuration
    public static LocalFlinkMiniCluster startCluster(int numTaskManagers, int taskManagerNumSlots, 
                                                     boolean startWebserver, boolean startZooKeeper, 
                                                     boolean singleActorSystem) throws Exception;
    
    // Start cluster with configuration object
    public static LocalFlinkMiniCluster startCluster(Configuration config, boolean singleActorSystem) throws Exception;
    
    // Stop cluster with timeout
    public static void stopCluster(LocalFlinkMiniCluster executor, FiniteDuration timeout) throws Exception;
}

Usage Example:

// Start a cluster with 2 task managers, 4 slots each
LocalFlinkMiniCluster cluster = TestBaseUtils.startCluster(2, 4, false, false, true);

try {
    // Use cluster for testing
    TestEnvironment testEnv = new TestEnvironment(cluster, 4, false);
    // ... run tests
} finally {
    // Always clean up
    TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
}

Result Reading and File I/O

Reading Test Results

// Get readers for result files
public static BufferedReader[] getResultReader(String resultPath) throws IOException;
public static BufferedReader[] getResultReader(String resultPath, String[] excludePrefixes, 
                                             boolean inOrderOfFiles) throws IOException;

// Get input streams for result files  
public static BufferedInputStream[] getResultInputStream(String resultPath) throws IOException;
public static BufferedInputStream[] getResultInputStream(String resultPath, String[] excludePrefixes) throws IOException;

// Read all result lines into a list
public static void readAllResultLines(List<String> target, String resultPath) throws IOException;
public static void readAllResultLines(List<String> target, String resultPath, String[] excludePrefixes) throws IOException;
public static void readAllResultLines(List<String> target, String resultPath, String[] excludePrefixes, 
                                    boolean inOrderOfFiles) throws IOException;

Usage Example:

// Read results from output directory
List<String> results = new ArrayList<>();
TestBaseUtils.readAllResultLines(results, "/path/to/output");

// Read results excluding log files
String[] excludePatterns = {"_logs", ".log"};
TestBaseUtils.readAllResultLines(results, "/path/to/output", excludePatterns);

// Process results
for (String line : results) {
    System.out.println("Result: " + line);
}

Result Comparison and Validation

Text-based Result Comparison

// Compare results line by line (order doesn't matter)
public static void compareResultsByLinesInMemory(String expectedResultStr, String resultPath) throws Exception;
public static void compareResultsByLinesInMemory(String expectedResultStr, String resultPath, 
                                               String[] excludePrefixes) throws Exception;

// Compare results with strict ordering
public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr, String resultPath) throws Exception;
public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr, String resultPath, 
                                                               String[] excludePrefixes) throws Exception;

// Check results against regular expression
public static void checkLinesAgainstRegexp(String resultPath, String regexp);

Usage Example:

// Compare unordered results
String expected = "apple\nbanana\ncherry";
TestBaseUtils.compareResultsByLinesInMemory(expected, "/path/to/results");

// Compare with strict ordering
TestBaseUtils.compareResultsByLinesInMemoryWithStrictOrder(expected, "/path/to/results");

// Validate format with regex
TestBaseUtils.checkLinesAgainstRegexp("/path/to/results", "\\d+,\\w+");

Collection-based Result Comparison

// Compare collected results as tuples
public static <T> void compareResultAsTuples(List<T> result, String expected);

// Compare collected results as text
public static <T> void compareResultAsText(List<T> result, String expected);

// Compare with strict ordering
public static <T> void compareOrderedResultAsText(List<T> result, String expected);
public static <T> void compareOrderedResultAsText(List<T> result, String expected, boolean asTuples);

// Check if results contain expected values
public static <T> void containsResultAsText(List<T> result, String expected);

// Generic comparison with custom comparator
public static <X> void compareResultCollections(List<X> expected, List<X> actual, Comparator<X> comparator);

Usage Example:

// Test word count results
DataSet<Tuple2<String, Integer>> wordCounts = // ... your computation
List<Tuple2<String, Integer>> results = wordCounts.collect();

String expected = "apple,3\nbanana,1\ncherry,2";
TestBaseUtils.compareResultAsTuples(results, expected);

// Test simple string results
DataSet<String> words = // ... your computation  
List<String> wordList = words.collect();

String expectedWords = "apple\nbanana\ncherry";
TestBaseUtils.compareResultAsText(wordList, expectedWords);

Numerical Result Comparison

// Compare key-value pairs with delta tolerance
public static void compareKeyValuePairsWithDelta(String expectedLines, String resultPath, 
                                               String delimiter, double maxDelta) throws Exception;
public static void compareKeyValuePairsWithDelta(String expectedLines, String resultPath, String[] excludePrefixes,
                                               String delimiter, double maxDelta) throws Exception;

Usage Example:

// Compare floating-point results with tolerance
String expected = "pi,3.14159\ne,2.71828";
TestBaseUtils.compareKeyValuePairsWithDelta(expected, "/path/to/results", ",", 0.001);

Utility Methods

Test Configuration

// Convert configurations to parameterized test parameters
protected static Collection<Object[]> toParameterList(Configuration... testConfigs);
protected static Collection<Object[]> toParameterList(List<Configuration> testConfigs);

// Set environment variables for testing
public static void setEnv(Map<String, String> newenv);

Usage Example:

// Create parameterized test configurations
Configuration config1 = new Configuration();
config1.setString("key1", "value1");

Configuration config2 = new Configuration();  
config2.setString("key2", "value2");

@Parameterized.Parameters
public static Collection<Object[]> getConfigurations() {
    return TestBaseUtils.toParameterList(config1, config2);
}

// Set test environment variables
Map<String, String> testEnv = new HashMap<>();
testEnv.put("FLINK_HOME", "/test/flink");
TestBaseUtils.setEnv(testEnv);

Path and URL Utilities

// Construct test resource paths
public static String constructTestPath(Class<?> forClass, String folder);
public static String constructTestURI(Class<?> forClass, String folder);

// Fetch content from HTTP endpoint
public static String getFromHTTP(String url) throws Exception;

Usage Example:

// Get test data path relative to test class
String testDataPath = TestBaseUtils.constructTestPath(MyTest.class, "testdata");
String inputFile = testDataPath + "/input.txt";

// Construct test resource URI
String testDataURI = TestBaseUtils.constructTestURI(MyTest.class, "resources");

// Fetch test data from web endpoint (for integration tests)
String jsonData = TestBaseUtils.getFromHTTP("http://localhost:8080/test-data");

Helper Classes

TupleComparator

public static class TupleComparator<T extends Tuple> implements Comparator<T> {
    // Compares tuples field by field for consistent ordering
}

Usage Example:

// Sort tuple results for consistent comparison
List<Tuple2<String, Integer>> results = wordCounts.collect();
results.sort(new TestBaseUtils.TupleComparator<>());

// Now compare with expected results
String expected = "apple,1\nbanana,2\ncherry,3";
TestBaseUtils.compareResultAsTuples(results, expected);

Testing Utilities

CheckedThread

Thread utility that propagates exceptions from background threads to the main test thread.

/**
 * Thread that captures exceptions during execution and makes them available to the calling thread
 */
public abstract class CheckedThread extends Thread {
    public CheckedThread();
    public CheckedThread(String name);
    
    // Main work method - implement this instead of run()
    public abstract void go() throws Exception;
    
    // Join thread and re-throw any exceptions that occurred
    public void sync() throws Exception;
}

Usage Example:

@Test
public void testConcurrentOperations() throws Exception {
    CheckedThread worker = new CheckedThread("test-worker") {
        @Override
        public void go() throws Exception {
            // This code runs in background thread
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            DataSet<String> result = env.fromElements("test").map(String::toUpperCase);
            List<String> output = result.collect();
            
            // Any exceptions thrown here will be propagated to main thread
            assertEquals("Unexpected result", "TEST", output.get(0));
        }
    };
    
    worker.start();
    
    // This will throw any exception that occurred in the background thread
    worker.sync();
}

Retry Mechanisms

JUnit rules and annotations for automatically retrying failed tests.

RetryRule

/**
 * JUnit rule that enables automatic test retries based on annotations
 */
public class RetryRule implements TestRule {
    public RetryRule();
    public Statement apply(Statement base, Description description);
}

RetryOnFailure

/**
 * Annotation to retry tests that fail for any reason
 */
public @interface RetryOnFailure {
    int times(); // Number of retry attempts
}

RetryOnException

/**
 * Annotation to retry tests that fail with specific exception types
 */
public @interface RetryOnException {
    int times(); // Number of retry attempts
    Class<? extends Throwable> exception(); // Exception type to retry on
}

Usage Example:

public class FlakeyTest {
    @Rule
    public RetryRule retryRule = new RetryRule();
    
    @Test
    @RetryOnFailure(times = 3)
    public void testUnstableOperation() throws Exception {
        // Test that might fail due to timing issues
        // Will be retried up to 3 times if it fails
        performUnstableOperation();
    }
    
    @Test
    @RetryOnException(times = 2, exception = ConnectException.class)
    public void testNetworkOperation() throws Exception {
        // Test that might fail due to network connectivity
        // Will be retried up to 2 times if ConnectException occurs
        connectToExternalService();
    }
}

TestLogger

Base class that provides automatic logging for test execution.

/**
 * Base test class with automatic test lifecycle logging
 */
public class TestLogger {
    protected final Logger log; // Logger instance for test class
    
    @Rule
    public TestRule watchman; // Automatic test logging rule
}

Usage Example:

public class MyTest extends TestLogger {
    @Test
    public void testWithLogging() {
        // Logging is automatically available
        log.info("Starting test execution");
        
        // Test logic here
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // ...
        
        log.info("Test completed successfully");
        // Test start/end automatically logged by watchman rule
    }
}

Common Test Utilities

/**
 * Common testing utilities for environment setup and validation
 */
public class CommonTestUtils {
    // Environment utilities
    public static String getTempDir();
    public static void setEnv(Map<String, String> newenv);
    public static void setEnv(Map<String, String> newenv, boolean clearPrevious);
    
    // Serialization testing
    public static <T extends Serializable> T createCopySerializable(T original);
    
    // File utilities
    public static String createTempFile(String contents);
    
    // Threading utilities  
    public static void blockForeverNonInterruptibly();
    
    // Environment checks
    public static void assumeJava8();
    
    // Exception utilities
    public static boolean containsCause(Throwable throwable, Class<? extends Throwable> causeType);
}

Usage Example:

@Test
public void testSerializationRoundtrip() throws Exception {
    MySerializableClass original = new MySerializableClass("test");
    
    // Test that object survives serialization/deserialization
    MySerializableClass copy = CommonTestUtils.createCopySerializable(original);
    assertEquals("Data should survive serialization", original.getData(), copy.getData());
}

@Test  
public void testWithTempEnvironment() throws Exception {
    // Set up temporary environment variables
    Map<String, String> testEnv = new HashMap<>();
    testEnv.put("TEST_MODE", "true");
    testEnv.put("LOG_LEVEL", "DEBUG");
    
    CommonTestUtils.setEnv(testEnv);
    
    // Run test with modified environment
    // Environment will be restored after test
}

Common Testing Patterns

Complete Test Setup

public class IntegrationTest {
    private LocalFlinkMiniCluster cluster;
    
    @Before
    public void setup() throws Exception {
        cluster = TestBaseUtils.startCluster(1, 4, false, false, true);
    }
    
    @After
    public void teardown() throws Exception {
        if (cluster != null) {
            TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
        }
    }
    
    @Test
    public void testJobExecution() throws Exception {
        TestEnvironment testEnv = new TestEnvironment(cluster, 4, false);
        testEnv.setAsContext();
        
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            DataSet<String> result = env.fromElements("test").map(String::toUpperCase);
            List<String> output = result.collect();
            
            TestBaseUtils.compareResultAsText(output, "TEST");
        } finally {
            TestEnvironment.unsetAsContext();
        }
    }
}

File-based Testing

@Test
public void testFileProcessing() throws Exception {
    // Write test input
    String inputPath = createTempFile("input.txt", "line1\nline2\nline3");
    String outputPath = getTempDirPath("output");
    
    // Run job
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.readTextFile(inputPath)
       .map(String::toUpperCase)
       .writeAsText(outputPath);
    env.execute();
    
    // Validate results
    String expected = "LINE1\nLINE2\nLINE3";
    TestBaseUtils.compareResultsByLinesInMemory(expected, outputPath);
}