0
# Class Loading Test Programs
1
2
Complete standalone programs for testing dynamic class loading, user code isolation, and class loading policies in Flink applications. Each program serves as a test case for different class loading scenarios, ensuring that user code is properly isolated and loaded according to Flink's class loading mechanisms.
3
4
## Capabilities
5
6
### Basic Streaming Program
7
8
Simple streaming program for basic class loading tests.
9
10
```java { .api }
11
/**
12
* Basic streaming program for class loading tests
13
*/
14
public class StreamingProgram {
15
16
/**
17
* Main entry point for basic streaming class loading test
18
* @param args Command line arguments (not used)
19
* @throws Exception if program execution fails
20
*/
21
public static void main(String[] args) throws Exception;
22
}
23
```
24
25
**Usage:**
26
```bash
27
# Run as standalone program for class loading testing
28
java -cp flink-tests.jar org.apache.flink.test.classloading.jar.StreamingProgram
29
```
30
31
### Checkpointed Streaming Program
32
33
Streaming program that tests checkpointing with user-defined state classes.
34
35
```java { .api }
36
/**
37
* Checkpointed streaming program for testing state class loading
38
*/
39
public class CheckpointedStreamingProgram {
40
41
/**
42
* Main entry point for checkpointed streaming class loading test
43
* Creates a streaming job with checkpointed state to test state serialization
44
* and class loading across restarts
45
* @param args Command line arguments (not used)
46
* @throws Exception if program execution fails
47
*/
48
public static void main(String[] args) throws Exception;
49
}
50
```
51
52
### Custom Input Split Programs
53
54
Programs for testing custom input split functionality and class loading.
55
56
```java { .api }
57
/**
58
* Program for testing custom input split functionality in batch processing
59
*/
60
public class CustomInputSplitProgram {
61
62
/**
63
* Main entry point for custom input split test
64
* Tests loading and execution of custom InputFormat and InputSplit classes
65
* @param args Command line arguments (not used)
66
* @throws Exception if program execution fails
67
*/
68
public static void main(String[] args) throws Exception;
69
}
70
71
/**
72
* Program for testing custom input split functionality in streaming
73
*/
74
public class StreamingCustomInputSplitProgram {
75
76
/**
77
* Main entry point for streaming custom input split test
78
* Tests loading of custom SourceFunction classes in streaming context
79
* @param args Command line arguments (not used)
80
* @throws Exception if program execution fails
81
*/
82
public static void main(String[] args) throws Exception;
83
}
84
```
85
86
### Custom Key-Value State Programs
87
88
Programs for testing custom key-value state functionality and serialization.
89
90
```java { .api }
91
/**
92
* Program for testing basic custom key-value state
93
*/
94
public class CustomKvStateProgram {
95
96
/**
97
* Main entry point for custom key-value state test
98
* Tests loading and serialization of custom state types
99
* @param args Command line arguments (not used)
100
* @throws Exception if program execution fails
101
*/
102
public static void main(String[] args) throws Exception;
103
}
104
105
/**
106
* Program for testing checkpointing with custom key-value state
107
*/
108
public class CheckpointingCustomKvStateProgram {
109
110
/**
111
* Main entry point for checkpointing custom key-value state test
112
* Tests state persistence and recovery with custom state classes
113
* @param args Command line arguments (not used)
114
* @throws Exception if program execution fails
115
*/
116
public static void main(String[] args) throws Exception;
117
}
118
```
119
120
### Class Loading Policy Program
121
122
Program for testing different class loading policies and configurations.
123
124
```java { .api }
125
/**
126
* Program for testing class loading policy configurations
127
*/
128
public class ClassLoadingPolicyProgram {
129
130
/**
131
* Main entry point for class loading policy test
132
* Tests different class loading strategies (parent-first vs child-first)
133
* @param args Command line arguments (not used)
134
* @throws Exception if program execution fails
135
*/
136
public static void main(String[] args) throws Exception;
137
}
138
```
139
140
### User Code Type Program
141
142
Program for testing user code type loading and serialization.
143
144
```java { .api }
145
/**
146
* Program for testing user code type loading and serialization
147
*/
148
public class UserCodeType {
149
150
/**
151
* Main entry point for user code type testing
152
* Tests custom type usage and serialization in user code
153
* @param args Command line arguments (not used)
154
* @throws Exception if program execution fails
155
*/
156
public static void main(String[] args) throws Exception;
157
}
158
```
159
160
### K-Means Algorithm Test Program
161
162
Complete K-Means implementation for testing complex algorithm class loading.
163
164
```java { .api }
165
/**
166
* K-Means clustering algorithm implementation for class loading testing
167
*/
168
public class KMeansForTest {
169
170
/**
171
* Main entry point for K-Means class loading test
172
* @param args Command line arguments: [numPoints] [numClusters] [numIterations]
173
* @throws Exception if program execution fails
174
*/
175
public static void main(String[] args) throws Exception;
176
177
/**
178
* 2D point representation for K-Means clustering
179
*/
180
public static class Point {
181
public double x, y;
182
183
public Point();
184
public Point(double x, double y);
185
186
public Point add(Point other);
187
public Point div(double val);
188
public double euclideanDistance(Point other);
189
public void clear();
190
191
public String toString();
192
}
193
194
/**
195
* Cluster centroid extending Point
196
*/
197
public static class Centroid extends Point {
198
public int id;
199
200
public Centroid();
201
public Centroid(int id, double x, double y);
202
public Centroid(int id, Point p);
203
204
public String toString();
205
}
206
207
/**
208
* Converts string representation to Point
209
*/
210
public static class TuplePointConverter implements MapFunction<String, Point> {
211
public Point map(String value) throws Exception;
212
}
213
214
/**
215
* Converts string representation to Centroid
216
*/
217
public static class TupleCentroidConverter implements MapFunction<String, Centroid> {
218
public Centroid map(String value) throws Exception;
219
}
220
221
/**
222
* Finds nearest cluster center for each point
223
*/
224
public static class SelectNearestCenter
225
extends RichMapFunction<Point, Tuple2<Integer, Point>> {
226
227
private Collection<Centroid> centroids;
228
229
public void open(Configuration parameters) throws Exception;
230
public Tuple2<Integer, Point> map(Point p) throws Exception;
231
}
232
233
/**
234
* POJO tuple for testing serialization
235
*/
236
public static class DummyTuple3IntPointLong {
237
public int f0;
238
public Point f1;
239
public long f2;
240
241
public DummyTuple3IntPointLong();
242
public DummyTuple3IntPointLong(int f0, Point f1, long f2);
243
}
244
245
/**
246
* Appends count to cluster accumulation
247
*/
248
public static class CountAppender
249
implements GroupReduceFunction<Tuple2<Integer, Point>, Tuple2<Integer, Point>> {
250
251
public void reduce(Iterable<Tuple2<Integer, Point>> values,
252
Collector<Tuple2<Integer, Point>> out) throws Exception;
253
}
254
255
/**
256
* Accumulates points for centroid calculation
257
*/
258
public static class CentroidAccumulator
259
implements GroupReduceFunction<Tuple2<Integer, Point>, Tuple2<Integer, Point>> {
260
261
public void reduce(Iterable<Tuple2<Integer, Point>> values,
262
Collector<Tuple2<Integer, Point>> out) throws Exception;
263
}
264
265
/**
266
* Calculates average position for new centroid
267
*/
268
public static class CentroidAverager
269
implements MapFunction<Tuple2<Integer, Point>, Centroid> {
270
271
public Centroid map(Tuple2<Integer, Point> value) throws Exception;
272
}
273
274
/**
275
* Custom accumulator for K-Means testing
276
*/
277
public static class CustomAccumulator implements Accumulator<Point, Point> {
278
279
private Point localValue;
280
281
public void add(Point value);
282
public Point getLocalValue();
283
public void resetLocal();
284
public void merge(Accumulator<Point, Point> other);
285
public Point clone();
286
}
287
}
288
```
289
290
### Class Loading Test Patterns
291
292
Common patterns for using class loading test programs:
293
294
**Basic Class Loading Test:**
295
296
```java
297
// Test basic class loading functionality
298
@Test
299
public void testBasicClassLoading() throws Exception {
300
// Create isolated class loader environment
301
URLClassLoader userClassLoader = createUserClassLoader();
302
303
// Execute program in separate class loader context
304
ProcessBuilder pb = new ProcessBuilder(
305
"java", "-cp", getTestClassPath(),
306
"org.apache.flink.test.classloading.jar.StreamingProgram"
307
);
308
309
Process process = pb.start();
310
int exitCode = process.waitFor();
311
312
assertEquals("Program should complete successfully", 0, exitCode);
313
}
314
```
315
316
**Checkpointed Class Loading Test:**
317
318
```java
319
@Test
320
public void testCheckpointedClassLoading() throws Exception {
321
// Test checkpointing with user-defined classes
322
String[] args = {
323
"--checkpointPath", "/tmp/test-checkpoint",
324
"--iterations", "3"
325
};
326
327
// Run checkpointed program
328
ProcessBuilder pb = new ProcessBuilder();
329
pb.command().addAll(Arrays.asList(
330
"java", "-cp", getTestClassPath(),
331
"org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram"
332
));
333
pb.command().addAll(Arrays.asList(args));
334
335
Process process = pb.start();
336
int exitCode = process.waitFor();
337
338
assertEquals("Checkpointed program should complete", 0, exitCode);
339
340
// Verify checkpoint was created
341
assertTrue("Checkpoint should exist",
342
new File("/tmp/test-checkpoint").exists());
343
}
344
```
345
346
**K-Means Class Loading Test:**
347
348
```java
349
@Test
350
public void testKMeansClassLoading() throws Exception {
351
// Test complex algorithm class loading
352
String[] args = {"100", "5", "10"}; // 100 points, 5 clusters, 10 iterations
353
354
ProcessBuilder pb = new ProcessBuilder(
355
"java", "-cp", getTestClassPath(),
356
"org.apache.flink.test.classloading.jar.KMeansForTest"
357
);
358
pb.command().addAll(Arrays.asList(args));
359
360
Process process = pb.start();
361
362
// Capture output for verification
363
BufferedReader reader = new BufferedReader(
364
new InputStreamReader(process.getInputStream()));
365
List<String> output = reader.lines().collect(Collectors.toList());
366
367
int exitCode = process.waitFor();
368
369
assertEquals("K-Means should complete successfully", 0, exitCode);
370
assertTrue("Should output cluster results",
371
output.stream().anyMatch(line -> line.contains("Cluster")));
372
}
373
```
374
375
**Class Loading Policy Test:**
376
377
```java
378
@Test
379
public void testClassLoadingPolicy() throws Exception {
380
// Test different class loading policies
381
Map<String, String> env = new HashMap<>();
382
env.put("FLINK_CLASSPATH_POLICY", "CHILD_FIRST");
383
384
ProcessBuilder pb = new ProcessBuilder(
385
"java", "-cp", getTestClassPath(),
386
"org.apache.flink.test.classloading.jar.ClassLoadingPolicyProgram"
387
);
388
pb.environment().putAll(env);
389
390
Process process = pb.start();
391
int exitCode = process.waitFor();
392
393
assertEquals("Policy test should complete", 0, exitCode);
394
}
395
```
396
397
**User Code Type Test:**
398
399
```java
400
@Test
401
public void testUserCodeType() throws Exception {
402
ProcessBuilder pb = new ProcessBuilder(
403
"java", "-cp", getTestClassPath(),
404
"org.apache.flink.test.classloading.jar.UserCodeType"
405
);
406
407
Process process = pb.start();
408
int exitCode = process.waitFor();
409
410
assertEquals("User code type test should work", 0, exitCode);
411
}
412
```
413
414
These class loading test programs ensure that Flink properly isolates user code, handles different class loading scenarios, and maintains compatibility across different deployment configurations and class loading policies.