CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-tests

Comprehensive test library for Apache Flink stream processing framework with integration tests, test utilities, and end-to-end validation tests.

Pending
Overview
Eval results
Files

plugin-testing.mddocs/

Plugin Testing Framework

Framework for testing Flink's plugin system and service provider interface (SPI) implementations. This framework enables validation of plugin loading, dependency isolation, and service discovery mechanisms.

Capabilities

Plugin Test Base

Abstract base class providing framework for testing Flink plugin system functionality and SPI implementations.

/**
 * Base class for testing Flink plugin system and SPI implementations
 */
public abstract class PluginTestBase {
    
    /**
     * Test plugin loading and initialization
     * @throws Exception if plugin loading test fails
     */
    protected abstract void testPluginLoading() throws Exception;
    
    /**
     * Test plugin dependency isolation and classloader separation
     * @param pluginPath path to plugin JAR file
     * @throws Exception if dependency isolation test fails
     */
    protected void testPluginDependencyIsolation(String pluginPath) throws Exception;
    
    /**
     * Test service provider interface discovery and loading
     * @param serviceInterface service interface class
     * @param expectedImplementations expected number of implementations
     * @throws Exception if SPI discovery test fails
     */
    protected <T> void testServiceProviderDiscovery(
        Class<T> serviceInterface, 
        int expectedImplementations) throws Exception;
    
    /**
     * Test plugin configuration and parameter passing
     * @param pluginConfig configuration properties for plugin
     * @throws Exception if plugin configuration test fails
     */
    protected void testPluginConfiguration(Properties pluginConfig) throws Exception;
    
    /**
     * Test plugin lifecycle management (load, initialize, shutdown)
     * @param pluginIdentifier unique identifier for plugin
     * @throws Exception if plugin lifecycle test fails
     */
    protected void testPluginLifecycle(String pluginIdentifier) throws Exception;
    
    /**
     * Validate plugin classloader isolation
     * @param pluginClassLoader plugin's classloader
     * @param parentClassLoader parent application classloader
     * @return boolean indicating proper isolation
     */
    protected boolean validateClassLoaderIsolation(
        ClassLoader pluginClassLoader, 
        ClassLoader parentClassLoader);
    
    /**
     * Create test plugin configuration with specified properties
     * @param pluginName name of the plugin
     * @param pluginVersion version of the plugin
     * @param configProperties additional configuration properties
     * @return PluginConfiguration for testing
     */
    protected PluginConfiguration createTestPluginConfiguration(
        String pluginName,
        String pluginVersion, 
        Map<String, String> configProperties);
}

Plugin Service Interfaces

Test implementations of service provider interfaces for validating plugin discovery and loading.

/**
 * Test service provider interface for plugin testing
 */
public interface TestPluginService {
    
    /**
     * Get service name identifier
     * @return String service name
     */
    String getServiceName();
    
    /**
     * Get service version
     * @return String service version
     */
    String getServiceVersion();
    
    /**
     * Initialize service with configuration
     * @param config service configuration properties
     * @throws Exception if initialization fails
     */
    void initialize(Map<String, String> config) throws Exception;
    
    /**
     * Execute service operation for testing
     * @param input input data for service operation
     * @return String result of service operation
     * @throws Exception if service operation fails
     */
    String executeOperation(String input) throws Exception;
    
    /**
     * Shutdown service and cleanup resources
     * @throws Exception if shutdown fails  
     */
    void shutdown() throws Exception;
}

/**
 * Test implementation of plugin service interface
 */
public class TestPluginServiceImpl implements TestPluginService {
    
    /**
     * Constructor for test plugin service implementation
     */
    public TestPluginServiceImpl();
    
    @Override
    public String getServiceName();
    
    @Override
    public String getServiceVersion();
    
    @Override
    public void initialize(Map<String, String> config) throws Exception;
    
    @Override
    public String executeOperation(String input) throws Exception;
    
