or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cancellation-testing.mdcheckpointing-migration.mdfault-tolerance-recovery.mdindex.mdoperator-lifecycle.mdplugin-testing.mdruntime-utilities.mdsession-window-testing.mdstate-backend-restore.mdtest-data-utilities.md

operator-lifecycle.mddocs/

0

# Operator Lifecycle Testing

1

2

Complete framework for testing streaming operator behavior including startup, checkpointing, finishing, and shutdown phases. This framework provides comprehensive validation of operator lifecycle events and state transitions.

3

4

## Capabilities

5

6

### Test Job Builders

7

8

Factory classes for creating test jobs with different topologies and complexity levels for operator lifecycle testing.

9

10

```java { .api }

11

/**

12

* Factory for creating test jobs with different topologies

13

*/

14

public class TestJobBuilders {

15

16

/**

17

* Simple test graph factory for basic operator lifecycle testing

18

*/

19

public static final TestJobBuilder SIMPLE_GRAPH_BUILDER;

20

21

/**

22

* Complex multi-operator test graph factory for advanced lifecycle scenarios

23

*/

24

public static final TestJobBuilder COMPLEX_GRAPH_BUILDER;

25

26

/**

27

* Interface for test job builders

28

*/

29

public interface TestJobBuilder {

30

/**

31

* Build job graph with specified configuration

32

* @param config test configuration parameters

33

* @return JobGraph configured for lifecycle testing

34

*/

35

JobGraph build(TestConfiguration config);

36

}

37

}

38

```

39

40

### Test Stream Operators

41

42

Specialized streaming operators designed for lifecycle testing with event tracking and command processing capabilities.

43

44

```java { .api }

45

/**

46

* Single input test operator for lifecycle testing

47

*/

48

public class OneInputTestStreamOperator

49

extends AbstractStreamOperator<TestDataElement>

50

implements OneInputStreamOperator<TestDataElement, TestDataElement> {

51

52

/**

53

* Constructor for single input test operator

54

* @param eventQueue queue for tracking lifecycle events

55

*/

56

public OneInputTestStreamOperator(TestEventQueue eventQueue);

57

58

@Override

59

public void processElement(StreamRecord<TestDataElement> element) throws Exception;

60

61

@Override

62

public void open() throws Exception;

63

64

@Override

65

public void close() throws Exception;

66

67

@Override

68

public void finish() throws Exception;

69

}

70

71

/**

72

* Factory for creating single input test operators

73

*/

74

public class OneInputTestStreamOperatorFactory

75

implements StreamOperatorFactory<TestDataElement> {

76

77

/**

78

* Constructor for operator factory

79

* @param eventQueue shared event queue for lifecycle tracking

80

*/

81

public OneInputTestStreamOperatorFactory(TestEventQueue eventQueue);

82

83

@Override

84

public <T extends StreamOperator<TestDataElement>> T createStreamOperator(

85

StreamOperatorParameters<TestDataElement> parameters);

86

}

87

88

/**

89

* Dual input test operator for testing multi-input scenarios

90

*/

91

public class TwoInputTestStreamOperator

92

extends AbstractStreamOperator<TestDataElement>

93

implements TwoInputStreamOperator<TestDataElement, TestDataElement, TestDataElement> {

94

95

/**

96

* Constructor for dual input test operator

97

* @param eventQueue queue for tracking lifecycle events

98

*/

99

public TwoInputTestStreamOperator(TestEventQueue eventQueue);

100

101

@Override

102

public void processElement1(StreamRecord<TestDataElement> element) throws Exception;

103

104

@Override

105

public void processElement2(StreamRecord<TestDataElement> element) throws Exception;

106

}

107

108

/**

109

* Multi-input test operator supporting arbitrary number of inputs

110

*/

111

public class MultiInputTestOperator

112

extends AbstractStreamOperator<TestDataElement>

113

implements MultipleInputStreamOperator<TestDataElement> {

114

115

/**

116

* Constructor for multi-input test operator

117

* @param eventQueue queue for tracking lifecycle events

118

* @param inputCount number of input streams

119

*/

120

public MultiInputTestOperator(TestEventQueue eventQueue, int inputCount);

121

122

@Override

123

public void processElement(StreamRecord<TestDataElement> element, int inputId) throws Exception;

124

}

125

126

/**

127

* Factory for creating multi-input test operators

128

*/

129

public class MultiInputTestOperatorFactory

130

implements StreamOperatorFactory<TestDataElement> {

131

132

/**

133

* Constructor for multi-input operator factory

134

* @param eventQueue shared event queue

135

* @param inputCount number of input streams

136

*/

137

public MultiInputTestOperatorFactory(TestEventQueue eventQueue, int inputCount);

138

}

139

```

