or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cancellation-testing.mdcheckpointing-migration.mdfault-tolerance-recovery.mdindex.mdoperator-lifecycle.mdplugin-testing.mdruntime-utilities.mdsession-window-testing.mdstate-backend-restore.mdtest-data-utilities.md

cancellation-testing.mddocs/

0

# Cancellation Testing Framework

1

2

Framework for testing job cancellation scenarios and cleanup behavior. This framework enables validation of proper cancellation handling, resource cleanup, and graceful shutdown behavior in Flink jobs.

3

4

## Capabilities

5

6

### Canceling Test Base

7

8

Abstract base class providing framework for testing job cancellation scenarios with controlled timing and validation.

9

10

```java { .api }

11

/**

12

* Base class for testing job cancellation scenarios

13

*/

14

public abstract class CancelingTestBase {

15

16

/**

17

* Run job with controlled cancellation after specified time

18

* @param jobGraph JobGraph to execute and cancel

19

* @param cancelAfterMs milliseconds to wait before cancellation

20

* @throws Exception if job execution or cancellation fails

21

*/

22

protected void runAndCancelJob(JobGraph jobGraph, long cancelAfterMs) throws Exception;

23

24

/**

25

* Run job and cancel after processing specified number of elements

26

* @param jobGraph JobGraph to execute

27

* @param cancelAfterElements number of elements to process before cancellation

28

* @throws Exception if execution or cancellation fails

29

*/

30

protected void runAndCancelJobAfterElements(JobGraph jobGraph, int cancelAfterElements) throws Exception;

31

32

/**

33

* Run job with multiple cancellation attempts to test robustness

34

* @param jobGraph JobGraph to execute

35

* @param cancellationAttempts number of cancellation attempts

36

* @param intervalMs interval between cancellation attempts

37

* @throws Exception if execution fails

38

*/

39

protected void runJobWithMultipleCancellations(

40

JobGraph jobGraph,

41

int cancellationAttempts,

42

long intervalMs) throws Exception;

43

44

/**

45

* Validate that job was properly cancelled and resources cleaned up

46

* @param jobId identifier of cancelled job

47

* @return boolean indicating successful cancellation validation

48

*/

49

protected boolean validateJobCancellation(JobID jobId);

50

51

/**

52

* Create test job configured for cancellation testing

53

* @param sourceParallelism parallelism for source operators

54

* @param processingParallelism parallelism for processing operators

55

* @return JobGraph configured for cancellation testing

56

*/

57

protected JobGraph createCancellationTestJob(int sourceParallelism, int processingParallelism);

58

}

59

```

60

61

### Cancellation Test Sources

62

63

Specialized source functions designed for cancellation testing with controllable behavior and cancellation detection.

64

65

```java { .api }

66

/**

67

* Source function that can be gracefully cancelled for testing cancellation behavior

68

*/

69

public class CancellableSource implements SourceFunction<Integer> {

70

71

/**

72

* Constructor for cancellable source

73

* @param maxElements maximum elements to emit (or -1 for infinite)

74

* @param emissionIntervalMs interval between element emissions

75

*/

76

public CancellableSource(int maxElements, long emissionIntervalMs);

77

78

@Override

79

public void run(SourceContext<Integer> ctx) throws Exception;

80

81

@Override

82

public void cancel();

83

84

/**

85

* Check if source was cancelled gracefully

86

* @return boolean indicating graceful cancellation

87

*/

88

public boolean wasCancelledGracefully();

89

90

/**

91

* Get number of elements emitted before cancellation

92

* @return int count of emitted elements

93

*/

94

public int getElementsEmittedBeforeCancellation();

95

}

96

97

/**

98

* Source that triggers its own cancellation after specified conditions

99

*/

100

public class SelfCancellingSource implements SourceFunction<String> {

101

102

/**

103

* Constructor for self-cancelling source

104

* @param cancelAfterElements elements to emit before self-cancellation

105

* @param cancellationMessage message to emit upon cancellation

106

*/

107

public SelfCancellingSource(int cancelAfterElements, String cancellationMessage);

108

109

@Override

110

public void run(SourceContext<String> ctx) throws Exception;

111

112

@Override

113

public void cancel();

114

115

/**

116

* Check if source cancelled itself as expected

117

* @return boolean indicating expected self-cancellation

118

*/

119

public boolean didSelfCancel();

120

}

121

```

122

123

### Cancellation Test Operators

124

125

Map functions and operators designed to test cancellation behavior during processing.

