or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdsecurity-testing.mdtest-base-classes.mdtest-data.mdtest-environments.mdtest-utilities.md
tile.json

test-base-classes.mddocs/

Test Base Classes

Test base classes provide standardized testing patterns, cluster lifecycle management, and support for parameterized testing across multiple execution modes. They handle the common setup and teardown required for Flink testing.

Core Base Classes

AbstractTestBase

Base class for tests that run test programs in a Flink mini cluster with automatic cluster lifecycle management.

public abstract class AbstractTestBase extends TestBaseUtils {
    @ClassRule
    public static final TemporaryFolder temporaryFolder;
    
    public AbstractTestBase(Configuration config);
    
    public void startCluster() throws Exception;
    public void stopCluster() throws Exception;
    
    public int getTaskManagerNumSlots();
    public void setTaskManagerNumSlots(int taskManagerNumSlots);
    public int getNumTaskManagers();
    public void setNumTaskManagers(int numTaskManagers);
    
    public String getTempDirPath(String dirName) throws IOException;
    public String getTempFilePath(String fileName) throws IOException;
    public String createTempFile(String fileName, String contents) throws IOException;
    public File createAndRegisterTempFile(String fileName) throws IOException;
}

Usage Example:

public class MyCustomTest extends AbstractTestBase {
    public MyCustomTest() {
        super(new Configuration());
        setNumTaskManagers(2);
        setTaskManagerNumSlots(4);
    }
    
    @Test
    public void testMyApplication() throws Exception {
        startCluster();
        
        // Create temp input file
        String inputPath = createTempFile("input.txt", "line1\nline2\nline3");
        
        // Your test logic here
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> input = env.readTextFile(inputPath);
        // ... test logic
        
        stopCluster();
    }
}

Batch Testing Base Classes

JavaProgramTestBase

Base class for Java API program tests supporting multiple execution modes (cluster, collection, object reuse).

public abstract class JavaProgramTestBase extends AbstractTestBase {
    public JavaProgramTestBase();
    public JavaProgramTestBase(Configuration config);
    
    public void setParallelism(int parallelism);
    public void setNumberOfTestRepetitions(int numberOfTestRepetitions);
    public int getParallelism();
    public JobExecutionResult getLatestExecutionResult();
    public boolean isCollectionExecution();
    
    protected abstract void testProgram() throws Exception;
    protected void preSubmit() throws Exception;
    protected void postSubmit() throws Exception;
    protected boolean skipCollectionExecution();
    
    @Test
    public void testJobWithObjectReuse() throws Exception;
    @Test
    public void testJobWithoutObjectReuse() throws Exception; 
    @Test
    public void testJobCollectionExecution() throws Exception;
}

Usage Example:

public class WordCountTest extends JavaProgramTestBase {
    
    @Override
    protected void testProgram() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        
        DataSet<String> text = env.fromElements(
            "hello world",
            "hello flink", 
            "world flink"
        );
        
        DataSet<Tuple2<String, Integer>> counts = text
            .flatMap(new Tokenizer())
            .groupBy(0)
            .sum(1);
            
        List<Tuple2<String, Integer>> result = counts.collect();
        
        String expected = "flink,2\nhello,2\nworld,2";
        compareResultAsTuples(result, expected);
    }
    
    @Override
    protected void preSubmit() throws Exception {
        // Setup before job execution
        setParallelism(4);
    }
    
    @Override
    protected boolean skipCollectionExecution() {
        // Skip collection execution for this test
        return false;
    }
}

MultipleProgramsTestBase

Base class for parameterized unit tests that run multiple tests and reuse the same Flink cluster across test methods.

public class MultipleProgramsTestBase extends TestBaseUtils {
    protected static final int DEFAULT_PARALLELISM = 4;
    protected static boolean startWebServer = false;
    protected static LocalFlinkMiniCluster cluster = null;
    
    public MultipleProgramsTestBase(TestExecutionMode mode);
    