140

141

### Test Event System

142

143

Event tracking system for monitoring operator lifecycle phases and state transitions.

144

145

```java { .api }

146

/**

147

* Queue for tracking operator lifecycle events during testing

148

*/

149

public class TestEventQueue {

150

151

/**

152

* Add lifecycle event to the queue

153

* @param event TestEvent representing lifecycle phase

154

*/

155

public void add(TestEvent event);

156

157

/**

158

* Get all recorded events in chronological order

159

* @return List of TestEvent objects

160

*/

161

public List<TestEvent> getEvents();

162

163

/**

164

* Get events for specific operator

165

* @param operatorId identifier of the operator

166

* @return List of events for the specified operator

167

*/

168

public List<TestEvent> getEventsForOperator(String operatorId);

169

170

/**

171

* Clear all recorded events

172

*/

173

public void clear();

174

}

175

176

/**

177

* Shared event queue for cross-operator event tracking

178

*/

179

public class SharedTestEventQueue extends TestEventQueue {

180

181

/**

182

* Get singleton instance of shared event queue

183

* @return SharedTestEventQueue instance

184

*/

185

public static SharedTestEventQueue getInstance();

186

}

187

188

/**

189

* Event representing operator startup completion

190

*/

191

public class OperatorStartedEvent implements TestEvent {

192

193

/**

194

* Constructor for operator started event

195

* @param operatorId identifier of the started operator

196

* @param timestamp event timestamp

197

*/

198

public OperatorStartedEvent(String operatorId, long timestamp);

199

200

@Override

201

public String getOperatorId();

202

203

@Override

204

public long getTimestamp();

205

}

206

207

/**

208

* Event representing operator shutdown completion

209

*/

210

public class OperatorFinishedEvent implements TestEvent {

211

212

/**

213

* Constructor for operator finished event

214

* @param operatorId identifier of the finished operator

215

* @param timestamp event timestamp

216

*/

217

public OperatorFinishedEvent(String operatorId, long timestamp);

218

}

219

220

/**

221

* Event representing checkpoint completion

222

*/

223

public class CheckpointCompletedEvent implements TestEvent {

224

225

/**

226

* Constructor for checkpoint completed event

227

* @param operatorId identifier of the checkpointed operator

228

* @param checkpointId checkpoint identifier

229

* @param timestamp event timestamp

230

*/

231

public CheckpointCompletedEvent(String operatorId, long checkpointId, long timestamp);

232

233

/**

234

* Get checkpoint identifier

235

* @return long checkpoint ID

236

*/

237

public long getCheckpointId();

238

}

239

```

240

241

### Test Job Executor

242

243

Controller for managing operator lifecycle testing jobs with comprehensive event monitoring, command dispatching, and validation capabilities.

244

245

