0
# Cluster Management
1
2
The Apache Flink Cluster Management module (`org.apache.flink.client.deployment.*`) provides comprehensive deployment and cluster interaction services supporting multiple deployment targets through pluggable factory patterns. This module handles cluster lifecycle operations including deployment, retrieval, and management across different environments like standalone, YARN, and Kubernetes clusters.
3
4
## Core Deployment Interfaces
5
6
### ClusterClientFactory { .api }
7
8
Factory interface for creating cluster-specific clients and descriptors.
9
10
```java
11
public interface ClusterClientFactory<ClusterID> {
12
// Compatibility and configuration
13
boolean isCompatibleWith(Configuration configuration);
14
ClusterID getClusterId(Configuration configuration);
15
ClusterSpecification getClusterSpecification(Configuration configuration);
16
17
// Cluster descriptor creation
18
ClusterDescriptor<ClusterID> createClusterDescriptor(Configuration configuration);
19
}
20
```
21
22
### ClusterDescriptor { .api }
23
24
Interface for describing and managing cluster operations including deployment and retrieval.
25
26
```java
27
public interface ClusterDescriptor<T> extends AutoCloseable {
28
// Cluster information
29
String getClusterDescription();
30
31
// Cluster operations
32
ClusterClientProvider<T> retrieve(T clusterId);
33
ClusterClientProvider<T> deploySessionCluster(ClusterSpecification clusterSpecification);
34
ClusterClientProvider<T> deployApplicationCluster(ClusterSpecification clusterSpecification,
35
ApplicationConfiguration applicationConfiguration);
36
ClusterClientProvider<T> deployJobCluster(ClusterSpecification clusterSpecification,
37
JobGraph jobGraph,
38
boolean detached);
39
40
// Cluster management
41
void killCluster(T clusterId);
42
void close();
43
}
44
```
45
46
### ClusterClientServiceLoader { .api }
47
48
Service loader interface for discovering and loading cluster client factories.
49
50
```java
51
public interface ClusterClientServiceLoader {
52
<ClusterID> ClusterClientFactory<ClusterID> getClusterClientFactory(Configuration configuration);
53
}
54
```
55
56
## Implementation Classes
57
58
### DefaultClusterClientServiceLoader { .api }
59
60
Default implementation of the cluster client service loader using Java ServiceLoader mechanism.
61
62
```java
63
public class DefaultClusterClientServiceLoader implements ClusterClientServiceLoader {
64
// Service loading implementation
65
public <ClusterID> ClusterClientFactory<ClusterID> getClusterClientFactory(Configuration configuration) { }
66
}
67
```
68
69
### StandaloneClientFactory { .api }
70
71
Factory implementation for standalone Flink clusters.
72
73
```java
74
public class StandaloneClientFactory implements ClusterClientFactory<StandaloneClusterId> {
75
// ClusterClientFactory interface implementations
76
public boolean isCompatibleWith(Configuration configuration) { }
77
public ClusterDescriptor<StandaloneClusterId> createClusterDescriptor(Configuration configuration) { }
78
public StandaloneClusterId getClusterId(Configuration configuration) { }
79
public ClusterSpecification getClusterSpecification(Configuration configuration) { }
80
}
81
```
82
83
### StandaloneClusterDescriptor { .api }
84
85
Cluster descriptor implementation for standalone Flink clusters.
86
87
```java
88
public class StandaloneClusterDescriptor implements ClusterDescriptor<StandaloneClusterId> {
89
// ClusterDescriptor interface implementations
90
public String getClusterDescription() { }
91
public ClusterClientProvider<StandaloneClusterId> retrieve(StandaloneClusterId clusterId) { }
92
public ClusterClientProvider<StandaloneClusterId> deploySessionCluster(ClusterSpecification clusterSpecification) { }
93
public ClusterClientProvider<StandaloneClusterId> deployApplicationCluster(ClusterSpecification clusterSpecification,
94
ApplicationConfiguration applicationConfiguration) { }
95
public ClusterClientProvider<StandaloneClusterId> deployJobCluster(ClusterSpecification clusterSpecification,
96
JobGraph jobGraph,
97
boolean detached) { }
98
public void killCluster(StandaloneClusterId clusterId) { }
99
public void close() { }
100
}
101
```
102
103
### AbstractContainerizedClusterClientFactory { .api }
104
105
Abstract base class for containerized cluster client factories (YARN, Kubernetes, etc.).
106
107
```java
108
public abstract class AbstractContainerizedClusterClientFactory<ClusterID> implements ClusterClientFactory<ClusterID> {
109
// Core factory methods
110
public boolean isCompatibleWith(Configuration configuration) { }
111
public ClusterDescriptor<ClusterID> createClusterDescriptor(Configuration configuration) { }
112
public ClusterID getClusterId(Configuration configuration) { }
113
public ClusterSpecification getClusterSpecification(Configuration configuration) { }
114
115
// Executor factory methods
116
protected abstract PipelineExecutorFactory getExecutorFactory();
117
protected abstract ClusterID getClusterIdFromConfiguration(Configuration configuration);
118
protected abstract ClusterDescriptor<ClusterID> createClusterDescriptor(Configuration configuration, String configurationDirectory);
119
}
120
```
121
122
## Configuration Classes
123
124
### ClusterSpecification { .api }
125
126
Configuration class defining cluster resource specifications.
127
128
```java
129
public class ClusterSpecification {
130
// Constructor
131
public ClusterSpecification(int masterMemoryMB, int taskManagerMemoryMB, int slotsPerTaskManager) { }
132
133
// Resource access methods
134
public int getMasterMemoryMB() { }
135
public int getTaskManagerMemoryMB() { }
136
public int getSlotsPerTaskManager() { }
137
138
// Utility methods
139
public String toString() { }
140
}
141
```
142
143
### StandaloneClusterId { .api }
144
145
Identifier class for standalone clusters.
146
147
```java
148
public class StandaloneClusterId {
149
// Cluster identification for standalone deployments
150
}
151
```
152
153
## Adapter Classes
154
155
### ClusterClientJobClientAdapter { .api }
156
157
Adapter that bridges cluster clients to job clients, providing job-specific operations.
158
159
```java
160
public class ClusterClientJobClientAdapter<ClusterID> implements JobClient, CoordinationRequestGateway {
161
// Constructor
162
public ClusterClientJobClientAdapter(ClusterClientProvider<ClusterID> clusterClientProvider,
163
JobID jobId,
164
ClassLoader userCodeClassloader) { }
165
166
// JobClient interface implementations
167
public JobID getJobId() { }
168
public CompletableFuture<JobStatus> getJobStatus() { }
169
public CompletableFuture<Void> cancel() { }
170
public CompletableFuture<String> stopWithSavepoint(boolean advanceToEndOfEventTime, String savepointDirectory) { }
171
public CompletableFuture<String> triggerSavepoint(String savepointDirectory) { }
172
public CompletableFuture<Map<String, Object>> getAccumulators() { }
173
public CompletableFuture<JobExecutionResult> getJobExecutionResult() { }
174
175
// CoordinationRequestGateway interface implementations
176
public CompletableFuture<CoordinationResponse> sendCoordinationRequest(OperatorID operatorId, CoordinationRequest request) { }
177
}
178
```
179
180
## Pipeline Executors
181
182
### AbstractJobClusterExecutor { .api }
183
184
Abstract base class for job cluster pipeline executors.
185
186
```java
187
public abstract class AbstractJobClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>>
188
implements PipelineExecutor {
189
// Common job cluster execution functionality
190
public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration, ClassLoader userClassloader) { }
191
}
192
```
193
194
### AbstractSessionClusterExecutor { .api }
195
196
Abstract base class for session cluster pipeline executors.
197
198
```java
199
public abstract class AbstractSessionClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>>
200
implements PipelineExecutor {
201
// Common session cluster execution functionality
202
public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration, ClassLoader userClassloader) { }
203
}
204
```
205
206
### LocalExecutor { .api }
207
208
Pipeline executor for local execution environment.
209
210
```java
211
public class LocalExecutor implements PipelineExecutor {
212
// Constants
213
public static final String NAME = "local";
214
215
// PipelineExecutor interface implementation
216
public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration, ClassLoader userClassloader) { }
217
}
218
```
219
220
### RemoteExecutor { .api }
221
222
Pipeline executor for remote cluster execution.
223
224
```java
225
public class RemoteExecutor extends AbstractSessionClusterExecutor<StandaloneClusterId, StandaloneClientFactory> {
226
// Constants
227
public static final String NAME = "remote";
228
229
// Remote execution implementation
230
}
231
```
232
233
## Executor Factories
234
235
### LocalExecutorFactory { .api }
236
237
Factory for creating local pipeline executors.
238
239
```java
240
public class LocalExecutorFactory implements PipelineExecutorFactory {
241
// PipelineExecutorFactory interface implementation
242
public String getName() { }
243
public boolean isCompatibleWith(Configuration configuration) { }
244
public PipelineExecutor getExecutor(Configuration configuration) { }
245
}
246
```
247
248
### RemoteExecutorFactory { .api }
249
250
Factory for creating remote pipeline executors.
251
252
```java
253
public class RemoteExecutorFactory implements PipelineExecutorFactory {
254
// PipelineExecutorFactory interface implementation
255
public String getName() { }
256
public boolean isCompatibleWith(Configuration configuration) { }
257
public PipelineExecutor getExecutor(Configuration configuration) { }
258
}
259
```
260
261
## Executor Utilities
262
263
### PipelineExecutorUtils { .api }
264
265
Utility class for pipeline executor operations.
266
267
```java
268
public class PipelineExecutorUtils {
269
// Static utility methods
270
public static CompletableFuture<JobClient> getJobClient(Pipeline pipeline,
271
Configuration configuration,
272
PipelineExecutor executor,
273
ClassLoader userCodeClassLoader) { }
274
}
275
```
276
277
## Exception Classes
278
279
### ClusterDeploymentException { .api }
280
281
Exception thrown during cluster deployment operations.
282
283
```java
284
public class ClusterDeploymentException extends FlinkException {
285
// Constructors
286
public ClusterDeploymentException(String message) { }
287
public ClusterDeploymentException(String message, Throwable cause) { }
288
public ClusterDeploymentException(Throwable cause) { }
289
}
290
```
291
292
### ClusterRetrieveException { .api }
293
294
Exception thrown when retrieving or connecting to existing clusters.
295
296
```java
297
public class ClusterRetrieveException extends FlinkException {
298
// Constructors
299
public ClusterRetrieveException(String message) { }
300
public ClusterRetrieveException(String message, Throwable cause) { }
301
public ClusterRetrieveException(Throwable cause) { }
302
}
303
```
304
305
## Usage Examples
306
307
### Basic Cluster Client Usage
308
309
```java
310
// Load cluster client factory
311
Configuration config = new Configuration();
312
config.setString("execution.target", "remote");
313
314
ClusterClientServiceLoader serviceLoader = new DefaultClusterClientServiceLoader();
315
ClusterClientFactory<StandaloneClusterId> factory = serviceLoader.getClusterClientFactory(config);
316
317
// Create cluster descriptor and retrieve client
318
try (ClusterDescriptor<StandaloneClusterId> descriptor = factory.createClusterDescriptor(config)) {
319
StandaloneClusterId clusterId = factory.getClusterId(config);
320
try (ClusterClient<StandaloneClusterId> client = descriptor.retrieve(clusterId).getClusterClient()) {
321
// Use cluster client for operations
322
CompletableFuture<Collection<JobStatusMessage>> jobs = client.listJobs();
323
}
324
}
325
```
326
327
### Session Cluster Deployment
328
329
```java
330
// Define cluster specification
331
ClusterSpecification spec = new ClusterSpecification(1024, 2048, 4);
332
333
// Deploy session cluster
334
Configuration config = new Configuration();
335
ClusterClientServiceLoader serviceLoader = new DefaultClusterClientServiceLoader();
336
ClusterClientFactory<StandaloneClusterId> factory = serviceLoader.getClusterClientFactory(config);
337
338
try (ClusterDescriptor<StandaloneClusterId> descriptor = factory.createClusterDescriptor(config)) {
339
ClusterClientProvider<StandaloneClusterId> provider = descriptor.deploySessionCluster(spec);
340
try (ClusterClient<StandaloneClusterId> client = provider.getClusterClient()) {
341
// Session cluster is now available for job submission
342
System.out.println("Cluster deployed at: " + client.getWebInterfaceURL());
343
}
344
}
345
```
346
347
### Application Cluster Deployment
348
349
```java
350
// Application configuration
351
ApplicationConfiguration appConfig = new ApplicationConfiguration(
352
new String[]{"--input", "/data/input", "--output", "/data/output"},
353
"com.example.MyFlinkApp"
354
);
355
356
// Cluster specification
357
ClusterSpecification spec = new ClusterSpecification(1024, 2048, 4);
358
359
// Deploy application cluster
360
try (ClusterDescriptor<StandaloneClusterId> descriptor = factory.createClusterDescriptor(config)) {
361
ClusterClientProvider<StandaloneClusterId> provider =
362
descriptor.deployApplicationCluster(spec, appConfig);
363
364
try (ClusterClient<StandaloneClusterId> client = provider.getClusterClient()) {
365
// Application cluster is deployed and running
366
CompletableFuture<JobResult> result = client.requestJobResult(jobId);
367
}
368
}
369
```
370
371
### Pipeline Executor Usage
372
373
```java
374
// Get pipeline executor
375
Configuration config = new Configuration();
376
config.setString("execution.target", "local");
377
378
PipelineExecutorServiceLoader executorLoader =
379
PipelineExecutorServiceLoader.fromConfiguration(config);
380
PipelineExecutor executor = executorLoader.getExecutor(config);
381
382
// Execute pipeline
383
Pipeline pipeline = /* your Flink program pipeline */;
384
CompletableFuture<JobClient> jobClientFuture =
385
PipelineExecutorUtils.getJobClient(pipeline, config, executor, userClassLoader);
386
387
JobClient jobClient = jobClientFuture.get();
388
```
389
390
## Application Deployment Classes
391
392
### ApplicationConfiguration { .api }
393
394
Configuration class for application cluster deployments containing program arguments and entry point information.
395
396
```java
397
public class ApplicationConfiguration {
398
// Constructors
399
public ApplicationConfiguration(String[] programArguments, String entryPointClassName) { }
400
public ApplicationConfiguration(String[] programArguments,
401
String entryPointClassName,
402
SavepointRestoreSettings savepointRestoreSettings) { }
403
404
// Property access methods
405
public String[] getProgramArguments() { }
406
public String getEntryPointClassName() { }
407
public SavepointRestoreSettings getSavepointRestoreSettings() { }
408
409
// Configuration validation
410
public void validate() { }
411
public boolean hasValidEntryPoint() { }
412
}
413
```
414
415
### ApplicationClusterEntryPoint { .api }
416
417
Entry point class for application cluster mode that manages the cluster lifecycle and application execution.
418
419
```java
420
public class ApplicationClusterEntryPoint extends ClusterEntrypoint {
421
// Main entry point
422
public static void main(String[] args) { }
423
424
// Cluster initialization methods
425
protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore(Configuration configuration,
426
ScheduledExecutor scheduledExecutor) { }
427
428
protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) { }
429
430
// Application-specific setup
431
protected void initializeServices(Configuration configuration) { }
432
protected ApplicationRunner createApplicationRunner() { }
433
}
434
```
435
436
### ApplicationRunner { .api }
437
438
Interface for running applications within application clusters.
439
440
```java
441
public interface ApplicationRunner {
442
// Application execution
443
CompletableFuture<Void> run(DispatcherGateway dispatcherGateway, ScheduledExecutor scheduledExecutor);
444
445
// Lifecycle management
446
void cancel();
447
boolean isCancelled();
448
}
449
```
450
451
### DetachedApplicationRunner { .api }
452
453
Implementation of ApplicationRunner for detached application execution.
454
455
```java
456
public class DetachedApplicationRunner implements ApplicationRunner {
457
// Constructor
458
public DetachedApplicationRunner(boolean enforceSingleJobExecution,
459
PackagedProgram packagedProgram,
460
Configuration configuration) { }
461
462
// ApplicationRunner interface implementation
463
public CompletableFuture<Void> run(DispatcherGateway dispatcherGateway, ScheduledExecutor scheduledExecutor) { }
464
public void cancel() { }
465
public boolean isCancelled() { }
466
467
// Program execution
468
private CompletableFuture<Void> runApplicationEntryPoint(DispatcherGateway dispatcherGateway,
469
ScheduledExecutor scheduledExecutor) { }
470
}
471
```
472
473
### EmbeddedJobClient { .api }
474
475
Job client implementation for embedded execution within application clusters.
476
477
```java
478
public class EmbeddedJobClient implements JobClient, CoordinationRequestGateway {
479
// Constructor
480
public EmbeddedJobClient(JobID jobId,
481
DispatcherGateway dispatcherGateway,
482
ClassLoader userClassloader,
483
ScheduledExecutor scheduledExecutorService) { }
484
485
// JobClient interface implementation
486
public JobID getJobId() { }
487
public CompletableFuture<JobStatus> getJobStatus() { }
488
public CompletableFuture<Void> cancel() { }
489
public CompletableFuture<String> stopWithSavepoint(boolean advanceToEndOfEventTime, String savepointDirectory) { }
490
public CompletableFuture<String> triggerSavepoint(String savepointDirectory) { }
491
public CompletableFuture<Map<String, Object>> getAccumulators() { }
492
public CompletableFuture<JobExecutionResult> getJobExecutionResult() { }
493
494
// CoordinationRequestGateway interface implementation
495
public CompletableFuture<CoordinationResponse> sendCoordinationRequest(OperatorID operatorId,
496
CoordinationRequest request) { }
497
}
498
```
499
500
### WebSubmissionJobClient { .api }
501
502
Job client implementation for web-based job submissions.
503
504
```java
505
public class WebSubmissionJobClient implements JobClient {
506
// Constructor
507
public WebSubmissionJobClient(JobID jobId,
508
String restAddress,
509
int restPort,
510
Configuration configuration) { }
511
512
// JobClient interface implementation
513
public JobID getJobId() { }
514
public CompletableFuture<JobStatus> getJobStatus() { }
515
public CompletableFuture<Void> cancel() { }
516
public CompletableFuture<String> stopWithSavepoint(boolean advanceToEndOfEventTime, String savepointDirectory) { }
517
public CompletableFuture<String> triggerSavepoint(String savepointDirectory) { }
518
public CompletableFuture<Map<String, Object>> getAccumulators() { }
519
public CompletableFuture<JobExecutionResult> getJobExecutionResult() { }
520
521
// Web-specific operations
522
public CompletableFuture<String> getWebInterfaceURL() { }
523
public void close() { }
524
}
525
```
526
527
## Required Imports
528
529
```java
530
import org.apache.flink.client.deployment.ClusterClientFactory;
531
import org.apache.flink.client.deployment.ClusterDescriptor;
532
import org.apache.flink.client.deployment.ClusterClientServiceLoader;
533
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
534
import org.apache.flink.client.deployment.StandaloneClientFactory;
535
import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
536
import org.apache.flink.client.deployment.AbstractContainerizedClusterClientFactory;
537
import org.apache.flink.client.deployment.ClusterSpecification;
538
import org.apache.flink.client.deployment.StandaloneClusterId;
539
import org.apache.flink.client.deployment.ClusterClientJobClientAdapter;
540
import org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor;
541
import org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor;
542
import org.apache.flink.client.deployment.executors.LocalExecutor;
543
import org.apache.flink.client.deployment.executors.RemoteExecutor;
544
import org.apache.flink.client.deployment.executors.LocalExecutorFactory;
545
import org.apache.flink.client.deployment.executors.RemoteExecutorFactory;
546
import org.apache.flink.client.deployment.executors.PipelineExecutorUtils;
547
import org.apache.flink.client.deployment.ClusterDeploymentException;
548
import org.apache.flink.client.deployment.ClusterRetrieveException;
549
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
550
import org.apache.flink.client.deployment.application.ApplicationClusterEntryPoint;
551
import org.apache.flink.client.deployment.application.ApplicationRunner;
552
import org.apache.flink.client.deployment.application.DetachedApplicationRunner;
553
import org.apache.flink.client.deployment.application.EmbeddedJobClient;
554
import org.apache.flink.client.deployment.application.WebSubmissionJobClient;
555
import org.apache.flink.runtime.clusterframework.ClusterEntrypoint;
556
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
557
import org.apache.flink.runtime.dispatcher.DispatcherResourceManagerComponentFactory;
558
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraphStore;
559
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
560
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
561
import org.apache.flink.runtime.operators.coordination.CoordinationRequestGateway;
562
import org.apache.flink.runtime.state.SavepointRestoreSettings;
563
import org.apache.flink.core.execution.PipelineExecutorFactory;
564
import org.apache.flink.client.program.ClusterClientProvider;
565
import org.apache.flink.core.execution.PipelineExecutor;
566
import org.apache.flink.core.execution.PipelineExecutorFactory;
567
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
568
import org.apache.flink.api.dag.Pipeline;
569
import org.apache.flink.api.common.JobID;
570
import org.apache.flink.runtime.jobgraph.JobGraph;
571
import org.apache.flink.runtime.jobmaster.JobResult;
572
import org.apache.flink.core.execution.JobClient;
573
import org.apache.flink.configuration.Configuration;
574
import org.apache.flink.util.FlinkException;
575
import java.util.concurrent.CompletableFuture;
576
import java.util.concurrent.ScheduledExecutor;
577
import java.util.Collection;
578
import java.util.Map;
579
```