CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-tests

Comprehensive test library for Apache Flink stream processing framework with integration tests, test utilities, and end-to-end validation tests.

Pending
Overview
Eval results
Files

runtime-utilities.mddocs/

Runtime Utilities

Comprehensive collection of runtime utilities for test execution, process management, and common testing operations. These utilities provide essential infrastructure for running tests in controlled environments and managing Flink job execution.

Capabilities

Job Graph Execution Utilities

Utilities for executing JobGraphs on MiniCluster instances with comprehensive control and monitoring capabilities.

/**
 * Utility for running JobGraphs on MiniCluster for testing
 */
public class JobGraphRunningUtil {
    
    /**
     * Execute JobGraph on MiniCluster and wait for completion
     * @param jobGraph JobGraph to execute
     * @param miniCluster MiniCluster instance for execution  
     * @throws Exception if job execution fails
     */
    public static void execute(JobGraph jobGraph, MiniCluster miniCluster) throws Exception;
    
    /**
     * Execute JobGraph with timeout
     * @param jobGraph JobGraph to execute
     * @param miniCluster MiniCluster instance
     * @param timeoutMs timeout in milliseconds
     * @return JobExecutionResult containing execution results
     * @throws Exception if execution fails or times out
     */
    public static JobExecutionResult executeWithTimeout(
        JobGraph jobGraph, 
        MiniCluster miniCluster, 
        long timeoutMs) throws Exception;
    
    /**
     * Execute JobGraph and return execution result
     * @param jobGraph JobGraph to execute
     * @param miniCluster MiniCluster instance
     * @return JobExecutionResult with job execution details
     * @throws Exception if execution fails
     */
    public static JobExecutionResult executeAndGetResult(
        JobGraph jobGraph, 
        MiniCluster miniCluster) throws Exception;
    
    /**
     * Execute multiple JobGraphs sequentially
     * @param jobGraphs list of JobGraphs to execute
     * @param miniCluster MiniCluster instance
     * @return List of JobExecutionResults
     * @throws Exception if any job execution fails
     */
    public static List<JobExecutionResult> executeSequentially(
        List<JobGraph> jobGraphs,
        MiniCluster miniCluster) throws Exception;
    
    /**
     * Submit JobGraph asynchronously and return CompletableFuture
     * @param jobGraph JobGraph to submit
     * @param miniCluster MiniCluster instance
     * @return CompletableFuture containing JobExecutionResult
     */
    public static CompletableFuture<JobExecutionResult> submitAsync(
        JobGraph jobGraph,
        MiniCluster miniCluster);
}

Process Management Utilities

Entry points and utilities for managing external processes during testing scenarios.

/**
 * Entry point for task executor process testing
 */
public class TaskExecutorProcessEntryPoint {
    
    /**
     * Main entry point for standalone task executor process
     * @param args command line arguments for task executor configuration
     */
    public static void main(String[] args);
    
    /**
     * Start task executor with specific configuration
     * @param config Configuration for task executor setup
     * @throws Exception if task executor startup fails
     */
    public static void startTaskExecutor(Configuration config) throws Exception;
    
    /**
     * Create default configuration for task executor testing
     * @return Configuration with testing defaults
     */
    public static Configuration createDefaultTestConfiguration();
}

Test Function Utilities

Common test functions and utilities for data processing and validation scenarios.

/**
 * Tokenizer function for string processing tests
 */
public class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
    
    /**
     * Constructor for tokenizer with default configuration
     */
    public Tokenizer();
    
    /**
     * Constructor for tokenizer with custom delimiter
     * @param delimiter delimiter pattern for tokenization
     */
    public Tokenizer(String delimiter);
    
    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception;
}

/**
 * Identity mapper for testing data flow without transformation
 */
public class NoOpIntMap implements MapFunction<Integer, Integer> {
    
    /**
     * Constructor for no-operation integer mapper
     */
    public NoOpIntMap();
    
    @Override
    public Integer map(Integer value) throws Exception;
}

/**
 * No-operation sink for testing data flow completion
 */
public class ReceiveCheckNoOpSink<T> implements SinkFunction<T> {
    
    /**
     * Constructor for no-op sink with receive tracking
     * @param expectedCount expected number of elements to receive
     */
    public ReceiveCheckNoOpSink(int expectedCount);
    
    @Override
    public void invoke(T value, Context context) throws Exception;
    
    /**
     * Check if expected number of elements were received
     * @return boolean indicating if expected count was reached
     */
    public boolean receivedExpectedCount();
    
    /**
     * Get actual count of received elements
     * @return int representing actual received count
     */
    public int getReceivedCount();
    
    /**
     * Reset the counter for reuse in multiple tests
     */
    public void reset();
}

