CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-tests

Comprehensive test library for Apache Flink stream processing framework with integration tests, test utilities, and end-to-end validation tests.

Pending
Overview
Eval results
Files

operator-lifecycle.mddocs/

Operator Lifecycle Testing

Complete framework for testing streaming operator behavior including startup, checkpointing, finishing, and shutdown phases. This framework provides comprehensive validation of operator lifecycle events and state transitions.

Capabilities

Test Job Builders

Factory classes for creating test jobs with different topologies and complexity levels for operator lifecycle testing.

/**
 * Factory for creating test jobs with different topologies
 */
public class TestJobBuilders {
    
    /**
     * Simple test graph factory for basic operator lifecycle testing
     */
    public static final TestJobBuilder SIMPLE_GRAPH_BUILDER;
    
    /**
     * Complex multi-operator test graph factory for advanced lifecycle scenarios
     */
    public static final TestJobBuilder COMPLEX_GRAPH_BUILDER;
    
    /**
     * Interface for test job builders
     */
    public interface TestJobBuilder {
        /**
         * Build job graph with specified configuration
         * @param config test configuration parameters
         * @return JobGraph configured for lifecycle testing
         */
        JobGraph build(TestConfiguration config);
    }
}

Test Stream Operators

Specialized streaming operators designed for lifecycle testing with event tracking and command processing capabilities.

/**
 * Single input test operator for lifecycle testing
 */
public class OneInputTestStreamOperator 
    extends AbstractStreamOperator<TestDataElement> 
    implements OneInputStreamOperator<TestDataElement, TestDataElement> {
    
    /**
     * Constructor for single input test operator
     * @param eventQueue queue for tracking lifecycle events
     */
    public OneInputTestStreamOperator(TestEventQueue eventQueue);
    
    @Override
    public void processElement(StreamRecord<TestDataElement> element) throws Exception;
    
    @Override
    public void open() throws Exception;
    
    @Override
    public void close() throws Exception;
    
    @Override
    public void finish() throws Exception;
}

/**
 * Factory for creating single input test operators
 */
public class OneInputTestStreamOperatorFactory 
    implements StreamOperatorFactory<TestDataElement> {
    
    /**
     * Constructor for operator factory
     * @param eventQueue shared event queue for lifecycle tracking
     */
    public OneInputTestStreamOperatorFactory(TestEventQueue eventQueue);
    
    @Override
    public <T extends StreamOperator<TestDataElement>> T createStreamOperator(
        StreamOperatorParameters<TestDataElement> parameters);
}

/**
 * Dual input test operator for testing multi-input scenarios
 */
public class TwoInputTestStreamOperator 
    extends AbstractStreamOperator<TestDataElement>
    implements TwoInputStreamOperator<TestDataElement, TestDataElement, TestDataElement> {
    
    /**
     * Constructor for dual input test operator
     * @param eventQueue queue for tracking lifecycle events
     */
    public TwoInputTestStreamOperator(TestEventQueue eventQueue);
    
    @Override
    public void processElement1(StreamRecord<TestDataElement> element) throws Exception;
    
    @Override  
    public void processElement2(StreamRecord<TestDataElement> element) throws Exception;
}

/**
 * Multi-input test operator supporting arbitrary number of inputs
 */
public class MultiInputTestOperator 
    extends AbstractStreamOperator<TestDataElement>
    implements MultipleInputStreamOperator<TestDataElement> {
    
    /**
     * Constructor for multi-input test operator
     * @param eventQueue queue for tracking lifecycle events
     * @param inputCount number of input streams
     */
    public MultiInputTestOperator(TestEventQueue eventQueue, int inputCount);
    
    @Override
    public void processElement(StreamRecord<TestDataElement> element, int inputId) throws Exception;
}

/**
 * Factory for creating multi-input test operators
 */
public class MultiInputTestOperatorFactory 
    implements StreamOperatorFactory<TestDataElement> {
    
    /**
     * Constructor for multi-input operator factory
     * @param eventQueue shared event queue
     * @param inputCount number of input streams
     */
    public MultiInputTestOperatorFactory(TestEventQueue eventQueue, int inputCount);
}

Test Event System

