or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

class-loading-programs.mdindex.mdmigration-testing.mdperformance-testing.mdstate-management-testing.mdstreaming-utilities.mdtest-base-classes.mdtest-data-generation.md

class-loading-programs.mddocs/

0

# Class Loading Test Programs

1

2

Complete standalone programs for testing dynamic class loading, user code isolation, and class loading policies in Flink applications. Each program serves as a test case for different class loading scenarios, ensuring that user code is properly isolated and loaded according to Flink's class loading mechanisms.

3

4

## Capabilities

5

6

### Basic Streaming Program

7

8

Simple streaming program for basic class loading tests.

9

10

```java { .api }

11

/**

12

* Basic streaming program for class loading tests

13

*/

14

public class StreamingProgram {

15

16

/**

17

* Main entry point for basic streaming class loading test

18

* @param args Command line arguments (not used)

19

* @throws Exception if program execution fails

20

*/

21

public static void main(String[] args) throws Exception;

22

}

23

```

24

25

**Usage:**

26

```bash

27

# Run as standalone program for class loading testing

28

java -cp flink-tests.jar org.apache.flink.test.classloading.jar.StreamingProgram

29

```

30

31

### Checkpointed Streaming Program

32

33

Streaming program that tests checkpointing with user-defined state classes.

34

35

```java { .api }

36

/**

37

* Checkpointed streaming program for testing state class loading

38

*/

39

public class CheckpointedStreamingProgram {

40

41

/**

42

* Main entry point for checkpointed streaming class loading test

43

* Creates a streaming job with checkpointed state to test state serialization

44

* and class loading across restarts

45

* @param args Command line arguments (not used)

46

* @throws Exception if program execution fails

47

*/

48

public static void main(String[] args) throws Exception;

49

}

50

```

51

52

### Custom Input Split Programs

53

54

Programs for testing custom input split functionality and class loading.

55

56

```java { .api }

57

/**

58

* Program for testing custom input split functionality in batch processing

59

*/

60

public class CustomInputSplitProgram {

61

62

/**

63

* Main entry point for custom input split test

64

* Tests loading and execution of custom InputFormat and InputSplit classes

65

* @param args Command line arguments (not used)

66

* @throws Exception if program execution fails

67

*/

68

public static void main(String[] args) throws Exception;

69

}

70

71

/**

72

* Program for testing custom input split functionality in streaming

73

*/

74

public class StreamingCustomInputSplitProgram {

75

76

/**

77

* Main entry point for streaming custom input split test

78

* Tests loading of custom SourceFunction classes in streaming context

79

* @param args Command line arguments (not used)

80

* @throws Exception if program execution fails

81

*/

82

public static void main(String[] args) throws Exception;

83

}

84

```

85

86

### Custom Key-Value State Programs

87

88

Programs for testing custom key-value state functionality and serialization.

89

90

```java { .api }

91

/**

92

* Program for testing basic custom key-value state

93

*/

94

public class CustomKvStateProgram {

95

96

/**

97

* Main entry point for custom key-value state test

98

* Tests loading and serialization of custom state types

99

* @param args Command line arguments (not used)

100

* @throws Exception if program execution fails

101

*/

102

public static void main(String[] args) throws Exception;

103

}

104

105

/**

106

* Program for testing checkpointing with custom key-value state

107

*/

108

public class CheckpointingCustomKvStateProgram {

109

110

/**

111

* Main entry point for checkpointing custom key-value state test

112

* Tests state persistence and recovery with custom state classes

113

* @param args Command line arguments (not used)

114

* @throws Exception if program execution fails

115

*/

116

public static void main(String[] args) throws Exception;

117

}

118

```

119

120

### Class Loading Policy Program

121

122

Program for testing different class loading policies and configurations.

123

124

```java { .api }

125

/**

126

* Program for testing class loading policy configurations

127

*/

128

public class ClassLoadingPolicyProgram {

129

130

/**

131

* Main entry point for class loading policy test

132

* Tests different class loading strategies (parent-first vs child-first)

133

* @param args Command line arguments (not used)

134

* @throws Exception if program execution fails

135

*/

136

public static void main(String[] args) throws Exception;

137

}

138

```