```java { .api }

246

/**

247

* Controller for operator lifecycle testing jobs with event monitoring and command capabilities

248

*/

249

public class TestJobExecutor {

250

251

/**

252

* Constructor for test job executor

253

* @param jobWithDescription test job with event/command infrastructure

254

*/

255

public TestJobExecutor(TestJobWithDescription jobWithDescription);

256

257

/**

258

* Execute test job on MiniCluster and initialize monitoring

259

* @param miniClusterResource MiniCluster resource for job execution

260

* @return CompletableFuture for asynchronous job execution

261

* @throws Exception if job submission fails

262

*/

263

public CompletableFuture<JobExecutionResult> execute(

264

MiniClusterWithClientResource miniClusterResource) throws Exception;

265

266

/**

267

* Wait for all operators in the job to reach running state

268

* @throws Exception if operators don't reach running state

269

*/

270

public void waitForAllRunning() throws Exception;

271

272

/**

273

* Wait for all operators to reach running state within timeout

274

* @param timeout timeout duration

275

* @param timeUnit time unit for timeout

276

* @throws Exception if timeout exceeded or operators don't reach running state

277

*/

278

public void waitForAllRunning(long timeout, TimeUnit timeUnit) throws Exception;

279

280

/**

281

* Wait for specific type of event to occur

282

* @param eventType class of the event to wait for

283

* @throws Exception if event doesn't occur within timeout

284

*/

285

public void waitForEvent(Class<? extends TestEvent> eventType) throws Exception;

286

287

/**

288

* Wait for specific event with timeout

289

* @param eventType class of the event to wait for

290

* @param timeout timeout duration

291

* @param timeUnit time unit for timeout

292

* @throws Exception if event doesn't occur within timeout

293

*/

294

public void waitForEvent(Class<? extends TestEvent> eventType, long timeout, TimeUnit timeUnit) throws Exception;

295

296

/**

297

* Stop job execution and create savepoint

298

* @param savepointDir directory for savepoint storage

299

* @param advanceToEndOfEventTime advance to end of event time before stopping

300

* @return String path to created savepoint

301

* @throws Exception if savepoint creation fails

302

*/

303

public String stopWithSavepoint(TemporaryFolder savepointDir, boolean advanceToEndOfEventTime) throws Exception;

304

305

/**

306

* Send command to specific operator instance

307

* @param operatorId identifier of target operator

308

* @param command command to send

309

* @param scope scope of command execution

310

* @throws Exception if command sending fails

311

*/

312

public void sendOperatorCommand(String operatorId, TestCommand command, TestCommandScope scope) throws Exception;

313

314

/**

315

* Trigger failover for specific operator

316

* @param operatorId identifier of operator to fail

317

* @throws Exception if failover triggering fails

318

*/

319

public void triggerFailover(String operatorId) throws Exception;

320

321

/**

322

* Send broadcast command to all operators

323

* @param command command to broadcast

324

* @param scope scope of command execution

325

* @throws Exception if broadcast fails

326

*/

327

public void sendBroadcastCommand(TestCommand command, TestCommandScope scope) throws Exception;

328

329

/**

330

* Wait for job termination (completion or failure)

331

* @throws Exception if waiting for termination fails

332

*/

333

public void waitForTermination() throws Exception;

334

335

/**

336

* Wait for job termination with timeout

337

* @param timeout timeout duration

338

* @param timeUnit time unit for timeout

339

* @throws Exception if termination doesn't occur within timeout

340

*/

341

public void waitForTermination(long timeout, TimeUnit timeUnit) throws Exception;

342

343

/**

344

* Assert that job finished successfully without failures

345

* @throws Exception if job didn't finish successfully

346

*/

347

public void assertFinishedSuccessfully() throws Exception;

348

349

/**

350

* Get all events collected during job execution

351

* @return List of TestEvent instances

352

*/

353

public List<TestEvent> getAllEvents();

354

355

/**

356

* Get events of specific type

357

* @param eventType class of events to retrieve

358

* @return List of events matching the specified type

359

*/

360

public <T extends TestEvent> List<T> getEventsOfType(Class<T> eventType);

361

362

/**

363

* Cancel the running job

364

* @throws Exception if job cancellation fails

365

*/

366

public void cancel() throws Exception;

367

368

/**

369

* Get current job execution result if available

370

* @return Optional containing JobExecutionResult if job completed

371

*/

372

public Optional<JobExecutionResult> getJobResult();

373

}

374

375

/**

376

* Container for test job with event and command infrastructure

377

*/

378

public class TestJobWithDescription {

379

380

/**

381

* Constructor for test job container

382

* @param jobGraph JobGraph for the test

383

* @param eventQueue shared event queue for lifecycle tracking

384

* @param commandDispatcher dispatcher for sending commands to operators

385

*/

386

public TestJobWithDescription(

387

JobGraph jobGraph,

388

TestEventQueue eventQueue,

389

TestCommandDispatcher commandDispatcher);

390

391

/**

392

* Get the job graph for execution

393

* @return JobGraph instance

394

*/

395

public JobGraph getJobGraph();

396

397

/**

398

* Get event queue for monitoring

399

* @return TestEventQueue instance

400

*/

401

public TestEventQueue getEventQueue();

402

403

/**

404

* Get command dispatcher for operator control

405

* @return TestCommandDispatcher instance

406

*/

407

public TestCommandDispatcher getCommandDispatcher();

408

}

409

```

410

411

### Test Data and Event Sources

412

413

Data structures and source functions for lifecycle testing scenarios.

414

415

```java { .api }

416

/**

417

* Data element specifically designed for lifecycle testing

418

*/

419

public class TestDataElement {

420

421

/**

422

* Constructor for test data element

423

* @param value string value of the element

424

* @param timestamp element timestamp

425

*/

426

public TestDataElement(String value, long timestamp);

427

428

/**

429

* Get element value

430

* @return String value

431

*/

432

public String getValue();

433

434

/**

435

* Get element timestamp

436

* @return long timestamp

437

*/

438

public long getTimestamp();

439

}

440

441

/**

442

* Source that emits test events and responds to test commands

443

*/

444

public class TestEventSource implements SourceFunction<TestDataElement> {

445

446

/**

447

* Constructor for test event source

448

* @param eventQueue queue for lifecycle event tracking

449

* @param elementsToEmit number of elements to emit

450

*/

451

public TestEventSource(TestEventQueue eventQueue, int elementsToEmit);

452

453

@Override

454

public void run(SourceContext<TestDataElement> ctx) throws Exception;

455

456

@Override

457

public void cancel();

458

}

459

```