Recovery Testing Utilities

Utilities specifically designed for testing recovery scenarios and restart strategies.

/**
 * Utility class for recovery testing operations
 */
public class RecoveryTestUtils {
    
    /**
     * Validate recovery behavior from job execution result
     * @param result JobExecutionResult to analyze
     * @param expectedRestartCount expected number of restarts
     * @return boolean indicating if recovery behavior is valid
     */
    public static boolean validateRecoveryBehavior(
        JobExecutionResult result, 
        int expectedRestartCount);
    
    /**
     * Create configuration for failure injection testing
     * @param restartStrategy restart strategy to use
     * @param maxFailures maximum number of failures to inject
     * @return Configuration with failure injection settings
     */
    public static Configuration createFailureInjectionConfig(
        String restartStrategy,
        int maxFailures);
    
    /**
     * Wait for job to reach specific state with timeout
     * @param jobId JobID to monitor
     * @param targetState target JobStatus to wait for
     * @param timeoutMs timeout in milliseconds
     * @param restGateway RestClient for job monitoring
     * @return boolean indicating if state was reached
     * @throws Exception if monitoring fails
     */
    public static boolean waitForJobState(
        JobID jobId,
        JobStatus targetState,
        long timeoutMs,
        RestClusterClient<?> restGateway) throws Exception;
}

Test Environment Utilities

Utilities for setting up and managing test environments and configurations.

/**
 * Utility for creating and managing test environments
 */
public class TestEnvironmentUtil {
    
    /**
     * Create MiniCluster configuration for testing
     * @param parallelism desired parallelism
     * @param numTaskManagers number of task managers
     * @return Configuration for MiniCluster setup
     */
    public static Configuration createTestClusterConfig(
        int parallelism,
        int numTaskManagers);
    
    /**
     * Create streaming environment for testing
     * @param parallelism parallelism for the environment
     * @param checkpointingEnabled whether to enable checkpointing
     * @return StreamExecutionEnvironment configured for testing
     */
    public static StreamExecutionEnvironment createTestStreamEnv(
        int parallelism,
        boolean checkpointingEnabled);
    
    /**
     * Create batch environment for testing
     * @param parallelism parallelism for the environment
     * @return ExecutionEnvironment configured for testing
     */
    public static ExecutionEnvironment createTestBatchEnv(int parallelism);
    
    /**
     * Set up test-specific logging configuration
     * @param logLevel logging level for tests
     * @param logToConsole whether to log to console
     */
    public static void setupTestLogging(Level logLevel, boolean logToConsole);
    
    /**
     * Clean up test environment resources
     * @param environment execution environment to clean up
     * @param miniCluster mini cluster to shut down
     * @throws Exception if cleanup fails
     */
    public static void cleanupTestEnvironment(
        StreamExecutionEnvironment environment,
        MiniCluster miniCluster) throws Exception;
}

Metrics and Monitoring Utilities

Utilities for collecting and validating metrics during test execution.

/**
 * Utility for metrics collection and validation in tests
 */
public class TestMetricsUtil {
    
    /**
     * Collect all metrics from MiniCluster
     * @param miniCluster cluster to collect metrics from
     * @return Map of metric names to values
     * @throws Exception if metrics collection fails
     */
    public static Map<String, Object> collectAllMetrics(MiniCluster miniCluster) throws Exception;
    
    /**
     * Wait for specific metric to reach expected value
     * @param miniCluster cluster to monitor
     * @param metricName name of metric to monitor
     * @param expectedValue expected metric value
     * @param timeoutMs timeout in milliseconds
     * @return boolean indicating if metric reached expected value
     * @throws Exception if monitoring fails
     */
    public static boolean waitForMetricValue(
        MiniCluster miniCluster,
        String metricName,
        Object expectedValue,
        long timeoutMs) throws Exception;
    
    /**
     * Validate job metrics against expected values
     * @param jobId JobID to validate metrics for
     * @param expectedMetrics map of expected metric values
     * @param miniCluster cluster to collect metrics from
     * @return boolean indicating if all metrics match expectations
     * @throws Exception if validation fails
     */
    public static boolean validateJobMetrics(
        JobID jobId,
        Map<String, Object> expectedMetrics,
        MiniCluster miniCluster) throws Exception;
}

Usage Examples:

import org.apache.flink.test.runtime.*;
import org.apache.flink.test.util.*;

// Example: Executing jobs with runtime utilities
public class JobExecutionTest {
    
