CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-test-utils-parent

Comprehensive testing utilities for Apache Flink stream processing framework

Pending
Overview
Eval results
Files

client-testing.mddocs/

Client Application Testing

Testing utilities specifically designed for Flink client applications, including classloader testing and job submission scenarios. These utilities help test the interaction between client applications and Flink clusters.

Capabilities

Classloader Testing

TestUserClassLoaderJob

Test job implementation designed for testing user classloader scenarios and class loading behavior.

/**
 * Test job for user classloader scenarios
 * Used to verify proper class loading in client applications
 */
class TestUserClassLoaderJob {
    /** Main entry point for classloader testing */
    static void main(String[] args) throws Exception;
    
    /** Execute job with custom classloader */
    static void executeWithCustomClassLoader(ClassLoader classLoader) throws Exception;
    
    /** Verify class loading behavior */
    static boolean verifyClassLoading(String className, ClassLoader expectedLoader);
    
    /** Get job execution result */
    static JobExecutionResult getLastExecutionResult();
}

Usage Examples:

import org.apache.flink.client.testjar.TestUserClassLoaderJob;

@Test
public void testClientClassLoading() throws Exception {
    // Create custom classloader
    URLClassLoader customClassLoader = new URLClassLoader(
        new URL[]{new File("test-lib.jar").toURI().toURL()},
        Thread.currentThread().getContextClassLoader()
    );
    
    // Execute job with custom classloader
    TestUserClassLoaderJob.executeWithCustomClassLoader(customClassLoader);
    
    // Verify class loading behavior
    assertTrue(TestUserClassLoaderJob.verifyClassLoading(
        "com.example.MyCustomClass", customClassLoader));
    
    // Check execution result
    JobExecutionResult result = TestUserClassLoaderJob.getLastExecutionResult();
    assertNotNull(result);
}

TestUserClassLoaderAdditionalArtifact

Utility for testing additional artifacts and dependencies in client classloader scenarios.

/**
 * Additional artifacts for classloader testing
 * Provides utilities for testing dependency resolution
 */
class TestUserClassLoaderAdditionalArtifact {
    /** Load additional artifact into classloader */
    static void loadArtifact(String artifactPath, ClassLoader classLoader) throws Exception;
    
    /** Verify artifact availability */
    static boolean isArtifactAvailable(String artifactName, ClassLoader classLoader);
    
    /** Get artifact metadata */
    static ArtifactMetadata getArtifactMetadata(String artifactPath);
    
    /** Resolve artifact dependencies */
    static List<String> resolveDependencies(String artifactPath);
}

/**
 * Metadata information about test artifacts
 */
class ArtifactMetadata {
    /** Get artifact name */
    String getName();
    
    /** Get artifact version */
    String getVersion();
    
    /** Get artifact dependencies */
    List<String> getDependencies();
    
    /** Get artifact manifest attributes */
    Map<String, String> getManifestAttributes();
}

Usage Examples:

import org.apache.flink.client.testjar.TestUserClassLoaderAdditionalArtifact;

@Test
public void testAdditionalArtifacts() throws Exception {
    URLClassLoader testClassLoader = new URLClassLoader(new URL[]{});
    
    // Load additional test artifact
    String artifactPath = "test-connector.jar";
    TestUserClassLoaderAdditionalArtifact.loadArtifact(artifactPath, testClassLoader);
    
    // Verify artifact is available
    assertTrue(TestUserClassLoaderAdditionalArtifact.isArtifactAvailable(
        "test-connector", testClassLoader));
    
    // Get artifact information
    ArtifactMetadata metadata = TestUserClassLoaderAdditionalArtifact.getArtifactMetadata(artifactPath);
    assertEquals("test-connector", metadata.getName());
    assertEquals("1.0.0", metadata.getVersion());
    
    // Check dependencies
    List<String> dependencies = TestUserClassLoaderAdditionalArtifact.resolveDependencies(artifactPath);
    assertFalse(dependencies.isEmpty());
}

Client Testing Patterns

Job Submission Testing

import org.apache.flink.client.testjar.TestUserClassLoaderJob;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.runtime.jobgraph.JobGraph;

@Test
public void testJobSubmissionWithCustomClasspath() throws Exception {
    // Set up custom classpath
    List<URL> classpath = Arrays.asList(
        new File("connector.jar").toURI().toURL(),
        new File("custom-functions.jar").toURI().toURL()
    );
    
    URLClassLoader jobClassLoader = new URLClassLoader(
        classpath.toArray(new URL[0]),
        Thread.currentThread().getContextClassLoader()
    );
    
    // Execute job with custom classloader
    Thread.currentThread().setContextClassLoader(jobClassLoader);
    try {
        TestUserClassLoaderJob.main(new String[]{"--input", "test-input"});
        
        JobExecutionResult result = TestUserClassLoaderJob.getLastExecutionResult();
        assertNotNull(result);
        assertTrue(result.isJobExecutionResult());
        
    } finally {
        Thread.currentThread().setContextClassLoader(
            ClassLoader.getSystemClassLoader());
    }
}

Dependency Isolation Testing