126

127

```java { .api }

128

/**

129

* Map function that can detect and respond to cancellation signals

130

*/

131

public class CancellationAwareMapper implements MapFunction<Integer, Integer> {

132

133

/**

134

* Constructor for cancellation-aware mapper

135

* @param processingDelayMs delay per element to simulate processing time

136

*/

137

public CancellationAwareMapper(long processingDelayMs);

138

139

@Override

140

public Integer map(Integer value) throws Exception;

141

142

/**

143

* Check if mapper was interrupted during processing

144

* @return boolean indicating interruption during processing

145

*/

146

public boolean wasInterruptedDuringProcessing();

147

148

/**

149

* Get number of elements processed before cancellation

150

* @return int count of processed elements

151

*/

152

public int getElementsProcessedBeforeCancellation();

153

}

154

155

/**

156

* Map function that simulates long-running processing for cancellation testing

157

*/

158

public class LongRunningMapper implements MapFunction<String, String> {

159

160

/**

161

* Constructor for long-running mapper

162

* @param processingTimeMs time to spend processing each element

163

* @param checkCancellationInterval interval to check for cancellation

164

*/

165

public LongRunningMapper(long processingTimeMs, long checkCancellationInterval);

166

167

@Override

168

public String map(String value) throws Exception;

169

170

/**

171

* Check if processing was cancelled cleanly

172

* @return boolean indicating clean cancellation

173

*/

174

public boolean wasCancelledCleanly();

175

}

176

```

177

178

### Cancellation Test Sinks

179

180

Sink functions designed to validate cancellation behavior and resource cleanup.

181

182

```java { .api }

183

/**

184

* Sink that tracks cancellation behavior and resource cleanup

185

*/

186

public class CancellationTrackingSink<T> implements SinkFunction<T> {

187

188

/**

189

* Constructor for cancellation tracking sink

190

* @param expectedElements expected elements before cancellation

191

*/

192

public CancellationTrackingSink(int expectedElements);

193

194

@Override

195

public void invoke(T value, Context context) throws Exception;

196

197

/**

198

* Check if sink received cancellation signal

199

* @return boolean indicating cancellation signal received

200

*/

201

public boolean receivedCancellationSignal();

202

203

/**

204

* Get number of elements received before cancellation

205

* @return int count of received elements

206

*/

207

public int getElementsReceivedBeforeCancellation();

208

209

/**

210

* Validate that resources were properly cleaned up after cancellation

211

* @return boolean indicating proper resource cleanup

212

*/

213

public boolean validateResourceCleanup();

214

}

215

216

/**

217

* Sink that can block to test cancellation during blocking operations

218

*/

219

public class BlockingSink<T> implements SinkFunction<T> {

220

221

/**

222

* Constructor for blocking sink

223

* @param blockAfterElements elements to process before blocking

224

* @param blockDurationMs duration to block in milliseconds

225

*/

226

public BlockingSink(int blockAfterElements, long blockDurationMs);

227

228

@Override

229

public void invoke(T value, Context context) throws Exception;

230

231

/**

232

* Check if sink was cancelled while blocked

233

* @return boolean indicating cancellation during blocking

234

*/

235

public boolean wasCancelledWhileBlocked();

236

}

237

```

238

239

### Cancellation Utilities

240

241

Utility classes for common cancellation testing operations and validation.

242

243

