Comprehensive test library for Apache Flink stream processing framework with integration tests, test utilities, and end-to-end validation tests.
—
Multiple failure injection mechanisms and recovery testing utilities for validating Flink's fault tolerance capabilities. This framework enables testing of job recovery scenarios, restart strategies, and failure handling behavior.
Abstract base class for testing job recovery scenarios with configurable failure injection and restart validation.
/**
* Base class for testing job recovery scenarios
*/
public abstract class SimpleRecoveryITCaseBase {
/**
* Run job with controlled cancellation for recovery testing
* @param jobGraph JobGraph to execute and cancel
* @throws Exception if job execution or cancellation fails
*/
protected void runAndCancelJob(JobGraph jobGraph) throws Exception;
/**
* Failing mapper function that fails after processing a specified number of elements
*/
public static class FailingMapper1 implements MapFunction<Integer, Integer> {
/**
* Constructor for failing mapper
* @param failAfterElements number of elements to process before failing
*/
public FailingMapper1(int failAfterElements);
@Override
public Integer map(Integer value) throws Exception;
}
/**
* Alternative failing mapper with different failure patterns
*/
public static class FailingMapper2 implements MapFunction<Integer, Integer> {
/**
* Constructor for alternative failing mapper
* @param failAfterElements number of elements before failure
*/
public FailingMapper2(int failAfterElements);
@Override
public Integer map(Integer value) throws Exception;
}
/**
* Third variant of failing mapper for complex failure scenarios
*/
public static class FailingMapper3 implements MapFunction<Integer, Integer> {
/**
* Constructor for third failing mapper variant
* @param failAfterElements number of elements before failure
*/
public FailingMapper3(int failAfterElements);
@Override
public Integer map(Integer value) throws Exception;
}
}Comprehensive parameterized base class for testing stream processing fault tolerance with checkpointing, failure injection, and recovery validation.
/**
* Parameterized base class for comprehensive fault tolerance testing in streaming scenarios
*/
@RunWith(Parameterized.class)
public abstract class StreamFaultToleranceTestBase {
/** Default parallelism for fault tolerance tests */
public static final int PARALLELISM = 12;
/** Number of task managers for test cluster */
public static final int NUM_TASK_MANAGERS = 3;
/** Number of task slots per task manager */
public static final int NUM_TASK_SLOTS = 4;
/**
* Enumeration of available failover strategies for testing
*/
public enum FailoverStrategy {
RestartAllFailoverStrategy,
RestartPipelinedRegionFailoverStrategy
}
/**
* POJO for counting prefixed values in fault tolerance tests
*/
public static class PrefixCount {
/** Prefix string */
public String prefix;
/** Integer value */
public Integer value;
/** Count of occurrences */
public Long count;
/**
* Default constructor for PrefixCount
*/
public PrefixCount();
/**
* Constructor with field initialization
* @param prefix prefix string
* @param value integer value
* @param count occurrence count
*/
public PrefixCount(String prefix, Integer value, Long count);
}
/**
* Current failover strategy being tested
*/
protected final FailoverStrategy failoverStrategy;
/**
* Constructor for parameterized test
* @param failoverStrategy failover strategy to test
*/
public StreamFaultToleranceTestBase(FailoverStrategy failoverStrategy);
/**
* Abstract method to define the test program topology
* @param env StreamExecutionEnvironment for building the job
* @return DataStream representing the final result
* @throws Exception if program construction fails
*/
public abstract DataStream<PrefixCount> testProgram(StreamExecutionEnvironment env) throws Exception;
/**
* Abstract method for post-submission actions and validation
* @throws Exception if post-submission actions fail
*/
public abstract void postSubmit() throws Exception;
/**
* Run the complete checkpointed program with fault injection
* @throws Exception if test execution fails
*/
public void runCheckpointedProgram() throws Exception;
/**
* Get parameters for parameterized testing of different failover strategies
* @return Collection of failover strategy parameters
*/
@Parameterized.Parameters(name = "Failover strategy: {0}")
public static Collection<FailoverStrategy[]> parameters();
/**
* Create test environment configuration with fault tolerance settings
* @return Configuration for test environment
*/
protected Configuration createTestConfiguration();
/**
* Trigger failure in the running job for fault tolerance testing
* @param jobId JobID of the running job
* @throws Exception if failure triggering fails
*/
protected void triggerFailure(JobID jobId) throws Exception;
/**
* Wait for job to reach running state
* @param jobId JobID to monitor
* @param timeout timeout in milliseconds
* @throws Exception if job doesn't reach running state within timeout
*/
protected void waitForJobRunning(JobID jobId, long timeout) throws Exception;
/**
* Validate checkpointing behavior
* @param jobId JobID to validate
* @return boolean indicating if checkpointing is working correctly
* @throws Exception if validation fails
*/
protected boolean validateCheckpointing(JobID jobId) throws Exception;
}Abstract base classes for testing different restart strategies with Flink's fault tolerance mechanisms.
/**
* Base class for testing fixed delay restart strategy
*/
public abstract class SimpleRecoveryFixedDelayRestartStrategyITBase
extends SimpleRecoveryITCaseBase {
/**
* Test recovery with fixed delay between restart attempts
* @param delayMs delay in milliseconds between restarts
* @param maxAttempts maximum number of restart attempts
* @throws Exception if test execution fails
*/
protected void testFixedDelayRestart(long delayMs, int maxAttempts) throws Exception;
}
/**
* Base class for testing exponential delay restart strategy
*/
public abstract class SimpleRecoveryExponentialDelayRestartStrategyITBase
extends SimpleRecoveryITCaseBase {
/**
* Test recovery with exponential backoff delay
* @param initialDelayMs initial delay in milliseconds
* @param maxDelayMs maximum delay in milliseconds
* @param backoffMultiplier multiplier for exponential backoff
* @throws Exception if test execution fails
*/
protected void testExponentialDelayRestart(
long initialDelayMs,
long maxDelayMs,
double backoffMultiplier) throws Exception;
}
/**
* Base class for testing failure rate restart strategy
*/
public abstract class SimpleRecoveryFailureRateStrategyITBase
extends SimpleRecoveryITCaseBase {
/**
* Test recovery based on failure rate thresholds
* @param maxFailuresPerInterval maximum failures allowed per time interval
* @param failureRateIntervalMs time interval for failure rate calculation
* @param delayMs delay between restart attempts
* @throws Exception if test execution fails
*/
protected void testFailureRateRestart(
int maxFailuresPerInterval,
long failureRateIntervalMs,
long delayMs) throws Exception;
}Utility classes providing common recovery testing functionality and helper methods.
/**
* Utility class for recovery testing scenarios
*/
public class RecoveryTestUtils {
/**
* Create job graph with configurable failure injection
* @param sourceParallelism parallelism for source operator
* @param mapParallelism parallelism for map operator
* @param failAfterElements elements to process before failure
* @return JobGraph configured for recovery testing
*/
public static JobGraph createJobWithFailure(
int sourceParallelism,
int mapParallelism,
int failAfterElements);
/**
* Validate job recovery metrics and behavior
* @param jobExecutionResult result from job execution
* @param expectedRestarts expected number of restarts
* @return boolean indicating if recovery behavior is correct
*/
public static boolean validateRecoveryBehavior(
JobExecutionResult jobExecutionResult,
int expectedRestarts);
}Usage Examples:
import org.apache.flink.test.recovery.SimpleRecoveryITCaseBase;
import org.apache.flink.test.recovery.SimpleRecoveryFixedDelayRestartStrategyITBase;
import org.apache.flink.test.recovery.utils.RecoveryTestUtils;
// Basic recovery test
public class JobRecoveryTest extends SimpleRecoveryITCaseBase {
@Test
public void testSimpleJobRecovery() throws Exception {
// Create job with failing mapper
JobGraph job = new JobGraph();
// Add source
JobVertex source = new JobVertex("source");
source.setInvokableClass(NumberSequenceSource.class);
source.setParallelism(1);
job.addVertex(source);
// Add failing mapper
JobVertex mapper = new JobVertex("mapper");
mapper.setInvokableClass(FailingMapper1.class);
mapper.getConfiguration().setInteger("fail-after", 50);
mapper.setParallelism(2);
job.addVertex(mapper);
// Connect vertices
mapper.connectNewDataSetAsInput(source, DistributionPattern.REBALANCE);
// Test recovery
runAndCancelJob(job);
}
@Test
public void testMultipleFailureRecovery() throws Exception {
JobGraph job = RecoveryTestUtils.createJobWithFailure(1, 2, 30);
// Configure restart strategy
job.getJobConfiguration().setString(
"restart-strategy", "fixed-delay");
job.getJobConfiguration().setString(
"restart-strategy.fixed-delay.attempts", "3");
job.getJobConfiguration().setString(
"restart-strategy.fixed-delay.delay", "1s");
JobExecutionResult result = runJobWithExpectedFailures(job);
// Validate recovery behavior
assertTrue(RecoveryTestUtils.validateRecoveryBehavior(result, 2));
}
}
// Fixed delay restart strategy test
public class FixedDelayRecoveryTest extends SimpleRecoveryFixedDelayRestartStrategyITBase {
@Test
public void testFixedDelayStrategy() throws Exception {
// Test with 2 second delay, maximum 3 attempts
testFixedDelayRestart(2000L, 3);
}
@Test
public void testFixedDelayWithQuickRecovery() throws Exception {
// Test with 500ms delay for quick recovery scenarios
testFixedDelayRestart(500L, 5);
}
}
// Comprehensive recovery testing
public class ComprehensiveRecoveryTest extends SimpleRecoveryITCaseBase {
@Test
public void testCascadingFailures() throws Exception {
JobGraph job = new JobGraph();
// Chain multiple failing mappers
JobVertex source = createSourceVertex();
JobVertex mapper1 = createMapperVertex(new FailingMapper1(20));
JobVertex mapper2 = createMapperVertex(new FailingMapper2(40));
JobVertex mapper3 = createMapperVertex(new FailingMapper3(60));
JobVertex sink = createSinkVertex();
// Connect in chain
mapper1.connectNewDataSetAsInput(source, DistributionPattern.FORWARD);
mapper2.connectNewDataSetAsInput(mapper1, DistributionPattern.FORWARD);
mapper3.connectNewDataSetAsInput(mapper2, DistributionPattern.FORWARD);
sink.connectNewDataSetAsInput(mapper3, DistributionPattern.FORWARD);
job.addVertex(source);
job.addVertex(mapper1);
job.addVertex(mapper2);
job.addVertex(mapper3);
job.addVertex(sink);
// Test complex recovery scenario
runAndCancelJob(job);
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-tests