@Test
public void testDependencyIsolation() throws Exception {
    // Create isolated classloaders for different components
    URLClassLoader connectorClassLoader = new URLClassLoader(
        new URL[]{new File("connector-v1.jar").toURI().toURL()});
    
    URLClassLoader functionClassLoader = new URLClassLoader(
        new URL[]{new File("functions-v2.jar").toURI().toURL()});
    
    // Load artifacts into respective classloaders
    TestUserClassLoaderAdditionalArtifact.loadArtifact(
        "connector-v1.jar", connectorClassLoader);
    TestUserClassLoaderAdditionalArtifact.loadArtifact(
        "functions-v2.jar", functionClassLoader);
    
    // Verify isolation
    assertTrue(TestUserClassLoaderAdditionalArtifact.isArtifactAvailable(
        "connector-v1", connectorClassLoader));
    assertFalse(TestUserClassLoaderAdditionalArtifact.isArtifactAvailable(
        "connector-v1", functionClassLoader));
    
    assertTrue(TestUserClassLoaderAdditionalArtifact.isArtifactAvailable(
        "functions-v2", functionClassLoader));
    assertFalse(TestUserClassLoaderAdditionalArtifact.isArtifactAvailable(
        "functions-v2", connectorClassLoader));
}

Client Configuration Testing

@Test
public void testClientConfiguration() throws Exception {
    // Test different client configurations
    Configuration clientConfig = new Configuration();
    clientConfig.setString("jobmanager.rpc.address", "localhost");
    clientConfig.setInteger("jobmanager.rpc.port", 6123);
    clientConfig.setString("rest.address", "localhost");
    clientConfig.setInteger("rest.port", 8081);
    
    // Create test job with configuration
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
        "localhost", 6123, clientConfig);
    
    // Add test pipeline
    DataStream<String> source = env.fromElements("test1", "test2", "test3");
    source.print();
    
    // Execute through client
    JobExecutionResult result = env.execute("Client Configuration Test");
    assertNotNull(result);
}

Resource Management Testing

@Test
public void testResourceManagement() throws Exception {
    // Test resource allocation for client jobs
    Configuration config = new Configuration();
    config.setString("taskmanager.memory.process.size", "1g");
    config.setInteger("taskmanager.numberOfTaskSlots", 4);
    config.setInteger("parallelism.default", 2);
    
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.getConfig().setGlobalJobParameters(ParameterTool.fromMap(config.toMap()));
    
    // Create resource-intensive job
    DataStream<Integer> numbers = env.fromSequence(1, 1000000);
    numbers.map(x -> x * x).print();
    
    // Monitor resource usage during execution
    JobExecutionResult result = env.execute("Resource Management Test");
    
    // Verify resource constraints were respected
    assertTrue(result.getNetRuntime() > 0);
}

Client-Server Communication Testing

@Test
public void testClientServerCommunication() throws Exception {
    // Test communication between client and Flink cluster
    MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(
        new MiniClusterResourceConfiguration.Builder()
            .setNumberTaskManagers(1)
            .setNumberSlotsPerTaskManager(4)
            .build());
    
    cluster.before();
    try {
        ClusterClient<?> client = cluster.getClusterClient();
        
        // Submit test job through client  
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> source = env.fromElements("hello", "world");
        source.print();
        
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        
        // Test job submission
        JobID jobId = client.submitJob(jobGraph).get();
        assertNotNull(jobId);
        
        // Test job monitoring
        JobStatus status = client.getJobStatus(jobId).get();
        assertTrue(status == JobStatus.RUNNING || status == JobStatus.FINISHED);
        
        // Test job cancellation (if still running)
        if (status == JobStatus.RUNNING) {
            client.cancel(jobId).get();
            JobStatus cancelledStatus = client.getJobStatus(jobId).get();
            assertEquals(JobStatus.CANCELED, cancelledStatus);
        }
        
    } finally {
        cluster.after();
    }
}

Dynamic Class Loading Testing

@Test
public void testDynamicClassLoading() throws Exception {
    // Test dynamic loading of classes during job execution
    String jarPath = createDynamicJar(); // Helper method to create JAR
    
    // Create classloader with dynamically created JAR
    URLClassLoader dynamicClassLoader = new URLClassLoader(
        new URL[]{new File(jarPath).toURI().toURL()});
    
    // Load class dynamically
    Class<?> dynamicClass = dynamicClassLoader.loadClass("com.example.DynamicFunction");
    Object instance = dynamicClass.getDeclaredConstructor().newInstance();
    
    // Verify dynamic class functionality
    Method processMethod = dynamicClass.getMethod("process", String.class);
    String result = (String) processMethod.invoke(instance, "test");
    assertEquals("processed: test", result);
    
    // Use dynamic class in Flink job
    TestUserClassLoaderJob.executeWithCustomClassLoader(dynamicClassLoader);
    assertTrue(TestUserClassLoaderJob.verifyClassLoading(
        "com.example.DynamicFunction", dynamicClassLoader));
}

private String createDynamicJar() throws Exception {
    // Helper method to create a JAR file with test classes
    String jarPath = "/tmp/dynamic-test.jar";
    
    try (JarOutputStream jos = new JarOutputStream(new FileOutputStream(jarPath))) {
        // Add class files to JAR
        JarEntry entry = new JarEntry("com/example/DynamicFunction.class");
        jos.putNextEntry(entry);
        
        // Write compiled class bytes (simplified example)
        byte[] classBytes = compileTestClass(); // Implementation omitted for brevity
        jos.write(classBytes);
        jos.closeEntry();
    }
    
    return jarPath;
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-test-utils-parent

docs

client-testing.md

connector-testing.md

core-testing.md

index.md

migration-testing.md

table-testing.md

test-environments.md

tile.json