or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

class-loading-programs.mdindex.mdmigration-testing.mdperformance-testing.mdstate-management-testing.mdstreaming-utilities.mdtest-base-classes.mdtest-data-generation.md
tile.json

test-base-classes.mddocs/

Test Base Classes

Abstract base classes providing common patterns and infrastructure for different testing scenarios including cancellation testing, fault tolerance testing, and recovery testing. These base classes standardize test setup, execution patterns, and verification procedures across different types of Flink tests.

Capabilities

Stream Fault Tolerance Test Base

Abstract base class for testing streaming applications under fault tolerance conditions.

/**
 * Abstract base class for fault tolerance testing of streaming applications
 */
public abstract class StreamFaultToleranceTestBase extends TestLogger {
    
    // Test cluster configuration constants
    public static final int NUM_TASK_MANAGERS = 2;
    public static final int NUM_TASK_SLOTS = 8;
    public static final int PARALLELISM = 4;
    
    /**
     * Define the streaming topology to be tested under fault conditions
     * @param env Pre-configured StreamExecutionEnvironment
     */
    public abstract void testProgram(StreamExecutionEnvironment env);
    
    /**
     * Verify test results after job completion
     * Called after successful job execution to validate results
     * @throws Exception if verification fails
     */
    public abstract void postSubmit() throws Exception;
}

Usage Example:

public class MyFaultToleranceTest extends StreamFaultToleranceTestBase {
    
    private List<String> collectedResults = new ArrayList<>();
    
    @Override
    public void testProgram(StreamExecutionEnvironment env) {
        // Configure environment for fault tolerance
        env.setParallelism(PARALLELISM);
        env.enableCheckpointing(100);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000));
        
        // Build fault-tolerant topology
        env.addSource(new FaultTolerantSource())
           .keyBy(value -> value.getKey())
           .process(new StatefulProcessFunction())
           .addSink(new CollectingSink(collectedResults));
    }
    
    @Override
    public void postSubmit() throws Exception {
        // Verify results after fault tolerance test
        assertEquals(expectedResultCount, collectedResults.size());
        assertTrue("Missing expected results", collectedResults.containsAll(expectedResults));
        
        // Verify no duplicates after recovery
        Set<String> uniqueResults = new HashSet<>(collectedResults);
        assertEquals("Duplicate results detected", collectedResults.size(), uniqueResults.size());
    }
    
    @Test
    public void testFaultTolerance() throws Exception {
        // Test execution handled by base class infrastructure
        runTest();
    }
}

Canceling Test Base

Abstract base class for testing job cancellation scenarios and cleanup behavior.

/**
 * Abstract base class for testing job cancellation and cleanup
 */
public abstract class CancelingTestBase extends TestLogger {
    
    // Test execution constants  
    protected static final int PARALLELISM = 4;
    protected static final Duration GET_FUTURE_TIMEOUT = Duration.ofSeconds(30);
    
    /**
     * Run a job and cancel it after specified time
     * @param plan Job execution plan to run and cancel
     * @param msecsTillCanceling Milliseconds to wait before canceling
     * @param maxTimeTillCanceled Maximum time to wait for cancellation completion
     * @throws Exception if job execution or cancellation fails
     */
    protected void runAndCancelJob(
        Plan plan, 
        int msecsTillCanceling, 
        int maxTimeTillCanceled) throws Exception;
}

Usage Example:

public class MyCancellationTest extends CancelingTestBase {
    
    @Test
    public void testJobCancellation() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(PARALLELISM);
        
        // Create long-running job that can be cancelled
        DataSet<String> input = env.fromElements("data");
        input.map(new SlowMapper())  // Mapper that takes time to process
             .output(new DiscardingOutputFormat<>());
        
        Plan plan = env.createProgramPlan();
        
        // Cancel job after 2 seconds, allow up to 10 seconds for cancellation
        runAndCancelJob(plan, 2000, 10000);
        
        // Test passes if job is successfully cancelled within time limit
    }
}

Recovery Test Base Classes

Base classes for testing different recovery strategies and failure scenarios.

/**
 * Base class for simple recovery testing scenarios
 */
public abstract class SimpleRecoveryITCaseBase extends TestLogger {
    
    /**
     * Execute recovery test with default configuration
     * @throws Exception if recovery test fails
     */
    protected abstract void executeRecoveryTest() throws Exception;
}

/**
 * Base class for testing fixed delay restart strategy
 */
public abstract class SimpleRecoveryFixedDelayRestartStrategyITBase extends SimpleRecoveryITCaseBase {
    
    /**
     * Get restart strategy configuration for fixed delay
     * @return RestartStrategy configured for fixed delay
     */
    protected RestartStrategies.RestartStrategyConfiguration getRestartStrategy();
}

/**
 * Base class for testing failure rate restart strategy  
 */
public abstract class SimpleRecoveryFailureRateStrategyITBase extends SimpleRecoveryITCaseBase {
    
