or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

api-completeness.mdcheckpointing.mddata-generation.mdexecution.mdfault-tolerance.mdindex.mdstreaming.md
tile.json

fault-tolerance.mddocs/

Fault Tolerance

Base classes and utilities for testing job cancellation, task failure recovery, and fault tolerance mechanisms. Provides controlled failure injection and recovery validation for comprehensive resilience testing.

Core Base Classes

CancelingTestBase

Abstract base class providing infrastructure for testing job cancellation scenarios with cluster management and execution control.

public abstract class CancelingTestBase {
    protected LocalFlinkMiniCluster cluster;
    protected Configuration config;
    
    @Before
    public void setup() throws Exception;
    
    @After
    public void cleanup() throws Exception;
    
    // Cluster lifecycle management
    protected void startCluster() throws Exception;
    protected void stopCluster() throws Exception;
    
    // Job execution and cancellation
    protected void runAndCancelJob(JobGraph jobGraph, int msecsTillCanceling) throws Exception;
    protected void runAndCancelJob(JobGraph jobGraph, int msecsTillCanceling, boolean waitForCancel) throws Exception;
    
    // Abstract methods for test implementation
    protected abstract void testProgram(ExecutionEnvironment env);
    protected abstract JobGraph getJobGraph() throws Exception;
}

SimpleRecoveryITCaseBase

Abstract base class for testing task failure recovery scenarios with multi-attempt execution and failure injection.

public abstract class SimpleRecoveryITCaseBase {
    protected LocalFlinkMiniCluster cluster;
    protected Configuration config;
    protected int parallelism;
    
    @Before
    public void setup() throws Exception;
    
    @After  
    public void cleanup() throws Exception;
    
    // Recovery testing workflow
    protected void execute() throws Exception;
    protected void preSubmit() throws Exception;
    protected void postSubmit() throws Exception;
    
    // Abstract test program definition
    protected abstract void testProgram(ExecutionEnvironment env);
}

Recovery Execution Environment

RecoveryITCaseBase

Extended base class providing additional recovery testing capabilities with configurable parallelism and failure scenarios.

public abstract class RecoveryITCaseBase extends SimpleRecoveryITCaseBase {
    protected int numTaskManagers;
    protected int slotsPerTaskManager;
    
    public RecoveryITCaseBase();
    public RecoveryITCaseBase(Configuration config, int parallelism);
    
    // Extended setup with custom configuration
    protected void setupCluster(Configuration config, int numTaskManagers, int slotsPerTaskManager) throws Exception;
    
    // Failure injection utilities
    protected void injectTaskFailure(JobID jobId, int taskIndex) throws Exception;
    protected void waitForRecovery(JobID jobId) throws Exception;
}

Cancellation Testing Utilities

CancelableInfiniteInputFormat

Input format that generates infinite data streams for cancellation testing.

public class CancelableInfiniteInputFormat extends GenericInputFormat<Integer> {
    private volatile boolean canceled;
    
    public CancelableInfiniteInputFormat();
    
    @Override
    public boolean reachedEnd();
    
    @Override
    public Integer nextRecord(Integer reuse);
    
    @Override
    public void cancel();
}

SlowlyDeserializingInputFormat

Input format with controllable deserialization delays for timeout and cancellation testing.

public class SlowlyDeserializingInputFormat extends GenericInputFormat<Integer> {
    private long deserializationDelay;
    private int elementsToReturn;
    
    public SlowlyDeserializingInputFormat(long deserializationDelay, int elementsToReturn);
    
    @Override
    public boolean reachedEnd();
    
    @Override
    public Integer nextRecord(Integer reuse);
}

Recovery Testing Functions

FailingMapper

MapFunction that intentionally fails after processing a specified number of elements.

public class FailingMapper<T> implements MapFunction<T, T> {
    private int failAfterElements;
    private static volatile int processedElements;
    
    public FailingMapper(int failAfterElements);
    
    @Override
    public T map(T value) throws Exception;
    
    public static void reset();
}

RecoveringFunction

Base class for functions that track failure and recovery across restart attempts.

public abstract class RecoveringFunction {
    protected static volatile int attemptNumber;
    protected static volatile boolean hasFailed;
    
    protected void trackAttempt();
    protected boolean shouldFail();
    protected void simulateFailure() throws Exception;
    
    public static void reset();
    public static int getAttemptNumber();
}

Usage Examples

Job Cancellation Testing

public class MyCancellationTest extends CancelingTestBase {
    