Event tracking system for monitoring operator lifecycle phases and state transitions.

/**
 * Queue for tracking operator lifecycle events during testing
 */
public class TestEventQueue {
    
    /**
     * Add lifecycle event to the queue
     * @param event TestEvent representing lifecycle phase
     */
    public void add(TestEvent event);
    
    /**
     * Get all recorded events in chronological order
     * @return List of TestEvent objects
     */
    public List<TestEvent> getEvents();
    
    /**
     * Get events for specific operator
     * @param operatorId identifier of the operator
     * @return List of events for the specified operator
     */
    public List<TestEvent> getEventsForOperator(String operatorId);
    
    /**
     * Clear all recorded events
     */
    public void clear();
}

/**
 * Shared event queue for cross-operator event tracking
 */
public class SharedTestEventQueue extends TestEventQueue {
    
    /**
     * Get singleton instance of shared event queue
     * @return SharedTestEventQueue instance
     */
    public static SharedTestEventQueue getInstance();
}

/**
 * Event representing operator startup completion
 */
public class OperatorStartedEvent implements TestEvent {
    
    /**
     * Constructor for operator started event
     * @param operatorId identifier of the started operator
     * @param timestamp event timestamp
     */
    public OperatorStartedEvent(String operatorId, long timestamp);
    
    @Override
    public String getOperatorId();
    
    @Override
    public long getTimestamp();
}

/**
 * Event representing operator shutdown completion
 */
public class OperatorFinishedEvent implements TestEvent {
    
    /**
     * Constructor for operator finished event
     * @param operatorId identifier of the finished operator
     * @param timestamp event timestamp
     */
    public OperatorFinishedEvent(String operatorId, long timestamp);
}

/**
 * Event representing checkpoint completion
 */
public class CheckpointCompletedEvent implements TestEvent {
    
    /**
     * Constructor for checkpoint completed event
     * @param operatorId identifier of the checkpointed operator
     * @param checkpointId checkpoint identifier
     * @param timestamp event timestamp
     */
    public CheckpointCompletedEvent(String operatorId, long checkpointId, long timestamp);
    
    /**
     * Get checkpoint identifier
     * @return long checkpoint ID
     */
    public long getCheckpointId();
}

Test Job Executor

Controller for managing operator lifecycle testing jobs with comprehensive event monitoring, command dispatching, and validation capabilities.

/**
 * Controller for operator lifecycle testing jobs with event monitoring and command capabilities
 */
public class TestJobExecutor {
    
    /**
     * Constructor for test job executor
     * @param jobWithDescription test job with event/command infrastructure
     */
    public TestJobExecutor(TestJobWithDescription jobWithDescription);
    
    /**
     * Execute test job on MiniCluster and initialize monitoring
     * @param miniClusterResource MiniCluster resource for job execution
     * @return CompletableFuture for asynchronous job execution
     * @throws Exception if job submission fails
     */
    public CompletableFuture<JobExecutionResult> execute(
        MiniClusterWithClientResource miniClusterResource) throws Exception;
    
    /**
     * Wait for all operators in the job to reach running state
     * @throws Exception if operators don't reach running state
     */
    public void waitForAllRunning() throws Exception;
    
    /**
     * Wait for all operators to reach running state within timeout
     * @param timeout timeout duration
     * @param timeUnit time unit for timeout
     * @throws Exception if timeout exceeded or operators don't reach running state
     */
    public void waitForAllRunning(long timeout, TimeUnit timeUnit) throws Exception;
    
    /**
     * Wait for specific type of event to occur
     * @param eventType class of the event to wait for
     * @throws Exception if event doesn't occur within timeout
     */
    public void waitForEvent(Class<? extends TestEvent> eventType) throws Exception;
    
    /**
     * Wait for specific event with timeout
     * @param eventType class of the event to wait for
     * @param timeout timeout duration
     * @param timeUnit time unit for timeout
     * @throws Exception if event doesn't occur within timeout
     */
    public void waitForEvent(Class<? extends TestEvent> eventType, long timeout, TimeUnit timeUnit) throws Exception;
    
