Test environments provide specialized execution environments for running Flink jobs in test contexts. They execute jobs on LocalFlinkMiniCluster instances and can be configured as global execution contexts.
ExecutionEnvironment implementation that executes jobs on LocalFlinkMiniCluster for batch processing tests.
public class TestEnvironment extends ExecutionEnvironment {
public TestEnvironment(LocalFlinkMiniCluster miniCluster, int parallelism, boolean isObjectReuseEnabled,
Collection<Path> jarFiles, Collection<URL> classPaths);
public TestEnvironment(LocalFlinkMiniCluster executor, int parallelism, boolean isObjectReuseEnabled);
public JobExecutionResult getLastJobExecutionResult();
public void startNewSession() throws Exception;
public JobExecutionResult execute(String jobName) throws Exception;
public String getExecutionPlan() throws Exception;
public void setAsContext();
public static void setAsContext(LocalFlinkMiniCluster miniCluster, int parallelism,
Collection<Path> jarFiles, Collection<URL> classPaths);
public static void setAsContext(LocalFlinkMiniCluster miniCluster, int parallelism);
public static void unsetAsContext();
}Usage Example:
// Create and configure test environment
LocalFlinkMiniCluster cluster = TestBaseUtils.startCluster(1, 4, false, false, true);
TestEnvironment testEnv = new TestEnvironment(cluster, 4, false);
// Set as global context
testEnv.setAsContext();
// Now ExecutionEnvironment.getExecutionEnvironment() returns this test environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> result = env.fromElements("test").map(s -> s.toUpperCase());
JobExecutionResult jobResult = env.execute("test job");
// Clean up
TestEnvironment.unsetAsContext();Collection-based test environment for lightweight local testing without a cluster.
public class CollectionTestEnvironment extends CollectionEnvironment {
public JobExecutionResult getLastJobExecutionResult();
public JobExecutionResult execute() throws Exception;
public JobExecutionResult execute(String jobName) throws Exception;
protected void setAsContext();
protected static void unsetAsContext();
}Usage Example:
// Use collection environment for simple tests
CollectionTestEnvironment collectionEnv = new CollectionTestEnvironment();
collectionEnv.setAsContext();
// Jobs will execute locally using Java collections
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);
List<Integer> result = numbers.map(x -> x * 2).collect();
CollectionTestEnvironment.unsetAsContext();StreamExecutionEnvironment implementation that executes streaming jobs on LocalFlinkMiniCluster.
public class TestStreamEnvironment extends StreamExecutionEnvironment {
public TestStreamEnvironment(LocalFlinkMiniCluster miniCluster, int parallelism,
Collection<Path> jarFiles, Collection<URL> classPaths);
public TestStreamEnvironment(LocalFlinkMiniCluster miniCluster, int parallelism);
public JobExecutionResult execute(String jobName) throws Exception;
public static void setAsContext(LocalFlinkMiniCluster cluster, int parallelism,
Collection<Path> jarFiles, Collection<URL> classpaths);
public static void setAsContext(LocalFlinkMiniCluster cluster, int parallelism);
public static void unsetAsContext();
}Usage Example:
// Create streaming test environment
LocalFlinkMiniCluster cluster = TestBaseUtils.startCluster(1, 4, false, false, true);
TestStreamEnvironment.setAsContext(cluster, 4);
// Now StreamExecutionEnvironment.getExecutionEnvironment() returns test environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = env.fromElements("hello", "world")
.map(s -> s.toUpperCase());
stream.print();
JobExecutionResult result = env.execute("streaming test");
// Clean up
TestStreamEnvironment.unsetAsContext();
TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);All test environments support configuration of:
Test environments can be set as global contexts using static methods:
setAsContext() - Makes the test environment the default for ExecutionEnvironment.getExecutionEnvironment()unsetAsContext() - Restores the previous execution environmentThis allows existing code that calls ExecutionEnvironment.getExecutionEnvironment() to automatically use the test environment without modification.