0
# Operator Lifecycle Testing
1
2
Complete framework for testing streaming operator behavior including startup, checkpointing, finishing, and shutdown phases. This framework provides comprehensive validation of operator lifecycle events and state transitions.
3
4
## Capabilities
5
6
### Test Job Builders
7
8
Factory classes for creating test jobs with different topologies and complexity levels for operator lifecycle testing.
9
10
```java { .api }
11
/**
12
* Factory for creating test jobs with different topologies
13
*/
14
public class TestJobBuilders {
15
16
/**
17
* Simple test graph factory for basic operator lifecycle testing
18
*/
19
public static final TestJobBuilder SIMPLE_GRAPH_BUILDER;
20
21
/**
22
* Complex multi-operator test graph factory for advanced lifecycle scenarios
23
*/
24
public static final TestJobBuilder COMPLEX_GRAPH_BUILDER;
25
26
/**
27
* Interface for test job builders
28
*/
29
public interface TestJobBuilder {
30
/**
31
* Build job graph with specified configuration
32
* @param config test configuration parameters
33
* @return JobGraph configured for lifecycle testing
34
*/
35
JobGraph build(TestConfiguration config);
36
}
37
}
38
```
39
40
### Test Stream Operators
41
42
Specialized streaming operators designed for lifecycle testing with event tracking and command processing capabilities.
43
44
```java { .api }
45
/**
46
* Single input test operator for lifecycle testing
47
*/
48
public class OneInputTestStreamOperator
49
extends AbstractStreamOperator<TestDataElement>
50
implements OneInputStreamOperator<TestDataElement, TestDataElement> {
51
52
/**
53
* Constructor for single input test operator
54
* @param eventQueue queue for tracking lifecycle events
55
*/
56
public OneInputTestStreamOperator(TestEventQueue eventQueue);
57
58
@Override
59
public void processElement(StreamRecord<TestDataElement> element) throws Exception;
60
61
@Override
62
public void open() throws Exception;
63
64
@Override
65
public void close() throws Exception;
66
67
@Override
68
public void finish() throws Exception;
69
}
70
71
/**
72
* Factory for creating single input test operators
73
*/
74
public class OneInputTestStreamOperatorFactory
75
implements StreamOperatorFactory<TestDataElement> {
76
77
/**
78
* Constructor for operator factory
79
* @param eventQueue shared event queue for lifecycle tracking
80
*/
81
public OneInputTestStreamOperatorFactory(TestEventQueue eventQueue);
82
83
@Override
84
public <T extends StreamOperator<TestDataElement>> T createStreamOperator(
85
StreamOperatorParameters<TestDataElement> parameters);
86
}
87
88
/**
89
* Dual input test operator for testing multi-input scenarios
90
*/
91
public class TwoInputTestStreamOperator
92
extends AbstractStreamOperator<TestDataElement>
93
implements TwoInputStreamOperator<TestDataElement, TestDataElement, TestDataElement> {
94
95
/**
96
* Constructor for dual input test operator
97
* @param eventQueue queue for tracking lifecycle events
98
*/
99
public TwoInputTestStreamOperator(TestEventQueue eventQueue);
100
101
@Override
102
public void processElement1(StreamRecord<TestDataElement> element) throws Exception;
103
104
@Override
105
public void processElement2(StreamRecord<TestDataElement> element) throws Exception;
106
}
107
108
/**
109
* Multi-input test operator supporting arbitrary number of inputs
110
*/
111
public class MultiInputTestOperator
112
extends AbstractStreamOperator<TestDataElement>
113
implements MultipleInputStreamOperator<TestDataElement> {
114
115
/**
116
* Constructor for multi-input test operator
117
* @param eventQueue queue for tracking lifecycle events
118
* @param inputCount number of input streams
119
*/
120
public MultiInputTestOperator(TestEventQueue eventQueue, int inputCount);
121
122
@Override
123
public void processElement(StreamRecord<TestDataElement> element, int inputId) throws Exception;
124
}
125
126
/**
127
* Factory for creating multi-input test operators
128
*/
129
public class MultiInputTestOperatorFactory
130
implements StreamOperatorFactory<TestDataElement> {
131
132
/**
133
* Constructor for multi-input operator factory
134
* @param eventQueue shared event queue
135
* @param inputCount number of input streams
136
*/
137
public MultiInputTestOperatorFactory(TestEventQueue eventQueue, int inputCount);
138
}
139
```
140
141
### Test Event System
142
143
Event tracking system for monitoring operator lifecycle phases and state transitions.
144
145
```java { .api }
146
/**
147
* Queue for tracking operator lifecycle events during testing
148
*/
149
public class TestEventQueue {
150
151
/**
152
* Add lifecycle event to the queue
153
* @param event TestEvent representing lifecycle phase
154
*/
155
public void add(TestEvent event);
156
157
/**
158
* Get all recorded events in chronological order
159
* @return List of TestEvent objects
160
*/
161
public List<TestEvent> getEvents();
162
163
/**
164
* Get events for specific operator
165
* @param operatorId identifier of the operator
166
* @return List of events for the specified operator
167
*/
168
public List<TestEvent> getEventsForOperator(String operatorId);
169
170
/**
171
* Clear all recorded events
172
*/
173
public void clear();
174
}
175
176
/**
177
* Shared event queue for cross-operator event tracking
178
*/
179
public class SharedTestEventQueue extends TestEventQueue {
180
181
/**
182
* Get singleton instance of shared event queue
183
* @return SharedTestEventQueue instance
184
*/
185
public static SharedTestEventQueue getInstance();
186
}
187
188
/**
189
* Event representing operator startup completion
190
*/
191
public class OperatorStartedEvent implements TestEvent {
192
193
/**
194
* Constructor for operator started event
195
* @param operatorId identifier of the started operator
196
* @param timestamp event timestamp
197
*/
198
public OperatorStartedEvent(String operatorId, long timestamp);
199
200
@Override
201
public String getOperatorId();
202
203
@Override
204
public long getTimestamp();
205
}
206
207
/**
208
* Event representing operator shutdown completion
209
*/
210
public class OperatorFinishedEvent implements TestEvent {
211
212
/**
213
* Constructor for operator finished event
214
* @param operatorId identifier of the finished operator
215
* @param timestamp event timestamp
216
*/
217
public OperatorFinishedEvent(String operatorId, long timestamp);
218
}
219
220
/**
221
* Event representing checkpoint completion
222
*/
223
public class CheckpointCompletedEvent implements TestEvent {
224
225
/**
226
* Constructor for checkpoint completed event
227
* @param operatorId identifier of the checkpointed operator
228
* @param checkpointId checkpoint identifier
229
* @param timestamp event timestamp
230
*/
231
public CheckpointCompletedEvent(String operatorId, long checkpointId, long timestamp);
232
233
/**
234
* Get checkpoint identifier
235
* @return long checkpoint ID
236
*/
237
public long getCheckpointId();
238
}
239
```
240
241
### Test Job Executor
242
243
Controller for managing operator lifecycle testing jobs with comprehensive event monitoring, command dispatching, and validation capabilities.
244
245
```java { .api }
246
/**
247
* Controller for operator lifecycle testing jobs with event monitoring and command capabilities
248
*/
249
public class TestJobExecutor {
250
251
/**
252
* Constructor for test job executor
253
* @param jobWithDescription test job with event/command infrastructure
254
*/
255
public TestJobExecutor(TestJobWithDescription jobWithDescription);
256
257
/**
258
* Execute test job on MiniCluster and initialize monitoring
259
* @param miniClusterResource MiniCluster resource for job execution
260
* @return CompletableFuture for asynchronous job execution
261
* @throws Exception if job submission fails
262
*/
263
public CompletableFuture<JobExecutionResult> execute(
264
MiniClusterWithClientResource miniClusterResource) throws Exception;
265
266
/**
267
* Wait for all operators in the job to reach running state
268
* @throws Exception if operators don't reach running state
269
*/
270
public void waitForAllRunning() throws Exception;
271
272
/**
273
* Wait for all operators to reach running state within timeout
274
* @param timeout timeout duration
275
* @param timeUnit time unit for timeout
276
* @throws Exception if timeout exceeded or operators don't reach running state
277
*/
278
public void waitForAllRunning(long timeout, TimeUnit timeUnit) throws Exception;
279
280
/**
281
* Wait for specific type of event to occur
282
* @param eventType class of the event to wait for
283
* @throws Exception if event doesn't occur within timeout
284
*/
285
public void waitForEvent(Class<? extends TestEvent> eventType) throws Exception;
286
287
/**
288
* Wait for specific event with timeout
289
* @param eventType class of the event to wait for
290
* @param timeout timeout duration
291
* @param timeUnit time unit for timeout
292
* @throws Exception if event doesn't occur within timeout
293
*/
294
public void waitForEvent(Class<? extends TestEvent> eventType, long timeout, TimeUnit timeUnit) throws Exception;
295
296
/**
297
* Stop job execution and create savepoint
298
* @param savepointDir directory for savepoint storage
299
* @param advanceToEndOfEventTime advance to end of event time before stopping
300
* @return String path to created savepoint
301
* @throws Exception if savepoint creation fails
302
*/
303
public String stopWithSavepoint(TemporaryFolder savepointDir, boolean advanceToEndOfEventTime) throws Exception;
304
305
/**
306
* Send command to specific operator instance
307
* @param operatorId identifier of target operator
308
* @param command command to send
309
* @param scope scope of command execution
310
* @throws Exception if command sending fails
311
*/
312
public void sendOperatorCommand(String operatorId, TestCommand command, TestCommandScope scope) throws Exception;
313
314
/**
315
* Trigger failover for specific operator
316
* @param operatorId identifier of operator to fail
317
* @throws Exception if failover triggering fails
318
*/
319
public void triggerFailover(String operatorId) throws Exception;
320
321
/**
322
* Send broadcast command to all operators
323
* @param command command to broadcast
324
* @param scope scope of command execution
325
* @throws Exception if broadcast fails
326
*/
327
public void sendBroadcastCommand(TestCommand command, TestCommandScope scope) throws Exception;
328
329
/**
330
* Wait for job termination (completion or failure)
331
* @throws Exception if waiting for termination fails
332
*/
333
public void waitForTermination() throws Exception;
334
335
/**
336
* Wait for job termination with timeout
337
* @param timeout timeout duration
338
* @param timeUnit time unit for timeout
339
* @throws Exception if termination doesn't occur within timeout
340
*/
341
public void waitForTermination(long timeout, TimeUnit timeUnit) throws Exception;
342
343
/**
344
* Assert that job finished successfully without failures
345
* @throws Exception if job didn't finish successfully
346
*/
347
public void assertFinishedSuccessfully() throws Exception;
348
349
/**
350
* Get all events collected during job execution
351
* @return List of TestEvent instances
352
*/
353
public List<TestEvent> getAllEvents();
354
355
/**
356
* Get events of specific type
357
* @param eventType class of events to retrieve
358
* @return List of events matching the specified type
359
*/
360
public <T extends TestEvent> List<T> getEventsOfType(Class<T> eventType);
361
362
/**
363
* Cancel the running job
364
* @throws Exception if job cancellation fails
365
*/
366
public void cancel() throws Exception;
367
368
/**
369
* Get current job execution result if available
370
* @return Optional containing JobExecutionResult if job completed
371
*/
372
public Optional<JobExecutionResult> getJobResult();
373
}
374
375
/**
376
* Container for test job with event and command infrastructure
377
*/
378
public class TestJobWithDescription {
379
380
/**
381
* Constructor for test job container
382
* @param jobGraph JobGraph for the test
383
* @param eventQueue shared event queue for lifecycle tracking
384
* @param commandDispatcher dispatcher for sending commands to operators
385
*/
386
public TestJobWithDescription(
387
JobGraph jobGraph,
388
TestEventQueue eventQueue,
389
TestCommandDispatcher commandDispatcher);
390
391
/**
392
* Get the job graph for execution
393
* @return JobGraph instance
394
*/
395
public JobGraph getJobGraph();
396
397
/**
398
* Get event queue for monitoring
399
* @return TestEventQueue instance
400
*/
401
public TestEventQueue getEventQueue();
402
403
/**
404
* Get command dispatcher for operator control
405
* @return TestCommandDispatcher instance
406
*/
407
public TestCommandDispatcher getCommandDispatcher();
408
}
409
```
410
411
### Test Data and Event Sources
412
413
Data structures and source functions for lifecycle testing scenarios.
414
415
```java { .api }
416
/**
417
* Data element specifically designed for lifecycle testing
418
*/
419
public class TestDataElement {
420
421
/**
422
* Constructor for test data element
423
* @param value string value of the element
424
* @param timestamp element timestamp
425
*/
426
public TestDataElement(String value, long timestamp);
427
428
/**
429
* Get element value
430
* @return String value
431
*/
432
public String getValue();
433
434
/**
435
* Get element timestamp
436
* @return long timestamp
437
*/
438
public long getTimestamp();
439
}
440
441
/**
442
* Source that emits test events and responds to test commands
443
*/
444
public class TestEventSource implements SourceFunction<TestDataElement> {
445
446
/**
447
* Constructor for test event source
448
* @param eventQueue queue for lifecycle event tracking
449
* @param elementsToEmit number of elements to emit
450
*/
451
public TestEventSource(TestEventQueue eventQueue, int elementsToEmit);
452
453
@Override
454
public void run(SourceContext<TestDataElement> ctx) throws Exception;
455
456
@Override
457
public void cancel();
458
}
459
```
460
461
### Command System
462
463
Command dispatch system for controlling operator behavior during lifecycle testing.
464
465
```java { .api }
466
/**
467
* Dispatcher for sending commands to operators during testing
468
*/
469
public class TestCommandDispatcher {
470
471
/**
472
* Constructor for command dispatcher
473
* @param eventQueue event queue for tracking command effects
474
*/
475
public TestCommandDispatcher(TestEventQueue eventQueue);
476
477
/**
478
* Send command to specific operator
479
* @param operatorId target operator identifier
480
* @param command command to execute
481
*/
482
public void sendCommand(String operatorId, TestCommand command);
483
484
/**
485
* Send command to all operators
486
* @param command command to broadcast
487
*/
488
public void broadcastCommand(TestCommand command);
489
}
490
491
/**
492
* Base interface for test commands
493
*/
494
public interface TestCommand {
495
496
/**
497
* Execute command on target operator
498
* @param operator target stream operator
499
*/
500
void execute(StreamOperator<?> operator);
501
502
/**
503
* Get command type identifier
504
* @return String identifying the command type
505
*/
506
String getCommandType();
507
}
508
509
/**
510
* Enumeration of command execution scopes
511
*/
512
public enum TestCommandScope {
513
/** Command applies to all subtasks of the operator */
514
ALL_SUBTASKS,
515
/** Command applies to a single subtask only */
516
SINGLE_SUBTASK
517
}
518
519
/**
520
* Command implementations for common test scenarios
521
*/
522
public static class TestCommands {
523
524
/**
525
* Command to trigger operator failure for fault tolerance testing
526
*/
527
public static class FailCommand implements TestCommand {
528
529
/**
530
* Constructor for fail command
531
* @param cause exception to throw as failure cause
532
*/
533
public FailCommand(Exception cause);
534
535
@Override
536
public void execute(StreamOperator<?> operator);
537
538
@Override
539
public String getCommandType();
540
}
541
542
/**
543
* Command to trigger operator finishing for graceful termination testing
544
*/
545
public static class FinishCommand implements TestCommand {
546
547
/**
548
* Constructor for finish command
549
*/
550
public FinishCommand();
551
552
@Override
553
public void execute(StreamOperator<?> operator);
554
555
@Override
556
public String getCommandType();
557
}
558
559
/**
560
* Command to trigger checkpoint for state management testing
561
*/
562
public static class TriggerCheckpointCommand implements TestCommand {
563
564
/**
565
* Constructor for checkpoint trigger command
566
* @param checkpointId checkpoint identifier
567
*/
568
public TriggerCheckpointCommand(long checkpointId);
569
570
@Override
571
public void execute(StreamOperator<?> operator);
572
573
@Override
574
public String getCommandType();
575
576
/**
577
* Get checkpoint ID
578
* @return long checkpoint identifier
579
*/
580
public long getCheckpointId();
581
}
582
583
/**
584
* Command to pause operator processing for synchronization testing
585
*/
586
public static class PauseCommand implements TestCommand {
587
588
/**
589
* Constructor for pause command
590
* @param durationMs pause duration in milliseconds
591
*/
592
public PauseCommand(long durationMs);
593
594
@Override
595
public void execute(StreamOperator<?> operator);
596
597
@Override
598
public String getCommandType();
599
600
/**
601
* Get pause duration
602
* @return long duration in milliseconds
603
*/
604
public long getDurationMs();
605
}
606
}
607
```
608
609
### Lifecycle Validation Framework
610
611
Validation framework for verifying correct operator lifecycle behavior and event sequences.
612
613
```java { .api }
614
/**
615
* Validator for operator lifecycle behavior
616
*/
617
public class TestOperatorLifecycleValidator {
618
619
/**
620
* Constructor for lifecycle validator
621
* @param eventQueue event queue containing lifecycle events
622
*/
623
public TestOperatorLifecycleValidator(TestEventQueue eventQueue);
624
625
/**
626
* Validate complete operator lifecycle sequence
627
* @param operatorId identifier of operator to validate
628
* @return boolean indicating if lifecycle is valid
629
*/
630
public boolean validateLifecycle(String operatorId);
631
632
/**
633
* Validate specific lifecycle phase
634
* @param operatorId operator identifier
635
* @param phase lifecycle phase to validate
636
* @return boolean indicating phase validity
637
*/
638
public boolean validatePhase(String operatorId, LifecyclePhase phase);
639
}
640
641
/**
642
* Validator for operator draining behavior
643
*/
644
public class DrainingValidator {
645
646
/**
647
* Constructor for draining validator
648
* @param eventQueue event queue for validation
649
*/
650
public DrainingValidator(TestEventQueue eventQueue);
651
652
/**
653
* Validate operator draining sequence
654
* @param operatorId operator to validate
655
* @return boolean indicating valid draining behavior
656
*/
657
public boolean validateDraining(String operatorId);
658
}
659
660
/**
661
* Validator for operator finishing behavior
662
*/
663
public class FinishingValidator {
664
665
/**
666
* Constructor for finishing validator
667
* @param eventQueue event queue for validation
668
*/
669
public FinishingValidator(TestEventQueue eventQueue);
670
671
/**
672
* Validate operator finishing sequence
673
* @param operatorId operator to validate
674
* @return boolean indicating valid finishing behavior
675
*/
676
public boolean validateFinishing(String operatorId);
677
}
678
679
/**
680
* Validator for job-level data flow behavior
681
*/
682
public class TestJobDataFlowValidator {
683
684
/**
685
* Constructor for data flow validator
686
* @param eventQueue event queue containing flow events
687
*/
688
public TestJobDataFlowValidator(TestEventQueue eventQueue);
689
690
/**
691
* Validate end-to-end data flow through job
692
* @param expectedElements expected number of processed elements
693
* @return boolean indicating valid data flow
694
*/
695
public boolean validateDataFlow(int expectedElements);
696
}
697
698
/**
699
* Lifecycle phases for validation
700
*/
701
public enum LifecyclePhase {
702
STARTING,
703
RUNNING,
704
CHECKPOINTING,
705
DRAINING,
706
FINISHING,
707
STOPPED
708
}
709
```
710
711
**Usage Examples:**
712
713
```java
714
import org.apache.flink.runtime.operators.lifecycle.graph.*;
715
import org.apache.flink.runtime.operators.lifecycle.event.*;
716
import org.apache.flink.runtime.operators.lifecycle.validation.*;
717
718
// Basic operator lifecycle test
719
public class OperatorLifecycleTest {
720
721
@Test
722
public void testSimpleOperatorLifecycle() throws Exception {
723
TestEventQueue eventQueue = new TestEventQueue();
724
725
// Build simple test job
726
JobGraph job = TestJobBuilders.SIMPLE_GRAPH_BUILDER.build(
727
new TestConfiguration(eventQueue, 100));
728
729
// Execute job
730
MiniCluster miniCluster = new MiniCluster(configuration);
731
miniCluster.start();
732
miniCluster.executeJobBlocking(job);
733
734
// Validate lifecycle
735
TestOperatorLifecycleValidator validator =
736
new TestOperatorLifecycleValidator(eventQueue);
737
assertTrue(validator.validateLifecycle("test-operator"));
738
}
739
740
@Test
741
public void testMultiInputOperatorLifecycle() throws Exception {
742
TestEventQueue eventQueue = new TestEventQueue();
743
744
// Create multi-input operator
745
MultiInputTestOperator operator = new MultiInputTestOperator(eventQueue, 3);
746
747
// Build complex job graph
748
JobGraph job = TestJobBuilders.COMPLEX_GRAPH_BUILDER.build(
749
new TestConfiguration(eventQueue, 500));
750
751
// Execute and validate
752
executeJobAndValidate(job, eventQueue);
753
}
754
755
@Test
756
public void testOperatorDraining() throws Exception {
757
TestEventQueue eventQueue = new TestEventQueue();
758
759
// Create job with draining scenario
760
JobGraph job = createDrainingTestJob(eventQueue);
761
762
// Execute with controlled shutdown
763
executeJobWithDraining(job);
764
765
// Validate draining behavior
766
DrainingValidator validator = new DrainingValidator(eventQueue);
767
assertTrue(validator.validateDraining("draining-operator"));
768
}
769
770
@Test
771
public void testCheckpointingLifecycle() throws Exception {
772
SharedTestEventQueue eventQueue = SharedTestEventQueue.getInstance();
773
eventQueue.clear();
774
775
// Create job with checkpointing
776
JobGraph job = createCheckpointingTestJob(eventQueue);
777
778
// Execute with periodic checkpoints
779
JobExecutionResult result = executeJobWithCheckpoints(job, 3);
780
781
// Validate checkpoint events
782
List<TestEvent> checkpointEvents = eventQueue.getEvents().stream()
783
.filter(e -> e instanceof CheckpointCompletedEvent)
784
.collect(Collectors.toList());
785
786
assertEquals(3, checkpointEvents.size());
787
}
788
}
789
```