or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdsecurity-testing.mdtest-base-classes.mdtest-data.mdtest-environments.mdtest-utilities.md
tile.json

test-environments.mddocs/

Test Environments

Test environments provide specialized execution environments for running Flink jobs in test contexts. They execute jobs on LocalFlinkMiniCluster instances and can be configured as global execution contexts.

Batch Testing Environment

TestEnvironment

ExecutionEnvironment implementation that executes jobs on LocalFlinkMiniCluster for batch processing tests.

public class TestEnvironment extends ExecutionEnvironment {
    public TestEnvironment(LocalFlinkMiniCluster miniCluster, int parallelism, boolean isObjectReuseEnabled, 
                          Collection<Path> jarFiles, Collection<URL> classPaths);
    public TestEnvironment(LocalFlinkMiniCluster executor, int parallelism, boolean isObjectReuseEnabled);
    
    public JobExecutionResult getLastJobExecutionResult();
    public void startNewSession() throws Exception;
    public JobExecutionResult execute(String jobName) throws Exception;
    public String getExecutionPlan() throws Exception;
    
    public void setAsContext();
    public static void setAsContext(LocalFlinkMiniCluster miniCluster, int parallelism, 
                                  Collection<Path> jarFiles, Collection<URL> classPaths);
    public static void setAsContext(LocalFlinkMiniCluster miniCluster, int parallelism);
    public static void unsetAsContext();
}

Usage Example:

// Create and configure test environment
LocalFlinkMiniCluster cluster = TestBaseUtils.startCluster(1, 4, false, false, true);
TestEnvironment testEnv = new TestEnvironment(cluster, 4, false);

// Set as global context
testEnv.setAsContext();

// Now ExecutionEnvironment.getExecutionEnvironment() returns this test environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> result = env.fromElements("test").map(s -> s.toUpperCase());
JobExecutionResult jobResult = env.execute("test job");

// Clean up
TestEnvironment.unsetAsContext();

CollectionTestEnvironment

Collection-based test environment for lightweight local testing without a cluster.

public class CollectionTestEnvironment extends CollectionEnvironment {
    public JobExecutionResult getLastJobExecutionResult();
    public JobExecutionResult execute() throws Exception;
    public JobExecutionResult execute(String jobName) throws Exception;
    
    protected void setAsContext();
    protected static void unsetAsContext();
}

Usage Example:

// Use collection environment for simple tests
CollectionTestEnvironment collectionEnv = new CollectionTestEnvironment();
collectionEnv.setAsContext();

// Jobs will execute locally using Java collections
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);
List<Integer> result = numbers.map(x -> x * 2).collect();

CollectionTestEnvironment.unsetAsContext();

Streaming Testing Environment

TestStreamEnvironment

StreamExecutionEnvironment implementation that executes streaming jobs on LocalFlinkMiniCluster.

public class TestStreamEnvironment extends StreamExecutionEnvironment {
    public TestStreamEnvironment(LocalFlinkMiniCluster miniCluster, int parallelism, 
                               Collection<Path> jarFiles, Collection<URL> classPaths);
    public TestStreamEnvironment(LocalFlinkMiniCluster miniCluster, int parallelism);
    
    public JobExecutionResult execute(String jobName) throws Exception;
    
    public static void setAsContext(LocalFlinkMiniCluster cluster, int parallelism, 
                                  Collection<Path> jarFiles, Collection<URL> classpaths);
    public static void setAsContext(LocalFlinkMiniCluster cluster, int parallelism);
    public static void unsetAsContext();
}

Usage Example:

// Create streaming test environment
LocalFlinkMiniCluster cluster = TestBaseUtils.startCluster(1, 4, false, false, true);
TestStreamEnvironment.setAsContext(cluster, 4);

// Now StreamExecutionEnvironment.getExecutionEnvironment() returns test environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = env.fromElements("hello", "world")
    .map(s -> s.toUpperCase());

stream.print();
JobExecutionResult result = env.execute("streaming test");

// Clean up
TestStreamEnvironment.unsetAsContext();
TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);

Environment Configuration

All test environments support configuration of:

  • Parallelism: Number of parallel slots for job execution
  • Object Reuse: Whether to reuse objects to reduce garbage collection (batch only)
  • JAR Files: Additional JAR files to include in the classpath
  • Class Paths: Additional URLs to include in the classpath
  • Cluster Configuration: LocalFlinkMiniCluster settings

Context Management

Test environments can be set as global contexts using static methods:

  • setAsContext() - Makes the test environment the default for ExecutionEnvironment.getExecutionEnvironment()
  • unsetAsContext() - Restores the previous execution environment

This allows existing code that calls ExecutionEnvironment.getExecutionEnvironment() to automatically use the test environment without modification.