    @Override
    public void shutdown() throws Exception;
    
    /**
     * Check if service is properly initialized
     * @return boolean indicating initialization status
     */
    public boolean isInitialized();
    
    /**
     * Get service configuration
     * @return Map of configuration properties
     */
    public Map<String, String> getConfiguration();
}

/**
 * Alternative test service implementation for multiple provider testing
 */
public class AlternativeTestPluginServiceImpl implements TestPluginService {
    
    @Override
    public String getServiceName();
    
    @Override  
    public String getServiceVersion();
    
    @Override
    public void initialize(Map<String, String> config) throws Exception;
    
    @Override
    public String executeOperation(String input) throws Exception;
    
    @Override
    public void shutdown() throws Exception;
}

Plugin Management Utilities

Utility classes for plugin loading, management, and validation operations.

/**
 * Utilities for plugin loading and management testing
 */
public class PluginTestUtils {
    
    /**
     * Load plugin from JAR file with isolated classloader
     * @param pluginJarPath path to plugin JAR file
     * @param parentClassLoader parent classloader for isolation
     * @return PluginClassLoader for the loaded plugin
     * @throws Exception if plugin loading fails
     */
    public static PluginClassLoader loadPluginFromJar(
        String pluginJarPath, 
        ClassLoader parentClassLoader) throws Exception;
    
    /**
     * Discover service implementations using SPI mechanism
     * @param serviceInterface service interface class
     * @param pluginClassLoader classloader containing service implementations
     * @return List of service implementation instances
     * @throws Exception if service discovery fails
     */
    public static <T> List<T> discoverServices(
        Class<T> serviceInterface, 
        ClassLoader pluginClassLoader) throws Exception;
    
    /**
     * Validate plugin JAR structure and required files
     * @param pluginJarPath path to plugin JAR file
     * @return PluginValidationResult containing validation details
     * @throws Exception if validation process fails  
     */
    public static PluginValidationResult validatePluginJar(String pluginJarPath) throws Exception;
    
    /**
     * Create test plugin JAR with specified services
     * @param outputPath path for created plugin JAR
     * @param serviceImplementations service implementations to include
     * @param pluginMetadata metadata for plugin descriptor
     * @throws Exception if plugin JAR creation fails
     */
    public static void createTestPluginJar(
        String outputPath,
        List<Class<?>> serviceImplementations, 
        PluginMetadata pluginMetadata) throws Exception;
    
    /**
     * Test plugin dependency isolation by attempting to load conflicting dependencies
     * @param pluginClassLoader plugin's isolated classloader
     * @param dependencyClass class that should be isolated
     * @return boolean indicating successful dependency isolation
     */
    public static boolean testDependencyIsolation(
        ClassLoader pluginClassLoader, 
        Class<?> dependencyClass);
}

/**
 * Plugin classloader with isolation capabilities for testing
 */
public class PluginClassLoader extends URLClassLoader {
    
    /**
     * Constructor for plugin classloader
     * @param urls URLs containing plugin classes and resources
     * @param parent parent classloader
     * @param isolatedPackages packages to isolate from parent
     */
    public PluginClassLoader(URL[] urls, ClassLoader parent, Set<String> isolatedPackages);
    
    @Override
    public Class<?> loadClass(String name) throws ClassNotFoundException;
    
    /**
     * Check if package is isolated in this classloader
     * @param packageName package name to check
     * @return boolean indicating package isolation
     */
    public boolean isPackageIsolated(String packageName);
    
    /**
     * Get list of isolated packages
     * @return Set of isolated package names
     */
    public Set<String> getIsolatedPackages();
}

Plugin Configuration and Metadata

Configuration and metadata classes for plugin testing scenarios.

/**
 * Configuration for plugin testing scenarios
 */
public class PluginConfiguration {
    