    /**
     * Stop job execution and create savepoint
     * @param savepointDir directory for savepoint storage
     * @param advanceToEndOfEventTime advance to end of event time before stopping
     * @return String path to created savepoint
     * @throws Exception if savepoint creation fails
     */
    public String stopWithSavepoint(TemporaryFolder savepointDir, boolean advanceToEndOfEventTime) throws Exception;
    
    /**
     * Send command to specific operator instance
     * @param operatorId identifier of target operator
     * @param command command to send
     * @param scope scope of command execution
     * @throws Exception if command sending fails
     */
    public void sendOperatorCommand(String operatorId, TestCommand command, TestCommandScope scope) throws Exception;
    
    /**
     * Trigger failover for specific operator
     * @param operatorId identifier of operator to fail
     * @throws Exception if failover triggering fails
     */
    public void triggerFailover(String operatorId) throws Exception;
    
    /**
     * Send broadcast command to all operators
     * @param command command to broadcast
     * @param scope scope of command execution
     * @throws Exception if broadcast fails
     */
    public void sendBroadcastCommand(TestCommand command, TestCommandScope scope) throws Exception;
    
    /**
     * Wait for job termination (completion or failure)
     * @throws Exception if waiting for termination fails
     */
    public void waitForTermination() throws Exception;
    
    /**
     * Wait for job termination with timeout
     * @param timeout timeout duration
     * @param timeUnit time unit for timeout
     * @throws Exception if termination doesn't occur within timeout
     */
    public void waitForTermination(long timeout, TimeUnit timeUnit) throws Exception;
    
    /**
     * Assert that job finished successfully without failures
     * @throws Exception if job didn't finish successfully
     */
    public void assertFinishedSuccessfully() throws Exception;
    
    /**
     * Get all events collected during job execution
     * @return List of TestEvent instances
     */
    public List<TestEvent> getAllEvents();
    
    /**
     * Get events of specific type
     * @param eventType class of events to retrieve
     * @return List of events matching the specified type
     */
    public <T extends TestEvent> List<T> getEventsOfType(Class<T> eventType);
    
    /**
     * Cancel the running job
     * @throws Exception if job cancellation fails
     */
    public void cancel() throws Exception;
    
    /**
     * Get current job execution result if available
     * @return Optional containing JobExecutionResult if job completed
     */
    public Optional<JobExecutionResult> getJobResult();
}

/**
 * Container for test job with event and command infrastructure
 */
public class TestJobWithDescription {
    
    /**
     * Constructor for test job container
     * @param jobGraph JobGraph for the test
     * @param eventQueue shared event queue for lifecycle tracking
     * @param commandDispatcher dispatcher for sending commands to operators
     */
    public TestJobWithDescription(
        JobGraph jobGraph, 
        TestEventQueue eventQueue, 
        TestCommandDispatcher commandDispatcher);
    
    /**
     * Get the job graph for execution
     * @return JobGraph instance
     */
    public JobGraph getJobGraph();
    
    /**
     * Get event queue for monitoring
     * @return TestEventQueue instance
     */
    public TestEventQueue getEventQueue();
    
    /**
     * Get command dispatcher for operator control
     * @return TestCommandDispatcher instance
     */
    public TestCommandDispatcher getCommandDispatcher();
}

Test Data and Event Sources

Data structures and source functions for lifecycle testing scenarios.

/**
 * Data element specifically designed for lifecycle testing
 */
public class TestDataElement {
    
    /**
     * Constructor for test data element
     * @param value string value of the element
     * @param timestamp element timestamp
     */
    public TestDataElement(String value, long timestamp);
    
    /**
     * Get element value
     * @return String value
     */
    public String getValue();
    
    /**
     * Get element timestamp
     * @return long timestamp
     */
    public long getTimestamp();
}

/**
 * Source that emits test events and responds to test commands
 */
public class TestEventSource implements SourceFunction<TestDataElement> {
    
    /**
     * Constructor for test event source
     * @param eventQueue queue for lifecycle event tracking
     * @param elementsToEmit number of elements to emit
     */
    public TestEventSource(TestEventQueue eventQueue, int elementsToEmit);
    
    @Override
    public void run(SourceContext<TestDataElement> ctx) throws Exception;
    
    @Override
    public void cancel();
}

Command System

