0
# Client Application Testing
1
2
Testing utilities specifically designed for Flink client applications, including classloader testing and job submission scenarios. These utilities help test the interaction between client applications and Flink clusters.
3
4
## Capabilities
5
6
### Classloader Testing
7
8
#### TestUserClassLoaderJob
9
10
Test job implementation designed for testing user classloader scenarios and class loading behavior.
11
12
```java { .api }
13
/**
14
* Test job for user classloader scenarios
15
* Used to verify proper class loading in client applications
16
*/
17
class TestUserClassLoaderJob {
18
/** Main entry point for classloader testing */
19
static void main(String[] args) throws Exception;
20
21
/** Execute job with custom classloader */
22
static void executeWithCustomClassLoader(ClassLoader classLoader) throws Exception;
23
24
/** Verify class loading behavior */
25
static boolean verifyClassLoading(String className, ClassLoader expectedLoader);
26
27
/** Get job execution result */
28
static JobExecutionResult getLastExecutionResult();
29
}
30
```
31
32
**Usage Examples:**
33
34
```java
35
import org.apache.flink.client.testjar.TestUserClassLoaderJob;
36
37
@Test
38
public void testClientClassLoading() throws Exception {
39
// Create custom classloader
40
URLClassLoader customClassLoader = new URLClassLoader(
41
new URL[]{new File("test-lib.jar").toURI().toURL()},
42
Thread.currentThread().getContextClassLoader()
43
);
44
45
// Execute job with custom classloader
46
TestUserClassLoaderJob.executeWithCustomClassLoader(customClassLoader);
47
48
// Verify class loading behavior
49
assertTrue(TestUserClassLoaderJob.verifyClassLoading(
50
"com.example.MyCustomClass", customClassLoader));
51
52
// Check execution result
53
JobExecutionResult result = TestUserClassLoaderJob.getLastExecutionResult();
54
assertNotNull(result);
55
}
56
```
57
58
#### TestUserClassLoaderAdditionalArtifact
59
60
Utility for testing additional artifacts and dependencies in client classloader scenarios.
61
62
```java { .api }
63
/**
64
* Additional artifacts for classloader testing
65
* Provides utilities for testing dependency resolution
66
*/
67
class TestUserClassLoaderAdditionalArtifact {
68
/** Load additional artifact into classloader */
69
static void loadArtifact(String artifactPath, ClassLoader classLoader) throws Exception;
70
71
/** Verify artifact availability */
72
static boolean isArtifactAvailable(String artifactName, ClassLoader classLoader);
73
74
/** Get artifact metadata */
75
static ArtifactMetadata getArtifactMetadata(String artifactPath);
76
77
/** Resolve artifact dependencies */
78
static List<String> resolveDependencies(String artifactPath);
79
}
80
81
/**
82
* Metadata information about test artifacts
83
*/
84
class ArtifactMetadata {
85
/** Get artifact name */
86
String getName();
87
88
/** Get artifact version */
89
String getVersion();
90
91
/** Get artifact dependencies */
92
List<String> getDependencies();
93
94
/** Get artifact manifest attributes */
95
Map<String, String> getManifestAttributes();
96
}
97
```
98
99
**Usage Examples:**
100
101
```java
102
import org.apache.flink.client.testjar.TestUserClassLoaderAdditionalArtifact;
103
104
@Test
105
public void testAdditionalArtifacts() throws Exception {
106
URLClassLoader testClassLoader = new URLClassLoader(new URL[]{});
107
108
// Load additional test artifact
109
String artifactPath = "test-connector.jar";
110
TestUserClassLoaderAdditionalArtifact.loadArtifact(artifactPath, testClassLoader);
111
112
// Verify artifact is available
113
assertTrue(TestUserClassLoaderAdditionalArtifact.isArtifactAvailable(
114
"test-connector", testClassLoader));
115
116
// Get artifact information
117
ArtifactMetadata metadata = TestUserClassLoaderAdditionalArtifact.getArtifactMetadata(artifactPath);
118
assertEquals("test-connector", metadata.getName());
119
assertEquals("1.0.0", metadata.getVersion());
120
121
// Check dependencies
122
List<String> dependencies = TestUserClassLoaderAdditionalArtifact.resolveDependencies(artifactPath);
123
assertFalse(dependencies.isEmpty());
124
}
125
```
126
127
## Client Testing Patterns
128
129
### Job Submission Testing
130
131
```java
132
import org.apache.flink.client.testjar.TestUserClassLoaderJob;
133
import org.apache.flink.client.program.ClusterClient;
134
import org.apache.flink.runtime.jobgraph.JobGraph;
135
136
@Test
137
public void testJobSubmissionWithCustomClasspath() throws Exception {
138
// Set up custom classpath
139
List<URL> classpath = Arrays.asList(
140
new File("connector.jar").toURI().toURL(),
141
new File("custom-functions.jar").toURI().toURL()
142
);
143
144
URLClassLoader jobClassLoader = new URLClassLoader(
145
classpath.toArray(new URL[0]),
146
Thread.currentThread().getContextClassLoader()
147
);
148
149
// Execute job with custom classloader
150
Thread.currentThread().setContextClassLoader(jobClassLoader);
151
try {
152
TestUserClassLoaderJob.main(new String[]{"--input", "test-input"});
153
154
JobExecutionResult result = TestUserClassLoaderJob.getLastExecutionResult();
155
assertNotNull(result);
156
assertTrue(result.isJobExecutionResult());
157
158
} finally {
159
Thread.currentThread().setContextClassLoader(
160
ClassLoader.getSystemClassLoader());
161
}
162
}
163
```
164
165
### Dependency Isolation Testing
166
167
```java
168
@Test
169
public void testDependencyIsolation() throws Exception {
170
// Create isolated classloaders for different components
171
URLClassLoader connectorClassLoader = new URLClassLoader(
172
new URL[]{new File("connector-v1.jar").toURI().toURL()});
173
174
URLClassLoader functionClassLoader = new URLClassLoader(
175
new URL[]{new File("functions-v2.jar").toURI().toURL()});
176
177
// Load artifacts into respective classloaders
178
TestUserClassLoaderAdditionalArtifact.loadArtifact(
179
"connector-v1.jar", connectorClassLoader);
180
TestUserClassLoaderAdditionalArtifact.loadArtifact(
181
"functions-v2.jar", functionClassLoader);
182
183
// Verify isolation
184
assertTrue(TestUserClassLoaderAdditionalArtifact.isArtifactAvailable(
185
"connector-v1", connectorClassLoader));
186
assertFalse(TestUserClassLoaderAdditionalArtifact.isArtifactAvailable(
187
"connector-v1", functionClassLoader));
188
189
assertTrue(TestUserClassLoaderAdditionalArtifact.isArtifactAvailable(
190
"functions-v2", functionClassLoader));
191
assertFalse(TestUserClassLoaderAdditionalArtifact.isArtifactAvailable(
192
"functions-v2", connectorClassLoader));
193
}
194
```
195
196
### Client Configuration Testing
197
198
```java
199
@Test
200
public void testClientConfiguration() throws Exception {
201
// Test different client configurations
202
Configuration clientConfig = new Configuration();
203
clientConfig.setString("jobmanager.rpc.address", "localhost");
204
clientConfig.setInteger("jobmanager.rpc.port", 6123);
205
clientConfig.setString("rest.address", "localhost");
206
clientConfig.setInteger("rest.port", 8081);
207
208
// Create test job with configuration
209
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
210
"localhost", 6123, clientConfig);
211
212
// Add test pipeline
213
DataStream<String> source = env.fromElements("test1", "test2", "test3");
214
source.print();
215
216
// Execute through client
217
JobExecutionResult result = env.execute("Client Configuration Test");
218
assertNotNull(result);
219
}
220
```
221
222
### Resource Management Testing
223
224
```java
225
@Test
226
public void testResourceManagement() throws Exception {
227
// Test resource allocation for client jobs
228
Configuration config = new Configuration();
229
config.setString("taskmanager.memory.process.size", "1g");
230
config.setInteger("taskmanager.numberOfTaskSlots", 4);
231
config.setInteger("parallelism.default", 2);
232
233
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
234
env.getConfig().setGlobalJobParameters(ParameterTool.fromMap(config.toMap()));
235
236
// Create resource-intensive job
237
DataStream<Integer> numbers = env.fromSequence(1, 1000000);
238
numbers.map(x -> x * x).print();
239
240
// Monitor resource usage during execution
241
JobExecutionResult result = env.execute("Resource Management Test");
242
243
// Verify resource constraints were respected
244
assertTrue(result.getNetRuntime() > 0);
245
}
246
```
247
248
### Client-Server Communication Testing
249
250
```java
251
@Test
252
public void testClientServerCommunication() throws Exception {
253
// Test communication between client and Flink cluster
254
MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(
255
new MiniClusterResourceConfiguration.Builder()
256
.setNumberTaskManagers(1)
257
.setNumberSlotsPerTaskManager(4)
258
.build());
259
260
cluster.before();
261
try {
262
ClusterClient<?> client = cluster.getClusterClient();
263
264
// Submit test job through client
265
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
266
DataStream<String> source = env.fromElements("hello", "world");
267
source.print();
268
269
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
270
271
// Test job submission
272
JobID jobId = client.submitJob(jobGraph).get();
273
assertNotNull(jobId);
274
275
// Test job monitoring
276
JobStatus status = client.getJobStatus(jobId).get();
277
assertTrue(status == JobStatus.RUNNING || status == JobStatus.FINISHED);
278
279
// Test job cancellation (if still running)
280
if (status == JobStatus.RUNNING) {
281
client.cancel(jobId).get();
282
JobStatus cancelledStatus = client.getJobStatus(jobId).get();
283
assertEquals(JobStatus.CANCELED, cancelledStatus);
284
}
285
286
} finally {
287
cluster.after();
288
}
289
}
290
```
291
292
### Dynamic Class Loading Testing
293
294
```java
295
@Test
296
public void testDynamicClassLoading() throws Exception {
297
// Test dynamic loading of classes during job execution
298
String jarPath = createDynamicJar(); // Helper method to create JAR
299
300
// Create classloader with dynamically created JAR
301
URLClassLoader dynamicClassLoader = new URLClassLoader(
302
new URL[]{new File(jarPath).toURI().toURL()});
303
304
// Load class dynamically
305
Class<?> dynamicClass = dynamicClassLoader.loadClass("com.example.DynamicFunction");
306
Object instance = dynamicClass.getDeclaredConstructor().newInstance();
307
308
// Verify dynamic class functionality
309
Method processMethod = dynamicClass.getMethod("process", String.class);
310
String result = (String) processMethod.invoke(instance, "test");
311
assertEquals("processed: test", result);
312
313
// Use dynamic class in Flink job
314
TestUserClassLoaderJob.executeWithCustomClassLoader(dynamicClassLoader);
315
assertTrue(TestUserClassLoaderJob.verifyClassLoading(
316
"com.example.DynamicFunction", dynamicClassLoader));
317
}
318
319
private String createDynamicJar() throws Exception {
320
// Helper method to create a JAR file with test classes
321
String jarPath = "/tmp/dynamic-test.jar";
322
323
try (JarOutputStream jos = new JarOutputStream(new FileOutputStream(jarPath))) {
324
// Add class files to JAR
325
JarEntry entry = new JarEntry("com/example/DynamicFunction.class");
326
jos.putNextEntry(entry);
327
328
// Write compiled class bytes (simplified example)
329
byte[] classBytes = compileTestClass(); // Implementation omitted for brevity
330
jos.write(classBytes);
331
jos.closeEntry();
332
}
333
334
return jarPath;
335
}
336
```