or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-testing.mdconnector-testing.mdcore-testing.mdindex.mdmigration-testing.mdtable-testing.mdtest-environments.md

client-testing.mddocs/

0

# Client Application Testing

1

2

Testing utilities specifically designed for Flink client applications, including classloader testing and job submission scenarios. These utilities help test the interaction between client applications and Flink clusters.

3

4

## Capabilities

5

6

### Classloader Testing

7

8

#### TestUserClassLoaderJob

9

10

Test job implementation designed for testing user classloader scenarios and class loading behavior.

11

12

```java { .api }

13

/**

14

* Test job for user classloader scenarios

15

* Used to verify proper class loading in client applications

16

*/

17

class TestUserClassLoaderJob {

18

/** Main entry point for classloader testing */

19

static void main(String[] args) throws Exception;

20

21

/** Execute job with custom classloader */

22

static void executeWithCustomClassLoader(ClassLoader classLoader) throws Exception;

23

24

/** Verify class loading behavior */

25

static boolean verifyClassLoading(String className, ClassLoader expectedLoader);

26

27

/** Get job execution result */

28

static JobExecutionResult getLastExecutionResult();

29

}

30

```

31

32

**Usage Examples:**

33

34

```java

35

import org.apache.flink.client.testjar.TestUserClassLoaderJob;

36

37

@Test

38

public void testClientClassLoading() throws Exception {

39

// Create custom classloader

40

URLClassLoader customClassLoader = new URLClassLoader(

41

new URL[]{new File("test-lib.jar").toURI().toURL()},

42

Thread.currentThread().getContextClassLoader()

43

);

44

45

// Execute job with custom classloader

46

TestUserClassLoaderJob.executeWithCustomClassLoader(customClassLoader);

47

48

// Verify class loading behavior

49

assertTrue(TestUserClassLoaderJob.verifyClassLoading(

50

"com.example.MyCustomClass", customClassLoader));

51

52

// Check execution result

53

JobExecutionResult result = TestUserClassLoaderJob.getLastExecutionResult();

54

assertNotNull(result);

55

}

56

```

57

58

#### TestUserClassLoaderAdditionalArtifact

59

60

Utility for testing additional artifacts and dependencies in client classloader scenarios.

61

62

```java { .api }

63

/**

64

* Additional artifacts for classloader testing

65

* Provides utilities for testing dependency resolution

66

*/

67

class TestUserClassLoaderAdditionalArtifact {

68

/** Load additional artifact into classloader */

69

static void loadArtifact(String artifactPath, ClassLoader classLoader) throws Exception;

70

71

/** Verify artifact availability */

72

static boolean isArtifactAvailable(String artifactName, ClassLoader classLoader);

73

74

/** Get artifact metadata */

75

static ArtifactMetadata getArtifactMetadata(String artifactPath);

76

77

/** Resolve artifact dependencies */

78

static List<String> resolveDependencies(String artifactPath);

79

}

80

81

/**

82

* Metadata information about test artifacts

83

*/

84

class ArtifactMetadata {

85

/** Get artifact name */

86

String getName();

87

88

/** Get artifact version */

89

String getVersion();

90

91

/** Get artifact dependencies */

92

List<String> getDependencies();

93

94

/** Get artifact manifest attributes */

95

Map<String, String> getManifestAttributes();

96

}

97

```

98

99

**Usage Examples:**

100

101

```java

102

import org.apache.flink.client.testjar.TestUserClassLoaderAdditionalArtifact;

103

104

@Test

105

public void testAdditionalArtifacts() throws Exception {

106

URLClassLoader testClassLoader = new URLClassLoader(new URL[]{});

107

108

// Load additional test artifact

109

String artifactPath = "test-connector.jar";

110

TestUserClassLoaderAdditionalArtifact.loadArtifact(artifactPath, testClassLoader);

111

112

// Verify artifact is available

113

assertTrue(TestUserClassLoaderAdditionalArtifact.isArtifactAvailable(

114

"test-connector", testClassLoader));

115

116

// Get artifact information

117

ArtifactMetadata metadata = TestUserClassLoaderAdditionalArtifact.getArtifactMetadata(artifactPath);

118

assertEquals("test-connector", metadata.getName());

119

assertEquals("1.0.0", metadata.getVersion());

120

121

// Check dependencies

122

List<String> dependencies = TestUserClassLoaderAdditionalArtifact.resolveDependencies(artifactPath);

123

assertFalse(dependencies.isEmpty());

124

}

125

```