    /**
     * Constructor for plugin configuration
     * @param pluginName name of the plugin
     * @param pluginVersion version of the plugin
     * @param configProperties configuration properties
     */
    public PluginConfiguration(
        String pluginName, 
        String pluginVersion, 
        Map<String, String> configProperties);
    
    /**
     * Get plugin name
     * @return String plugin name
     */
    public String getPluginName();
    
    /**
     * Get plugin version
     * @return String plugin version
     */
    public String getPluginVersion();
    
    /**
     * Get configuration properties
     * @return Map of configuration key-value pairs
     */
    public Map<String, String> getConfigProperties();
    
    /**
     * Get specific configuration property value
     * @param key property key
     * @return String property value or null if not found
     */
    public String getProperty(String key);
    
    /**
     * Set configuration property
     * @param key property key
     * @param value property value
     */
    public void setProperty(String key, String value);
}

/**
 * Metadata for plugin descriptor and JAR creation
 */
public class PluginMetadata {
    
    /**
     * Constructor for plugin metadata
     * @param pluginId unique plugin identifier
     * @param pluginName human-readable plugin name
     * @param version plugin version
     * @param description plugin description
     */
    public PluginMetadata(
        String pluginId, 
        String pluginName, 
        String version, 
        String description);
    
    /**
     * Get plugin identifier
     * @return String plugin ID
     */
    public String getPluginId();
    
    /**
     * Get plugin name
     * @return String plugin name
     */
    public String getPluginName();
    
    /**
     * Get plugin version
     * @return String version
     */
    public String getVersion();
    
    /**
     * Get plugin description
     * @return String description
     */
    public String getDescription();
    
    /**
     * Get required dependencies
     * @return List of required dependency specifications
     */
    public List<DependencySpecification> getRequiredDependencies();
    
    /**
     * Add required dependency
     * @param dependency dependency specification to add
     */
    public void addRequiredDependency(DependencySpecification dependency);
}

/**
 * Result of plugin validation operations
 */
public class PluginValidationResult {
    
    /**
     * Check if plugin validation was successful
     * @return boolean indicating validation success
     */
    public boolean isValid();
    
    /**
     * Get list of validation errors
     * @return List of validation error messages
     */
    public List<String> getValidationErrors();
    
    /**
     * Get list of validation warnings
     * @return List of validation warning messages
     */
    public List<String> getValidationWarnings();
    
    /**
     * Get discovered service implementations
     * @return List of service implementation class names
     */
    public List<String> getDiscoveredServices();
    
    /**
     * Check if required plugin descriptor is present
     * @return boolean indicating descriptor presence
     */
    public boolean hasPluginDescriptor();
}

/**
 * Specification for plugin dependencies
 */
public class DependencySpecification {
    
    /**
     * Constructor for dependency specification
     * @param groupId dependency group identifier
     * @param artifactId dependency artifact identifier
     * @param version dependency version
     * @param scope dependency scope (compile, runtime, test)
     */
    public DependencySpecification(
        String groupId, 
        String artifactId, 
        String version, 
        String scope);
    
    /**
     * Get dependency coordinates as string
     * @return String representation of dependency coordinates
     */
    public String getCoordinates();
}

Usage Examples:

import org.apache.flink.test.plugin.PluginTestBase;
import org.apache.flink.test.plugin.jar.*;

// Basic plugin testing
public class BasicPluginTest extends PluginTestBase {
    
    @Test
    public void testBasicPluginLoading() throws Exception {
        testPluginLoading();
    }
    
    @Override
    protected void testPluginLoading() throws Exception {
        // Create test plugin configuration
        Map<String, String> configProps = new HashMap<>();
        configProps.put("test.property", "test.value");
        configProps.put("service.timeout", "30000");
        
        PluginConfiguration config = createTestPluginConfiguration(
            "test-plugin", "1.0.0", configProps);
        
        // Test plugin configuration
        testPluginConfiguration(config.getConfigProperties());
        
        // Test plugin lifecycle
        testPluginLifecycle("test-plugin-001");
    }
    
