Comprehensive testing utilities for Apache Flink stream processing framework
—
Testing utilities specifically designed for Flink client applications, including classloader testing and job submission scenarios. These utilities help test the interaction between client applications and Flink clusters.
Test job implementation designed for testing user classloader scenarios and class loading behavior.
/**
* Test job for user classloader scenarios
* Used to verify proper class loading in client applications
*/
class TestUserClassLoaderJob {
/** Main entry point for classloader testing */
static void main(String[] args) throws Exception;
/** Execute job with custom classloader */
static void executeWithCustomClassLoader(ClassLoader classLoader) throws Exception;
/** Verify class loading behavior */
static boolean verifyClassLoading(String className, ClassLoader expectedLoader);
/** Get job execution result */
static JobExecutionResult getLastExecutionResult();
}Usage Examples:
import org.apache.flink.client.testjar.TestUserClassLoaderJob;
@Test
public void testClientClassLoading() throws Exception {
// Create custom classloader
URLClassLoader customClassLoader = new URLClassLoader(
new URL[]{new File("test-lib.jar").toURI().toURL()},
Thread.currentThread().getContextClassLoader()
);
// Execute job with custom classloader
TestUserClassLoaderJob.executeWithCustomClassLoader(customClassLoader);
// Verify class loading behavior
assertTrue(TestUserClassLoaderJob.verifyClassLoading(
"com.example.MyCustomClass", customClassLoader));
// Check execution result
JobExecutionResult result = TestUserClassLoaderJob.getLastExecutionResult();
assertNotNull(result);
}Utility for testing additional artifacts and dependencies in client classloader scenarios.
/**
* Additional artifacts for classloader testing
* Provides utilities for testing dependency resolution
*/
class TestUserClassLoaderAdditionalArtifact {
/** Load additional artifact into classloader */
static void loadArtifact(String artifactPath, ClassLoader classLoader) throws Exception;
/** Verify artifact availability */
static boolean isArtifactAvailable(String artifactName, ClassLoader classLoader);
/** Get artifact metadata */
static ArtifactMetadata getArtifactMetadata(String artifactPath);
/** Resolve artifact dependencies */
static List<String> resolveDependencies(String artifactPath);
}
/**
* Metadata information about test artifacts
*/
class ArtifactMetadata {
/** Get artifact name */
String getName();
/** Get artifact version */
String getVersion();
/** Get artifact dependencies */
List<String> getDependencies();
/** Get artifact manifest attributes */
Map<String, String> getManifestAttributes();
}Usage Examples:
import org.apache.flink.client.testjar.TestUserClassLoaderAdditionalArtifact;
@Test
public void testAdditionalArtifacts() throws Exception {
URLClassLoader testClassLoader = new URLClassLoader(new URL[]{});
// Load additional test artifact
String artifactPath = "test-connector.jar";
TestUserClassLoaderAdditionalArtifact.loadArtifact(artifactPath, testClassLoader);
// Verify artifact is available
assertTrue(TestUserClassLoaderAdditionalArtifact.isArtifactAvailable(
"test-connector", testClassLoader));
// Get artifact information
ArtifactMetadata metadata = TestUserClassLoaderAdditionalArtifact.getArtifactMetadata(artifactPath);
assertEquals("test-connector", metadata.getName());
assertEquals("1.0.0", metadata.getVersion());
// Check dependencies
List<String> dependencies = TestUserClassLoaderAdditionalArtifact.resolveDependencies(artifactPath);
assertFalse(dependencies.isEmpty());
}import org.apache.flink.client.testjar.TestUserClassLoaderJob;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
@Test
public void testJobSubmissionWithCustomClasspath() throws Exception {
// Set up custom classpath
List<URL> classpath = Arrays.asList(
new File("connector.jar").toURI().toURL(),
new File("custom-functions.jar").toURI().toURL()
);
URLClassLoader jobClassLoader = new URLClassLoader(
classpath.toArray(new URL[0]),
Thread.currentThread().getContextClassLoader()
);
// Execute job with custom classloader
Thread.currentThread().setContextClassLoader(jobClassLoader);
try {
TestUserClassLoaderJob.main(new String[]{"--input", "test-input"});
JobExecutionResult result = TestUserClassLoaderJob.getLastExecutionResult();
assertNotNull(result);
assertTrue(result.isJobExecutionResult());
} finally {
Thread.currentThread().setContextClassLoader(
ClassLoader.getSystemClassLoader());
}
}@Test
public void testDependencyIsolation() throws Exception {
// Create isolated classloaders for different components
URLClassLoader connectorClassLoader = new URLClassLoader(
new URL[]{new File("connector-v1.jar").toURI().toURL()});
URLClassLoader functionClassLoader = new URLClassLoader(
new URL[]{new File("functions-v2.jar").toURI().toURL()});
// Load artifacts into respective classloaders
TestUserClassLoaderAdditionalArtifact.loadArtifact(
"connector-v1.jar", connectorClassLoader);
TestUserClassLoaderAdditionalArtifact.loadArtifact(
"functions-v2.jar", functionClassLoader);
// Verify isolation
assertTrue(TestUserClassLoaderAdditionalArtifact.isArtifactAvailable(
"connector-v1", connectorClassLoader));
assertFalse(TestUserClassLoaderAdditionalArtifact.isArtifactAvailable(
"connector-v1", functionClassLoader));
assertTrue(TestUserClassLoaderAdditionalArtifact.isArtifactAvailable(
"functions-v2", functionClassLoader));
assertFalse(TestUserClassLoaderAdditionalArtifact.isArtifactAvailable(
"functions-v2", connectorClassLoader));
}@Test
public void testClientConfiguration() throws Exception {
// Test different client configurations
Configuration clientConfig = new Configuration();
clientConfig.setString("jobmanager.rpc.address", "localhost");
clientConfig.setInteger("jobmanager.rpc.port", 6123);
clientConfig.setString("rest.address", "localhost");
clientConfig.setInteger("rest.port", 8081);
// Create test job with configuration
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
"localhost", 6123, clientConfig);
// Add test pipeline
DataStream<String> source = env.fromElements("test1", "test2", "test3");
source.print();
// Execute through client
JobExecutionResult result = env.execute("Client Configuration Test");
assertNotNull(result);
}@Test
public void testResourceManagement() throws Exception {
// Test resource allocation for client jobs
Configuration config = new Configuration();
config.setString("taskmanager.memory.process.size", "1g");
config.setInteger("taskmanager.numberOfTaskSlots", 4);
config.setInteger("parallelism.default", 2);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(ParameterTool.fromMap(config.toMap()));
// Create resource-intensive job
DataStream<Integer> numbers = env.fromSequence(1, 1000000);
numbers.map(x -> x * x).print();
// Monitor resource usage during execution
JobExecutionResult result = env.execute("Resource Management Test");
// Verify resource constraints were respected
assertTrue(result.getNetRuntime() > 0);
}@Test
public void testClientServerCommunication() throws Exception {
// Test communication between client and Flink cluster
MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(4)
.build());
cluster.before();
try {
ClusterClient<?> client = cluster.getClusterClient();
// Submit test job through client
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> source = env.fromElements("hello", "world");
source.print();
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
// Test job submission
JobID jobId = client.submitJob(jobGraph).get();
assertNotNull(jobId);
// Test job monitoring
JobStatus status = client.getJobStatus(jobId).get();
assertTrue(status == JobStatus.RUNNING || status == JobStatus.FINISHED);
// Test job cancellation (if still running)
if (status == JobStatus.RUNNING) {
client.cancel(jobId).get();
JobStatus cancelledStatus = client.getJobStatus(jobId).get();
assertEquals(JobStatus.CANCELED, cancelledStatus);
}
} finally {
cluster.after();
}
}@Test
public void testDynamicClassLoading() throws Exception {
// Test dynamic loading of classes during job execution
String jarPath = createDynamicJar(); // Helper method to create JAR
// Create classloader with dynamically created JAR
URLClassLoader dynamicClassLoader = new URLClassLoader(
new URL[]{new File(jarPath).toURI().toURL()});
// Load class dynamically
Class<?> dynamicClass = dynamicClassLoader.loadClass("com.example.DynamicFunction");
Object instance = dynamicClass.getDeclaredConstructor().newInstance();
// Verify dynamic class functionality
Method processMethod = dynamicClass.getMethod("process", String.class);
String result = (String) processMethod.invoke(instance, "test");
assertEquals("processed: test", result);
// Use dynamic class in Flink job
TestUserClassLoaderJob.executeWithCustomClassLoader(dynamicClassLoader);
assertTrue(TestUserClassLoaderJob.verifyClassLoading(
"com.example.DynamicFunction", dynamicClassLoader));
}
private String createDynamicJar() throws Exception {
// Helper method to create a JAR file with test classes
String jarPath = "/tmp/dynamic-test.jar";
try (JarOutputStream jos = new JarOutputStream(new FileOutputStream(jarPath))) {
// Add class files to JAR
JarEntry entry = new JarEntry("com/example/DynamicFunction.class");
jos.putNextEntry(entry);
// Write compiled class bytes (simplified example)
byte[] classBytes = compileTestClass(); // Implementation omitted for brevity
jos.write(classBytes);
jos.closeEntry();
}
return jarPath;
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-test-utils-parent