    /**
     * Get restart strategy configuration for failure rate limiting
     * @return RestartStrategy configured for failure rate limiting
     */
    protected RestartStrategies.RestartStrategyConfiguration getRestartStrategy();
}

Prefix Count POJO

Common data type used in fault tolerance and recovery testing.

/**
 * POJO for prefix counting tests in fault tolerance scenarios
 */
public static class PrefixCount {
    public String str;
    public long count;
    
    /**
     * Default constructor
     */
    public PrefixCount();
    
    /**
     * Constructor with string and count
     * @param str String prefix
     * @param count Count value
     */
    public PrefixCount(String str, long count);
    
    /**
     * Check equality based on str and count fields
     * @param obj Object to compare
     * @return true if equal
     */
    public boolean equals(Object obj);
    
    /**
     * Generate hash code based on str and count
     * @return hash code
     */
    public int hashCode();
    
    /**
     * String representation
     * @return formatted string
     */
    public String toString();
}

Test Base Usage Patterns

Common patterns for implementing tests using base classes:

Fault Tolerance Test Pattern:

public class StreamingJobFaultToleranceTest extends StreamFaultToleranceTestBase {
    
    private TestListResultSink<PrefixCount> resultSink;
    
    @Override
    public void testProgram(StreamExecutionEnvironment env) {
        // Configure for fault tolerance
        env.setParallelism(PARALLELISM);
        env.enableCheckpointing(50);
        env.setStateBackend(new RocksDBStateBackend("file:///tmp/test"));
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000));
        
        resultSink = new TestListResultSink<>();
        
        // Create fault-tolerant streaming topology
        env.addSource(new FailingSource(1000, 500, 3))  // Fail after 500 elements
           .keyBy(value -> value.f0 % 4)
           .process(new CountingProcessFunction())
           .addSink(resultSink);
    }
    
    @Override
    public void postSubmit() throws Exception {
        List<PrefixCount> results = resultSink.getResult();
        
        // Verify all expected results are present after recovery
        assertEquals(4, results.size());  // One per key
        
        long totalCount = results.stream().mapToLong(pc -> pc.count).sum();
        assertEquals(1000, totalCount);  // All elements processed exactly once
    }
}

Cancellation Test Pattern:

public class LongRunningJobCancellationTest extends CancelingTestBase {
    
    @Test
    public void testIterativeJobCancellation() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(PARALLELISM);
        
        // Create iterative job that runs indefinitely
        DataSet<Long> initial = env.fromElements(1L);
        IterativeDataSet<Long> iteration = initial.iterate(Integer.MAX_VALUE);
        
        DataSet<Long> next = iteration.map(value -> value + 1);
        DataSet<Long> result = iteration.closeWith(next);
        
        result.output(new DiscardingOutputFormat<>());
        
        Plan plan = env.createProgramPlan();
        
        // Cancel after 3 seconds, allow up to 15 seconds for cancellation  
        runAndCancelJob(plan, 3000, 15000);
    }
}

Recovery Strategy Test Pattern:

public class FixedDelayRecoveryTest extends SimpleRecoveryFixedDelayRestartStrategyITBase {
    
    @Override
    protected void executeRecoveryTest() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        env.setRestartStrategy(getRestartStrategy());
        env.enableCheckpointing(100);
        
        TestListResultSink<String> sink = new TestListResultSink<>();
        
        // Source that fails twice then succeeds
        env.addSource(new RecoveringSource(3, 100))
           .map(new StatelessMapper())
           .addSink(sink);
        
        env.execute("Recovery Test");
        
        // Verify successful recovery
        List<String> results = sink.getResult();
        assertEquals(100, results.size());
        assertTrue("Recovery failed", results.stream().allMatch(Objects::nonNull));
    }
    
    @Override
    protected RestartStrategies.RestartStrategyConfiguration getRestartStrategy() {
        // Fixed delay of 1 second, maximum 3 restart attempts
        return RestartStrategies.fixedDelayRestart(3, 1000);
    }
}

Multi-Phase Test Pattern:

public class MultiPhaseRecoveryTest extends SimpleRecoveryITCaseBase {
    
    @Override
    protected void executeRecoveryTest() throws Exception {
        // Phase 1: Generate savepoint
        String savepointPath = runJobAndCreateSavepoint();
        
        // Phase 2: Restore and verify  
        restoreJobAndVerifyResults(savepointPath);
        
        // Phase 3: Test failure recovery
        testFailureRecoveryFromSavepoint(savepointPath);
    }
    
    private String runJobAndCreateSavepoint() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Implementation for savepoint generation
        return savepointPath;
    }
    
    private void restoreJobAndVerifyResults(String savepointPath) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Implementation for restoration and verification
    }
    
    private void testFailureRecoveryFromSavepoint(String savepointPath) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Implementation for failure recovery testing
    }
}

These test base classes provide standardized infrastructure for comprehensive testing of Flink applications under various failure and recovery scenarios, ensuring robust and reliable stream processing applications.