0
# Cluster Management
1
2
Core functionality for connecting to and managing Flink clusters, including job submission, monitoring, and lifecycle operations.
3
4
## Capabilities
5
6
### ClusterClient (Abstract Base Class)
7
8
Base class for all cluster clients that communicate with Flink clusters. Provides the core interface for job management operations.
9
10
```java { .api }
11
/**
12
* Abstract base class for all cluster clients that communicate with Flink clusters.
13
* Encapsulates the functionality necessary to submit a program to a remote cluster.
14
*/
15
public abstract class ClusterClient {
16
/**
17
* Creates a instance that submits the programs to the JobManager defined in the configuration.
18
* This method will try to resolve the JobManager hostname and throw an exception if that is not possible.
19
* @param flinkConfig The config used to obtain the job-manager's address, and used to configure the optimizer.
20
* @throws Exception when cannot create the high availability services
21
*/
22
public ClusterClient(Configuration flinkConfig) throws Exception;
23
24
/**
25
* Creates a instance that submits the programs to the JobManager defined in the configuration.
26
* This method will try to resolve the JobManager hostname and throw an exception if that is not possible.
27
* @param flinkConfig The config used to obtain the job-manager's address, and used to configure the optimizer.
28
* @param highAvailabilityServices HighAvailabilityServices to use for leader retrieval
29
*/
30
public ClusterClient(Configuration flinkConfig, HighAvailabilityServices highAvailabilityServices);
31
32
// Configuration Methods
33
34
/**
35
* Shuts down the client. This stops the internal actor system and actors.
36
* @throws Exception In case the shutdown did not complete successfully
37
*/
38
public void shutdown() throws Exception;
39
40
/**
41
* Configures whether the client should print progress updates during the execution to System.out.
42
* All updates are logged via the SLF4J loggers regardless of this setting.
43
* @param print True to print updates to standard out during execution, false to not print them.
44
*/
45
public void setPrintStatusDuringExecution(boolean print);
46
47
/**
48
* Returns whether the client will print progress updates during the execution to System.out
49
* @return boolean indicating print status
50
*/
51
public boolean getPrintStatusDuringExecution();
52
53
/**
54
* Gets the current JobManager address (may change in case of a HA setup).
55
* @return The address (host and port) of the leading JobManager
56
*/
57
public InetSocketAddress getJobManagerAddress();
58
59
/**
60
* Return the Flink configuration object
61
* @return The Flink configuration object
62
*/
63
public Configuration getFlinkConfiguration();
64
65
// Program Execution Methods
66
67
/**
68
* General purpose method to run a user jar from the CliFrontend in either blocking or detached mode,
69
* depending on whether {@code setDetached(true)} or {@code setDetached(false)}.
70
* @param prog The packaged program to execute
71
* @param parallelism The parallelism level for job execution
72
* @return The result of the execution
73
* @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file,
74
* or if the submission failed. That might be either due to an I/O problem, i.e. the job-manager
75
* is unreachable, or due to the fact that the parallel execution failed.
76
* @throws ProgramMissingJobException Thrown, if the submitted program cannot be executed, because it lacks
77
* a job definition.
78
*/
79
public JobSubmissionResult run(PackagedProgram prog, int parallelism) throws ProgramInvocationException, ProgramMissingJobException;
80
81
/**
82
* Runs a program on the Flink cluster to which this client is connected.
83
* @param program The job with associated JAR files
84
* @param parallelism The parallelism level for job execution
85
* @return JobSubmissionResult containing job ID and execution details
86
* @throws ProgramInvocationException if job submission fails
87
*/
88
public JobSubmissionResult run(JobWithJars program, int parallelism) throws ProgramInvocationException;
89
90
/**
91
* Runs a program on the Flink cluster to which this client is connected. The call blocks until the
92
* execution is complete, and returns afterwards.
93
* @param jobWithJars The job with associated JAR files
94
* @param parallelism The parallelism level for job execution
95
* @param savepointSettings Savepoint restore settings
96
* @return JobSubmissionResult containing job ID and execution details
97
* @throws CompilerException Thrown, if the compiler encounters an illegal situation.
98
* @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file,
99
* or if the submission failed. That might be either due to an I/O problem, i.e. the job-manager
100
* is unreachable, or due to the fact that the parallel execution failed.
101
*/
102
public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, SavepointRestoreSettings savepointSettings) throws CompilerException, ProgramInvocationException;
103
104
/**
105
* Submits a program to the cluster.
106
* @param compiledPlan The optimized program plan to submit
107
* @param libraries The libraries that contain the program and all dependencies.
108
* @param classpaths The classpaths that contain the program and all dependencies.
109
* @param classLoader The class-loader to deserialize the job and result.
110
* @return JobSubmissionResult containing job ID and execution details
111
* @throws ProgramInvocationException if job submission fails
112
*/
113
public JobSubmissionResult run(FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader) throws ProgramInvocationException;
114
115
/**
116
* Submits a program to the cluster.
117
* @param compiledPlan The optimized program plan to submit
118
* @param libraries The libraries that contain the program and all dependencies.
119
* @param classpaths The classpaths that contain the program and all dependencies.
120
* @param classLoader The class-loader to deserialize the job and result.
121
* @param savepointSettings Savepoint restore settings
122
* @return JobSubmissionResult containing job ID and execution details
123
* @throws ProgramInvocationException if job submission fails
124
*/
125
public JobSubmissionResult run(FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings) throws ProgramInvocationException;
126
127
/**
128
* Submits a JobGraph blocking.
129
* @param jobGraph The job graph to execute
130
* @param classLoader User code class loader to deserialize the results and errors (may contain custom classes)
131
* @return JobExecutionResult with execution details and results
132
* @throws ProgramInvocationException if job execution fails
133
*/
134
public JobExecutionResult run(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException;
135
136
/**
137
* Submits a JobGraph detached.
138
* @param jobGraph The job graph to execute
139
* @param classLoader User code class loader to deserialize the results and errors (may contain custom classes)
140
* @return JobSubmissionResult containing job ID
141
* @throws ProgramInvocationException if job submission fails
142
*/
143
public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException;
144
145
// Job Management Methods
146
147
/**
148
* Reattaches to a running from from the supplied job id
149
* @param jobID The job id of the job to attach to
150
* @return The JobExecutionResult for the jobID
151
* @throws JobExecutionException if an error occurs during monitoring the job execution
152
*/
153
public JobExecutionResult retrieveJob(JobID jobID) throws JobExecutionException;
154
155
/**
156
* Reattaches to a running job with the given job id.
157
* @param jobID The job id of the job to attach to
158
* @return The JobExecutionResult for the jobID
159
* @throws JobExecutionException if an error occurs during monitoring the job execution
160
*/
161
public JobListeningContext connectToJob(JobID jobID) throws JobExecutionException;
162
163
/**
164
* Cancels a job identified by the job id.
165
* @param jobId the job id
166
* @throws Exception In case an error occurred.
167
*/
168
public void cancel(JobID jobId) throws Exception;
169
170
/**
171
* Stops a program on Flink cluster whose job-manager is configured in this client's configuration.
172
* Stopping works only for streaming programs. Be aware, that the program might continue to run for a while after sending the stop command,
173
* because after sources stopped to emit data all operators need to finish processing.
174
* @param jobId the job ID of the streaming program to stop
175
* @throws Exception If the job ID is invalid (ie, is unknown or refers to a batch job) or if sending the stop signal failed.
176
* That might be due to an I/O problem, ie, the job-manager is unreachable.
177
*/
178
public void stop(final JobID jobId) throws Exception;
179
180
// Accumulator Methods
181
182
/**
183
* Requests and returns the accumulators for the given job identifier. Accumulators can be requested while a
184
* is running or after it has finished. The default class loader is used to deserialize the incoming accumulator results.
185
* @param jobID The job identifier of a job.
186
* @return A Map containing the accumulator's name and its value.
187
*/
188
public Map<String, Object> getAccumulators(JobID jobID) throws Exception;
189
190
/**
191
* Requests and returns the accumulators for the given job identifier. Accumulators can be requested while a
192
* is running or after it has finished.
193
* @param jobID The job identifier of a job.
194
* @param loader The class loader for deserializing the accumulator results.
195
* @return A Map containing the accumulator's name and its value.
196
*/
197
public Map<String, Object> getAccumulators(JobID jobID, ClassLoader loader) throws Exception;
198
199
// Session Management Methods
200
201
/**
202
* Tells the JobManager to finish the session (job) defined by the given ID.
203
* @param jobId The ID that identifies the session.
204
*/
205
public void endSession(JobID jobId) throws Exception;
206
207
/**
208
* Tells the JobManager to finish the sessions (jobs) defined by the given IDs.
209
* @param jobIds The IDs that identify the sessions.
210
*/
211
public void endSessions(List<JobID> jobIds) throws Exception;
212
213
// Static Plan Methods
214
215
/**
216
* Returns the optimized execution plan of the program as a JSON string.
217
* @param compiler The optimizer to use
218
* @param prog The program to get the plan for
219
* @param parallelism The parallelism to compile the plan for
220
* @return String representation of optimized plan as JSON
221
* @throws CompilerException if compilation fails
222
* @throws ProgramInvocationException if program cannot be invoked
223
*/
224
public static String getOptimizedPlanAsJson(Optimizer compiler, PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException;
225
226
/**
227
* Returns the optimized execution plan of the program.
228
* @param compiler The optimizer to use
229
* @param prog The program to get the plan for
230
* @param parallelism The parallelism to compile the plan for
231
* @return FlinkPlan optimized plan
232
* @throws CompilerException if compilation fails
233
* @throws ProgramInvocationException if program cannot be invoked
234
*/
235
public static FlinkPlan getOptimizedPlan(Optimizer compiler, PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException;
236
237
/**
238
* Returns the optimized execution plan of the program.
239
* @param compiler The optimizer to use
240
* @param p The program plan to optimize
241
* @param parallelism The parallelism to compile the plan for
242
* @return OptimizedPlan
243
* @throws CompilerException if compilation fails
244
*/
245
public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int parallelism) throws CompilerException;
246
247
// Utility Methods
248
249
/**
250
* Creates a JobGraph from a packaged program and optimized plan.
251
* @param prog The packaged program
252
* @param optPlan The optimized plan
253
* @param savepointSettings Savepoint restore settings
254
* @return JobGraph
255
* @throws ProgramInvocationException if JobGraph creation fails
256
*/
257
public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan, SavepointRestoreSettings savepointSettings) throws ProgramInvocationException;
258
259
/**
260
* Returns the ActorGateway of the current job manager leader using the LeaderRetrievalService.
261
* @return ActorGateway of the current job manager leader
262
* @throws Exception
263
*/
264
public ActorGateway getJobManagerGateway() throws Exception;
265
266
// Client Configuration Methods
267
268
/**
269
* Set the mode of this client (detached or blocking job execution).
270
* @param isDetached If true, the client will submit programs detached via the run method
271
*/
272
public void setDetached(boolean isDetached);
273
274
/**
275
* A flag to indicate whether this clients submits jobs detached.
276
* @return True if the Client submits detached, false otherwise
277
*/
278
public boolean isDetached();
279
280
// Abstract Methods (Must be implemented by subclasses)
281
282
/**
283
* Blocks until the client has determined that the cluster is ready for Job submission.
284
* This is delayed until right before job submission to report any other errors first (e.g. invalid job definitions/errors in the user jar)
285
*/
286
public abstract void waitForClusterToBeReady();
287
288
/**
289
* Returns an URL (as a string) to the JobManager web interface
290
*/
291
public abstract String getWebInterfaceURL();
292
293
/**
294
* Returns the latest cluster status, with number of Taskmanagers and slots
295
*/
296
public abstract GetClusterStatusResponse getClusterStatus();
297
298
/**
299
* Returns a string representation of the cluster.
300
*/
301
public abstract String getClusterIdentifier();
302
303
/**
304
* The client may define an upper limit on the number of slots to use
305
* @return -1 if unknown
306
*/
307
public abstract int getMaxSlots();
308
309
/**
310
* Returns true if the client already has the user jar and providing it again would result in duplicate uploading of the jar.
311
*/
312
public abstract boolean hasUserJarsInClassPath(List<URL> userJarFiles);
313
}
314
```
315
316
### StandaloneClusterClient
317
318
Client implementation for connecting to standalone Flink clusters.
319
320
```java { .api }
321
/**
322
* Client for connecting to standalone Flink clusters
323
*/
324
public class StandaloneClusterClient extends ClusterClient {
325
/**
326
* Creates a client for connecting to a standalone cluster
327
* @param config Configuration containing cluster connection details
328
*/
329
public StandaloneClusterClient(Configuration config);
330
331
/**
332
* Creates a client with high availability services
333
* @param config Configuration containing cluster connection details
334
* @param highAvailabilityServices High availability services for cluster coordination
335
*/
336
public StandaloneClusterClient(Configuration config, HighAvailabilityServices highAvailabilityServices);
337
}
338
```
339
340
**Usage Examples:**
341
342
```java
343
import org.apache.flink.client.program.StandaloneClusterClient;
344
import org.apache.flink.configuration.Configuration;
345
import org.apache.flink.configuration.JobManagerOptions;
346
import org.apache.flink.runtime.jobgraph.JobID;
347
348
// Create configuration for cluster connection
349
Configuration config = new Configuration();
350
config.setString(JobManagerOptions.ADDRESS, "localhost");
351
config.setInteger(JobManagerOptions.PORT, 6123);
352
353
// Create cluster client
354
StandaloneClusterClient client = new StandaloneClusterClient(config);
355
356
// Submit a job
357
PackagedProgram program = new PackagedProgram(new File("my-job.jar"));
358
JobSubmissionResult result = client.run(program, 4);
359
JobID jobId = result.getJobID();
360
361
// Monitor job execution
362
JobExecutionResult executionResult = client.retrieveJob(jobId);
363
System.out.println("Job execution time: " + executionResult.getNetRuntime() + "ms");
364
365
// Get accumulator results
366
Map<String, Object> accumulators = client.getAccumulators(jobId);
367
System.out.println("Accumulators: " + accumulators);
368
369
// Cancel job if needed
370
client.cancel(jobId);
371
372
// Clean up
373
client.shutdown();
374
```
375
376
### Job Result Types
377
378
Result types returned by cluster operations.
379
380
```java { .api }
381
/**
382
* Result of job submission operations
383
*/
384
public class JobSubmissionResult {
385
/**
386
* Gets the unique identifier of the submitted job
387
* @return JobID of the submitted job
388
*/
389
public JobID getJobID();
390
391
/**
392
* Checks if this result contains execution details
393
* @return true if this is a JobExecutionResult with execution details
394
*/
395
public boolean isJobExecutionResult();
396
}
397
398
/**
399
* Extended result containing job execution details and performance metrics
400
*/
401
public class JobExecutionResult extends JobSubmissionResult {
402
/**
403
* Gets the net runtime of the job execution in milliseconds
404
* @return Job execution time in milliseconds
405
*/
406
public long getNetRuntime();
407
408
/**
409
* Gets all accumulator results from the job execution
410
* @return Map of accumulator names to their final values
411
*/
412
public Map<String, Object> getAllAccumulatorResults();
413
}
414
```