    @Test
    public void testJobExecution() throws Exception {
        // Create test job graph
        JobGraph jobGraph = createTestJobGraph();
        
        // Set up mini cluster for testing
        Configuration config = TestEnvironmentUtil.createTestClusterConfig(4, 2);
        MiniCluster miniCluster = new MiniCluster(config);
        miniCluster.start();
        
        try {
            // Execute job with timeout
            JobExecutionResult result = JobGraphRunningUtil.executeWithTimeout(
                jobGraph, miniCluster, 30000L);
            
            // Validate execution results
            assertTrue(result.isSuccess());
            assertFalse(result.getAllAccumulatorResults().isEmpty());
            
        } finally {
            miniCluster.close();
        }
    }
    
    @Test
    public void testSequentialJobExecution() throws Exception {
        List<JobGraph> jobs = Arrays.asList(
            createTestJobGraph("job1"),
            createTestJobGraph("job2"),
            createTestJobGraph("job3")
        );
        
        Configuration config = TestEnvironmentUtil.createTestClusterConfig(2, 1);
        MiniCluster miniCluster = new MiniCluster(config);
        miniCluster.start();
        
        try {
            // Execute jobs sequentially
            List<JobExecutionResult> results = JobGraphRunningUtil.executeSequentially(
                jobs, miniCluster);
            
            // Validate all jobs completed successfully
            assertEquals(3, results.size());
            results.forEach(result -> assertTrue(result.isSuccess()));
            
        } finally {
            miniCluster.close();
        }
    }
}

// Example: Recovery testing with utilities
public class RecoveryUtilityTest {
    
    @Test
    public void testRecoveryBehavior() throws Exception {
        // Create job with failure injection
        JobGraph faultTolerantJob = createJobWithFailures();
        
        // Configure recovery settings
        Configuration config = RecoveryTestUtils.createFailureInjectionConfig(
            "fixed-delay", 3);
        config.setString("restart-strategy.fixed-delay.delay", "1s");
        
        MiniCluster miniCluster = new MiniCluster(config);
        miniCluster.start();
        
        try {
            // Execute job and wait for completion
            JobExecutionResult result = JobGraphRunningUtil.executeAndGetResult(
                faultTolerantJob, miniCluster);
            
            // Validate recovery behavior
            boolean recoveryValid = RecoveryTestUtils.validateRecoveryBehavior(
                result, 2);
            assertTrue(recoveryValid);
            
        } finally {
            miniCluster.close();
        }
    }
}

// Example: Using test functions
public class TestFunctionUsage {
    
    @Test
    public void testTokenizerFunction() throws Exception {
        StreamExecutionEnvironment env = TestEnvironmentUtil.createTestStreamEnv(1, false);
        
        // Create test data
        DataStreamSource<String> textStream = env.fromElements(
            "hello world", "test data", "apache flink"
        );
        
        // Apply tokenizer
        DataStream<Tuple2<String, Integer>> tokens = textStream
            .flatMap(new Tokenizer())
            .returns(TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){}));
        
        // Use no-op sink to count results
        ReceiveCheckNoOpSink<Tuple2<String, Integer>> countingSink = 
            new ReceiveCheckNoOpSink<>(6); // Expecting 6 tokens
        
        tokens.addSink(countingSink);
        
        // Execute and validate
        env.execute("Tokenizer Test");
        assertTrue(countingSink.receivedExpectedCount());
    }
}

// Example: Environment setup and cleanup
public class EnvironmentManagementTest {
    
    private StreamExecutionEnvironment env;
    private MiniCluster miniCluster;
    
    @Before
    public void setup() throws Exception {
        // Set up test logging
        TestEnvironmentUtil.setupTestLogging(Level.INFO, true);
        
        // Create test environment
        env = TestEnvironmentUtil.createTestStreamEnv(2, true);
        
        // Start mini cluster
        Configuration config = TestEnvironmentUtil.createTestClusterConfig(2, 1);
        miniCluster = new MiniCluster(config);
        miniCluster.start();
    }
    
    @After
    public void cleanup() throws Exception {
        // Clean up test resources
        TestEnvironmentUtil.cleanupTestEnvironment(env, miniCluster);
    }
    
    @Test
    public void testWithManagedEnvironment() throws Exception {
        // Test implementation using managed environment
        DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4, 5);
        
        DataStream<Integer> processed = source
            .map(new NoOpIntMap())
            .filter(x -> x > 2);
        
        ReceiveCheckNoOpSink<Integer> sink = new ReceiveCheckNoOpSink<>(3);
        processed.addSink(sink);
        
        env.execute("Managed Environment Test");
        assertTrue(sink.receivedExpectedCount());
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-tests

docs

cancellation-testing.md

checkpointing-migration.md

fault-tolerance-recovery.md

index.md

operator-lifecycle.md

plugin-testing.md

runtime-utilities.md

session-window-testing.md

state-backend-restore.md

test-data-utilities.md

tile.json