Command dispatch system for controlling operator behavior during lifecycle testing.

/**
 * Dispatcher for sending commands to operators during testing
 */
public class TestCommandDispatcher {
    
    /**
     * Constructor for command dispatcher
     * @param eventQueue event queue for tracking command effects
     */
    public TestCommandDispatcher(TestEventQueue eventQueue);
    
    /**
     * Send command to specific operator
     * @param operatorId target operator identifier
     * @param command command to execute
     */
    public void sendCommand(String operatorId, TestCommand command);
    
    /**
     * Send command to all operators
     * @param command command to broadcast
     */
    public void broadcastCommand(TestCommand command);
}

/**
 * Base interface for test commands
 */
public interface TestCommand {
    
    /**
     * Execute command on target operator
     * @param operator target stream operator
     */
    void execute(StreamOperator<?> operator);
    
    /**
     * Get command type identifier
     * @return String identifying the command type
     */
    String getCommandType();
}

/**
 * Enumeration of command execution scopes
 */
public enum TestCommandScope {
    /** Command applies to all subtasks of the operator */
    ALL_SUBTASKS,
    /** Command applies to a single subtask only */
    SINGLE_SUBTASK
}

/**
 * Command implementations for common test scenarios
 */
public static class TestCommands {
    
    /**
     * Command to trigger operator failure for fault tolerance testing
     */
    public static class FailCommand implements TestCommand {
        
        /**
         * Constructor for fail command
         * @param cause exception to throw as failure cause
         */
        public FailCommand(Exception cause);
        
        @Override
        public void execute(StreamOperator<?> operator);
        
        @Override
        public String getCommandType();
    }
    
    /**
     * Command to trigger operator finishing for graceful termination testing
     */
    public static class FinishCommand implements TestCommand {
        
        /**
         * Constructor for finish command
         */
        public FinishCommand();
        
        @Override
        public void execute(StreamOperator<?> operator);
        
        @Override
        public String getCommandType();
    }
    
    /**
     * Command to trigger checkpoint for state management testing
     */
    public static class TriggerCheckpointCommand implements TestCommand {
        
        /**
         * Constructor for checkpoint trigger command
         * @param checkpointId checkpoint identifier
         */
        public TriggerCheckpointCommand(long checkpointId);
        
        @Override
        public void execute(StreamOperator<?> operator);
        
        @Override
        public String getCommandType();
        
        /**
         * Get checkpoint ID
         * @return long checkpoint identifier
         */
        public long getCheckpointId();
    }
    
    /**
     * Command to pause operator processing for synchronization testing
     */
    public static class PauseCommand implements TestCommand {
        
        /**
         * Constructor for pause command
         * @param durationMs pause duration in milliseconds
         */
        public PauseCommand(long durationMs);
        
        @Override
        public void execute(StreamOperator<?> operator);
        
        @Override
        public String getCommandType();
        
        /**
         * Get pause duration
         * @return long duration in milliseconds
         */
        public long getDurationMs();
    }
}

Lifecycle Validation Framework

Validation framework for verifying correct operator lifecycle behavior and event sequences.

/**
 * Validator for operator lifecycle behavior
 */
public class TestOperatorLifecycleValidator {
    
    /**
     * Constructor for lifecycle validator
     * @param eventQueue event queue containing lifecycle events
     */
    public TestOperatorLifecycleValidator(TestEventQueue eventQueue);
    
    /**
     * Validate complete operator lifecycle sequence
     * @param operatorId identifier of operator to validate
     * @return boolean indicating if lifecycle is valid
     */
    public boolean validateLifecycle(String operatorId);
    
    /**
     * Validate specific lifecycle phase
     * @param operatorId operator identifier
     * @param phase lifecycle phase to validate
     * @return boolean indicating phase validity
     */
    public boolean validatePhase(String operatorId, LifecyclePhase phase);
}

/**
 * Validator for operator draining behavior
 */
public class DrainingValidator {
    
    /**
     * Constructor for draining validator
     * @param eventQueue event queue for validation
     */
    public DrainingValidator(TestEventQueue eventQueue);
    
    /**
     * Validate operator draining sequence
     * @param operatorId operator to validate
     * @return boolean indicating valid draining behavior
     */
    public boolean validateDraining(String operatorId);
}