    @Test
    public void testJobCancellation() throws Exception {
        JobGraph jobGraph = getJobGraph();
        
        // Run job and cancel after 5 seconds
        runAndCancelJob(jobGraph, 5000);
        
        // Verify clean cancellation
        assertTrue("Job should be cancelled", jobWasCancelled);
    }
    
    @Override
    protected void testProgram(ExecutionEnvironment env) {
        // Create a long-running job that can be cancelled
        env.createInput(new CancelableInfiniteInputFormat())
           .map(x -> x * 2)
           .map(new SlowProcessingMapper()) // Adds processing delay
           .writeAsText("/tmp/cancellation-test-output");
    }
    
    @Override
    protected JobGraph getJobGraph() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        testProgram(env);
        return env.createProgramPlan().getJobGraph();
    }
}

Recovery Testing with Failure Injection

public class MyRecoveryTest extends SimpleRecoveryITCaseBase {
    
    @Test
    public void testTaskFailureRecovery() throws Exception {
        FailingMapper.reset(); // Reset failure counter
        execute(); // Run test with recovery
    }
    
    @Override
    protected void testProgram(ExecutionEnvironment env) {
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000));
        
        env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
           .map(new FailingMapper<>(5)) // Fails after 5 elements
           .map(new RecoveringMapper()) // Handles recovery
           .writeAsText("/tmp/recovery-test-output");
    }
    
    @Override
    protected void postSubmit() throws Exception {
        // Verify that recovery occurred
        assertTrue("Should have failed at least once", FailingMapper.hasFailed());
        assertTrue("Should have recovered", RecoveringMapper.hasRecovered());
        
        // Verify output correctness
        verifyOutputFile("/tmp/recovery-test-output");
    }
}

Custom Recovery Function Implementation

public class StatefulRecoveringMapper extends RecoveringFunction implements MapFunction<Integer, Integer> {
    private ValueState<Integer> counterState;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<Integer> descriptor = 
            new ValueStateDescriptor<>("counter", Integer.class);
        counterState = getRuntimeContext().getState(descriptor);
    }
    
    @Override
    public Integer map(Integer value) throws Exception {
        trackAttempt();
        
        Integer count = counterState.value();
        if (count == null) count = 0;
        
        count++;
        counterState.update(count);
        
        // Fail on first attempt after processing 3 elements
        if (getAttemptNumber() == 1 && count == 3) {
            simulateFailure();
        }
        
        return value * count;
    }
}

Advanced Cancellation with Timeout

public class TimeoutCancellationTest extends CancelingTestBase {
    
    @Test
    public void testCancellationWithTimeout() throws Exception {
        JobGraph jobGraph = createSlowJobGraph();
        
        long startTime = System.currentTimeMillis();
        
        // Cancel job after 3 seconds, wait for cancellation
        runAndCancelJob(jobGraph, 3000, true);
        
        long duration = System.currentTimeMillis() - startTime;
        
        // Verify cancellation happened within reasonable time
        assertTrue("Cancellation took too long", duration < 10000);
    }
    
    private JobGraph createSlowJobGraph() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        
        env.createInput(new SlowlyDeserializingInputFormat(1000, 100)) // 1 sec per element
           .map(x -> {
               Thread.sleep(500); // Additional processing delay
               return x;
           })
           .writeAsText("/tmp/slow-job-output");
           
        return env.createProgramPlan().getJobGraph();
    }
}

Multi-Stage Recovery Testing

public class ComplexRecoveryTest extends RecoveryITCaseBase {
    
    public ComplexRecoveryTest() {
        super(new Configuration(), 4); // 4 parallel instances
    }
    
    @Test
    public void testMultiStageRecovery() throws Exception {
        // Configure multiple restart attempts
        config.setString(ConfigConstants.RESTART_STRATEGY, "fixed-delay");
        config.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 5);
        config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "1s");
        
        execute();
    }
    
    @Override
    protected void testProgram(ExecutionEnvironment env) {
        env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
           .map(new FailingMapper<>(3)) // Fail after 3 elements
           .filter(new RecoveringFilter()) // Filter with recovery logic
           .map(new ValidatingMapper()) // Validate state consistency
           .collect(); // Force execution
    }
    
    @Override
    protected void postSubmit() throws Exception {
        // Verify multiple recovery attempts occurred
        assertTrue("Should have multiple attempts", getAttemptNumber() > 1);
        
        // Verify final state consistency
        validateFinalResults();
    }
}