or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

class-loading-programs.mdindex.mdmigration-testing.mdperformance-testing.mdstate-management-testing.mdstreaming-utilities.mdtest-base-classes.mdtest-data-generation.md
tile.json

class-loading-programs.mddocs/

Class Loading Test Programs

Complete standalone programs for testing dynamic class loading, user code isolation, and class loading policies in Flink applications. Each program serves as a test case for different class loading scenarios, ensuring that user code is properly isolated and loaded according to Flink's class loading mechanisms.

Capabilities

Basic Streaming Program

Simple streaming program for basic class loading tests.

/**
 * Basic streaming program for class loading tests
 */
public class StreamingProgram {
    
    /**
     * Main entry point for basic streaming class loading test
     * @param args Command line arguments (not used)
     * @throws Exception if program execution fails
     */
    public static void main(String[] args) throws Exception;
}

Usage:

# Run as standalone program for class loading testing
java -cp flink-tests.jar org.apache.flink.test.classloading.jar.StreamingProgram

Checkpointed Streaming Program

Streaming program that tests checkpointing with user-defined state classes.

/**
 * Checkpointed streaming program for testing state class loading
 */
public class CheckpointedStreamingProgram {
    
    /**
     * Main entry point for checkpointed streaming class loading test
     * Creates a streaming job with checkpointed state to test state serialization
     * and class loading across restarts
     * @param args Command line arguments (not used)
     * @throws Exception if program execution fails
     */
    public static void main(String[] args) throws Exception;
}

Custom Input Split Programs

Programs for testing custom input split functionality and class loading.

/**
 * Program for testing custom input split functionality in batch processing
 */
public class CustomInputSplitProgram {
    
    /**
     * Main entry point for custom input split test
     * Tests loading and execution of custom InputFormat and InputSplit classes
     * @param args Command line arguments (not used)
     * @throws Exception if program execution fails
     */
    public static void main(String[] args) throws Exception;
}

/**
 * Program for testing custom input split functionality in streaming
 */
public class StreamingCustomInputSplitProgram {
    
    /**
     * Main entry point for streaming custom input split test
     * Tests loading of custom SourceFunction classes in streaming context
     * @param args Command line arguments (not used)
     * @throws Exception if program execution fails
     */
    public static void main(String[] args) throws Exception;
}

Custom Key-Value State Programs

Programs for testing custom key-value state functionality and serialization.

/**
 * Program for testing basic custom key-value state
 */
public class CustomKvStateProgram {
    
    /**
     * Main entry point for custom key-value state test
     * Tests loading and serialization of custom state types
     * @param args Command line arguments (not used)  
     * @throws Exception if program execution fails
     */
    public static void main(String[] args) throws Exception;
}

/**
 * Program for testing checkpointing with custom key-value state
 */
public class CheckpointingCustomKvStateProgram {
    
    /**
     * Main entry point for checkpointing custom key-value state test
     * Tests state persistence and recovery with custom state classes
     * @param args Command line arguments (not used)
     * @throws Exception if program execution fails
     */
    public static void main(String[] args) throws Exception;
}

Class Loading Policy Program

Program for testing different class loading policies and configurations.

/**
 * Program for testing class loading policy configurations
 */
public class ClassLoadingPolicyProgram {
    
    /**
     * Main entry point for class loading policy test
     * Tests different class loading strategies (parent-first vs child-first)
     * @param args Command line arguments (not used)
     * @throws Exception if program execution fails
     */
    public static void main(String[] args) throws Exception;
}

User Code Type Program

Program for testing user code type loading and serialization.

/**
 * Program for testing user code type loading and serialization
 */
public class UserCodeType {
    
    /**
     * Main entry point for user code type testing
     * Tests custom type usage and serialization in user code
     * @param args Command line arguments (not used)
     * @throws Exception if program execution fails
     */
    public static void main(String[] args) throws Exception;
}

K-Means Algorithm Test Program

Complete K-Means implementation for testing complex algorithm class loading.

/**
 * K-Means clustering algorithm implementation for class loading testing
 */
public class KMeansForTest {
    
    /**
     * Main entry point for K-Means class loading test
     * @param args Command line arguments: [numPoints] [numClusters] [numIterations]
     * @throws Exception if program execution fails
     */
    public static void main(String[] args) throws Exception;
    
    /**
     * 2D point representation for K-Means clustering
     */
    public static class Point {
        public double x, y;
        
        public Point();
        public Point(double x, double y);
        
        public Point add(Point other);
        public Point div(double val);
        public double euclideanDistance(Point other);
        public void clear();
        
        public String toString();
    }
    
    /**
     * Cluster centroid extending Point
     */
    public static class Centroid extends Point {
        public int id;
        
        public Centroid();
        public Centroid(int id, double x, double y);
        public Centroid(int id, Point p);
        
        public String toString();
    }
    
    /**
     * Converts string representation to Point
     */
    public static class TuplePointConverter implements MapFunction<String, Point> {
        public Point map(String value) throws Exception;
    }
    
    /**
     * Converts string representation to Centroid
     */
    public static class TupleCentroidConverter implements MapFunction<String, Centroid> {
        public Centroid map(String value) throws Exception;
    }
    
