Complete standalone programs for testing dynamic class loading, user code isolation, and class loading policies in Flink applications. Each program serves as a test case for different class loading scenarios, ensuring that user code is properly isolated and loaded according to Flink's class loading mechanisms.
Simple streaming program for basic class loading tests.
/**
* Basic streaming program for class loading tests
*/
public class StreamingProgram {
/**
* Main entry point for basic streaming class loading test
* @param args Command line arguments (not used)
* @throws Exception if program execution fails
*/
public static void main(String[] args) throws Exception;
}Usage:
# Run as standalone program for class loading testing
java -cp flink-tests.jar org.apache.flink.test.classloading.jar.StreamingProgramStreaming program that tests checkpointing with user-defined state classes.
/**
* Checkpointed streaming program for testing state class loading
*/
public class CheckpointedStreamingProgram {
/**
* Main entry point for checkpointed streaming class loading test
* Creates a streaming job with checkpointed state to test state serialization
* and class loading across restarts
* @param args Command line arguments (not used)
* @throws Exception if program execution fails
*/
public static void main(String[] args) throws Exception;
}Programs for testing custom input split functionality and class loading.
/**
* Program for testing custom input split functionality in batch processing
*/
public class CustomInputSplitProgram {
/**
* Main entry point for custom input split test
* Tests loading and execution of custom InputFormat and InputSplit classes
* @param args Command line arguments (not used)
* @throws Exception if program execution fails
*/
public static void main(String[] args) throws Exception;
}
/**
* Program for testing custom input split functionality in streaming
*/
public class StreamingCustomInputSplitProgram {
/**
* Main entry point for streaming custom input split test
* Tests loading of custom SourceFunction classes in streaming context
* @param args Command line arguments (not used)
* @throws Exception if program execution fails
*/
public static void main(String[] args) throws Exception;
}Programs for testing custom key-value state functionality and serialization.
/**
* Program for testing basic custom key-value state
*/
public class CustomKvStateProgram {
/**
* Main entry point for custom key-value state test
* Tests loading and serialization of custom state types
* @param args Command line arguments (not used)
* @throws Exception if program execution fails
*/
public static void main(String[] args) throws Exception;
}
/**
* Program for testing checkpointing with custom key-value state
*/
public class CheckpointingCustomKvStateProgram {
/**
* Main entry point for checkpointing custom key-value state test
* Tests state persistence and recovery with custom state classes
* @param args Command line arguments (not used)
* @throws Exception if program execution fails
*/
public static void main(String[] args) throws Exception;
}Program for testing different class loading policies and configurations.
/**
* Program for testing class loading policy configurations
*/
public class ClassLoadingPolicyProgram {
/**
* Main entry point for class loading policy test
* Tests different class loading strategies (parent-first vs child-first)
* @param args Command line arguments (not used)
* @throws Exception if program execution fails
*/
public static void main(String[] args) throws Exception;
}Program for testing user code type loading and serialization.
/**
* Program for testing user code type loading and serialization
*/
public class UserCodeType {
/**
* Main entry point for user code type testing
* Tests custom type usage and serialization in user code
* @param args Command line arguments (not used)
* @throws Exception if program execution fails
*/
public static void main(String[] args) throws Exception;
}Complete K-Means implementation for testing complex algorithm class loading.
/**
* K-Means clustering algorithm implementation for class loading testing
*/
public class KMeansForTest {
/**
* Main entry point for K-Means class loading test
* @param args Command line arguments: [numPoints] [numClusters] [numIterations]
* @throws Exception if program execution fails
*/
public static void main(String[] args) throws Exception;
/**
* 2D point representation for K-Means clustering
*/
public static class Point {
public double x, y;
public Point();
public Point(double x, double y);
public Point add(Point other);
public Point div(double val);
public double euclideanDistance(Point other);
public void clear();
public String toString();
}
/**
* Cluster centroid extending Point
*/
public static class Centroid extends Point {
public int id;
public Centroid();
public Centroid(int id, double x, double y);
public Centroid(int id, Point p);
public String toString();
}
/**
* Converts string representation to Point
*/
public static class TuplePointConverter implements MapFunction<String, Point> {
public Point map(String value) throws Exception;
}
/**
* Converts string representation to Centroid
*/
public static class TupleCentroidConverter implements MapFunction<String, Centroid> {
public Centroid map(String value) throws Exception;
}
/**
* Finds nearest cluster center for each point
*/
public static class SelectNearestCenter
extends RichMapFunction<Point, Tuple2<Integer, Point>> {
private Collection<Centroid> centroids;
public void open(Configuration parameters) throws Exception;
public Tuple2<Integer, Point> map(Point p) throws Exception;
}
/**
* POJO tuple for testing serialization
*/
public static class DummyTuple3IntPointLong {
public int f0;
public Point f1;
public long f2;
public DummyTuple3IntPointLong();
public DummyTuple3IntPointLong(int f0, Point f1, long f2);
}
/**
* Appends count to cluster accumulation
*/
public static class CountAppender
implements GroupReduceFunction<Tuple2<Integer, Point>, Tuple2<Integer, Point>> {
public void reduce(Iterable<Tuple2<Integer, Point>> values,
Collector<Tuple2<Integer, Point>> out) throws Exception;
}
/**
* Accumulates points for centroid calculation
*/
public static class CentroidAccumulator
implements GroupReduceFunction<Tuple2<Integer, Point>, Tuple2<Integer, Point>> {
public void reduce(Iterable<Tuple2<Integer, Point>> values,
Collector<Tuple2<Integer, Point>> out) throws Exception;
}
/**
* Calculates average position for new centroid
*/
public static class CentroidAverager
implements MapFunction<Tuple2<Integer, Point>, Centroid> {
public Centroid map(Tuple2<Integer, Point> value) throws Exception;
}
/**
* Custom accumulator for K-Means testing
*/
public static class CustomAccumulator implements Accumulator<Point, Point> {
private Point localValue;
public void add(Point value);
public Point getLocalValue();
public void resetLocal();
public void merge(Accumulator<Point, Point> other);
public Point clone();
}
}Common patterns for using class loading test programs:
Basic Class Loading Test:
// Test basic class loading functionality
@Test
public void testBasicClassLoading() throws Exception {
// Create isolated class loader environment
URLClassLoader userClassLoader = createUserClassLoader();
// Execute program in separate class loader context
ProcessBuilder pb = new ProcessBuilder(
"java", "-cp", getTestClassPath(),
"org.apache.flink.test.classloading.jar.StreamingProgram"
);
Process process = pb.start();
int exitCode = process.waitFor();
assertEquals("Program should complete successfully", 0, exitCode);
}Checkpointed Class Loading Test:
@Test
public void testCheckpointedClassLoading() throws Exception {
// Test checkpointing with user-defined classes
String[] args = {
"--checkpointPath", "/tmp/test-checkpoint",
"--iterations", "3"
};
// Run checkpointed program
ProcessBuilder pb = new ProcessBuilder();
pb.command().addAll(Arrays.asList(
"java", "-cp", getTestClassPath(),
"org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram"
));
pb.command().addAll(Arrays.asList(args));
Process process = pb.start();
int exitCode = process.waitFor();
assertEquals("Checkpointed program should complete", 0, exitCode);
// Verify checkpoint was created
assertTrue("Checkpoint should exist",
new File("/tmp/test-checkpoint").exists());
}K-Means Class Loading Test:
@Test
public void testKMeansClassLoading() throws Exception {
// Test complex algorithm class loading
String[] args = {"100", "5", "10"}; // 100 points, 5 clusters, 10 iterations
ProcessBuilder pb = new ProcessBuilder(
"java", "-cp", getTestClassPath(),
"org.apache.flink.test.classloading.jar.KMeansForTest"
);
pb.command().addAll(Arrays.asList(args));
Process process = pb.start();
// Capture output for verification
BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream()));
List<String> output = reader.lines().collect(Collectors.toList());
int exitCode = process.waitFor();
assertEquals("K-Means should complete successfully", 0, exitCode);
assertTrue("Should output cluster results",
output.stream().anyMatch(line -> line.contains("Cluster")));
}Class Loading Policy Test:
@Test
public void testClassLoadingPolicy() throws Exception {
// Test different class loading policies
Map<String, String> env = new HashMap<>();
env.put("FLINK_CLASSPATH_POLICY", "CHILD_FIRST");
ProcessBuilder pb = new ProcessBuilder(
"java", "-cp", getTestClassPath(),
"org.apache.flink.test.classloading.jar.ClassLoadingPolicyProgram"
);
pb.environment().putAll(env);
Process process = pb.start();
int exitCode = process.waitFor();
assertEquals("Policy test should complete", 0, exitCode);
}User Code Type Test:
@Test
public void testUserCodeType() throws Exception {
ProcessBuilder pb = new ProcessBuilder(
"java", "-cp", getTestClassPath(),
"org.apache.flink.test.classloading.jar.UserCodeType"
);
Process process = pb.start();
int exitCode = process.waitFor();
assertEquals("User code type test should work", 0, exitCode);
}These class loading test programs ensure that Flink properly isolates user code, handles different class loading scenarios, and maintains compatibility across different deployment configurations and class loading policies.