TestBaseUtils provides comprehensive utilities for Flink testing including cluster management, result comparison, file I/O operations, and test data handling.
public class TestBaseUtils extends TestLogger {
public static FiniteDuration DEFAULT_TIMEOUT;
// Start a mini cluster with full configuration
public static LocalFlinkMiniCluster startCluster(int numTaskManagers, int taskManagerNumSlots,
boolean startWebserver, boolean startZooKeeper,
boolean singleActorSystem) throws Exception;
// Start cluster with configuration object
public static LocalFlinkMiniCluster startCluster(Configuration config, boolean singleActorSystem) throws Exception;
// Stop cluster with timeout
public static void stopCluster(LocalFlinkMiniCluster executor, FiniteDuration timeout) throws Exception;
}Usage Example:
// Start a cluster with 2 task managers, 4 slots each
LocalFlinkMiniCluster cluster = TestBaseUtils.startCluster(2, 4, false, false, true);
try {
// Use cluster for testing
TestEnvironment testEnv = new TestEnvironment(cluster, 4, false);
// ... run tests
} finally {
// Always clean up
TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
}// Get readers for result files
public static BufferedReader[] getResultReader(String resultPath) throws IOException;
public static BufferedReader[] getResultReader(String resultPath, String[] excludePrefixes,
boolean inOrderOfFiles) throws IOException;
// Get input streams for result files
public static BufferedInputStream[] getResultInputStream(String resultPath) throws IOException;
public static BufferedInputStream[] getResultInputStream(String resultPath, String[] excludePrefixes) throws IOException;
// Read all result lines into a list
public static void readAllResultLines(List<String> target, String resultPath) throws IOException;
public static void readAllResultLines(List<String> target, String resultPath, String[] excludePrefixes) throws IOException;
public static void readAllResultLines(List<String> target, String resultPath, String[] excludePrefixes,
boolean inOrderOfFiles) throws IOException;Usage Example:
// Read results from output directory
List<String> results = new ArrayList<>();
TestBaseUtils.readAllResultLines(results, "/path/to/output");
// Read results excluding log files
String[] excludePatterns = {"_logs", ".log"};
TestBaseUtils.readAllResultLines(results, "/path/to/output", excludePatterns);
// Process results
for (String line : results) {
System.out.println("Result: " + line);
}// Compare results line by line (order doesn't matter)
public static void compareResultsByLinesInMemory(String expectedResultStr, String resultPath) throws Exception;
public static void compareResultsByLinesInMemory(String expectedResultStr, String resultPath,
String[] excludePrefixes) throws Exception;
// Compare results with strict ordering
public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr, String resultPath) throws Exception;
public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr, String resultPath,
String[] excludePrefixes) throws Exception;
// Check results against regular expression
public static void checkLinesAgainstRegexp(String resultPath, String regexp);Usage Example:
// Compare unordered results
String expected = "apple\nbanana\ncherry";
TestBaseUtils.compareResultsByLinesInMemory(expected, "/path/to/results");
// Compare with strict ordering
TestBaseUtils.compareResultsByLinesInMemoryWithStrictOrder(expected, "/path/to/results");
// Validate format with regex
TestBaseUtils.checkLinesAgainstRegexp("/path/to/results", "\\d+,\\w+");// Compare collected results as tuples
public static <T> void compareResultAsTuples(List<T> result, String expected);
// Compare collected results as text
public static <T> void compareResultAsText(List<T> result, String expected);
// Compare with strict ordering
public static <T> void compareOrderedResultAsText(List<T> result, String expected);
public static <T> void compareOrderedResultAsText(List<T> result, String expected, boolean asTuples);
// Check if results contain expected values
public static <T> void containsResultAsText(List<T> result, String expected);
// Generic comparison with custom comparator
public static <X> void compareResultCollections(List<X> expected, List<X> actual, Comparator<X> comparator);Usage Example:
// Test word count results
DataSet<Tuple2<String, Integer>> wordCounts = // ... your computation
List<Tuple2<String, Integer>> results = wordCounts.collect();
String expected = "apple,3\nbanana,1\ncherry,2";
TestBaseUtils.compareResultAsTuples(results, expected);
// Test simple string results
DataSet<String> words = // ... your computation
List<String> wordList = words.collect();
String expectedWords = "apple\nbanana\ncherry";
TestBaseUtils.compareResultAsText(wordList, expectedWords);// Compare key-value pairs with delta tolerance
public static void compareKeyValuePairsWithDelta(String expectedLines, String resultPath,
String delimiter, double maxDelta) throws Exception;
public static void compareKeyValuePairsWithDelta(String expectedLines, String resultPath, String[] excludePrefixes,
String delimiter, double maxDelta) throws Exception;Usage Example:
// Compare floating-point results with tolerance
String expected = "pi,3.14159\ne,2.71828";
TestBaseUtils.compareKeyValuePairsWithDelta(expected, "/path/to/results", ",", 0.001);// Convert configurations to parameterized test parameters
protected static Collection<Object[]> toParameterList(Configuration... testConfigs);
protected static Collection<Object[]> toParameterList(List<Configuration> testConfigs);
// Set environment variables for testing
public static void setEnv(Map<String, String> newenv);Usage Example:
// Create parameterized test configurations
Configuration config1 = new Configuration();
config1.setString("key1", "value1");
Configuration config2 = new Configuration();
config2.setString("key2", "value2");
@Parameterized.Parameters
public static Collection<Object[]> getConfigurations() {
return TestBaseUtils.toParameterList(config1, config2);
}
// Set test environment variables
Map<String, String> testEnv = new HashMap<>();
testEnv.put("FLINK_HOME", "/test/flink");
TestBaseUtils.setEnv(testEnv);// Construct test resource paths
public static String constructTestPath(Class<?> forClass, String folder);
public static String constructTestURI(Class<?> forClass, String folder);
// Fetch content from HTTP endpoint
public static String getFromHTTP(String url) throws Exception;Usage Example:
// Get test data path relative to test class
String testDataPath = TestBaseUtils.constructTestPath(MyTest.class, "testdata");
String inputFile = testDataPath + "/input.txt";
// Construct test resource URI
String testDataURI = TestBaseUtils.constructTestURI(MyTest.class, "resources");
// Fetch test data from web endpoint (for integration tests)
String jsonData = TestBaseUtils.getFromHTTP("http://localhost:8080/test-data");public static class TupleComparator<T extends Tuple> implements Comparator<T> {
// Compares tuples field by field for consistent ordering
}Usage Example:
// Sort tuple results for consistent comparison
List<Tuple2<String, Integer>> results = wordCounts.collect();
results.sort(new TestBaseUtils.TupleComparator<>());
// Now compare with expected results
String expected = "apple,1\nbanana,2\ncherry,3";
TestBaseUtils.compareResultAsTuples(results, expected);Thread utility that propagates exceptions from background threads to the main test thread.
/**
* Thread that captures exceptions during execution and makes them available to the calling thread
*/
public abstract class CheckedThread extends Thread {
public CheckedThread();
public CheckedThread(String name);
// Main work method - implement this instead of run()
public abstract void go() throws Exception;
// Join thread and re-throw any exceptions that occurred
public void sync() throws Exception;
}Usage Example:
@Test
public void testConcurrentOperations() throws Exception {
CheckedThread worker = new CheckedThread("test-worker") {
@Override
public void go() throws Exception {
// This code runs in background thread
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> result = env.fromElements("test").map(String::toUpperCase);
List<String> output = result.collect();
// Any exceptions thrown here will be propagated to main thread
assertEquals("Unexpected result", "TEST", output.get(0));
}
};
worker.start();
// This will throw any exception that occurred in the background thread
worker.sync();
}JUnit rules and annotations for automatically retrying failed tests.
/**
* JUnit rule that enables automatic test retries based on annotations
*/
public class RetryRule implements TestRule {
public RetryRule();
public Statement apply(Statement base, Description description);
}/**
* Annotation to retry tests that fail for any reason
*/
public @interface RetryOnFailure {
int times(); // Number of retry attempts
}/**
* Annotation to retry tests that fail with specific exception types
*/
public @interface RetryOnException {
int times(); // Number of retry attempts
Class<? extends Throwable> exception(); // Exception type to retry on
}Usage Example:
public class FlakeyTest {
@Rule
public RetryRule retryRule = new RetryRule();
@Test
@RetryOnFailure(times = 3)
public void testUnstableOperation() throws Exception {
// Test that might fail due to timing issues
// Will be retried up to 3 times if it fails
performUnstableOperation();
}
@Test
@RetryOnException(times = 2, exception = ConnectException.class)
public void testNetworkOperation() throws Exception {
// Test that might fail due to network connectivity
// Will be retried up to 2 times if ConnectException occurs
connectToExternalService();
}
}Base class that provides automatic logging for test execution.
/**
* Base test class with automatic test lifecycle logging
*/
public class TestLogger {
protected final Logger log; // Logger instance for test class
@Rule
public TestRule watchman; // Automatic test logging rule
}Usage Example:
public class MyTest extends TestLogger {
@Test
public void testWithLogging() {
// Logging is automatically available
log.info("Starting test execution");
// Test logic here
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// ...
log.info("Test completed successfully");
// Test start/end automatically logged by watchman rule
}
}/**
* Common testing utilities for environment setup and validation
*/
public class CommonTestUtils {
// Environment utilities
public static String getTempDir();
public static void setEnv(Map<String, String> newenv);
public static void setEnv(Map<String, String> newenv, boolean clearPrevious);
// Serialization testing
public static <T extends Serializable> T createCopySerializable(T original);
// File utilities
public static String createTempFile(String contents);
// Threading utilities
public static void blockForeverNonInterruptibly();
// Environment checks
public static void assumeJava8();
// Exception utilities
public static boolean containsCause(Throwable throwable, Class<? extends Throwable> causeType);
}Usage Example:
@Test
public void testSerializationRoundtrip() throws Exception {
MySerializableClass original = new MySerializableClass("test");
// Test that object survives serialization/deserialization
MySerializableClass copy = CommonTestUtils.createCopySerializable(original);
assertEquals("Data should survive serialization", original.getData(), copy.getData());
}
@Test
public void testWithTempEnvironment() throws Exception {
// Set up temporary environment variables
Map<String, String> testEnv = new HashMap<>();
testEnv.put("TEST_MODE", "true");
testEnv.put("LOG_LEVEL", "DEBUG");
CommonTestUtils.setEnv(testEnv);
// Run test with modified environment
// Environment will be restored after test
}public class IntegrationTest {
private LocalFlinkMiniCluster cluster;
@Before
public void setup() throws Exception {
cluster = TestBaseUtils.startCluster(1, 4, false, false, true);
}
@After
public void teardown() throws Exception {
if (cluster != null) {
TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
}
}
@Test
public void testJobExecution() throws Exception {
TestEnvironment testEnv = new TestEnvironment(cluster, 4, false);
testEnv.setAsContext();
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> result = env.fromElements("test").map(String::toUpperCase);
List<String> output = result.collect();
TestBaseUtils.compareResultAsText(output, "TEST");
} finally {
TestEnvironment.unsetAsContext();
}
}
}@Test
public void testFileProcessing() throws Exception {
// Write test input
String inputPath = createTempFile("input.txt", "line1\nline2\nline3");
String outputPath = getTempDirPath("output");
// Run job
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.readTextFile(inputPath)
.map(String::toUpperCase)
.writeAsText(outputPath);
env.execute();
// Validate results
String expected = "LINE1\nLINE2\nLINE3";
TestBaseUtils.compareResultsByLinesInMemory(expected, outputPath);
}