/**
 * Validator for operator finishing behavior  
 */
public class FinishingValidator {
    
    /**
     * Constructor for finishing validator
     * @param eventQueue event queue for validation
     */
    public FinishingValidator(TestEventQueue eventQueue);
    
    /**
     * Validate operator finishing sequence
     * @param operatorId operator to validate
     * @return boolean indicating valid finishing behavior
     */
    public boolean validateFinishing(String operatorId);
}

/**
 * Validator for job-level data flow behavior
 */
public class TestJobDataFlowValidator {
    
    /**
     * Constructor for data flow validator
     * @param eventQueue event queue containing flow events
     */
    public TestJobDataFlowValidator(TestEventQueue eventQueue);
    
    /**
     * Validate end-to-end data flow through job
     * @param expectedElements expected number of processed elements
     * @return boolean indicating valid data flow
     */
    public boolean validateDataFlow(int expectedElements);
}

/**
 * Lifecycle phases for validation
 */
public enum LifecyclePhase {
    STARTING,
    RUNNING, 
    CHECKPOINTING,
    DRAINING,
    FINISHING,
    STOPPED
}

Usage Examples:

import org.apache.flink.runtime.operators.lifecycle.graph.*;
import org.apache.flink.runtime.operators.lifecycle.event.*;
import org.apache.flink.runtime.operators.lifecycle.validation.*;

// Basic operator lifecycle test
public class OperatorLifecycleTest {
    
    @Test
    public void testSimpleOperatorLifecycle() throws Exception {
        TestEventQueue eventQueue = new TestEventQueue();
        
        // Build simple test job
        JobGraph job = TestJobBuilders.SIMPLE_GRAPH_BUILDER.build(
            new TestConfiguration(eventQueue, 100));
        
        // Execute job
        MiniCluster miniCluster = new MiniCluster(configuration);
        miniCluster.start();
        miniCluster.executeJobBlocking(job);
        
        // Validate lifecycle
        TestOperatorLifecycleValidator validator = 
            new TestOperatorLifecycleValidator(eventQueue);
        assertTrue(validator.validateLifecycle("test-operator"));
    }
    
    @Test
    public void testMultiInputOperatorLifecycle() throws Exception {
        TestEventQueue eventQueue = new TestEventQueue();
        
        // Create multi-input operator
        MultiInputTestOperator operator = new MultiInputTestOperator(eventQueue, 3);
        
        // Build complex job graph
        JobGraph job = TestJobBuilders.COMPLEX_GRAPH_BUILDER.build(
            new TestConfiguration(eventQueue, 500));
        
        // Execute and validate
        executeJobAndValidate(job, eventQueue);
    }
    
    @Test
    public void testOperatorDraining() throws Exception {
        TestEventQueue eventQueue = new TestEventQueue();
        
        // Create job with draining scenario
        JobGraph job = createDrainingTestJob(eventQueue);
        
        // Execute with controlled shutdown
        executeJobWithDraining(job);
        
        // Validate draining behavior
        DrainingValidator validator = new DrainingValidator(eventQueue);
        assertTrue(validator.validateDraining("draining-operator"));
    }
    
    @Test
    public void testCheckpointingLifecycle() throws Exception {
        SharedTestEventQueue eventQueue = SharedTestEventQueue.getInstance();
        eventQueue.clear();
        
        // Create job with checkpointing
        JobGraph job = createCheckpointingTestJob(eventQueue);
        
        // Execute with periodic checkpoints
        JobExecutionResult result = executeJobWithCheckpoints(job, 3);
        
        // Validate checkpoint events
        List<TestEvent> checkpointEvents = eventQueue.getEvents().stream()
            .filter(e -> e instanceof CheckpointCompletedEvent)
            .collect(Collectors.toList());
        
        assertEquals(3, checkpointEvents.size());
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-tests

docs

cancellation-testing.md

checkpointing-migration.md

fault-tolerance-recovery.md

index.md

operator-lifecycle.md

plugin-testing.md

runtime-utilities.md

session-window-testing.md

state-backend-restore.md

test-data-utilities.md

tile.json