Base classes and utilities for testing job cancellation, task failure recovery, and fault tolerance mechanisms. Provides controlled failure injection and recovery validation for comprehensive resilience testing.
Abstract base class providing infrastructure for testing job cancellation scenarios with cluster management and execution control.
public abstract class CancelingTestBase {
protected LocalFlinkMiniCluster cluster;
protected Configuration config;
@Before
public void setup() throws Exception;
@After
public void cleanup() throws Exception;
// Cluster lifecycle management
protected void startCluster() throws Exception;
protected void stopCluster() throws Exception;
// Job execution and cancellation
protected void runAndCancelJob(JobGraph jobGraph, int msecsTillCanceling) throws Exception;
protected void runAndCancelJob(JobGraph jobGraph, int msecsTillCanceling, boolean waitForCancel) throws Exception;
// Abstract methods for test implementation
protected abstract void testProgram(ExecutionEnvironment env);
protected abstract JobGraph getJobGraph() throws Exception;
}Abstract base class for testing task failure recovery scenarios with multi-attempt execution and failure injection.
public abstract class SimpleRecoveryITCaseBase {
protected LocalFlinkMiniCluster cluster;
protected Configuration config;
protected int parallelism;
@Before
public void setup() throws Exception;
@After
public void cleanup() throws Exception;
// Recovery testing workflow
protected void execute() throws Exception;
protected void preSubmit() throws Exception;
protected void postSubmit() throws Exception;
// Abstract test program definition
protected abstract void testProgram(ExecutionEnvironment env);
}Extended base class providing additional recovery testing capabilities with configurable parallelism and failure scenarios.
public abstract class RecoveryITCaseBase extends SimpleRecoveryITCaseBase {
protected int numTaskManagers;
protected int slotsPerTaskManager;
public RecoveryITCaseBase();
public RecoveryITCaseBase(Configuration config, int parallelism);
// Extended setup with custom configuration
protected void setupCluster(Configuration config, int numTaskManagers, int slotsPerTaskManager) throws Exception;
// Failure injection utilities
protected void injectTaskFailure(JobID jobId, int taskIndex) throws Exception;
protected void waitForRecovery(JobID jobId) throws Exception;
}Input format that generates infinite data streams for cancellation testing.
public class CancelableInfiniteInputFormat extends GenericInputFormat<Integer> {
private volatile boolean canceled;
public CancelableInfiniteInputFormat();
@Override
public boolean reachedEnd();
@Override
public Integer nextRecord(Integer reuse);
@Override
public void cancel();
}Input format with controllable deserialization delays for timeout and cancellation testing.
public class SlowlyDeserializingInputFormat extends GenericInputFormat<Integer> {
private long deserializationDelay;
private int elementsToReturn;
public SlowlyDeserializingInputFormat(long deserializationDelay, int elementsToReturn);
@Override
public boolean reachedEnd();
@Override
public Integer nextRecord(Integer reuse);
}MapFunction that intentionally fails after processing a specified number of elements.
public class FailingMapper<T> implements MapFunction<T, T> {
private int failAfterElements;
private static volatile int processedElements;
public FailingMapper(int failAfterElements);
@Override
public T map(T value) throws Exception;
public static void reset();
}Base class for functions that track failure and recovery across restart attempts.
public abstract class RecoveringFunction {
protected static volatile int attemptNumber;
protected static volatile boolean hasFailed;
protected void trackAttempt();
protected boolean shouldFail();
protected void simulateFailure() throws Exception;
public static void reset();
public static int getAttemptNumber();
}public class MyCancellationTest extends CancelingTestBase {
@Test
public void testJobCancellation() throws Exception {
JobGraph jobGraph = getJobGraph();
// Run job and cancel after 5 seconds
runAndCancelJob(jobGraph, 5000);
// Verify clean cancellation
assertTrue("Job should be cancelled", jobWasCancelled);
}
@Override
protected void testProgram(ExecutionEnvironment env) {
// Create a long-running job that can be cancelled
env.createInput(new CancelableInfiniteInputFormat())
.map(x -> x * 2)
.map(new SlowProcessingMapper()) // Adds processing delay
.writeAsText("/tmp/cancellation-test-output");
}
@Override
protected JobGraph getJobGraph() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
testProgram(env);
return env.createProgramPlan().getJobGraph();
}
}public class MyRecoveryTest extends SimpleRecoveryITCaseBase {
@Test
public void testTaskFailureRecovery() throws Exception {
FailingMapper.reset(); // Reset failure counter
execute(); // Run test with recovery
}
@Override
protected void testProgram(ExecutionEnvironment env) {
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000));
env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.map(new FailingMapper<>(5)) // Fails after 5 elements
.map(new RecoveringMapper()) // Handles recovery
.writeAsText("/tmp/recovery-test-output");
}
@Override
protected void postSubmit() throws Exception {
// Verify that recovery occurred
assertTrue("Should have failed at least once", FailingMapper.hasFailed());
assertTrue("Should have recovered", RecoveringMapper.hasRecovered());
// Verify output correctness
verifyOutputFile("/tmp/recovery-test-output");
}
}public class StatefulRecoveringMapper extends RecoveringFunction implements MapFunction<Integer, Integer> {
private ValueState<Integer> counterState;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Integer> descriptor =
new ValueStateDescriptor<>("counter", Integer.class);
counterState = getRuntimeContext().getState(descriptor);
}
@Override
public Integer map(Integer value) throws Exception {
trackAttempt();
Integer count = counterState.value();
if (count == null) count = 0;
count++;
counterState.update(count);
// Fail on first attempt after processing 3 elements
if (getAttemptNumber() == 1 && count == 3) {
simulateFailure();
}
return value * count;
}
}public class TimeoutCancellationTest extends CancelingTestBase {
@Test
public void testCancellationWithTimeout() throws Exception {
JobGraph jobGraph = createSlowJobGraph();
long startTime = System.currentTimeMillis();
// Cancel job after 3 seconds, wait for cancellation
runAndCancelJob(jobGraph, 3000, true);
long duration = System.currentTimeMillis() - startTime;
// Verify cancellation happened within reasonable time
assertTrue("Cancellation took too long", duration < 10000);
}
private JobGraph createSlowJobGraph() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.createInput(new SlowlyDeserializingInputFormat(1000, 100)) // 1 sec per element
.map(x -> {
Thread.sleep(500); // Additional processing delay
return x;
})
.writeAsText("/tmp/slow-job-output");
return env.createProgramPlan().getJobGraph();
}
}public class ComplexRecoveryTest extends RecoveryITCaseBase {
public ComplexRecoveryTest() {
super(new Configuration(), 4); // 4 parallel instances
}
@Test
public void testMultiStageRecovery() throws Exception {
// Configure multiple restart attempts
config.setString(ConfigConstants.RESTART_STRATEGY, "fixed-delay");
config.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 5);
config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "1s");
execute();
}
@Override
protected void testProgram(ExecutionEnvironment env) {
env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.map(new FailingMapper<>(3)) // Fail after 3 elements
.filter(new RecoveringFilter()) // Filter with recovery logic
.map(new ValidatingMapper()) // Validate state consistency
.collect(); // Force execution
}
@Override
protected void postSubmit() throws Exception {
// Verify multiple recovery attempts occurred
assertTrue("Should have multiple attempts", getAttemptNumber() > 1);
// Verify final state consistency
validateFinalResults();
}
}