126

127

## Client Testing Patterns

128

129

### Job Submission Testing

130

131

```java

132

import org.apache.flink.client.testjar.TestUserClassLoaderJob;

133

import org.apache.flink.client.program.ClusterClient;

134

import org.apache.flink.runtime.jobgraph.JobGraph;

135

136

@Test

137

public void testJobSubmissionWithCustomClasspath() throws Exception {

138

// Set up custom classpath

139

List<URL> classpath = Arrays.asList(

140

new File("connector.jar").toURI().toURL(),

141

new File("custom-functions.jar").toURI().toURL()

142

);

143

144

URLClassLoader jobClassLoader = new URLClassLoader(

145

classpath.toArray(new URL[0]),

146

Thread.currentThread().getContextClassLoader()

147

);

148

149

// Execute job with custom classloader

150

Thread.currentThread().setContextClassLoader(jobClassLoader);

151

try {

152

TestUserClassLoaderJob.main(new String[]{"--input", "test-input"});

153

154

JobExecutionResult result = TestUserClassLoaderJob.getLastExecutionResult();

155

assertNotNull(result);

156

assertTrue(result.isJobExecutionResult());

157

158

} finally {

159

Thread.currentThread().setContextClassLoader(

160

ClassLoader.getSystemClassLoader());

161

}

162

}

163

```

164

165

### Dependency Isolation Testing

166

167

```java

168

@Test

169

public void testDependencyIsolation() throws Exception {

170

// Create isolated classloaders for different components

171

URLClassLoader connectorClassLoader = new URLClassLoader(

172

new URL[]{new File("connector-v1.jar").toURI().toURL()});

173

174

URLClassLoader functionClassLoader = new URLClassLoader(

175

new URL[]{new File("functions-v2.jar").toURI().toURL()});

176

177

// Load artifacts into respective classloaders

178

TestUserClassLoaderAdditionalArtifact.loadArtifact(

179

"connector-v1.jar", connectorClassLoader);

180

TestUserClassLoaderAdditionalArtifact.loadArtifact(

181

"functions-v2.jar", functionClassLoader);

182

183

// Verify isolation

184

assertTrue(TestUserClassLoaderAdditionalArtifact.isArtifactAvailable(

185

"connector-v1", connectorClassLoader));

186

assertFalse(TestUserClassLoaderAdditionalArtifact.isArtifactAvailable(

187

"connector-v1", functionClassLoader));

188

189

assertTrue(TestUserClassLoaderAdditionalArtifact.isArtifactAvailable(

190

"functions-v2", functionClassLoader));

191

assertFalse(TestUserClassLoaderAdditionalArtifact.isArtifactAvailable(

192

"functions-v2", connectorClassLoader));

193

}

194

```

195

196

### Client Configuration Testing

197

198

```java

199

@Test

200

public void testClientConfiguration() throws Exception {

201

// Test different client configurations

202

Configuration clientConfig = new Configuration();

203

clientConfig.setString("jobmanager.rpc.address", "localhost");

204

clientConfig.setInteger("jobmanager.rpc.port", 6123);

205

clientConfig.setString("rest.address", "localhost");

206

clientConfig.setInteger("rest.port", 8081);

207

208

// Create test job with configuration

209

StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(

210

"localhost", 6123, clientConfig);

211

212

// Add test pipeline

213

DataStream<String> source = env.fromElements("test1", "test2", "test3");

214

source.print();

215

216

// Execute through client

217

JobExecutionResult result = env.execute("Client Configuration Test");

218

assertNotNull(result);

219

}

220

```

221

222

### Resource Management Testing

223

224

