Testing utilities for Apache Flink stream processing framework providing test environments, base classes and sample data
npx @tessl/cli install tessl/maven-org-apache-flink--flink-test-utils_2-10@1.3.0Flink Test Utils is a comprehensive testing library for Apache Flink applications providing test environments, base classes, utilities, and sample data for unit and integration testing of both streaming and batch processing applications.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.10</artifactId>
<version>1.3.3</version>
<scope>test</scope>
</dependency>import org.apache.flink.test.util.*;
import org.apache.flink.streaming.util.*;
import org.apache.flink.test.testdata.*;// Basic test setup
public class MyFlinkTest extends JavaProgramTestBase {
@Override
protected void testProgram() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Your test logic here
DataSet<String> input = env.fromElements("hello", "world");
DataSet<String> result = input.map(s -> s.toUpperCase());
// Collect and verify results
List<String> resultList = result.collect();
TestBaseUtils.compareResultAsText(resultList, "HELLO\\nWORLD");
}
}The library is organized into three main areas:
Provides specialized execution environments for testing Flink applications in controlled environments.
// Batch job testing environment
public class TestEnvironment extends ExecutionEnvironment {
public TestEnvironment(LocalFlinkMiniCluster miniCluster, int parallelism, boolean isObjectReuseEnabled);
public JobExecutionResult execute(String jobName) throws Exception;
public static void setAsContext(LocalFlinkMiniCluster miniCluster, int parallelism);
}
// Streaming job testing environment
public class TestStreamEnvironment extends StreamExecutionEnvironment {
public TestStreamEnvironment(LocalFlinkMiniCluster miniCluster, int parallelism);
public JobExecutionResult execute(String jobName) throws Exception;
public static void setAsContext(LocalFlinkMiniCluster cluster, int parallelism);
}Abstract base classes that provide standardized testing patterns, cluster lifecycle management, and parameterized testing across multiple execution modes.
// Base class for batch program tests
public abstract class JavaProgramTestBase extends AbstractTestBase {
public JavaProgramTestBase();
protected abstract void testProgram() throws Exception;
public void setParallelism(int parallelism);
}
// Base class for streaming program tests
public abstract class StreamingProgramTestBase extends AbstractTestBase {
protected abstract void testProgram() throws Exception;
public void setParallelism(int parallelism);
}
// Base class for multiple program tests
public class MultipleProgramsTestBase extends TestBaseUtils {
public MultipleProgramsTestBase(TestExecutionMode mode);
}Comprehensive utilities for cluster management, result comparison, test data handling, and specialized testing utilities.
public class TestBaseUtils extends TestLogger {
// Cluster management
public static LocalFlinkMiniCluster startCluster(int numTaskManagers, int taskManagerNumSlots,
boolean startWebserver, boolean startZooKeeper,
boolean singleActorSystem) throws Exception;
// Result validation and comparison
public static void compareResultsByLinesInMemory(String expectedResultStr, String resultPath) throws Exception;
public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr, String resultPath) throws Exception;
public static <T> void compareResultAsTuples(List<T> result, String expected);
public static <T> void compareResultAsText(List<T> result, String expected);
public static void compareKeyValuePairsWithDelta(String expectedLines, String resultPath, String delimiter, double maxDelta) throws Exception;
public static void checkLinesAgainstRegexp(String resultPath, String regexp);
// File I/O utilities
public static BufferedReader[] getResultReader(String resultPath) throws IOException;
public static void readAllResultLines(List<String> target, String resultPath) throws IOException;
// Utility methods
public static String constructTestPath(Class<?> forClass, String folder);
public static String getFromHTTP(String url) throws Exception;
}
// Specialized testing utilities
public abstract class CheckedThread extends Thread {
public abstract void go() throws Exception;
public void sync() throws Exception;
}
public class RetryRule implements TestRule {
// JUnit rule for automatic test retries
}
public class CommonTestUtils {
public static <T extends Serializable> T createCopySerializable(T original);
public static void setEnv(Map<String, String> newenv);
}Support for testing Flink applications with security features enabled, including Kerberos authentication via MiniKDC.
public class SecureTestEnvironment {
public static void prepare(TemporaryFolder tempFolder);
public static void cleanup();
public static Configuration populateFlinkSecureConfigurations(Configuration flinkConf);
}
public class TestingSecurityContext {
public static void install(SecurityConfiguration config,
Map<String, ClientSecurityConfiguration> clientSecurityConfigurationMap) throws Exception;
}Pre-built datasets and validation utilities for common Flink algorithms and testing scenarios.
// PageRank test data
public class PageRankData {
public static final String VERTICES;
public static final String EDGES;
public static final String RANKS_AFTER_3_ITERATIONS;
}
// K-Means clustering test data
public class KMeansData {
public static final String DATAPOINTS;
public static final String INITIAL_CENTERS;
public static void checkResultsWithDelta(String expectedResults, List<String> resultLines, double maxDelta);
}// Test execution modes
public enum TestExecutionMode {
CLUSTER,
CLUSTER_OBJECT_REUSE,
COLLECTION
}
// Security configuration for testing
public static class ClientSecurityConfiguration {
public String getPrincipal();
public String getKeytab();
public ClientSecurityConfiguration(String principal, String keytab);
}
// Test annotations for retry mechanisms
public @interface RetryOnFailure {
int times();
}
public @interface RetryOnException {
int times();
Class<? extends Throwable> exception();
}
// Local Flink cluster for testing
public class LocalFlinkMiniCluster {
// Mini cluster used by all test environments
}
// Duration utilities
public class FiniteDuration {
// Timeout specifications for cluster operations
}