139

140

### User Code Type Program

141

142

Program for testing user code type loading and serialization.

143

144

```java { .api }

145

/**

146

* Program for testing user code type loading and serialization

147

*/

148

public class UserCodeType {

149

150

/**

151

* Main entry point for user code type testing

152

* Tests custom type usage and serialization in user code

153

* @param args Command line arguments (not used)

154

* @throws Exception if program execution fails

155

*/

156

public static void main(String[] args) throws Exception;

157

}

158

```

159

160

### K-Means Algorithm Test Program

161

162

Complete K-Means implementation for testing complex algorithm class loading.

163

164

```java { .api }

165

/**

166

* K-Means clustering algorithm implementation for class loading testing

167

*/

168

public class KMeansForTest {

169

170

/**

171

* Main entry point for K-Means class loading test

172

* @param args Command line arguments: [numPoints] [numClusters] [numIterations]

173

* @throws Exception if program execution fails

174

*/

175

public static void main(String[] args) throws Exception;

176

177

/**

178

* 2D point representation for K-Means clustering

179

*/

180

public static class Point {

181

public double x, y;

182

183

public Point();

184

public Point(double x, double y);

185

186

public Point add(Point other);

187

public Point div(double val);

188

public double euclideanDistance(Point other);

189

public void clear();

190

191

public String toString();

192

}

193

194

/**

195

* Cluster centroid extending Point

196

*/

197

public static class Centroid extends Point {

198

public int id;

199

200

public Centroid();

201

public Centroid(int id, double x, double y);

202

public Centroid(int id, Point p);

203

204

public String toString();

205

}

206

207

/**

208

* Converts string representation to Point

209

*/

210

public static class TuplePointConverter implements MapFunction<String, Point> {

211

public Point map(String value) throws Exception;

212

}

213

214

/**

215

* Converts string representation to Centroid

216

*/

217

public static class TupleCentroidConverter implements MapFunction<String, Centroid> {

218

public Centroid map(String value) throws Exception;

219

}

220

221

/**

222

* Finds nearest cluster center for each point

223

*/

224

public static class SelectNearestCenter

225

extends RichMapFunction<Point, Tuple2<Integer, Point>> {

226

227

private Collection<Centroid> centroids;

228

229

public void open(Configuration parameters) throws Exception;

230

public Tuple2<Integer, Point> map(Point p) throws Exception;

231

}

232

233

/**

234

* POJO tuple for testing serialization

235

*/

236

public static class DummyTuple3IntPointLong {

237

public int f0;

238

public Point f1;

239

public long f2;

240

241

public DummyTuple3IntPointLong();

242

public DummyTuple3IntPointLong(int f0, Point f1, long f2);

243

}

244

245

/**

246

* Appends count to cluster accumulation

247

*/

248

public static class CountAppender

249

implements GroupReduceFunction<Tuple2<Integer, Point>, Tuple2<Integer, Point>> {

250

251

public void reduce(Iterable<Tuple2<Integer, Point>> values,

252

Collector<Tuple2<Integer, Point>> out) throws Exception;

253

}

254

255

/**

256

* Accumulates points for centroid calculation

257

*/

258

public static class CentroidAccumulator

259

implements GroupReduceFunction<Tuple2<Integer, Point>, Tuple2<Integer, Point>> {

260

261

public void reduce(Iterable<Tuple2<Integer, Point>> values,

262

Collector<Tuple2<Integer, Point>> out) throws Exception;

263

}

264

265

/**

266

* Calculates average position for new centroid

267

*/

268

public static class CentroidAverager

269

implements MapFunction<Tuple2<Integer, Point>, Centroid> {

270

271

public Centroid map(Tuple2<Integer, Point> value) throws Exception;

272

}

273

274

/**

275

* Custom accumulator for K-Means testing

276

*/

277

public static class CustomAccumulator implements Accumulator<Point, Point> {

278

279

private Point localValue;

280

281

public void add(Point value);

282

public Point getLocalValue();

283

public void resetLocal();

284

public void merge(Accumulator<Point, Point> other);

285

public Point clone();

286

}

287

}

288

```

