Test environment utilities for setting up and managing Flink execution environments in tests, including both streaming and batch processing environments with MiniCluster integration.
TestStreamEnvironment provides a streaming execution environment that runs jobs on a MiniCluster for testing purposes.
/**
* StreamExecutionEnvironment that executes jobs on MiniCluster for testing
*/
public class TestStreamEnvironment extends StreamExecutionEnvironment {
/**
* Create test streaming environment with full configuration
* @param cluster The MiniCluster to execute on
* @param configuration Flink configuration
* @param parallelism Default parallelism
* @param jarFiles JAR files to include in classpath
* @param classPaths Additional classpaths
*/
public TestStreamEnvironment(
MiniCluster cluster,
Configuration configuration,
int parallelism,
Collection<Path> jarFiles,
Collection<URL> classPaths
);
/**
* Create test streaming environment with simplified configuration
* @param cluster The MiniCluster to execute on
* @param parallelism Default parallelism
*/
public TestStreamEnvironment(MiniCluster cluster, int parallelism);
/**
* Set as the context environment with full configuration
* @param cluster The MiniCluster to execute on
* @param parallelism Default parallelism
* @param jarFiles JAR files to include in classpath
* @param classPaths Additional classpaths
*/
public static void setAsContext(
MiniCluster cluster,
int parallelism,
Collection<Path> jarFiles,
Collection<URL> classPaths
);
/**
* Set as the context environment with simplified configuration
* @param cluster The MiniCluster to execute on
* @param parallelism Default parallelism
*/
public static void setAsContext(MiniCluster cluster, int parallelism);
/**
* Reset the context environment
*/
public static void unsetAsContext();
}Usage Example:
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.runtime.minicluster.MiniCluster;
@Test
public void testStreamingJob() throws Exception {
MiniCluster cluster = // ... create cluster
// Set up test streaming environment
TestStreamEnvironment.setAsContext(cluster, 4);
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Your streaming job logic here
DataStream<String> input = env.fromElements("test", "data");
DataStream<String> result = input.map(String::toUpperCase);
result.print();
env.execute("Test Streaming Job");
} finally {
TestStreamEnvironment.unsetAsContext();
}
}TestEnvironment provides a batch execution environment that runs jobs on a MiniCluster for testing purposes.
/**
* ExecutionEnvironment implementation that executes jobs on MiniCluster
*/
public class TestEnvironment extends ExecutionEnvironment {
/**
* Create test environment with full configuration
* @param cluster The MiniCluster to execute on
* @param parallelism Default parallelism
* @param objectReuse Whether to enable object reuse
* @param jarFiles JAR files to include in classpath
* @param classPaths Additional classpaths
*/
public TestEnvironment(
MiniCluster cluster,
int parallelism,
boolean objectReuse,
Collection<Path> jarFiles,
Collection<URL> classPaths
);
/**
* Create test environment with simplified configuration
* @param cluster The MiniCluster to execute on
* @param parallelism Default parallelism
* @param objectReuse Whether to enable object reuse
*/
public TestEnvironment(MiniCluster cluster, int parallelism, boolean objectReuse);
/**
* Get the result of the last job execution
* @return JobExecutionResult of the last executed job
*/
public JobExecutionResult getLastJobExecutionResult();
/**
* Set this environment as the context environment
*/
public void setAsContext();
/**
* Set as the context environment with full configuration
* @param cluster The MiniCluster to execute on
* @param parallelism Default parallelism
* @param jarFiles JAR files to include in classpath
* @param classPaths Additional classpaths
*/
public static void setAsContext(
MiniCluster cluster,
int parallelism,
Collection<Path> jarFiles,
Collection<URL> classPaths
);
/**
* Set as the context environment with simplified configuration
* @param cluster The MiniCluster to execute on
* @param parallelism Default parallelism
*/
public static void setAsContext(MiniCluster cluster, int parallelism);
/**
* Reset the context environment
*/
public static void unsetAsContext();
}MiniClusterWithClientResource provides JUnit integration for managing MiniCluster lifecycle in tests.
/**
* Starts Flink mini cluster and registers execution environments as JUnit rule
*/
public class MiniClusterWithClientResource extends MiniClusterResource {
/**
* Create cluster resource with configuration
* @param configuration MiniCluster configuration
*/
public MiniClusterWithClientResource(MiniClusterResourceConfiguration configuration);
/**
* Get cluster client for job submission
* @return ClusterClient for interacting with cluster
*/
public ClusterClient<?> getClusterClient();
/**
* Get REST cluster client
* @return RestClusterClient for REST API access
*/
public RestClusterClient<?> getRestClusterClient();
/**
* Get test environment configured for this cluster
* @return TestEnvironment configured for this cluster
*/
public TestEnvironment getTestEnvironment();
}Usage Example:
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
public class MyFlinkTest {
@ClassRule
public static MiniClusterWithClientResource flinkCluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.build());
@Test
public void testBatchJob() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Your batch job logic here
DataSet<String> input = env.fromElements("hello", "world");
DataSet<String> result = input.map(String::toUpperCase);
List<String> output = result.collect();
// Validate results
assertEquals(Arrays.asList("HELLO", "WORLD"), output);
}
}MiniClusterPipelineExecutorServiceLoader provides pipeline execution service integration for MiniCluster.
/**
* Pipeline executor service loader for MiniCluster execution
*/
public class MiniClusterPipelineExecutorServiceLoader implements PipelineExecutorServiceLoader {
public static final String NAME = "minicluster";
/**
* Create executor service loader for given MiniCluster
* @param miniCluster The MiniCluster to execute on
*/
public MiniClusterPipelineExecutorServiceLoader(MiniCluster miniCluster);
/**
* Update configuration for MiniCluster execution
* @param configuration Configuration to update
* @param jarFiles JAR files to include
* @param classpaths Additional classpaths
* @return Updated configuration
*/
public static Configuration updateConfigurationForMiniCluster(
Configuration configuration,
Collection<Path> jarFiles,
Collection<URL> classpaths
);
/**
* Get executor factory for configuration
* @param configuration Flink configuration
* @return PipelineExecutorFactory for the configuration
*/
public PipelineExecutorFactory getExecutorFactory(Configuration configuration);
/**
* Get supported executor names
* @return Stream of executor names
*/
public Stream<String> getExecutorNames();
}TestBaseUtils provides a comprehensive collection of utility methods for testing Flink applications, including result comparison, file operations, and configuration management.
/**
* Utility class with various methods for testing purposes
*/
public class TestBaseUtils {
// Configuration constants
protected static final int MINIMUM_HEAP_SIZE_MB = 192;
protected static final String TASK_MANAGER_MEMORY_SIZE = "80m";
protected static final long DEFAULT_AKKA_ASK_TIMEOUT = 1000;
protected static final String DEFAULT_AKKA_STARTUP_TIMEOUT = "60 s";
public static final FiniteDuration DEFAULT_TIMEOUT;
public static final Time DEFAULT_HTTP_TIMEOUT = Time.seconds(10L);
// Result reading methods
/**
* Get result readers for result files
* @param resultPath Path to result directory
* @return Array of BufferedReaders for result files
*/
public static BufferedReader[] getResultReader(String resultPath) throws IOException;
/**
* Get result readers with exclusion prefixes and ordering
* @param resultPath Path to result directory
* @param excludePrefixes Prefixes to exclude from results
* @param inOrderOfFiles Whether to maintain file order
* @return Array of BufferedReaders for result files
*/
public static BufferedReader[] getResultReader(String resultPath, String[] excludePrefixes, boolean inOrderOfFiles) throws IOException;
/**
* Get result input streams
* @param resultPath Path to result directory
* @return Array of BufferedInputStreams for result files
*/
public static BufferedInputStream[] getResultInputStream(String resultPath) throws IOException;
/**
* Get result input streams with exclusion prefixes
* @param resultPath Path to result directory
* @param excludePrefixes Prefixes to exclude from results
* @return Array of BufferedInputStreams for result files
*/
public static BufferedInputStream[] getResultInputStream(String resultPath, String[] excludePrefixes) throws IOException;
/**
* Read all result lines into target list
* @param target List to store result lines
* @param resultPath Path to result directory
*/
public static void readAllResultLines(List<String> target, String resultPath) throws IOException;
/**
* Read all result lines with exclusions and ordering
* @param target List to store result lines
* @param resultPath Path to result directory
* @param excludePrefixes Prefixes to exclude from results
* @param inOrderOfFiles Whether to maintain file order
*/
public static void readAllResultLines(List<String> target, String resultPath, String[] excludePrefixes, boolean inOrderOfFiles) throws IOException;
// Result comparison methods
/**
* Compare results by lines in memory
* @param expectedResultStr Expected result as string
* @param resultPath Path to actual results
*/
public static void compareResultsByLinesInMemory(String expectedResultStr, String resultPath) throws Exception;
/**
* Compare results by lines in memory with exclusions
* @param expectedResultStr Expected result as string
* @param resultPath Path to actual results
* @param excludePrefixes Prefixes to exclude from comparison
*/
public static void compareResultsByLinesInMemory(String expectedResultStr, String resultPath, String[] excludePrefixes) throws Exception;
/**
* Compare results by lines with strict ordering
* @param expectedResultStr Expected result as string
* @param resultPath Path to actual results
*/
public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr, String resultPath) throws Exception;
/**
* Compare results by lines with strict ordering and exclusions
* @param expectedResultStr Expected result as string
* @param resultPath Path to actual results
* @param excludePrefixes Prefixes to exclude from comparison
*/
public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr, String resultPath, String[] excludePrefixes) throws Exception;
/**
* Check lines against regular expression pattern
* @param resultPath Path to result files
* @param regexp Regular expression pattern to match
*/
public static void checkLinesAgainstRegexp(String resultPath, String regexp);
/**
* Compare key-value pairs with delta tolerance
* @param expectedLines Expected key-value pairs
* @param resultPath Path to actual results
* @param delimiter Key-value delimiter
* @param maxDelta Maximum allowed delta for numeric comparisons
*/
public static void compareKeyValuePairsWithDelta(String expectedLines, String resultPath, String delimiter, double maxDelta) throws Exception;
/**
* Compare key-value pairs with delta tolerance and exclusions
* @param expectedLines Expected key-value pairs
* @param resultPath Path to actual results
* @param excludePrefixes Prefixes to exclude from comparison
* @param delimiter Key-value delimiter
* @param maxDelta Maximum allowed delta for numeric comparisons
*/
public static void compareKeyValuePairsWithDelta(String expectedLines, String resultPath, String[] excludePrefixes, String delimiter, double maxDelta) throws Exception;
/**
* Compare result collections with custom comparator
* @param expected Expected results list
* @param actual Actual results list
* @param comparator Comparator for element comparison
*/
public static <X> void compareResultCollections(List<X> expected, List<X> actual, Comparator<X> comparator);
// Collection comparison methods
/**
* Compare result as tuples
* @param result Actual result list
* @param expected Expected result as string
*/
public static <T> void compareResultAsTuples(List<T> result, String expected);
/**
* Compare result as text
* @param result Actual result list
* @param expected Expected result as string
*/
public static <T> void compareResultAsText(List<T> result, String expected);
/**
* Compare ordered result as text
* @param result Actual result list
* @param expected Expected result as string
*/
public static <T> void compareOrderedResultAsText(List<T> result, String expected);
/**
* Compare ordered result as text with tuple option
* @param result Actual result list
* @param expected Expected result as string
* @param asTuples Whether to treat as tuples
*/
public static <T> void compareOrderedResultAsText(List<T> result, String expected, boolean asTuples);
/**
* Check if result contains expected content
* @param result Actual result list
* @param expected Expected content as string
*/
public static <T> void containsResultAsText(List<T> result, String expected);
// Utility methods
/**
* Set environment variables
* @param newenv Map of environment variables to set
*/
public static void setEnv(Map<String, String> newenv);
/**
* Construct test path for class
* @param forClass Class to construct path for
* @param folder Folder name
* @return Constructed test path
*/
public static String constructTestPath(Class<?> forClass, String folder);
/**
* Construct test URI for class
* @param forClass Class to construct URI for
* @param folder Folder name
* @return Constructed test URI
*/
public static String constructTestURI(Class<?> forClass, String folder);
/**
* Get content from HTTP URL
* @param url URL to fetch from
* @return HTTP response content
*/
public static String getFromHTTP(String url) throws Exception;
/**
* Get content from HTTP URL with timeout
* @param url URL to fetch from
* @param timeout Request timeout
* @return HTTP response content
*/
public static String getFromHTTP(String url, Time timeout) throws Exception;
// Configuration methods
/**
* Convert configurations to parameter list
* @param testConfigs Configuration objects
* @return Parameter list for parameterized tests
*/
protected static Collection<Object[]> toParameterList(Configuration... testConfigs);
/**
* Convert configuration list to parameter list
* @param testConfigs List of configuration objects
* @return Parameter list for parameterized tests
*/
protected static Collection<Object[]> toParameterList(List<Configuration> testConfigs);
/**
* Tuple comparator for comparing Tuple objects
*/
public static class TupleComparator<T extends Tuple> implements Comparator<T> {
public int compare(T o1, T o2);
}
}Usage Example:
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.api.java.tuple.Tuple2;
@Test
public void testResultComparison() throws Exception {
// Test result comparison
String expected = "hello\nworld\nflink";
String resultPath = "/path/to/results";
TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath);
// Test with exclusions
String[] excludePrefixes = {"debug:", "info:"};
TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath, excludePrefixes);
// Test ordered comparison
TestBaseUtils.compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath);
// Test collection comparison
List<String> actualResults = Arrays.asList("HELLO", "WORLD", "FLINK");
String expectedResults = "HELLO\nWORLD\nFLINK";
TestBaseUtils.compareResultAsText(actualResults, expectedResults);
// Test tuple comparison
List<Tuple2<String, Integer>> tupleResults = Arrays.asList(
new Tuple2<>("hello", 1),
new Tuple2<>("world", 2)
);
String expectedTuples = "(hello,1)\n(world,2)";
TestBaseUtils.compareResultAsTuples(tupleResults, expectedTuples);
}
@Test
public void testKeyValueComparison() throws Exception {
// Test with numeric delta comparison
String expected = "a,1.0\nb,2.5\nc,3.7";
String resultPath = "/path/to/numeric/results";
double maxDelta = 0.1;
TestBaseUtils.compareKeyValuePairsWithDelta(expected, resultPath, ",", maxDelta);
}
@Test
public void testHTTPUtilities() throws Exception {
// Test HTTP fetching
String content = TestBaseUtils.getFromHTTP("http://example.com/test");
assertNotNull(content);
// Test with timeout
Time timeout = Time.seconds(30);
String contentWithTimeout = TestBaseUtils.getFromHTTP("http://example.com/test", timeout);
assertNotNull(contentWithTimeout);
}
@Test
public void testPathConstruction() {
String testPath = TestBaseUtils.constructTestPath(MyTest.class, "testdata");
String testURI = TestBaseUtils.constructTestURI(MyTest.class, "testdata");
assertNotNull(testPath);
assertNotNull(testURI);
}Base class for unit tests that reuse the same Flink cluster across multiple test methods.
/**
* Base class for unit tests that reuse the same Flink cluster
*/
public abstract class AbstractTestBase extends TestBaseUtils {
/**
* Static class rule for managing cluster lifecycle
*/
public static MiniClusterWithClientResource miniClusterResource;
/**
* Static temporary folder rule
*/
public static TemporaryFolder TEMPORARY_FOLDER;
/**
* Get temporary directory path
* @param dirName Directory name
* @return Path to temporary directory
*/
public String getTempDirPath(String dirName);
/**
* Get temporary file path
* @param fileName File name
* @return Path to temporary file
*/
public String getTempFilePath(String fileName);
/**
* Create temporary file with contents
* @param fileName File name
* @param contents File contents
* @return Path to created file
*/
public String createTempFile(String fileName, String contents);
/**
* Create and register temporary file for cleanup
* @param fileName File name
* @return Created File object
*/
public File createAndRegisterTempFile(String fileName);
}Base class for tests that run a single test program with object reuse enabled/disabled.
/**
* Base for tests that run single test with object reuse enabled/disabled
*/
public abstract class JavaProgramTestBase extends AbstractTestBase {
/**
* Set number of test repetitions
* @param numberOfTestRepetitions Number of times to repeat test
*/
public void setNumberOfTestRepetitions(int numberOfTestRepetitions);
/**
* Get parallelism level
* @return Current parallelism setting
*/
public int getParallelism();
/**
* Get latest execution result
* @return JobExecutionResult of latest execution
*/
public JobExecutionResult getLatestExecutionResult();
/**
* Check if using collection execution
* @return true if collection execution mode
*/
public boolean isCollectionExecution();
/**
* Test program implementation - must be implemented by subclasses
*/
protected abstract void testProgram() throws Exception;
/**
* Pre-submission work - override if needed
*/
protected abstract void preSubmit() throws Exception;
/**
* Post-submission work - override if needed
*/
protected abstract void postSubmit() throws Exception;
/**
* Whether to skip collection execution - override if needed
* @return true to skip collection execution
*/
protected abstract boolean skipCollectionExecution();
}Base class for parameterized tests that run in different execution modes (cluster, collection, etc.).
/**
* Base for parameterized tests that run in different execution modes
*/
public class MultipleProgramsTestBase extends AbstractTestBase {
/**
* Test execution modes
*/
public enum TestExecutionMode {
CLUSTER,
CLUSTER_OBJECT_REUSE,
COLLECTION
}
/**
* Create test base with execution mode
* @param mode Test execution mode
*/
public MultipleProgramsTestBase(TestExecutionMode mode);
/**
* Provides parameterized execution modes for JUnit parameterized tests
* @return Collection of execution mode parameters
*/
public static Collection<Object[]> executionModes();
}Usage Example:
import org.apache.flink.test.util.MultipleProgramsTestBase;
@RunWith(Parameterized.class)
public class MyParameterizedTest extends MultipleProgramsTestBase {
public MyParameterizedTest(TestExecutionMode mode) {
super(mode);
}
@Parameterized.Parameters
public static Collection<Object[]> executionModes() {
return MultipleProgramsTestBase.executionModes();
}
@Test
public void testInAllModes() throws Exception {
// Test will run in all execution modes
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// ... test logic
}
}TestProcessBuilder provides utilities for creating and managing external test processes with JVM configuration.
/**
* Utility wrapping ProcessBuilder with common testing options
*/
public class TestProcessBuilder {
/**
* Create process builder for main class
* @param mainClass Main class name to execute
*/
public TestProcessBuilder(String mainClass) throws IOException;
/**
* Start the configured process
* @return TestProcess wrapper for managing the process
*/
public TestProcess start() throws IOException;
/**
* Set JVM memory configuration
* @param jvmMemory Memory size for JVM
* @return This builder for method chaining
*/
public TestProcessBuilder setJvmMemory(MemorySize jvmMemory);
/**
* Add JVM argument
* @param arg JVM argument to add
* @return This builder for method chaining
*/
public TestProcessBuilder addJvmArg(String arg);
/**
* Add main class argument
* @param arg Argument for main class
* @return This builder for method chaining
*/
public TestProcessBuilder addMainClassArg(String arg);
/**
* Add Flink configuration as main class arguments
* @param config Flink configuration to add
* @return This builder for method chaining
*/
public TestProcessBuilder addConfigAsMainClassArgs(Configuration config);
/**
* Use clean environment for process
* @return This builder for method chaining
*/
public TestProcessBuilder withCleanEnvironment();
/**
* Wrapper for managing test process execution
*/
public static class TestProcess {
/**
* Get underlying Process object
* @return Process instance
*/
public Process getProcess();
/**
* Get process output writer
* @return StringWriter with process output
*/
public StringWriter getProcessOutput();
/**
* Get process error output writer
* @return StringWriter with error output
*/
public StringWriter getErrorOutput();
/**
* Destroy the process
*/
public void destroy();
}
}ShellScript provides utilities for creating cross-platform shell scripts for testing.
/**
* Utility for creating shell scripts on Linux and Windows
*/
public class ShellScript {
/**
* Create shell script builder
* @return Platform-appropriate shell script builder
*/
public static ShellScriptBuilder createShellScriptBuilder();
/**
* Get script file extension for current platform
* @return Script extension (.sh or .bat)
*/
public static String getScriptExtension();
/**
* Abstract builder for creating shell scripts
*/
public abstract static class ShellScriptBuilder {
/**
* Write script to file
* @param file Target file for script
*/
public void write(File file) throws IOException;
/**
* Add command to script
* @param command Command components to execute
*/
public abstract void command(List<String> command);
/**
* Set environment variable in script
* @param key Environment variable name
* @param value Environment variable value
*/
public abstract void env(String key, String value);
}
}CollectionTestEnvironment provides collection-based execution environment for testing without cluster setup.
/**
* Collection execution environment for testing
*/
public class CollectionTestEnvironment extends ExecutionEnvironment {
/**
* Get result of last job execution
* @return JobExecutionResult of last execution
*/
public JobExecutionResult getLastJobExecutionResult();
/**
* Execute job with given name
* @param jobName Name for the job execution
* @return JobExecutionResult after execution
*/
public JobExecutionResult execute(String jobName) throws Exception;
/**
* Set this environment as context environment
*/
protected void setAsContext();
/**
* Unset this environment from context
*/
protected static void unsetAsContext();
}TestingSecurityContext provides security context management for Kerberos-enabled testing scenarios.
/**
* Security context for handling client and server principals in MiniKDC testing
*/
public class TestingSecurityContext {
/**
* Install security context with configurations
* @param config Security configuration
* @param clientSecurityConfigurationMap Client security configurations
*/
public static void install(
SecurityConfiguration config,
Map<String, ClientSecurityConfiguration> clientSecurityConfigurationMap
) throws Exception;
/**
* Client security configuration for testing
*/
static class ClientSecurityConfiguration {
/**
* Create client security configuration
* @param principal Kerberos principal
* @param keytab Path to keytab file
*/
public ClientSecurityConfiguration(String principal, String keytab);
/**
* Get Kerberos principal
* @return Principal string
*/
public String getPrincipal();
/**
* Get keytab file path
* @return Keytab file path
*/
public String getKeytab();
}
}