    /**
     * Finds nearest cluster center for each point
     */
    public static class SelectNearestCenter 
            extends RichMapFunction<Point, Tuple2<Integer, Point>> {
        
        private Collection<Centroid> centroids;
        
        public void open(Configuration parameters) throws Exception;
        public Tuple2<Integer, Point> map(Point p) throws Exception;
    }
    
    /**
     * POJO tuple for testing serialization
     */
    public static class DummyTuple3IntPointLong {
        public int f0;
        public Point f1;
        public long f2;
        
        public DummyTuple3IntPointLong();
        public DummyTuple3IntPointLong(int f0, Point f1, long f2);
    }
    
    /**
     * Appends count to cluster accumulation
     */
    public static class CountAppender 
            implements GroupReduceFunction<Tuple2<Integer, Point>, Tuple2<Integer, Point>> {
        
        public void reduce(Iterable<Tuple2<Integer, Point>> values, 
                          Collector<Tuple2<Integer, Point>> out) throws Exception;
    }
    
    /**
     * Accumulates points for centroid calculation
     */
    public static class CentroidAccumulator 
            implements GroupReduceFunction<Tuple2<Integer, Point>, Tuple2<Integer, Point>> {
        
        public void reduce(Iterable<Tuple2<Integer, Point>> values,
                          Collector<Tuple2<Integer, Point>> out) throws Exception;
    }
    
    /**
     * Calculates average position for new centroid
     */
    public static class CentroidAverager 
            implements MapFunction<Tuple2<Integer, Point>, Centroid> {
        
        public Centroid map(Tuple2<Integer, Point> value) throws Exception;
    }
    
    /**
     * Custom accumulator for K-Means testing
     */
    public static class CustomAccumulator implements Accumulator<Point, Point> {
        
        private Point localValue;
        
        public void add(Point value);
        public Point getLocalValue();
        public void resetLocal();
        public void merge(Accumulator<Point, Point> other);
        public Point clone();
    }
}

Class Loading Test Patterns

Common patterns for using class loading test programs:

Basic Class Loading Test:

// Test basic class loading functionality
@Test
public void testBasicClassLoading() throws Exception {
    // Create isolated class loader environment
    URLClassLoader userClassLoader = createUserClassLoader();
    
    // Execute program in separate class loader context
    ProcessBuilder pb = new ProcessBuilder(
        "java", "-cp", getTestClassPath(),
        "org.apache.flink.test.classloading.jar.StreamingProgram"
    );
    
    Process process = pb.start();
    int exitCode = process.waitFor();
    
    assertEquals("Program should complete successfully", 0, exitCode);
}

Checkpointed Class Loading Test:

@Test
public void testCheckpointedClassLoading() throws Exception {
    // Test checkpointing with user-defined classes
    String[] args = {
        "--checkpointPath", "/tmp/test-checkpoint",
        "--iterations", "3"
    };
    
    // Run checkpointed program
    ProcessBuilder pb = new ProcessBuilder();
    pb.command().addAll(Arrays.asList(
        "java", "-cp", getTestClassPath(),
        "org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram"
    ));
    pb.command().addAll(Arrays.asList(args));
    
    Process process = pb.start();
    int exitCode = process.waitFor();
    
    assertEquals("Checkpointed program should complete", 0, exitCode);
    
    // Verify checkpoint was created
    assertTrue("Checkpoint should exist", 
               new File("/tmp/test-checkpoint").exists());
}

K-Means Class Loading Test:

@Test  
public void testKMeansClassLoading() throws Exception {
    // Test complex algorithm class loading
    String[] args = {"100", "5", "10"}; // 100 points, 5 clusters, 10 iterations
    
    ProcessBuilder pb = new ProcessBuilder(
        "java", "-cp", getTestClassPath(),
        "org.apache.flink.test.classloading.jar.KMeansForTest"
    );
    pb.command().addAll(Arrays.asList(args));
    
    Process process = pb.start();
    
    // Capture output for verification
    BufferedReader reader = new BufferedReader(
        new InputStreamReader(process.getInputStream()));
    List<String> output = reader.lines().collect(Collectors.toList());
    
    int exitCode = process.waitFor();
    
    assertEquals("K-Means should complete successfully", 0, exitCode);
    assertTrue("Should output cluster results", 
               output.stream().anyMatch(line -> line.contains("Cluster")));
}

Class Loading Policy Test:

@Test
public void testClassLoadingPolicy() throws Exception {
    // Test different class loading policies
    Map<String, String> env = new HashMap<>();
    env.put("FLINK_CLASSPATH_POLICY", "CHILD_FIRST");
    
    ProcessBuilder pb = new ProcessBuilder(
        "java", "-cp", getTestClassPath(),
        "org.apache.flink.test.classloading.jar.ClassLoadingPolicyProgram"
    );
    pb.environment().putAll(env);
    
    Process process = pb.start();
    int exitCode = process.waitFor();
    
    assertEquals("Policy test should complete", 0, exitCode);
}

User Code Type Test:

@Test
public void testUserCodeType() throws Exception {
    ProcessBuilder pb = new ProcessBuilder(
        "java", "-cp", getTestClassPath(),
        "org.apache.flink.test.classloading.jar.UserCodeType"
    );
    
    Process process = pb.start();
    int exitCode = process.waitFor();
    
    assertEquals("User code type test should work", 0, exitCode);
}

These class loading test programs ensure that Flink properly isolates user code, handles different class loading scenarios, and maintains compatibility across different deployment configurations and class loading policies.