    @Before
    public void setupEnvironment();
    @After  
    public void teardownEnvironment();
    @BeforeClass
    public static void setup() throws Exception;
    @AfterClass
    public static void teardown() throws Exception;
    
    @Parameterized.Parameters(name = "Execution mode = {0}")
    public static Collection<Object[]> executionModes();
}

public enum TestExecutionMode {
    CLUSTER,
    CLUSTER_OBJECT_REUSE,
    COLLECTION
}

Usage Example:

@RunWith(Parameterized.class)
public class ParameterizedTest extends MultipleProgramsTestBase {
    
    public ParameterizedTest(TestExecutionMode mode) {
        super(mode);
    }
    
    @Test
    public void testWordCount() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        
        // Test runs in the mode specified by constructor parameter
        DataSet<String> input = env.fromElements("hello", "world", "hello");
        List<String> result = input.distinct().collect();
        
        compareResultAsText(result, "hello\nworld");
    }
    
    @Test
    public void testFilter() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        
        DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);
        List<Integer> evenNumbers = numbers.filter(x -> x % 2 == 0).collect();
        
        compareResultAsText(evenNumbers, "2\n4");
    }
}

Streaming Testing Base Classes

StreamingProgramTestBase

Abstract base class for streaming program tests.

public abstract class StreamingProgramTestBase extends AbstractTestBase {
    protected static final int DEFAULT_PARALLELISM = 4;
    
    public StreamingProgramTestBase();
    
    public void setParallelism(int parallelism);
    public int getParallelism();
    
    protected abstract void testProgram() throws Exception;
    protected void preSubmit() throws Exception;
    protected void postSubmit() throws Exception;
    
    @Test
    public void testJob() throws Exception;
}

Usage Example:

public class StreamingWordCountTest extends StreamingProgramTestBase {
    
    @Override
    protected void testProgram() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(getParallelism());
        
        DataStream<String> text = env.fromElements(
            "hello world",
            "hello flink"
        );
        
        DataStream<Tuple2<String, Integer>> counts = text
            .flatMap(new Tokenizer())
            .keyBy(0)
            .sum(1);
            
        counts.print();
        env.execute("Streaming WordCount");
    }
    
    @Override
    protected void preSubmit() throws Exception {
        setParallelism(2);
    }
}

StreamingMultipleProgramsTestBase

Base class for streaming unit tests that run multiple tests and reuse the same Flink cluster.

public class StreamingMultipleProgramsTestBase extends AbstractTestBase {
    protected static final int DEFAULT_PARALLELISM = 4;
    protected static LocalFlinkMiniCluster cluster;
    protected static final Logger LOG;
    
    public StreamingMultipleProgramsTestBase();
    
    @BeforeClass
    public static void setup() throws Exception;
    @AfterClass
    public static void teardown() throws Exception;
}

Usage Example:

public class StreamingIntegrationTest extends StreamingMultipleProgramsTestBase {
    
    @Test
    public void testStreamingMap() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        DataStream<String> input = env.fromElements("a", "b", "c");
        input.map(String::toUpperCase).print();
        
        env.execute("Map Test");
    }
    
    @Test  
    public void testStreamingFilter() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);
        numbers.filter(x -> x > 3).print();
        
        env.execute("Filter Test");
    }
}

Common Patterns

Cluster Configuration

All base classes support configuration of cluster parameters:

public MyTest() {
    super(new Configuration());
    setNumTaskManagers(2);
    setTaskManagerNumSlots(8);
}

Temporary File Management

Use the provided temporary file utilities for test data:

String inputFile = createTempFile("test-input.txt", "test data content");
String outputDir = getTempDirPath("test-output");

Test Lifecycle Hooks

Override lifecycle methods for custom setup/teardown:

@Override
protected void preSubmit() throws Exception {
    // Setup before job execution
}

@Override  
protected void postSubmit() throws Exception {
    // Validation after job execution
}