```java { .api }

244

/**

245

* Utilities for cancellation testing scenarios

246

*/

247

public class CancellationTestUtils {

248

249

/**

250

* Create job graph configured for cancellation testing

251

* @param sourceCount number of source operators

252

* @param processingChainLength length of processing chain

253

* @param sinkCount number of sink operators

254

* @return JobGraph configured for cancellation testing

255

*/

256

public static JobGraph createCancellationTestJob(

257

int sourceCount,

258

int processingChainLength,

259

int sinkCount);

260

261

/**

262

* Execute job with timed cancellation

263

* @param jobGraph job to execute

264

* @param miniCluster cluster for execution

265

* @param cancelAfterMs time before cancellation

266

* @return CancellationResult containing cancellation details

267

* @throws Exception if execution or cancellation fails

268

*/

269

public static CancellationResult executeJobWithCancellation(

270

JobGraph jobGraph,

271

MiniCluster miniCluster,

272

long cancelAfterMs) throws Exception;

273

274

/**

275

* Validate cancellation behavior across all operators

276

* @param cancellationResult result from cancellation test

277

* @return boolean indicating proper cancellation behavior

278

*/

279

public static boolean validateCancellationBehavior(CancellationResult cancellationResult);

280

281

/**

282

* Monitor job cancellation progress

283

* @param jobId identifier of job being cancelled

284

* @param timeoutMs timeout for cancellation completion

285

* @return CancellationProgress containing progress details

286

*/

287

public static CancellationProgress monitorCancellationProgress(

288

JobID jobId,

289

long timeoutMs);

290

}

291

292

/**

293

* Result of job cancellation test

294

*/

295

public class CancellationResult {

296

297

/**

298

* Check if job was cancelled successfully

299

* @return boolean indicating successful cancellation

300

*/

301

public boolean wasCancelledSuccessfully();

302

303

/**

304

* Get time taken for cancellation to complete

305

* @return long cancellation duration in milliseconds

306

*/

307

public long getCancellationDurationMs();

308

309

/**

310

* Get number of operators that completed cancellation

311

* @return int count of operators with completed cancellation

312

*/

313

public int getOperatorsWithCompletedCancellation();

314

315

/**

316

* Get list of operators that failed to cancel properly

317

* @return List of operator IDs that failed cancellation

318

*/

319

public List<String> getOperatorsWithFailedCancellation();

320

321

/**

322

* Check if all resources were cleaned up after cancellation

323

* @return boolean indicating complete resource cleanup

324

*/

325

public boolean wereAllResourcesCleanedUp();

326

}

327

328

/**

329

* Progress tracking for job cancellation

330

*/

331

public class CancellationProgress {

332

333

/**

334

* Check if cancellation is complete

335

* @return boolean indicating cancellation completion

336

*/

337

public boolean isCancellationComplete();

338

339

/**

340

* Get percentage of cancellation completion

341

* @return double percentage (0.0 to 1.0) of completion

342

*/

343

public double getCancellationCompletionPercentage();

344

345

/**

346

* Get list of operators still processing cancellation

347

* @return List of operator IDs still cancelling

348

*/

349

public List<String> getOperatorsStillCancelling();

350

}

351

```

352

353

**Usage Examples:**

354

355

