or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli.mdcluster-management.mdexecution-context.mdexecution-environments.mdindex.mdprogram-management.md

cluster-management.mddocs/

0

# Cluster Management

1

2

Core functionality for connecting to and managing Flink clusters, including job submission, monitoring, and lifecycle operations.

3

4

## Capabilities

5

6

### ClusterClient (Abstract Base Class)

7

8

Base class for all cluster clients that communicate with Flink clusters. Provides the core interface for job management operations.

9

10

```java { .api }

11

/**

12

* Abstract base class for all cluster clients that communicate with Flink clusters.

13

* Encapsulates the functionality necessary to submit a program to a remote cluster.

14

*/

15

public abstract class ClusterClient {

16

/**

17

* Creates a instance that submits the programs to the JobManager defined in the configuration.

18

* This method will try to resolve the JobManager hostname and throw an exception if that is not possible.

19

* @param flinkConfig The config used to obtain the job-manager's address, and used to configure the optimizer.

20

* @throws Exception when cannot create the high availability services

21

*/

22

public ClusterClient(Configuration flinkConfig) throws Exception;

23

24

/**

25

* Creates a instance that submits the programs to the JobManager defined in the configuration.

26

* This method will try to resolve the JobManager hostname and throw an exception if that is not possible.

27

* @param flinkConfig The config used to obtain the job-manager's address, and used to configure the optimizer.

28

* @param highAvailabilityServices HighAvailabilityServices to use for leader retrieval

29

*/

30

public ClusterClient(Configuration flinkConfig, HighAvailabilityServices highAvailabilityServices);

31

32

// Configuration Methods

33

34

/**

35

* Shuts down the client. This stops the internal actor system and actors.

36

* @throws Exception In case the shutdown did not complete successfully

37

*/

38

public void shutdown() throws Exception;

39

40

/**

41

* Configures whether the client should print progress updates during the execution to System.out.

42

* All updates are logged via the SLF4J loggers regardless of this setting.

43

* @param print True to print updates to standard out during execution, false to not print them.

44

*/

45

public void setPrintStatusDuringExecution(boolean print);

46

47

/**

48

* Returns whether the client will print progress updates during the execution to System.out

49

* @return boolean indicating print status

50

*/

51

public boolean getPrintStatusDuringExecution();

52

53

/**

54

* Gets the current JobManager address (may change in case of a HA setup).

55

* @return The address (host and port) of the leading JobManager

56

*/

57

public InetSocketAddress getJobManagerAddress();

58

59

/**

60

* Return the Flink configuration object

61

* @return The Flink configuration object

62

*/

63

public Configuration getFlinkConfiguration();

64

65

// Program Execution Methods

66

67

/**

68

* General purpose method to run a user jar from the CliFrontend in either blocking or detached mode,

69

* depending on whether {@code setDetached(true)} or {@code setDetached(false)}.

70

* @param prog The packaged program to execute

71

* @param parallelism The parallelism level for job execution

72

* @return The result of the execution

73

* @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file,

74

* or if the submission failed. That might be either due to an I/O problem, i.e. the job-manager

75

* is unreachable, or due to the fact that the parallel execution failed.

76

* @throws ProgramMissingJobException Thrown, if the submitted program cannot be executed, because it lacks

77

* a job definition.

78

*/

79

public JobSubmissionResult run(PackagedProgram prog, int parallelism) throws ProgramInvocationException, ProgramMissingJobException;

80

81

/**

82

* Runs a program on the Flink cluster to which this client is connected.

83

* @param program The job with associated JAR files

84

* @param parallelism The parallelism level for job execution

85

* @return JobSubmissionResult containing job ID and execution details

86

* @throws ProgramInvocationException if job submission fails

87

*/

88

public JobSubmissionResult run(JobWithJars program, int parallelism) throws ProgramInvocationException;

89

90

/**

91

* Runs a program on the Flink cluster to which this client is connected. The call blocks until the

92

* execution is complete, and returns afterwards.

93

* @param jobWithJars The job with associated JAR files

94

* @param parallelism The parallelism level for job execution

95

* @param savepointSettings Savepoint restore settings

96

* @return JobSubmissionResult containing job ID and execution details

97

* @throws CompilerException Thrown, if the compiler encounters an illegal situation.

98

* @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file,

99

* or if the submission failed. That might be either due to an I/O problem, i.e. the job-manager

100

* is unreachable, or due to the fact that the parallel execution failed.

101

*/

102

public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, SavepointRestoreSettings savepointSettings) throws CompilerException, ProgramInvocationException;

103

104

/**

105

* Submits a program to the cluster.

106

* @param compiledPlan The optimized program plan to submit

107

* @param libraries The libraries that contain the program and all dependencies.

108

* @param classpaths The classpaths that contain the program and all dependencies.

109

* @param classLoader The class-loader to deserialize the job and result.

110

* @return JobSubmissionResult containing job ID and execution details

111

* @throws ProgramInvocationException if job submission fails

112

*/

113

public JobSubmissionResult run(FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader) throws ProgramInvocationException;

114

115

/**

116

* Submits a program to the cluster.

117

* @param compiledPlan The optimized program plan to submit

118

* @param libraries The libraries that contain the program and all dependencies.

119

* @param classpaths The classpaths that contain the program and all dependencies.

120

* @param classLoader The class-loader to deserialize the job and result.

121

* @param savepointSettings Savepoint restore settings

122

* @return JobSubmissionResult containing job ID and execution details

123

* @throws ProgramInvocationException if job submission fails

124

*/

125

public JobSubmissionResult run(FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings) throws ProgramInvocationException;

126

127

/**

128

* Submits a JobGraph blocking.

129

* @param jobGraph The job graph to execute

130

* @param classLoader User code class loader to deserialize the results and errors (may contain custom classes)

131

* @return JobExecutionResult with execution details and results

132

* @throws ProgramInvocationException if job execution fails

133

*/

134

public JobExecutionResult run(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException;

135

136

/**

137

* Submits a JobGraph detached.

138

* @param jobGraph The job graph to execute

139

* @param classLoader User code class loader to deserialize the results and errors (may contain custom classes)

140

* @return JobSubmissionResult containing job ID

141

* @throws ProgramInvocationException if job submission fails

142

*/

143

public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException;

144

145

// Job Management Methods

146

147

/**

148

* Reattaches to a running from from the supplied job id

149

* @param jobID The job id of the job to attach to

150

* @return The JobExecutionResult for the jobID

151

* @throws JobExecutionException if an error occurs during monitoring the job execution

152

*/

153

public JobExecutionResult retrieveJob(JobID jobID) throws JobExecutionException;

154

155

/**

156

* Reattaches to a running job with the given job id.

157

* @param jobID The job id of the job to attach to

158

* @return The JobExecutionResult for the jobID

159

* @throws JobExecutionException if an error occurs during monitoring the job execution

160

*/

161

public JobListeningContext connectToJob(JobID jobID) throws JobExecutionException;

162

163

/**

164

* Cancels a job identified by the job id.

165

* @param jobId the job id

166

* @throws Exception In case an error occurred.

167

*/

168

public void cancel(JobID jobId) throws Exception;

169

170

/**

171

* Stops a program on Flink cluster whose job-manager is configured in this client's configuration.

172

* Stopping works only for streaming programs. Be aware, that the program might continue to run for a while after sending the stop command,

173

* because after sources stopped to emit data all operators need to finish processing.

174

* @param jobId the job ID of the streaming program to stop

175

* @throws Exception If the job ID is invalid (ie, is unknown or refers to a batch job) or if sending the stop signal failed.

176

* That might be due to an I/O problem, ie, the job-manager is unreachable.

177

*/

178

public void stop(final JobID jobId) throws Exception;

179

180

// Accumulator Methods

181

182

/**

183

* Requests and returns the accumulators for the given job identifier. Accumulators can be requested while a

184

* is running or after it has finished. The default class loader is used to deserialize the incoming accumulator results.

185

* @param jobID The job identifier of a job.

186

* @return A Map containing the accumulator's name and its value.

187

*/

188

public Map<String, Object> getAccumulators(JobID jobID) throws Exception;

189

190

/**

191

* Requests and returns the accumulators for the given job identifier. Accumulators can be requested while a

192

* is running or after it has finished.

193

* @param jobID The job identifier of a job.

194

* @param loader The class loader for deserializing the accumulator results.

195

* @return A Map containing the accumulator's name and its value.

196

*/

197

public Map<String, Object> getAccumulators(JobID jobID, ClassLoader loader) throws Exception;

198

199

// Session Management Methods

200

201

/**

202

* Tells the JobManager to finish the session (job) defined by the given ID.

203

* @param jobId The ID that identifies the session.

204

*/

205

public void endSession(JobID jobId) throws Exception;

206

207

/**

208

* Tells the JobManager to finish the sessions (jobs) defined by the given IDs.

209

* @param jobIds The IDs that identify the sessions.

210

*/

211

public void endSessions(List<JobID> jobIds) throws Exception;

212

213

// Static Plan Methods

214

215

/**

216

* Returns the optimized execution plan of the program as a JSON string.

217

* @param compiler The optimizer to use

218

* @param prog The program to get the plan for

219

* @param parallelism The parallelism to compile the plan for

220

* @return String representation of optimized plan as JSON

221

* @throws CompilerException if compilation fails

222

* @throws ProgramInvocationException if program cannot be invoked

223

*/

224

public static String getOptimizedPlanAsJson(Optimizer compiler, PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException;

225

226

/**

227

* Returns the optimized execution plan of the program.

228

* @param compiler The optimizer to use

229

* @param prog The program to get the plan for

230

* @param parallelism The parallelism to compile the plan for

231

* @return FlinkPlan optimized plan

232

* @throws CompilerException if compilation fails

233

* @throws ProgramInvocationException if program cannot be invoked

234

*/

235

public static FlinkPlan getOptimizedPlan(Optimizer compiler, PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException;

236

237

/**

238

* Returns the optimized execution plan of the program.

239

* @param compiler The optimizer to use

240

* @param p The program plan to optimize

241

* @param parallelism The parallelism to compile the plan for

242

* @return OptimizedPlan

243

* @throws CompilerException if compilation fails

244

*/

245

public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int parallelism) throws CompilerException;

246

247

// Utility Methods

248

249

/**

250

* Creates a JobGraph from a packaged program and optimized plan.

251

* @param prog The packaged program

252

* @param optPlan The optimized plan

253

* @param savepointSettings Savepoint restore settings

254

* @return JobGraph

255

* @throws ProgramInvocationException if JobGraph creation fails

256

*/

257

public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan, SavepointRestoreSettings savepointSettings) throws ProgramInvocationException;

258

259

/**

260

* Returns the ActorGateway of the current job manager leader using the LeaderRetrievalService.

261

* @return ActorGateway of the current job manager leader

262

* @throws Exception

263

*/

264

public ActorGateway getJobManagerGateway() throws Exception;

265

266

// Client Configuration Methods

267

268

/**

269

* Set the mode of this client (detached or blocking job execution).

270

* @param isDetached If true, the client will submit programs detached via the run method

271

*/

272

public void setDetached(boolean isDetached);

273

274

/**

275

* A flag to indicate whether this clients submits jobs detached.

276

* @return True if the Client submits detached, false otherwise

277

*/

278

public boolean isDetached();

279

280

// Abstract Methods (Must be implemented by subclasses)

281

282

/**

283

* Blocks until the client has determined that the cluster is ready for Job submission.

284

* This is delayed until right before job submission to report any other errors first (e.g. invalid job definitions/errors in the user jar)

285

*/

286

public abstract void waitForClusterToBeReady();

287

288

/**

289

* Returns an URL (as a string) to the JobManager web interface

290

*/

291

public abstract String getWebInterfaceURL();

292

293

/**

294

* Returns the latest cluster status, with number of Taskmanagers and slots

295

*/

296

public abstract GetClusterStatusResponse getClusterStatus();

297

298

/**

299

* Returns a string representation of the cluster.

300

*/

301

public abstract String getClusterIdentifier();

302

303

/**

304

* The client may define an upper limit on the number of slots to use

305

* @return -1 if unknown

306

*/

307

public abstract int getMaxSlots();

308

309

/**

310

* Returns true if the client already has the user jar and providing it again would result in duplicate uploading of the jar.

311

*/

312

public abstract boolean hasUserJarsInClassPath(List<URL> userJarFiles);

313

}

314

```

