Comprehensive test library for Apache Flink stream processing framework with integration tests, test utilities, and end-to-end validation tests.
—
Framework for testing job cancellation scenarios and cleanup behavior. This framework enables validation of proper cancellation handling, resource cleanup, and graceful shutdown behavior in Flink jobs.
Abstract base class providing framework for testing job cancellation scenarios with controlled timing and validation.
/**
* Base class for testing job cancellation scenarios
*/
public abstract class CancelingTestBase {
/**
* Run job with controlled cancellation after specified time
* @param jobGraph JobGraph to execute and cancel
* @param cancelAfterMs milliseconds to wait before cancellation
* @throws Exception if job execution or cancellation fails
*/
protected void runAndCancelJob(JobGraph jobGraph, long cancelAfterMs) throws Exception;
/**
* Run job and cancel after processing specified number of elements
* @param jobGraph JobGraph to execute
* @param cancelAfterElements number of elements to process before cancellation
* @throws Exception if execution or cancellation fails
*/
protected void runAndCancelJobAfterElements(JobGraph jobGraph, int cancelAfterElements) throws Exception;
/**
* Run job with multiple cancellation attempts to test robustness
* @param jobGraph JobGraph to execute
* @param cancellationAttempts number of cancellation attempts
* @param intervalMs interval between cancellation attempts
* @throws Exception if execution fails
*/
protected void runJobWithMultipleCancellations(
JobGraph jobGraph,
int cancellationAttempts,
long intervalMs) throws Exception;
/**
* Validate that job was properly cancelled and resources cleaned up
* @param jobId identifier of cancelled job
* @return boolean indicating successful cancellation validation
*/
protected boolean validateJobCancellation(JobID jobId);
/**
* Create test job configured for cancellation testing
* @param sourceParallelism parallelism for source operators
* @param processingParallelism parallelism for processing operators
* @return JobGraph configured for cancellation testing
*/
protected JobGraph createCancellationTestJob(int sourceParallelism, int processingParallelism);
}Specialized source functions designed for cancellation testing with controllable behavior and cancellation detection.
/**
* Source function that can be gracefully cancelled for testing cancellation behavior
*/
public class CancellableSource implements SourceFunction<Integer> {
/**
* Constructor for cancellable source
* @param maxElements maximum elements to emit (or -1 for infinite)
* @param emissionIntervalMs interval between element emissions
*/
public CancellableSource(int maxElements, long emissionIntervalMs);
@Override
public void run(SourceContext<Integer> ctx) throws Exception;
@Override
public void cancel();
/**
* Check if source was cancelled gracefully
* @return boolean indicating graceful cancellation
*/
public boolean wasCancelledGracefully();
/**
* Get number of elements emitted before cancellation
* @return int count of emitted elements
*/
public int getElementsEmittedBeforeCancellation();
}
/**
* Source that triggers its own cancellation after specified conditions
*/
public class SelfCancellingSource implements SourceFunction<String> {
/**
* Constructor for self-cancelling source
* @param cancelAfterElements elements to emit before self-cancellation
* @param cancellationMessage message to emit upon cancellation
*/
public SelfCancellingSource(int cancelAfterElements, String cancellationMessage);
@Override
public void run(SourceContext<String> ctx) throws Exception;
@Override
public void cancel();
/**
* Check if source cancelled itself as expected
* @return boolean indicating expected self-cancellation
*/
public boolean didSelfCancel();
}Map functions and operators designed to test cancellation behavior during processing.
/**
* Map function that can detect and respond to cancellation signals
*/
public class CancellationAwareMapper implements MapFunction<Integer, Integer> {
/**
* Constructor for cancellation-aware mapper
* @param processingDelayMs delay per element to simulate processing time
*/
public CancellationAwareMapper(long processingDelayMs);
@Override
public Integer map(Integer value) throws Exception;
/**
* Check if mapper was interrupted during processing
* @return boolean indicating interruption during processing
*/
public boolean wasInterruptedDuringProcessing();
/**
* Get number of elements processed before cancellation
* @return int count of processed elements
*/
public int getElementsProcessedBeforeCancellation();
}
/**
* Map function that simulates long-running processing for cancellation testing
*/
public class LongRunningMapper implements MapFunction<String, String> {
/**
* Constructor for long-running mapper
* @param processingTimeMs time to spend processing each element
* @param checkCancellationInterval interval to check for cancellation
*/
public LongRunningMapper(long processingTimeMs, long checkCancellationInterval);
@Override
public String map(String value) throws Exception;
/**
* Check if processing was cancelled cleanly
* @return boolean indicating clean cancellation
*/
public boolean wasCancelledCleanly();
}Sink functions designed to validate cancellation behavior and resource cleanup.
/**
* Sink that tracks cancellation behavior and resource cleanup
*/
public class CancellationTrackingSink<T> implements SinkFunction<T> {
/**
* Constructor for cancellation tracking sink
* @param expectedElements expected elements before cancellation
*/
public CancellationTrackingSink(int expectedElements);
@Override
public void invoke(T value, Context context) throws Exception;
/**
* Check if sink received cancellation signal
* @return boolean indicating cancellation signal received
*/
public boolean receivedCancellationSignal();
/**
* Get number of elements received before cancellation
* @return int count of received elements
*/
public int getElementsReceivedBeforeCancellation();
/**
* Validate that resources were properly cleaned up after cancellation
* @return boolean indicating proper resource cleanup
*/
public boolean validateResourceCleanup();
}
/**
* Sink that can block to test cancellation during blocking operations
*/
public class BlockingSink<T> implements SinkFunction<T> {
/**
* Constructor for blocking sink
* @param blockAfterElements elements to process before blocking
* @param blockDurationMs duration to block in milliseconds
*/
public BlockingSink(int blockAfterElements, long blockDurationMs);
@Override
public void invoke(T value, Context context) throws Exception;
/**
* Check if sink was cancelled while blocked
* @return boolean indicating cancellation during blocking
*/
public boolean wasCancelledWhileBlocked();
}Utility classes for common cancellation testing operations and validation.
/**
* Utilities for cancellation testing scenarios
*/
public class CancellationTestUtils {
/**
* Create job graph configured for cancellation testing
* @param sourceCount number of source operators
* @param processingChainLength length of processing chain
* @param sinkCount number of sink operators
* @return JobGraph configured for cancellation testing
*/
public static JobGraph createCancellationTestJob(
int sourceCount,
int processingChainLength,
int sinkCount);
/**
* Execute job with timed cancellation
* @param jobGraph job to execute
* @param miniCluster cluster for execution
* @param cancelAfterMs time before cancellation
* @return CancellationResult containing cancellation details
* @throws Exception if execution or cancellation fails
*/
public static CancellationResult executeJobWithCancellation(
JobGraph jobGraph,
MiniCluster miniCluster,
long cancelAfterMs) throws Exception;
/**
* Validate cancellation behavior across all operators
* @param cancellationResult result from cancellation test
* @return boolean indicating proper cancellation behavior
*/
public static boolean validateCancellationBehavior(CancellationResult cancellationResult);
/**
* Monitor job cancellation progress
* @param jobId identifier of job being cancelled
* @param timeoutMs timeout for cancellation completion
* @return CancellationProgress containing progress details
*/
public static CancellationProgress monitorCancellationProgress(
JobID jobId,
long timeoutMs);
}
/**
* Result of job cancellation test
*/
public class CancellationResult {
/**
* Check if job was cancelled successfully
* @return boolean indicating successful cancellation
*/
public boolean wasCancelledSuccessfully();
/**
* Get time taken for cancellation to complete
* @return long cancellation duration in milliseconds
*/
public long getCancellationDurationMs();
/**
* Get number of operators that completed cancellation
* @return int count of operators with completed cancellation
*/
public int getOperatorsWithCompletedCancellation();
/**
* Get list of operators that failed to cancel properly
* @return List of operator IDs that failed cancellation
*/
public List<String> getOperatorsWithFailedCancellation();
/**
* Check if all resources were cleaned up after cancellation
* @return boolean indicating complete resource cleanup
*/
public boolean wereAllResourcesCleanedUp();
}
/**
* Progress tracking for job cancellation
*/
public class CancellationProgress {
/**
* Check if cancellation is complete
* @return boolean indicating cancellation completion
*/
public boolean isCancellationComplete();
/**
* Get percentage of cancellation completion
* @return double percentage (0.0 to 1.0) of completion
*/
public double getCancellationCompletionPercentage();
/**
* Get list of operators still processing cancellation
* @return List of operator IDs still cancelling
*/
public List<String> getOperatorsStillCancelling();
}Usage Examples:
import org.apache.flink.test.cancelling.CancelingTestBase;
// Basic cancellation test
public class JobCancellationTest extends CancelingTestBase {
@Test
public void testSimpleJobCancellation() throws Exception {
// Create test job
JobGraph job = createCancellationTestJob(1, 2);
// Test cancellation after 5 seconds
runAndCancelJob(job, 5000L);
// Validate cancellation
assertTrue(validateJobCancellation(job.getJobID()));
}
@Test
public void testCancellationAfterElementProcessing() throws Exception {
JobGraph job = new JobGraph();
// Add cancellable source
JobVertex source = new JobVertex("cancellable-source");
source.setInvokableClass(CancellableSource.class);
source.getConfiguration().setInteger("max-elements", -1); // infinite
source.getConfiguration().setLong("emission-interval", 100L);
source.setParallelism(1);
// Add processing chain
JobVertex mapper = new JobVertex("cancellation-aware-mapper");
mapper.setInvokableClass(CancellationAwareMapper.class);
mapper.getConfiguration().setLong("processing-delay", 50L);
mapper.setParallelism(2);
// Add tracking sink
JobVertex sink = new JobVertex("cancellation-tracking-sink");
sink.setInvokableClass(CancellationTrackingSink.class);
sink.getConfiguration().setInteger("expected-elements", 100);
sink.setParallelism(1);
// Connect vertices
mapper.connectNewDataSetAsInput(source, DistributionPattern.REBALANCE);
sink.connectNewDataSetAsInput(mapper, DistributionPattern.FORWARD);
job.addVertex(source);
job.addVertex(mapper);
job.addVertex(sink);
// Test cancellation after processing 100 elements
runAndCancelJobAfterElements(job, 100);
}
@Test
public void testMultipleCancellationAttempts() throws Exception {
JobGraph robustJob = CancellationTestUtils.createCancellationTestJob(2, 3, 1);
// Test multiple cancellation attempts
runJobWithMultipleCancellations(robustJob, 3, 1000L);
}
}
// Advanced cancellation scenarios
public class AdvancedCancellationTest extends CancelingTestBase {
@Test
public void testCancellationDuringLongProcessing() throws Exception {
JobGraph job = new JobGraph();
// Source with controlled emission
JobVertex source = new JobVertex("controlled-source");
source.setInvokableClass(CancellableSource.class);
source.getConfiguration().setInteger("max-elements", 1000);
source.getConfiguration().setLong("emission-interval", 10L);
source.setParallelism(1);
// Long-running mapper
JobVertex mapper = new JobVertex("long-running-mapper");
mapper.setInvokableClass(LongRunningMapper.class);
mapper.getConfiguration().setLong("processing-time", 1000L);
mapper.getConfiguration().setLong("check-interval", 100L);
mapper.setParallelism(1);
// Blocking sink
JobVertex sink = new JobVertex("blocking-sink");
sink.setInvokableClass(BlockingSink.class);
sink.getConfiguration().setInteger("block-after", 10);
sink.getConfiguration().setLong("block-duration", 5000L);
sink.setParallelism(1);
job.addVertex(source);
job.addVertex(mapper);
job.addVertex(sink);
// Connect and test cancellation during blocking
mapper.connectNewDataSetAsInput(source, DistributionPattern.FORWARD);
sink.connectNewDataSetAsInput(mapper, DistributionPattern.FORWARD);
MiniCluster miniCluster = new MiniCluster(createTestConfiguration());
miniCluster.start();
CancellationResult result = CancellationTestUtils.executeJobWithCancellation(
job, miniCluster, 2000L);
// Validate cancellation behavior
assertTrue(CancellationTestUtils.validateCancellationBehavior(result));
assertTrue(result.wasCancelledSuccessfully());
assertTrue(result.wereAllResourcesCleanedUp());
miniCluster.close();
}
@Test
public void testSelfCancellingJob() throws Exception {
JobGraph job = new JobGraph();
// Self-cancelling source
JobVertex source = new JobVertex("self-cancelling-source");
source.setInvokableClass(SelfCancellingSource.class);
source.getConfiguration().setInteger("cancel-after", 50);
source.getConfiguration().setString("cancellation-message", "Self-cancelled");
source.setParallelism(1);
JobVertex sink = new JobVertex("tracking-sink");
sink.setInvokableClass(CancellationTrackingSink.class);
sink.getConfiguration().setInteger("expected-elements", 50);
sink.setParallelism(1);
sink.connectNewDataSetAsInput(source, DistributionPattern.FORWARD);
job.addVertex(source);
job.addVertex(sink);
// Execute and let source cancel itself
MiniCluster miniCluster = new MiniCluster(createTestConfiguration());
miniCluster.start();
JobExecutionResult result = miniCluster.executeJobBlocking(job);
// Job should complete due to self-cancellation
assertNotNull(result);
miniCluster.close();
}
}
// Cancellation progress monitoring
public class CancellationMonitoringTest {
@Test
public void testCancellationProgressMonitoring() throws Exception {
JobGraph largeJob = CancellationTestUtils.createCancellationTestJob(5, 10, 3);
MiniCluster miniCluster = new MiniCluster(createTestConfiguration());
miniCluster.start();
// Start job execution
CompletableFuture<JobExecutionResult> executionFuture =
miniCluster.executeJobAsync(largeJob);
// Wait briefly then cancel
Thread.sleep(2000);
miniCluster.cancelJob(largeJob.getJobID());
// Monitor cancellation progress
CancellationProgress progress = CancellationTestUtils.monitorCancellationProgress(
largeJob.getJobID(), 30000L);
// Validate progress tracking
assertTrue(progress.isCancellationComplete());
assertEquals(1.0, progress.getCancellationCompletionPercentage(), 0.01);
assertTrue(progress.getOperatorsStillCancelling().isEmpty());
miniCluster.close();
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-tests