```java

356

import org.apache.flink.test.cancelling.CancelingTestBase;

357

358

// Basic cancellation test

359

public class JobCancellationTest extends CancelingTestBase {

360

361

@Test

362

public void testSimpleJobCancellation() throws Exception {

363

// Create test job

364

JobGraph job = createCancellationTestJob(1, 2);

365

366

// Test cancellation after 5 seconds

367

runAndCancelJob(job, 5000L);

368

369

// Validate cancellation

370

assertTrue(validateJobCancellation(job.getJobID()));

371

}

372

373

@Test

374

public void testCancellationAfterElementProcessing() throws Exception {

375

JobGraph job = new JobGraph();

376

377

// Add cancellable source

378

JobVertex source = new JobVertex("cancellable-source");

379

source.setInvokableClass(CancellableSource.class);

380

source.getConfiguration().setInteger("max-elements", -1); // infinite

381

source.getConfiguration().setLong("emission-interval", 100L);

382

source.setParallelism(1);

383

384

// Add processing chain

385

JobVertex mapper = new JobVertex("cancellation-aware-mapper");

386

mapper.setInvokableClass(CancellationAwareMapper.class);

387

mapper.getConfiguration().setLong("processing-delay", 50L);

388

mapper.setParallelism(2);

389

390

// Add tracking sink

391

JobVertex sink = new JobVertex("cancellation-tracking-sink");

392

sink.setInvokableClass(CancellationTrackingSink.class);

393

sink.getConfiguration().setInteger("expected-elements", 100);

394

sink.setParallelism(1);

395

396

// Connect vertices

397

mapper.connectNewDataSetAsInput(source, DistributionPattern.REBALANCE);

398

sink.connectNewDataSetAsInput(mapper, DistributionPattern.FORWARD);

399

400

job.addVertex(source);

401

job.addVertex(mapper);

402

job.addVertex(sink);

403

404

// Test cancellation after processing 100 elements

405

runAndCancelJobAfterElements(job, 100);

406

}

407

408

@Test

409

public void testMultipleCancellationAttempts() throws Exception {

410

JobGraph robustJob = CancellationTestUtils.createCancellationTestJob(2, 3, 1);

411

412

// Test multiple cancellation attempts

413

runJobWithMultipleCancellations(robustJob, 3, 1000L);

414

}

415

}

416

417

// Advanced cancellation scenarios

418

public class AdvancedCancellationTest extends CancelingTestBase {

419

420

@Test

421

public void testCancellationDuringLongProcessing() throws Exception {

422

JobGraph job = new JobGraph();

423

424

// Source with controlled emission

425

JobVertex source = new JobVertex("controlled-source");

426

source.setInvokableClass(CancellableSource.class);

427

source.getConfiguration().setInteger("max-elements", 1000);

428

source.getConfiguration().setLong("emission-interval", 10L);

429

source.setParallelism(1);

430

431

// Long-running mapper

432

JobVertex mapper = new JobVertex("long-running-mapper");

433

mapper.setInvokableClass(LongRunningMapper.class);

434

mapper.getConfiguration().setLong("processing-time", 1000L);

435

mapper.getConfiguration().setLong("check-interval", 100L);

436

mapper.setParallelism(1);

437

438

// Blocking sink

439

JobVertex sink = new JobVertex("blocking-sink");

440

sink.setInvokableClass(BlockingSink.class);

441

sink.getConfiguration().setInteger("block-after", 10);

442

sink.getConfiguration().setLong("block-duration", 5000L);

443

sink.setParallelism(1);

444

445

job.addVertex(source);

446

job.addVertex(mapper);

447

job.addVertex(sink);

448

449

// Connect and test cancellation during blocking

450

mapper.connectNewDataSetAsInput(source, DistributionPattern.FORWARD);

451

sink.connectNewDataSetAsInput(mapper, DistributionPattern.FORWARD);

452

453

MiniCluster miniCluster = new MiniCluster(createTestConfiguration());

454

miniCluster.start();

455

456

CancellationResult result = CancellationTestUtils.executeJobWithCancellation(

457

job, miniCluster, 2000L);

458

459

// Validate cancellation behavior

460

assertTrue(CancellationTestUtils.validateCancellationBehavior(result));

461

assertTrue(result.wasCancelledSuccessfully());

462

assertTrue(result.wereAllResourcesCleanedUp());

463

464

miniCluster.close();

465

}

466

467

@Test

468

public void testSelfCancellingJob() throws Exception {

469

JobGraph job = new JobGraph();

470

471

// Self-cancelling source

472

JobVertex source = new JobVertex("self-cancelling-source");

473

source.setInvokableClass(SelfCancellingSource.class);

474

source.getConfiguration().setInteger("cancel-after", 50);

475

source.getConfiguration().setString("cancellation-message", "Self-cancelled");

476

source.setParallelism(1);

477

478

JobVertex sink = new JobVertex("tracking-sink");

479

sink.setInvokableClass(CancellationTrackingSink.class);

480

sink.getConfiguration().setInteger("expected-elements", 50);

481

sink.setParallelism(1);

482

483

sink.connectNewDataSetAsInput(source, DistributionPattern.FORWARD);

484

485

job.addVertex(source);

486

job.addVertex(sink);

487

488

// Execute and let source cancel itself

489

MiniCluster miniCluster = new MiniCluster(createTestConfiguration());

490

miniCluster.start();

491

492

JobExecutionResult result = miniCluster.executeJobBlocking(job);

493

494

// Job should complete due to self-cancellation

495

assertNotNull(result);

496

497

miniCluster.close();

498

}

499

}

500

501

// Cancellation progress monitoring

502

public class CancellationMonitoringTest {

503

504

@Test

505

public void testCancellationProgressMonitoring() throws Exception {

506

JobGraph largeJob = CancellationTestUtils.createCancellationTestJob(5, 10, 3);

507

508

MiniCluster miniCluster = new MiniCluster(createTestConfiguration());

509

miniCluster.start();

510

511

// Start job execution

512

CompletableFuture<JobExecutionResult> executionFuture =

513

miniCluster.executeJobAsync(largeJob);

514

515

// Wait briefly then cancel

516

Thread.sleep(2000);

517

miniCluster.cancelJob(largeJob.getJobID());

518

519

// Monitor cancellation progress

520

CancellationProgress progress = CancellationTestUtils.monitorCancellationProgress(

521

largeJob.getJobID(), 30000L);

522

523

// Validate progress tracking

524

assertTrue(progress.isCancellationComplete());

525

assertEquals(1.0, progress.getCancellationCompletionPercentage(), 0.01);

526

assertTrue(progress.getOperatorsStillCancelling().isEmpty());

527

528

miniCluster.close();

529

}

530

}

531

```