289

290

### Class Loading Test Patterns

291

292

Common patterns for using class loading test programs:

293

294

**Basic Class Loading Test:**

295

296

```java

297

// Test basic class loading functionality

298

@Test

299

public void testBasicClassLoading() throws Exception {

300

// Create isolated class loader environment

301

URLClassLoader userClassLoader = createUserClassLoader();

302

303

// Execute program in separate class loader context

304

ProcessBuilder pb = new ProcessBuilder(

305

"java", "-cp", getTestClassPath(),

306

"org.apache.flink.test.classloading.jar.StreamingProgram"

307

);

308

309

Process process = pb.start();

310

int exitCode = process.waitFor();

311

312

assertEquals("Program should complete successfully", 0, exitCode);

313

}

314

```

315

316

**Checkpointed Class Loading Test:**

317

318

```java

319

@Test

320

public void testCheckpointedClassLoading() throws Exception {

321

// Test checkpointing with user-defined classes

322

String[] args = {

323

"--checkpointPath", "/tmp/test-checkpoint",

324

"--iterations", "3"

325

};

326

327

// Run checkpointed program

328

ProcessBuilder pb = new ProcessBuilder();

329

pb.command().addAll(Arrays.asList(

330

"java", "-cp", getTestClassPath(),

331

"org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram"

332

));

333

pb.command().addAll(Arrays.asList(args));

334

335

Process process = pb.start();

336

int exitCode = process.waitFor();

337

338

assertEquals("Checkpointed program should complete", 0, exitCode);

339

340

// Verify checkpoint was created

341

assertTrue("Checkpoint should exist",

342

new File("/tmp/test-checkpoint").exists());

343

}

344

```

345

346

**K-Means Class Loading Test:**

347

348

```java

349

@Test

350

public void testKMeansClassLoading() throws Exception {

351

// Test complex algorithm class loading

352

String[] args = {"100", "5", "10"}; // 100 points, 5 clusters, 10 iterations

353

354

ProcessBuilder pb = new ProcessBuilder(

355

"java", "-cp", getTestClassPath(),

356

"org.apache.flink.test.classloading.jar.KMeansForTest"

357

);

358

pb.command().addAll(Arrays.asList(args));

359

360

Process process = pb.start();

361

362

// Capture output for verification

363

BufferedReader reader = new BufferedReader(

364

new InputStreamReader(process.getInputStream()));

365

List<String> output = reader.lines().collect(Collectors.toList());

366

367

int exitCode = process.waitFor();

368

369

assertEquals("K-Means should complete successfully", 0, exitCode);

370

assertTrue("Should output cluster results",

371

output.stream().anyMatch(line -> line.contains("Cluster")));

372

}

373

```

374

375

**Class Loading Policy Test:**

376

377

```java

378

@Test

379

public void testClassLoadingPolicy() throws Exception {

380

// Test different class loading policies

381

Map<String, String> env = new HashMap<>();

382

env.put("FLINK_CLASSPATH_POLICY", "CHILD_FIRST");

383

384

ProcessBuilder pb = new ProcessBuilder(

385

"java", "-cp", getTestClassPath(),

386

"org.apache.flink.test.classloading.jar.ClassLoadingPolicyProgram"

387

);

388

pb.environment().putAll(env);

389

390

Process process = pb.start();

391

int exitCode = process.waitFor();

392

393

assertEquals("Policy test should complete", 0, exitCode);

394

}

395

```

396

397

**User Code Type Test:**

398

399

```java

400

@Test

401

public void testUserCodeType() throws Exception {

402

ProcessBuilder pb = new ProcessBuilder(

403

"java", "-cp", getTestClassPath(),

404

"org.apache.flink.test.classloading.jar.UserCodeType"

405

);

406

407

Process process = pb.start();

408

int exitCode = process.waitFor();

409

410

assertEquals("User code type test should work", 0, exitCode);

411

}

412

```

413

414

These class loading test programs ensure that Flink properly isolates user code, handles different class loading scenarios, and maintains compatibility across different deployment configurations and class loading policies.