    @Test
    public void testServiceProviderDiscovery() throws Exception {
        // Test SPI discovery for TestPluginService
        testServiceProviderDiscovery(TestPluginService.class, 2);
    }
    
    @Test
    public void testPluginDependencyIsolation() throws Exception {
        // Create test plugin JAR
        String pluginJarPath = "/tmp/test-plugin.jar";
        
        PluginMetadata metadata = new PluginMetadata(
            "test-plugin", "Test Plugin", "1.0.0", 
            "Plugin for testing dependency isolation");
        
        PluginTestUtils.createTestPluginJar(
            pluginJarPath,
            Arrays.asList(TestPluginServiceImpl.class),
            metadata);
        
        // Test dependency isolation
        testPluginDependencyIsolation(pluginJarPath);
    }
}

// Advanced plugin testing scenarios
public class AdvancedPluginTest extends PluginTestBase {
    
    @Test
    public void testMultipleServiceImplementations() throws Exception {
        // Create plugin with multiple service implementations
        String pluginJarPath = "/tmp/multi-service-plugin.jar";
        
        PluginMetadata metadata = new PluginMetadata(
            "multi-service-plugin", "Multi-Service Plugin", "2.0.0",
            "Plugin with multiple service implementations");
        
        // Add dependency specifications
        metadata.addRequiredDependency(new DependencySpecification(
            "org.apache.flink", "flink-core", "2.1.0", "provided"));
        
        PluginTestUtils.createTestPluginJar(
            pluginJarPath,
            Arrays.asList(
                TestPluginServiceImpl.class, 
                AlternativeTestPluginServiceImpl.class),
            metadata);
        
        // Validate plugin JAR
        PluginValidationResult validationResult = 
            PluginTestUtils.validatePluginJar(pluginJarPath);
        
        assertTrue(validationResult.isValid());
        assertEquals(2, validationResult.getDiscoveredServices().size());
        assertTrue(validationResult.hasPluginDescriptor());
        
        // Load plugin and discover services
        PluginClassLoader pluginClassLoader = PluginTestUtils.loadPluginFromJar(
            pluginJarPath, getClass().getClassLoader());
        
        List<TestPluginService> services = PluginTestUtils.discoverServices(
            TestPluginService.class, pluginClassLoader);
        
        assertEquals(2, services.size());
        
        // Test each service implementation
        for (TestPluginService service : services) {
            Map<String, String> config = new HashMap<>();
            config.put("test.mode", "advanced");
            
            service.initialize(config);
            assertTrue(service instanceof TestPluginServiceImpl || 
                      service instanceof AlternativeTestPluginServiceImpl);
            
            String result = service.executeOperation("test-input");
            assertNotNull(result);
            
            service.shutdown();
        }
    }
    
    @Test
    public void testClassLoaderIsolation() throws Exception {
        String pluginJarPath = "/tmp/isolation-test-plugin.jar";
        
        // Create plugin with isolated dependencies
        PluginMetadata metadata = new PluginMetadata(
            "isolation-plugin", "Isolation Test Plugin", "1.0.0",
            "Plugin for testing classloader isolation");
        
        PluginTestUtils.createTestPluginJar(
            pluginJarPath,
            Arrays.asList(TestPluginServiceImpl.class),
            metadata);
        
        // Load plugin with isolated classloader
        Set<String> isolatedPackages = Set.of(
            "org.apache.flink.test.plugin.custom",
            "com.example.plugin.isolated");
        
        PluginClassLoader pluginClassLoader = new PluginClassLoader(
            new URL[]{new File(pluginJarPath).toURI().toURL()},
            getClass().getClassLoader(),
            isolatedPackages);
        
        // Validate classloader isolation
        boolean isolationValid = validateClassLoaderIsolation(
            pluginClassLoader, getClass().getClassLoader());
        assertTrue(isolationValid);
        
        // Test dependency isolation
        boolean dependencyIsolated = PluginTestUtils.testDependencyIsolation(
            pluginClassLoader, TestPluginService.class);
        assertTrue(dependencyIsolated);
        
        // Verify isolated packages
        for (String packageName : isolatedPackages) {
            assertTrue(pluginClassLoader.isPackageIsolated(packageName));
        }
    }
    