460

461

### Command System

462

463

Command dispatch system for controlling operator behavior during lifecycle testing.

464

465

```java { .api }

466

/**

467

* Dispatcher for sending commands to operators during testing

468

*/

469

public class TestCommandDispatcher {

470

471

/**

472

* Constructor for command dispatcher

473

* @param eventQueue event queue for tracking command effects

474

*/

475

public TestCommandDispatcher(TestEventQueue eventQueue);

476

477

/**

478

* Send command to specific operator

479

* @param operatorId target operator identifier

480

* @param command command to execute

481

*/

482

public void sendCommand(String operatorId, TestCommand command);

483

484

/**

485

* Send command to all operators

486

* @param command command to broadcast

487

*/

488

public void broadcastCommand(TestCommand command);

489

}

490

491

/**

492

* Base interface for test commands

493

*/

494

public interface TestCommand {

495

496

/**

497

* Execute command on target operator

498

* @param operator target stream operator

499

*/

500

void execute(StreamOperator<?> operator);

501

502

/**

503

* Get command type identifier

504

* @return String identifying the command type

505

*/

506

String getCommandType();

507

}

508

509

/**

510

* Enumeration of command execution scopes

511

*/

512

public enum TestCommandScope {

513

/** Command applies to all subtasks of the operator */

514

ALL_SUBTASKS,

515

/** Command applies to a single subtask only */

516

SINGLE_SUBTASK

517

}

518

519

/**

520

* Command implementations for common test scenarios

521

*/

522

public static class TestCommands {

523

524

/**

525

* Command to trigger operator failure for fault tolerance testing

526

*/

527

public static class FailCommand implements TestCommand {

528

529

/**

530

* Constructor for fail command

531

* @param cause exception to throw as failure cause

532

*/

533

public FailCommand(Exception cause);

534

535

@Override

536

public void execute(StreamOperator<?> operator);

537

538

@Override

539

public String getCommandType();

540

}

541

542

/**

543

* Command to trigger operator finishing for graceful termination testing

544

*/

545

public static class FinishCommand implements TestCommand {

546

547

/**

548

* Constructor for finish command

549

*/

550

public FinishCommand();

551

552

@Override

553

public void execute(StreamOperator<?> operator);

554

555

@Override

556

public String getCommandType();

557

}

558

559

/**

560

* Command to trigger checkpoint for state management testing

561

*/

562

public static class TriggerCheckpointCommand implements TestCommand {

563

564

/**

565

* Constructor for checkpoint trigger command

566

* @param checkpointId checkpoint identifier

567

*/

568

public TriggerCheckpointCommand(long checkpointId);

569

570

@Override

571

public void execute(StreamOperator<?> operator);

572

573

@Override

574

public String getCommandType();

575

576

/**

577

* Get checkpoint ID

578

* @return long checkpoint identifier

579

*/

580

public long getCheckpointId();

581

}

582

583

/**

584

* Command to pause operator processing for synchronization testing

585

*/

586

public static class PauseCommand implements TestCommand {

587

588

/**

589

* Constructor for pause command

590

* @param durationMs pause duration in milliseconds

591

*/

592

public PauseCommand(long durationMs);

593

594

@Override

595

public void execute(StreamOperator<?> operator);

596

597

@Override

598

public String getCommandType();

599

600

/**

601

* Get pause duration

602

* @return long duration in milliseconds

603

*/

604

public long getDurationMs();

605

}

606

}

607

```

608

609

### Lifecycle Validation Framework

610

611

Validation framework for verifying correct operator lifecycle behavior and event sequences.

612

613