```java

225

@Test

226

public void testResourceManagement() throws Exception {

227

// Test resource allocation for client jobs

228

Configuration config = new Configuration();

229

config.setString("taskmanager.memory.process.size", "1g");

230

config.setInteger("taskmanager.numberOfTaskSlots", 4);

231

config.setInteger("parallelism.default", 2);

232

233

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

234

env.getConfig().setGlobalJobParameters(ParameterTool.fromMap(config.toMap()));

235

236

// Create resource-intensive job

237

DataStream<Integer> numbers = env.fromSequence(1, 1000000);

238

numbers.map(x -> x * x).print();

239

240

// Monitor resource usage during execution

241

JobExecutionResult result = env.execute("Resource Management Test");

242

243

// Verify resource constraints were respected

244

assertTrue(result.getNetRuntime() > 0);

245

}

246

```

247

248

### Client-Server Communication Testing

249

250

```java

251

@Test

252

public void testClientServerCommunication() throws Exception {

253

// Test communication between client and Flink cluster

254

MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(

255

new MiniClusterResourceConfiguration.Builder()

256

.setNumberTaskManagers(1)

257

.setNumberSlotsPerTaskManager(4)

258

.build());

259

260

cluster.before();

261

try {

262

ClusterClient<?> client = cluster.getClusterClient();

263

264

// Submit test job through client

265

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

266

DataStream<String> source = env.fromElements("hello", "world");

267

source.print();

268

269

JobGraph jobGraph = env.getStreamGraph().getJobGraph();

270

271

// Test job submission

272

JobID jobId = client.submitJob(jobGraph).get();

273

assertNotNull(jobId);

274

275

// Test job monitoring

276

JobStatus status = client.getJobStatus(jobId).get();

277

assertTrue(status == JobStatus.RUNNING || status == JobStatus.FINISHED);

278

279

// Test job cancellation (if still running)

280

if (status == JobStatus.RUNNING) {

281

client.cancel(jobId).get();

282

JobStatus cancelledStatus = client.getJobStatus(jobId).get();

283

assertEquals(JobStatus.CANCELED, cancelledStatus);

284

}

285

286

} finally {

287

cluster.after();

288

}

289

}

290

```

291

292

### Dynamic Class Loading Testing

293

294

```java

295

@Test

296

public void testDynamicClassLoading() throws Exception {

297

// Test dynamic loading of classes during job execution

298

String jarPath = createDynamicJar(); // Helper method to create JAR

299

300

// Create classloader with dynamically created JAR

301

URLClassLoader dynamicClassLoader = new URLClassLoader(

302

new URL[]{new File(jarPath).toURI().toURL()});

303

304

// Load class dynamically

305

Class<?> dynamicClass = dynamicClassLoader.loadClass("com.example.DynamicFunction");

306

Object instance = dynamicClass.getDeclaredConstructor().newInstance();

307

308

// Verify dynamic class functionality

309

Method processMethod = dynamicClass.getMethod("process", String.class);

310

String result = (String) processMethod.invoke(instance, "test");

311

assertEquals("processed: test", result);

312

313

// Use dynamic class in Flink job

314

TestUserClassLoaderJob.executeWithCustomClassLoader(dynamicClassLoader);

315

assertTrue(TestUserClassLoaderJob.verifyClassLoading(

316

"com.example.DynamicFunction", dynamicClassLoader));

317

}

318

319

private String createDynamicJar() throws Exception {

320

// Helper method to create a JAR file with test classes

321

String jarPath = "/tmp/dynamic-test.jar";

322

323

try (JarOutputStream jos = new JarOutputStream(new FileOutputStream(jarPath))) {

324

// Add class files to JAR

325

JarEntry entry = new JarEntry("com/example/DynamicFunction.class");

326

jos.putNextEntry(entry);

327

328

// Write compiled class bytes (simplified example)

329

byte[] classBytes = compileTestClass(); // Implementation omitted for brevity

330

jos.write(classBytes);

331

jos.closeEntry();

332

}

333

334

return jarPath;

335

}

336

```