0
# Cluster Management
1
2
Cluster deployment and management functionality for various deployment targets including standalone, containerized, and cloud environments. Provides abstractions for deploying, retrieving, and managing Flink clusters.
3
4
## Capabilities
5
6
### Cluster Client Interface
7
8
Main interface for interacting with Flink clusters, providing job management and cluster operations.
9
10
```java { .api }
11
/**
12
* Main interface for interacting with Flink clusters
13
* @param <T> Type of cluster identifier
14
*/
15
public interface ClusterClient<T> extends AutoCloseable {
16
/**
17
* Returns the cluster identifier
18
* @return Cluster ID of generic type T
19
*/
20
T getClusterId();
21
22
/**
23
* Returns the Flink configuration for this cluster
24
* @return Configuration instance
25
*/
26
Configuration getFlinkConfiguration();
27
28
/**
29
* Shuts down the cluster
30
*/
31
void shutDownCluster();
32
33
/**
34
* Returns the web interface URL for the cluster
35
* @return URL string for web interface, may be null
36
*/
37
String getWebInterfaceURL();
38
39
/**
40
* Lists all jobs running on the cluster
41
* @return CompletableFuture containing collection of job status messages
42
* @throws Exception if listing fails
43
*/
44
CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception;
45
46
/**
47
* Submits a job to the cluster
48
* @param jobGraph JobGraph to submit
49
* @return CompletableFuture containing assigned JobID
50
*/
51
CompletableFuture<JobID> submitJob(JobGraph jobGraph);
52
53
/**
54
* Gets the status of a specific job
55
* @param jobId ID of the job to check
56
* @return CompletableFuture containing job status
57
*/
58
CompletableFuture<JobStatus> getJobStatus(JobID jobId);
59
60
/**
61
* Requests the result of a job execution
62
* @param jobId ID of the job
63
* @return CompletableFuture containing job result
64
*/
65
CompletableFuture<JobResult> requestJobResult(JobID jobId);
66
67
/**
68
* Gets job accumulators with default classloader
69
* @param jobID ID of the job
70
* @return CompletableFuture containing accumulator map
71
*/
72
CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID);
73
74
/**
75
* Gets job accumulators with specific classloader
76
* @param jobID ID of the job
77
* @param loader ClassLoader for deserialization
78
* @return CompletableFuture containing accumulator map
79
*/
80
CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader);
81
82
/**
83
* Cancels a running job
84
* @param jobId ID of the job to cancel
85
* @return CompletableFuture containing acknowledgment
86
*/
87
CompletableFuture<Acknowledge> cancel(JobID jobId);
88
89
/**
90
* Cancels a job with savepoint
91
* @param jobId ID of the job to cancel
92
* @param savepointDirectory Directory to store the savepoint (nullable - uses default if null)
93
* @return CompletableFuture containing savepoint path
94
*/
95
CompletableFuture<String> cancelWithSavepoint(JobID jobId, @Nullable String savepointDirectory);
96
97
/**
98
* Stops a job with savepoint
99
* @param jobId ID of the job to stop
100
* @param advanceToEndOfEventTime Whether to advance to end of event time
101
* @param savepointDirectory Directory to store the savepoint (nullable - uses default if null)
102
* @return CompletableFuture containing savepoint path
103
*/
104
CompletableFuture<String> stopWithSavepoint(
105
JobID jobId,
106
boolean advanceToEndOfEventTime,
107
@Nullable String savepointDirectory);
108
109
/**
110
* Triggers a savepoint for a running job
111
* @param jobId ID of the job
112
* @param savepointDirectory Directory to store the savepoint (nullable - uses default if null)
113
* @return CompletableFuture containing savepoint path
114
*/
115
CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory);
116
117
/**
118
* Disposes a savepoint
119
* @param savepointPath Path to the savepoint to dispose
120
* @return CompletableFuture containing acknowledgment
121
* @throws FlinkException if disposal fails
122
*/
123
CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) throws FlinkException;
124
125
/**
126
* Sends a coordination request to an operator
127
* @param jobId ID of the job containing the operator
128
* @param operatorId ID of the operator
129
* @param request Coordination request to send
130
* @return CompletableFuture containing coordination response
131
*/
132
CompletableFuture<CoordinationResponse> sendCoordinationRequest(
133
JobID jobId,
134
OperatorID operatorId,
135
CoordinationRequest request);
136
}
137
```
138
139
**Usage Example:**
140
141
```java
142
import org.apache.flink.client.program.ClusterClient;
143
import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
144
import org.apache.flink.client.deployment.StandaloneClusterId;
145
146
// Connect to standalone cluster
147
StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config);
148
try (ClusterClient<StandaloneClusterId> client =
149
descriptor.retrieve(new StandaloneClusterId()).getClusterClient()) {
150
151
// Submit job
152
JobID jobId = client.submitJob(jobGraph).get();
153
154
// Monitor job status
155
JobStatus status = client.getJobStatus(jobId).get();
156
System.out.println("Job status: " + status);
157
158
// Trigger savepoint
159
String savepointPath = client.triggerSavepoint(jobId, "/path/to/savepoints").get();
160
System.out.println("Savepoint created at: " + savepointPath);
161
}
162
```
163
164
### Cluster Descriptor Interface
165
166
Interface for cluster deployment and management operations.
167
168
```java { .api }
169
/**
170
* Descriptor for cluster deployment and management
171
* @param <T> Type of cluster identifier
172
*/
173
public interface ClusterDescriptor<T> extends AutoCloseable {
174
/**
175
* Returns a description of the cluster
176
* @return String description of cluster capabilities
177
*/
178
String getClusterDescription();
179
180
/**
181
* Retrieves an existing cluster
182
* @param clusterId Identifier of the cluster to retrieve
183
* @return ClusterClientProvider for the cluster
184
* @throws ClusterRetrieveException if cluster cannot be retrieved
185
*/
186
ClusterClientProvider<T> retrieve(T clusterId) throws ClusterRetrieveException;
187
188
/**
189
* Deploys a new session cluster
190
* @param clusterSpecification Resource specification for the cluster
191
* @return ClusterClientProvider for the deployed cluster
192
* @throws ClusterDeploymentException if deployment fails
193
*/
194
ClusterClientProvider<T> deploySessionCluster(ClusterSpecification clusterSpecification)
195
throws ClusterDeploymentException;
196
197
/**
198
* Deploys an application cluster
199
* @param clusterSpecification Resource specification for the cluster
200
* @param applicationConfiguration Application-specific configuration
201
* @return ClusterClientProvider for the deployed cluster
202
* @throws ClusterDeploymentException if deployment fails
203
*/
204
ClusterClientProvider<T> deployApplicationCluster(
205
ClusterSpecification clusterSpecification,
206
ApplicationConfiguration applicationConfiguration)
207
throws ClusterDeploymentException;
208
209
/**
210
* Deploys a job-specific cluster
211
* @param clusterSpecification Resource specification for the cluster
212
* @param jobGraph Job graph to deploy
213
* @param detached Whether to run in detached mode
214
* @return ClusterClientProvider for the deployed cluster
215
* @throws ClusterDeploymentException if deployment fails
216
*/
217
ClusterClientProvider<T> deployJobCluster(
218
ClusterSpecification clusterSpecification,
219
JobGraph jobGraph,
220
boolean detached) throws ClusterDeploymentException;
221
222
/**
223
* Kills an existing cluster
224
* @param clusterId Identifier of the cluster to kill
225
* @throws FlinkException if killing fails
226
*/
227
void killCluster(T clusterId) throws FlinkException;
228
}
229
```
230
231
### Cluster Client Factory
232
233
Factory interface for creating cluster clients and descriptors.
234
235
```java { .api }
236
/**
237
* Factory for creating cluster clients and descriptors
238
* @param <ClusterID> Type of cluster identifier
239
*/
240
public interface ClusterClientFactory<ClusterID> {
241
/**
242
* Checks if this factory is compatible with the given configuration
243
* @param configuration Flink configuration to check
244
* @return true if compatible, false otherwise
245
*/
246
boolean isCompatibleWith(Configuration configuration);
247
248
/**
249
* Creates a cluster descriptor for the given configuration
250
* @param configuration Flink configuration
251
* @return ClusterDescriptor instance
252
*/
253
ClusterDescriptor<ClusterID> createClusterDescriptor(Configuration configuration);
254
255
/**
256
* Extracts cluster ID from configuration
257
* @param configuration Flink configuration
258
* @return Cluster ID instance
259
*/
260
ClusterID getClusterId(Configuration configuration);
261
262
/**
263
* Creates cluster specification from configuration
264
* @param configuration Flink configuration
265
* @return ClusterSpecification instance
266
*/
267
ClusterSpecification getClusterSpecification(Configuration configuration);
268
}
269
```
270
271
### Cluster Client Provider
272
273
Provider interface for accessing cluster clients.
274
275
```java { .api }
276
/**
277
* Provider for cluster clients
278
* @param <T> Type of cluster identifier
279
*/
280
public interface ClusterClientProvider<T> {
281
/**
282
* Gets the cluster client instance
283
* @return ClusterClient instance for this cluster
284
*/
285
ClusterClient<T> getClusterClient();
286
287
/**
288
* Closes the provider and releases resources
289
*/
290
void close();
291
}
292
```
293
294
### Cluster Specification
295
296
Specification for cluster resources and configuration.
297
298
```java { .api }
299
/**
300
* Specification for cluster resources
301
*/
302
public class ClusterSpecification {
303
/**
304
* Gets the master/JobManager memory in MB
305
* @return Memory in MB
306
*/
307
public int getMasterMemoryMB();
308
309
/**
310
* Gets the TaskManager memory in MB
311
* @return Memory in MB
312
*/
313
public int getTaskManagerMemoryMB();
314
315
/**
316
* Gets the number of slots per TaskManager
317
* @return Number of slots
318
*/
319
public int getSlotsPerTaskManager();
320
321
/**
322
* Builder for creating ClusterSpecification instances
323
*/
324
public static class ClusterSpecificationBuilder {
325
/**
326
* Sets master memory in MB
327
* @param masterMemoryMB Memory in MB
328
* @return This builder instance
329
*/
330
public ClusterSpecificationBuilder setMasterMemoryMB(int masterMemoryMB);
331
332
/**
333
* Sets TaskManager memory in MB
334
* @param taskManagerMemoryMB Memory in MB
335
* @return This builder instance
336
*/
337
public ClusterSpecificationBuilder setTaskManagerMemoryMB(int taskManagerMemoryMB);
338
339
/**
340
* Sets slots per TaskManager
341
* @param slotsPerTaskManager Number of slots
342
* @return This builder instance
343
*/
344
public ClusterSpecificationBuilder setSlotsPerTaskManager(int slotsPerTaskManager);
345
346
/**
347
* Creates the ClusterSpecification
348
* @return ClusterSpecification instance
349
*/
350
public ClusterSpecification createClusterSpecification();
351
}
352
}
353
```
354
355
**Usage Example:**
356
357
```java
358
import org.apache.flink.client.deployment.ClusterSpecification;
359
360
// Create cluster specification
361
ClusterSpecification spec = new ClusterSpecification.ClusterSpecificationBuilder()
362
.setMasterMemoryMB(1024)
363
.setTaskManagerMemoryMB(2048)
364
.setSlotsPerTaskManager(4)
365
.createClusterSpecification();
366
367
// Deploy session cluster
368
ClusterDescriptor<StandaloneClusterId> descriptor =
369
new StandaloneClusterDescriptor(config);
370
ClusterClientProvider<StandaloneClusterId> provider =
371
descriptor.deploySessionCluster(spec);
372
```
373
374
### Standalone Cluster Support
375
376
Implementations for standalone cluster deployment and management.
377
378
```java { .api }
379
/**
380
* Factory for standalone cluster clients
381
*/
382
public class StandaloneClientFactory implements ClusterClientFactory<StandaloneClusterId> {
383
@Override
384
public boolean isCompatibleWith(Configuration configuration);
385
386
@Override
387
public ClusterDescriptor<StandaloneClusterId> createClusterDescriptor(Configuration configuration);
388
389
@Override
390
public StandaloneClusterId getClusterId(Configuration configuration);
391
392
@Override
393
public ClusterSpecification getClusterSpecification(Configuration configuration);
394
}
395
396
/**
397
* Descriptor for standalone clusters
398
*/
399
public class StandaloneClusterDescriptor implements ClusterDescriptor<StandaloneClusterId> {
400
public StandaloneClusterDescriptor(Configuration flinkConfiguration);
401
402
// Implements all ClusterDescriptor methods
403
}
404
405
/**
406
* Identifier for standalone clusters
407
*/
408
public class StandaloneClusterId {
409
public StandaloneClusterId();
410
}
411
```
412
413
### Mini Cluster Support
414
415
Client implementation for mini clusters used in testing and local development.
416
417
```java { .api }
418
/**
419
* Client for mini clusters (testing/local development)
420
*/
421
public class MiniClusterClient implements ClusterClient<MiniClusterClient.MiniClusterId> {
422
423
/**
424
* Identifier for mini clusters
425
*/
426
public static class MiniClusterId {
427
// Mini cluster identifier implementation
428
}
429
430
// Implements all ClusterClient methods for mini cluster
431
}
432
```
433
434
## Types
435
436
```java { .api }
437
public interface ClusterClientServiceLoader {
438
Stream<ClusterClientFactory<ClusterID>> getClusterClientFactories();
439
}
440
441
public class DefaultClusterClientServiceLoader implements ClusterClientServiceLoader {
442
@Override
443
public Stream<ClusterClientFactory<ClusterID>> getClusterClientFactories();
444
}
445
446
public class JobStatusMessage {
447
public JobID getJobId();
448
public String getJobName();
449
public JobStatus getJobStatus();
450
public long getStartTime();
451
}
452
453
public interface Acknowledge {
454
// Acknowledgment marker interface
455
}
456
457
public interface CoordinationRequest {
458
// Base interface for coordination requests
459
}
460
461
public interface CoordinationResponse {
462
// Base interface for coordination responses
463
}
464
465
public class OperatorID {
466
public static OperatorID generate();
467
public static OperatorID fromHexString(String hexString);
468
}
469
470
// Exception Types
471
public class ClusterDeploymentException extends FlinkException {
472
public ClusterDeploymentException(String message);
473
public ClusterDeploymentException(String message, Throwable cause);
474
}
475
476
public class ClusterRetrieveException extends FlinkException {
477
public ClusterRetrieveException(String message);
478
public ClusterRetrieveException(String message, Throwable cause);
479
}
480
481
public abstract class AbstractContainerizedClusterClientFactory<ClusterID>
482
implements ClusterClientFactory<ClusterID> {
483
// Base class for containerized cluster client factories
484
}
485
486
public class ClusterClientJobClientAdapter<ClusterID> implements JobClient {
487
public ClusterClientJobClientAdapter(
488
ClusterClientProvider<ClusterID> clusterClientProvider,
489
JobID jobId,
490
ClassLoader userCodeClassloader);
491
492
// Implements JobClient interface by adapting ClusterClient calls
493
}
494
```