```java { .api }

614

/**

615

* Validator for operator lifecycle behavior

616

*/

617

public class TestOperatorLifecycleValidator {

618

619

/**

620

* Constructor for lifecycle validator

621

* @param eventQueue event queue containing lifecycle events

622

*/

623

public TestOperatorLifecycleValidator(TestEventQueue eventQueue);

624

625

/**

626

* Validate complete operator lifecycle sequence

627

* @param operatorId identifier of operator to validate

628

* @return boolean indicating if lifecycle is valid

629

*/

630

public boolean validateLifecycle(String operatorId);

631

632

/**

633

* Validate specific lifecycle phase

634

* @param operatorId operator identifier

635

* @param phase lifecycle phase to validate

636

* @return boolean indicating phase validity

637

*/

638

public boolean validatePhase(String operatorId, LifecyclePhase phase);

639

}

640

641

/**

642

* Validator for operator draining behavior

643

*/

644

public class DrainingValidator {

645

646

/**

647

* Constructor for draining validator

648

* @param eventQueue event queue for validation

649

*/

650

public DrainingValidator(TestEventQueue eventQueue);

651

652

/**

653

* Validate operator draining sequence

654

* @param operatorId operator to validate

655

* @return boolean indicating valid draining behavior

656

*/

657

public boolean validateDraining(String operatorId);

658

}

659

660

/**

661

* Validator for operator finishing behavior

662

*/

663

public class FinishingValidator {

664

665

/**

666

* Constructor for finishing validator

667

* @param eventQueue event queue for validation

668

*/

669

public FinishingValidator(TestEventQueue eventQueue);

670

671

/**

672

* Validate operator finishing sequence

673

* @param operatorId operator to validate

674

* @return boolean indicating valid finishing behavior

675

*/

676

public boolean validateFinishing(String operatorId);

677

}

678

679

/**

680

* Validator for job-level data flow behavior

681

*/

682

public class TestJobDataFlowValidator {

683

684

/**

685

* Constructor for data flow validator

686

* @param eventQueue event queue containing flow events

687

*/

688

public TestJobDataFlowValidator(TestEventQueue eventQueue);

689

690

/**

691

* Validate end-to-end data flow through job

692

* @param expectedElements expected number of processed elements

693

* @return boolean indicating valid data flow

694

*/

695

public boolean validateDataFlow(int expectedElements);

696

}

697

698

/**

699

* Lifecycle phases for validation

700

*/

701

public enum LifecyclePhase {

702

STARTING,

703

RUNNING,

704

CHECKPOINTING,

705

DRAINING,

706

FINISHING,

707

STOPPED

708

}

709

```

710

711

**Usage Examples:**

712

713

```java

714

import org.apache.flink.runtime.operators.lifecycle.graph.*;

715

import org.apache.flink.runtime.operators.lifecycle.event.*;

716

import org.apache.flink.runtime.operators.lifecycle.validation.*;

717

718

// Basic operator lifecycle test

719

public class OperatorLifecycleTest {

720

721

@Test

722

public void testSimpleOperatorLifecycle() throws Exception {

723

TestEventQueue eventQueue = new TestEventQueue();

724

725

// Build simple test job

726

JobGraph job = TestJobBuilders.SIMPLE_GRAPH_BUILDER.build(

727

new TestConfiguration(eventQueue, 100));

728

729

// Execute job

730

MiniCluster miniCluster = new MiniCluster(configuration);

731

miniCluster.start();

732

miniCluster.executeJobBlocking(job);

733

734

// Validate lifecycle

735

TestOperatorLifecycleValidator validator =

736

new TestOperatorLifecycleValidator(eventQueue);

737

assertTrue(validator.validateLifecycle("test-operator"));

738

}

739

740

@Test

741

public void testMultiInputOperatorLifecycle() throws Exception {

742

TestEventQueue eventQueue = new TestEventQueue();

743

744

// Create multi-input operator

745

MultiInputTestOperator operator = new MultiInputTestOperator(eventQueue, 3);

746

747

// Build complex job graph

748

JobGraph job = TestJobBuilders.COMPLEX_GRAPH_BUILDER.build(

749

new TestConfiguration(eventQueue, 500));

750

751

// Execute and validate

752

executeJobAndValidate(job, eventQueue);

753

}

754

755

@Test

756

public void testOperatorDraining() throws Exception {

757

TestEventQueue eventQueue = new TestEventQueue();

758

759

// Create job with draining scenario

760

JobGraph job = createDrainingTestJob(eventQueue);

761

762

// Execute with controlled shutdown

763

executeJobWithDraining(job);

764

765

// Validate draining behavior

766

DrainingValidator validator = new DrainingValidator(eventQueue);

767

assertTrue(validator.validateDraining("draining-operator"));

768

}

769

770

@Test

771

public void testCheckpointingLifecycle() throws Exception {

772

SharedTestEventQueue eventQueue = SharedTestEventQueue.getInstance();

773

eventQueue.clear();

774

775

// Create job with checkpointing

776

JobGraph job = createCheckpointingTestJob(eventQueue);

777

778

// Execute with periodic checkpoints

779

JobExecutionResult result = executeJobWithCheckpoints(job, 3);

780

781

// Validate checkpoint events

782

List<TestEvent> checkpointEvents = eventQueue.getEvents().stream()

783

.filter(e -> e instanceof CheckpointCompletedEvent)

784

.collect(Collectors.toList());

785

786

assertEquals(3, checkpointEvents.size());

787

}

788

}

789

```