0
# Cancellation Testing Framework
1
2
Framework for testing job cancellation scenarios and cleanup behavior. This framework enables validation of proper cancellation handling, resource cleanup, and graceful shutdown behavior in Flink jobs.
3
4
## Capabilities
5
6
### Canceling Test Base
7
8
Abstract base class providing framework for testing job cancellation scenarios with controlled timing and validation.
9
10
```java { .api }
11
/**
12
* Base class for testing job cancellation scenarios
13
*/
14
public abstract class CancelingTestBase {
15
16
/**
17
* Run job with controlled cancellation after specified time
18
* @param jobGraph JobGraph to execute and cancel
19
* @param cancelAfterMs milliseconds to wait before cancellation
20
* @throws Exception if job execution or cancellation fails
21
*/
22
protected void runAndCancelJob(JobGraph jobGraph, long cancelAfterMs) throws Exception;
23
24
/**
25
* Run job and cancel after processing specified number of elements
26
* @param jobGraph JobGraph to execute
27
* @param cancelAfterElements number of elements to process before cancellation
28
* @throws Exception if execution or cancellation fails
29
*/
30
protected void runAndCancelJobAfterElements(JobGraph jobGraph, int cancelAfterElements) throws Exception;
31
32
/**
33
* Run job with multiple cancellation attempts to test robustness
34
* @param jobGraph JobGraph to execute
35
* @param cancellationAttempts number of cancellation attempts
36
* @param intervalMs interval between cancellation attempts
37
* @throws Exception if execution fails
38
*/
39
protected void runJobWithMultipleCancellations(
40
JobGraph jobGraph,
41
int cancellationAttempts,
42
long intervalMs) throws Exception;
43
44
/**
45
* Validate that job was properly cancelled and resources cleaned up
46
* @param jobId identifier of cancelled job
47
* @return boolean indicating successful cancellation validation
48
*/
49
protected boolean validateJobCancellation(JobID jobId);
50
51
/**
52
* Create test job configured for cancellation testing
53
* @param sourceParallelism parallelism for source operators
54
* @param processingParallelism parallelism for processing operators
55
* @return JobGraph configured for cancellation testing
56
*/
57
protected JobGraph createCancellationTestJob(int sourceParallelism, int processingParallelism);
58
}
59
```
60
61
### Cancellation Test Sources
62
63
Specialized source functions designed for cancellation testing with controllable behavior and cancellation detection.
64
65
```java { .api }
66
/**
67
* Source function that can be gracefully cancelled for testing cancellation behavior
68
*/
69
public class CancellableSource implements SourceFunction<Integer> {
70
71
/**
72
* Constructor for cancellable source
73
* @param maxElements maximum elements to emit (or -1 for infinite)
74
* @param emissionIntervalMs interval between element emissions
75
*/
76
public CancellableSource(int maxElements, long emissionIntervalMs);
77
78
@Override
79
public void run(SourceContext<Integer> ctx) throws Exception;
80
81
@Override
82
public void cancel();
83
84
/**
85
* Check if source was cancelled gracefully
86
* @return boolean indicating graceful cancellation
87
*/
88
public boolean wasCancelledGracefully();
89
90
/**
91
* Get number of elements emitted before cancellation
92
* @return int count of emitted elements
93
*/
94
public int getElementsEmittedBeforeCancellation();
95
}
96
97
/**
98
* Source that triggers its own cancellation after specified conditions
99
*/
100
public class SelfCancellingSource implements SourceFunction<String> {
101
102
/**
103
* Constructor for self-cancelling source
104
* @param cancelAfterElements elements to emit before self-cancellation
105
* @param cancellationMessage message to emit upon cancellation
106
*/
107
public SelfCancellingSource(int cancelAfterElements, String cancellationMessage);
108
109
@Override
110
public void run(SourceContext<String> ctx) throws Exception;
111
112
@Override
113
public void cancel();
114
115
/**
116
* Check if source cancelled itself as expected
117
* @return boolean indicating expected self-cancellation
118
*/
119
public boolean didSelfCancel();
120
}
121
```
122
123
### Cancellation Test Operators
124
125
Map functions and operators designed to test cancellation behavior during processing.
126
127
```java { .api }
128
/**
129
* Map function that can detect and respond to cancellation signals
130
*/
131
public class CancellationAwareMapper implements MapFunction<Integer, Integer> {
132
133
/**
134
* Constructor for cancellation-aware mapper
135
* @param processingDelayMs delay per element to simulate processing time
136
*/
137
public CancellationAwareMapper(long processingDelayMs);
138
139
@Override
140
public Integer map(Integer value) throws Exception;
141
142
/**
143
* Check if mapper was interrupted during processing
144
* @return boolean indicating interruption during processing
145
*/
146
public boolean wasInterruptedDuringProcessing();
147
148
/**
149
* Get number of elements processed before cancellation
150
* @return int count of processed elements
151
*/
152
public int getElementsProcessedBeforeCancellation();
153
}
154
155
/**
156
* Map function that simulates long-running processing for cancellation testing
157
*/
158
public class LongRunningMapper implements MapFunction<String, String> {
159
160
/**
161
* Constructor for long-running mapper
162
* @param processingTimeMs time to spend processing each element
163
* @param checkCancellationInterval interval to check for cancellation
164
*/
165
public LongRunningMapper(long processingTimeMs, long checkCancellationInterval);
166
167
@Override
168
public String map(String value) throws Exception;
169
170
/**
171
* Check if processing was cancelled cleanly
172
* @return boolean indicating clean cancellation
173
*/
174
public boolean wasCancelledCleanly();
175
}
176
```
177
178
### Cancellation Test Sinks
179
180
Sink functions designed to validate cancellation behavior and resource cleanup.
181
182
```java { .api }
183
/**
184
* Sink that tracks cancellation behavior and resource cleanup
185
*/
186
public class CancellationTrackingSink<T> implements SinkFunction<T> {
187
188
/**
189
* Constructor for cancellation tracking sink
190
* @param expectedElements expected elements before cancellation
191
*/
192
public CancellationTrackingSink(int expectedElements);
193
194
@Override
195
public void invoke(T value, Context context) throws Exception;
196
197
/**
198
* Check if sink received cancellation signal
199
* @return boolean indicating cancellation signal received
200
*/
201
public boolean receivedCancellationSignal();
202
203
/**
204
* Get number of elements received before cancellation
205
* @return int count of received elements
206
*/
207
public int getElementsReceivedBeforeCancellation();
208
209
/**
210
* Validate that resources were properly cleaned up after cancellation
211
* @return boolean indicating proper resource cleanup
212
*/
213
public boolean validateResourceCleanup();
214
}
215
216
/**
217
* Sink that can block to test cancellation during blocking operations
218
*/
219
public class BlockingSink<T> implements SinkFunction<T> {
220
221
/**
222
* Constructor for blocking sink
223
* @param blockAfterElements elements to process before blocking
224
* @param blockDurationMs duration to block in milliseconds
225
*/
226
public BlockingSink(int blockAfterElements, long blockDurationMs);
227
228
@Override
229
public void invoke(T value, Context context) throws Exception;
230
231
/**
232
* Check if sink was cancelled while blocked
233
* @return boolean indicating cancellation during blocking
234
*/
235
public boolean wasCancelledWhileBlocked();
236
}
237
```
238
239
### Cancellation Utilities
240
241
Utility classes for common cancellation testing operations and validation.
242
243
```java { .api }
244
/**
245
* Utilities for cancellation testing scenarios
246
*/
247
public class CancellationTestUtils {
248
249
/**
250
* Create job graph configured for cancellation testing
251
* @param sourceCount number of source operators
252
* @param processingChainLength length of processing chain
253
* @param sinkCount number of sink operators
254
* @return JobGraph configured for cancellation testing
255
*/
256
public static JobGraph createCancellationTestJob(
257
int sourceCount,
258
int processingChainLength,
259
int sinkCount);
260
261
/**
262
* Execute job with timed cancellation
263
* @param jobGraph job to execute
264
* @param miniCluster cluster for execution
265
* @param cancelAfterMs time before cancellation
266
* @return CancellationResult containing cancellation details
267
* @throws Exception if execution or cancellation fails
268
*/
269
public static CancellationResult executeJobWithCancellation(
270
JobGraph jobGraph,
271
MiniCluster miniCluster,
272
long cancelAfterMs) throws Exception;
273
274
/**
275
* Validate cancellation behavior across all operators
276
* @param cancellationResult result from cancellation test
277
* @return boolean indicating proper cancellation behavior
278
*/
279
public static boolean validateCancellationBehavior(CancellationResult cancellationResult);
280
281
/**
282
* Monitor job cancellation progress
283
* @param jobId identifier of job being cancelled
284
* @param timeoutMs timeout for cancellation completion
285
* @return CancellationProgress containing progress details
286
*/
287
public static CancellationProgress monitorCancellationProgress(
288
JobID jobId,
289
long timeoutMs);
290
}
291
292
/**
293
* Result of job cancellation test
294
*/
295
public class CancellationResult {
296
297
/**
298
* Check if job was cancelled successfully
299
* @return boolean indicating successful cancellation
300
*/
301
public boolean wasCancelledSuccessfully();
302
303
/**
304
* Get time taken for cancellation to complete
305
* @return long cancellation duration in milliseconds
306
*/
307
public long getCancellationDurationMs();
308
309
/**
310
* Get number of operators that completed cancellation
311
* @return int count of operators with completed cancellation
312
*/
313
public int getOperatorsWithCompletedCancellation();
314
315
/**
316
* Get list of operators that failed to cancel properly
317
* @return List of operator IDs that failed cancellation
318
*/
319
public List<String> getOperatorsWithFailedCancellation();
320
321
/**
322
* Check if all resources were cleaned up after cancellation
323
* @return boolean indicating complete resource cleanup
324
*/
325
public boolean wereAllResourcesCleanedUp();
326
}
327
328
/**
329
* Progress tracking for job cancellation
330
*/
331
public class CancellationProgress {
332
333
/**
334
* Check if cancellation is complete
335
* @return boolean indicating cancellation completion
336
*/
337
public boolean isCancellationComplete();
338
339
/**
340
* Get percentage of cancellation completion
341
* @return double percentage (0.0 to 1.0) of completion
342
*/
343
public double getCancellationCompletionPercentage();
344
345
/**
346
* Get list of operators still processing cancellation
347
* @return List of operator IDs still cancelling
348
*/
349
public List<String> getOperatorsStillCancelling();
350
}
351
```
352
353
**Usage Examples:**
354
355
```java
356
import org.apache.flink.test.cancelling.CancelingTestBase;
357
358
// Basic cancellation test
359
public class JobCancellationTest extends CancelingTestBase {
360
361
@Test
362
public void testSimpleJobCancellation() throws Exception {
363
// Create test job
364
JobGraph job = createCancellationTestJob(1, 2);
365
366
// Test cancellation after 5 seconds
367
runAndCancelJob(job, 5000L);
368
369
// Validate cancellation
370
assertTrue(validateJobCancellation(job.getJobID()));
371
}
372
373
@Test
374
public void testCancellationAfterElementProcessing() throws Exception {
375
JobGraph job = new JobGraph();
376
377
// Add cancellable source
378
JobVertex source = new JobVertex("cancellable-source");
379
source.setInvokableClass(CancellableSource.class);
380
source.getConfiguration().setInteger("max-elements", -1); // infinite
381
source.getConfiguration().setLong("emission-interval", 100L);
382
source.setParallelism(1);
383
384
// Add processing chain
385
JobVertex mapper = new JobVertex("cancellation-aware-mapper");
386
mapper.setInvokableClass(CancellationAwareMapper.class);
387
mapper.getConfiguration().setLong("processing-delay", 50L);
388
mapper.setParallelism(2);
389
390
// Add tracking sink
391
JobVertex sink = new JobVertex("cancellation-tracking-sink");
392
sink.setInvokableClass(CancellationTrackingSink.class);
393
sink.getConfiguration().setInteger("expected-elements", 100);
394
sink.setParallelism(1);
395
396
// Connect vertices
397
mapper.connectNewDataSetAsInput(source, DistributionPattern.REBALANCE);
398
sink.connectNewDataSetAsInput(mapper, DistributionPattern.FORWARD);
399
400
job.addVertex(source);
401
job.addVertex(mapper);
402
job.addVertex(sink);
403
404
// Test cancellation after processing 100 elements
405
runAndCancelJobAfterElements(job, 100);
406
}
407
408
@Test
409
public void testMultipleCancellationAttempts() throws Exception {
410
JobGraph robustJob = CancellationTestUtils.createCancellationTestJob(2, 3, 1);
411
412
// Test multiple cancellation attempts
413
runJobWithMultipleCancellations(robustJob, 3, 1000L);
414
}
415
}
416
417
// Advanced cancellation scenarios
418
public class AdvancedCancellationTest extends CancelingTestBase {
419
420
@Test
421
public void testCancellationDuringLongProcessing() throws Exception {
422
JobGraph job = new JobGraph();
423
424
// Source with controlled emission
425
JobVertex source = new JobVertex("controlled-source");
426
source.setInvokableClass(CancellableSource.class);
427
source.getConfiguration().setInteger("max-elements", 1000);
428
source.getConfiguration().setLong("emission-interval", 10L);
429
source.setParallelism(1);
430
431
// Long-running mapper
432
JobVertex mapper = new JobVertex("long-running-mapper");
433
mapper.setInvokableClass(LongRunningMapper.class);
434
mapper.getConfiguration().setLong("processing-time", 1000L);
435
mapper.getConfiguration().setLong("check-interval", 100L);
436
mapper.setParallelism(1);
437
438
// Blocking sink
439
JobVertex sink = new JobVertex("blocking-sink");
440
sink.setInvokableClass(BlockingSink.class);
441
sink.getConfiguration().setInteger("block-after", 10);
442
sink.getConfiguration().setLong("block-duration", 5000L);
443
sink.setParallelism(1);
444
445
job.addVertex(source);
446
job.addVertex(mapper);
447
job.addVertex(sink);
448
449
// Connect and test cancellation during blocking
450
mapper.connectNewDataSetAsInput(source, DistributionPattern.FORWARD);
451
sink.connectNewDataSetAsInput(mapper, DistributionPattern.FORWARD);
452
453
MiniCluster miniCluster = new MiniCluster(createTestConfiguration());
454
miniCluster.start();
455
456
CancellationResult result = CancellationTestUtils.executeJobWithCancellation(
457
job, miniCluster, 2000L);
458
459
// Validate cancellation behavior
460
assertTrue(CancellationTestUtils.validateCancellationBehavior(result));
461
assertTrue(result.wasCancelledSuccessfully());
462
assertTrue(result.wereAllResourcesCleanedUp());
463
464
miniCluster.close();
465
}
466
467
@Test
468
public void testSelfCancellingJob() throws Exception {
469
JobGraph job = new JobGraph();
470
471
// Self-cancelling source
472
JobVertex source = new JobVertex("self-cancelling-source");
473
source.setInvokableClass(SelfCancellingSource.class);
474
source.getConfiguration().setInteger("cancel-after", 50);
475
source.getConfiguration().setString("cancellation-message", "Self-cancelled");
476
source.setParallelism(1);
477
478
JobVertex sink = new JobVertex("tracking-sink");
479
sink.setInvokableClass(CancellationTrackingSink.class);
480
sink.getConfiguration().setInteger("expected-elements", 50);
481
sink.setParallelism(1);
482
483
sink.connectNewDataSetAsInput(source, DistributionPattern.FORWARD);
484
485
job.addVertex(source);
486
job.addVertex(sink);
487
488
// Execute and let source cancel itself
489
MiniCluster miniCluster = new MiniCluster(createTestConfiguration());
490
miniCluster.start();
491
492
JobExecutionResult result = miniCluster.executeJobBlocking(job);
493
494
// Job should complete due to self-cancellation
495
assertNotNull(result);
496
497
miniCluster.close();
498
}
499
}
500
501
// Cancellation progress monitoring
502
public class CancellationMonitoringTest {
503
504
@Test
505
public void testCancellationProgressMonitoring() throws Exception {
506
JobGraph largeJob = CancellationTestUtils.createCancellationTestJob(5, 10, 3);
507
508
MiniCluster miniCluster = new MiniCluster(createTestConfiguration());
509
miniCluster.start();
510
511
// Start job execution
512
CompletableFuture<JobExecutionResult> executionFuture =
513
miniCluster.executeJobAsync(largeJob);
514
515
// Wait briefly then cancel
516
Thread.sleep(2000);
517
miniCluster.cancelJob(largeJob.getJobID());
518
519
// Monitor cancellation progress
520
CancellationProgress progress = CancellationTestUtils.monitorCancellationProgress(
521
largeJob.getJobID(), 30000L);
522
523
// Validate progress tracking
524
assertTrue(progress.isCancellationComplete());
525
assertEquals(1.0, progress.getCancellationCompletionPercentage(), 0.01);
526
assertTrue(progress.getOperatorsStillCancelling().isEmpty());
527
528
miniCluster.close();
529
}
530
}
531
```