315

316

### StandaloneClusterClient

317

318

Client implementation for connecting to standalone Flink clusters.

319

320

```java { .api }

321

/**

322

* Client for connecting to standalone Flink clusters

323

*/

324

public class StandaloneClusterClient extends ClusterClient {

325

/**

326

* Creates a client for connecting to a standalone cluster

327

* @param config Configuration containing cluster connection details

328

*/

329

public StandaloneClusterClient(Configuration config);

330

331

/**

332

* Creates a client with high availability services

333

* @param config Configuration containing cluster connection details

334

* @param highAvailabilityServices High availability services for cluster coordination

335

*/

336

public StandaloneClusterClient(Configuration config, HighAvailabilityServices highAvailabilityServices);

337

}

338

```

339

340

**Usage Examples:**

341

342

```java

343

import org.apache.flink.client.program.StandaloneClusterClient;

344

import org.apache.flink.configuration.Configuration;

345

import org.apache.flink.configuration.JobManagerOptions;

346

import org.apache.flink.runtime.jobgraph.JobID;

347

348

// Create configuration for cluster connection

349

Configuration config = new Configuration();

350

config.setString(JobManagerOptions.ADDRESS, "localhost");

351

config.setInteger(JobManagerOptions.PORT, 6123);

352

353

// Create cluster client

354

StandaloneClusterClient client = new StandaloneClusterClient(config);

355

356

// Submit a job

357

PackagedProgram program = new PackagedProgram(new File("my-job.jar"));

358

JobSubmissionResult result = client.run(program, 4);

359

JobID jobId = result.getJobID();

360

361

// Monitor job execution

362

JobExecutionResult executionResult = client.retrieveJob(jobId);

363

System.out.println("Job execution time: " + executionResult.getNetRuntime() + "ms");

364

365

// Get accumulator results

366

Map<String, Object> accumulators = client.getAccumulators(jobId);

367

System.out.println("Accumulators: " + accumulators);

368

369

// Cancel job if needed

370

client.cancel(jobId);

371

372

// Clean up

373

client.shutdown();

374

```

375

376

### Job Result Types

377

378

Result types returned by cluster operations.

379

380

```java { .api }

381

/**

382

* Result of job submission operations

383

*/

384

public class JobSubmissionResult {

385

/**

386

* Gets the unique identifier of the submitted job

387

* @return JobID of the submitted job

388

*/

389

public JobID getJobID();

390

391

/**

392

* Checks if this result contains execution details

393

* @return true if this is a JobExecutionResult with execution details

394

*/

395

public boolean isJobExecutionResult();

396

}

397

398

/**

399

* Extended result containing job execution details and performance metrics

400

*/

401

public class JobExecutionResult extends JobSubmissionResult {

402

/**

403

* Gets the net runtime of the job execution in milliseconds

404

* @return Job execution time in milliseconds

405

*/

406

public long getNetRuntime();

407

408

/**

409

* Gets all accumulator results from the job execution

410

* @return Map of accumulator names to their final values

411

*/

412

public Map<String, Object> getAllAccumulatorResults();

413

}

414

```