    @Test
    public void testPluginConfigurationParsing() throws Exception {
        // Create complex plugin configuration
        Map<String, String> complexConfig = new HashMap<>();
        complexConfig.put("service.name", "advanced-test-service");
        complexConfig.put("service.threads", "10");
        complexConfig.put("service.timeout.connect", "5000");
        complexConfig.put("service.timeout.read", "30000");
        complexConfig.put("service.retry.attempts", "3");
        complexConfig.put("service.retry.backoff", "exponential");
        
        PluginConfiguration config = createTestPluginConfiguration(
            "advanced-plugin", "3.0.0", complexConfig);
        
        // Test configuration access
        assertEquals("advanced-test-service", config.getProperty("service.name"));
        assertEquals("10", config.getProperty("service.threads"));
        assertNull(config.getProperty("nonexistent.property"));
        
        // Test configuration modification
        config.setProperty("service.debug", "true");
        assertEquals("true", config.getProperty("service.debug"));
        
        // Test plugin configuration
        testPluginConfiguration(config.getConfigProperties());
    }
}

// Plugin lifecycle testing
public class PluginLifecycleTest extends PluginTestBase {
    
    @Test
    public void testCompletePluginLifecycle() throws Exception {
        String pluginId = "lifecycle-test-plugin";
        
        // Test complete lifecycle: load -> initialize -> execute -> shutdown
        testPluginLifecycle(pluginId);
    }
    
    @Override
    protected void testPluginLoading() throws Exception {
        // Custom plugin loading test implementation
        String pluginJarPath = createTestPluginJar();
        
        // Load plugin
        PluginClassLoader classLoader = PluginTestUtils.loadPluginFromJar(
            pluginJarPath, getClass().getClassLoader());
        
        // Discover and instantiate services
        List<TestPluginService> services = PluginTestUtils.discoverServices(
            TestPluginService.class, classLoader);
        
        assertFalse(services.isEmpty());
        
        // Test each service lifecycle
        for (TestPluginService service : services) {
            // Initialize
            Map<String, String> config = createServiceConfig();
            service.initialize(config);
            
            if (service instanceof TestPluginServiceImpl) {
                assertTrue(((TestPluginServiceImpl) service).isInitialized());
            }
            
            // Execute operations
            String result1 = service.executeOperation("test-input-1");
            String result2 = service.executeOperation("test-input-2");
            
            assertNotNull(result1);
            assertNotNull(result2);
            assertNotEquals(result1, result2); // Should produce different results
            
            // Shutdown
            service.shutdown();
        }
    }
    
    private String createTestPluginJar() throws Exception {
        String jarPath = "/tmp/lifecycle-test-plugin.jar";
        
        PluginMetadata metadata = new PluginMetadata(
            "lifecycle-plugin", "Lifecycle Test Plugin", "1.0.0",
            "Plugin for testing complete lifecycle");
        
        PluginTestUtils.createTestPluginJar(
            jarPath,
            Arrays.asList(TestPluginServiceImpl.class),
            metadata);
        
        return jarPath;
    }
    
    private Map<String, String> createServiceConfig() {
        Map<String, String> config = new HashMap<>();
        config.put("service.id", "lifecycle-test-service");
        config.put("service.version", "1.0.0");
        config.put("test.mode", "lifecycle");
        return config;
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-tests

docs

cancellation-testing.md

checkpointing-migration.md

fault-tolerance-recovery.md

index.md

operator-lifecycle.md

plugin-testing.md

runtime-utilities.md

session-window-testing.md

state-backend-restore.md

test-data-utilities.md

tile.json