0
# Deployment Management
1
2
Cluster deployment abstraction supporting multiple deployment targets including standalone, YARN, Kubernetes, and other containerized environments with pluggable factory pattern and resource specification.
3
4
## Capabilities
5
6
### Cluster Descriptor Interface
7
8
Core interface for cluster descriptors that manage cluster lifecycle including deployment, retrieval, and termination.
9
10
```java { .api }
11
/**
12
* Interface for cluster descriptors that manage cluster lifecycle
13
* @param <T> Type of cluster identifier
14
*/
15
public interface ClusterDescriptor<T> extends AutoCloseable {
16
/**
17
* Retrieve an existing cluster by ID
18
* @param clusterId Cluster identifier
19
* @return Cluster client provider for the existing cluster
20
* @throws ClusterRetrieveException if cluster cannot be retrieved
21
*/
22
ClusterClientProvider<T> retrieve(T clusterId) throws ClusterRetrieveException;
23
24
/**
25
* Deploy a new session cluster
26
* @param clusterSpecification Resource specification for the cluster
27
* @return Cluster client provider for the new cluster
28
* @throws ClusterDeploymentException if deployment fails
29
*/
30
ClusterClientProvider<T> deploySessionCluster(ClusterSpecification clusterSpecification)
31
throws ClusterDeploymentException;
32
33
/**
34
* Deploy an application cluster (application mode)
35
* @param clusterSpecification Resource specification for the cluster
36
* @param applicationConfiguration Application configuration
37
* @return Cluster client provider for the application cluster
38
* @throws ClusterDeploymentException if deployment fails
39
*/
40
default ClusterClientProvider<T> deployApplicationCluster(
41
ClusterSpecification clusterSpecification,
42
ApplicationConfiguration applicationConfiguration
43
) throws ClusterDeploymentException {
44
throw new UnsupportedOperationException(
45
"Application mode is not supported by this cluster descriptor."
46
);
47
}
48
49
/**
50
* Terminate the given cluster
51
* @param clusterId Cluster identifier
52
* @throws FlinkException if termination fails
53
*/
54
void terminateCluster(T clusterId) throws FlinkException;
55
56
/**
57
* Close the cluster descriptor and release resources
58
*/
59
@Override
60
void close() throws Exception;
61
}
62
```
63
64
**Usage Examples:**
65
66
```java
67
import org.apache.flink.client.deployment.ClusterDescriptor;
68
import org.apache.flink.client.deployment.ClusterSpecification;
69
import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
70
import org.apache.flink.client.deployment.StandaloneClusterId;
71
72
// Create cluster specification
73
ClusterSpecification spec = new ClusterSpecification.Builder()
74
.setMasterMemoryMB(1024)
75
.setTaskManagerMemoryMB(2048)
76
.setNumberTaskManagers(2)
77
.createClusterSpecification();
78
79
// Deploy session cluster
80
try (ClusterDescriptor<StandaloneClusterId> descriptor =
81
new StandaloneClusterDescriptor(config, highAvailabilityServices, rpcService)) {
82
83
ClusterClientProvider<StandaloneClusterId> provider =
84
descriptor.deploySessionCluster(spec);
85
86
try (ClusterClient<StandaloneClusterId> client = provider.getClusterClient()) {
87
System.out.println("Cluster deployed: " + client.getClusterId());
88
System.out.println("Web UI: " + client.getWebInterfaceURL());
89
90
// Use cluster...
91
92
} finally {
93
provider.close();
94
}
95
}
96
```
97
98
### Cluster Client Factory
99
100
Factory interface for creating cluster clients with automatic deployment target detection based on configuration.
101
102
```java { .api }
103
/**
104
* Factory for creating cluster clients
105
* @param <T> Type of cluster identifier
106
*/
107
public interface ClusterClientFactory<T> {
108
/**
109
* Check if this factory is compatible with the given configuration
110
* @param configuration Flink configuration
111
* @return true if compatible
112
*/
113
boolean isCompatibleWith(Configuration configuration);
114
115
/**
116
* Create cluster descriptor from configuration
117
* @param configuration Flink configuration
118
* @return Cluster descriptor instance
119
*/
120
ClusterDescriptor<T> createClusterDescriptor(Configuration configuration);
121
122
/**
123
* Get cluster ID from configuration
124
* @param configuration Flink configuration
125
* @return Cluster identifier or null if not specified
126
*/
127
@Nullable
128
T getClusterId(Configuration configuration);
129
130
/**
131
* Get cluster specification from configuration
132
* @param configuration Flink configuration
133
* @return Cluster specification
134
*/
135
ClusterSpecification getClusterSpecification(Configuration configuration);
136
}
137
```
138
139
### Cluster Client Service Loader
140
141
Service loader for discovering and loading cluster client factories dynamically.
142
143
```java { .api }
144
/**
145
* Service loader for cluster client factories
146
*/
147
public interface ClusterClientServiceLoader {
148
/**
149
* Get cluster client factory compatible with configuration
150
* @param configuration Flink configuration
151
* @return Compatible cluster client factory
152
* @throws UnsupportedOperationException if no compatible factory found
153
*/
154
<T> ClusterClientFactory<T> getClusterClientFactory(Configuration configuration);
155
}
156
157
/**
158
* Default implementation of cluster client service loader
159
*/
160
public class DefaultClusterClientServiceLoader implements ClusterClientServiceLoader {
161
/**
162
* Create service loader instance
163
*/
164
public DefaultClusterClientServiceLoader();
165
166
@Override
167
public <T> ClusterClientFactory<T> getClusterClientFactory(Configuration configuration);
168
}
169
```
170
171
### Standalone Deployment
172
173
Implementations for standalone cluster deployment and management.
174
175
```java { .api }
176
/**
177
* Cluster descriptor for standalone clusters
178
*/
179
public class StandaloneClusterDescriptor implements ClusterDescriptor<StandaloneClusterId> {
180
/**
181
* Create standalone cluster descriptor
182
* @param flinkConfig Flink configuration
183
* @param haServices High availability services
184
* @param rpcService RPC service
185
*/
186
public StandaloneClusterDescriptor(
187
Configuration flinkConfig,
188
HighAvailabilityServices haServices,
189
RpcService rpcService
190
);
191
192
@Override
193
public ClusterClientProvider<StandaloneClusterId> retrieve(StandaloneClusterId clusterId)
194
throws ClusterRetrieveException;
195
196
@Override
197
public ClusterClientProvider<StandaloneClusterId> deploySessionCluster(
198
ClusterSpecification clusterSpecification
199
) throws ClusterDeploymentException;
200
}
201
202
/**
203
* Factory for standalone cluster clients
204
*/
205
public class StandaloneClientFactory implements ClusterClientFactory<StandaloneClusterId> {
206
@Override
207
public boolean isCompatibleWith(Configuration configuration);
208
209
@Override
210
public ClusterDescriptor<StandaloneClusterId> createClusterDescriptor(
211
Configuration configuration
212
);
213
214
@Override
215
public StandaloneClusterId getClusterId(Configuration configuration);
216
}
217
218
/**
219
* Cluster ID for standalone clusters
220
*/
221
public class StandaloneClusterId {
222
/**
223
* Create standalone cluster ID
224
*/
225
public StandaloneClusterId();
226
227
@Override
228
public String toString();
229
230
@Override
231
public boolean equals(Object obj);
232
233
@Override
234
public int hashCode();
235
}
236
```
237
238
### Abstract Containerized Factory
239
240
Base class for containerized cluster client factories providing common functionality for Docker, Kubernetes, and other container-based deployments.
241
242
```java { .api }
243
/**
244
* Base factory for containerized cluster clients
245
* @param <ClusterID> Type of cluster identifier
246
* @param <ApplicationClusterID> Type of application cluster identifier
247
*/
248
public abstract class AbstractContainerizedClusterClientFactory<
249
ClusterID, ApplicationClusterID> implements ClusterClientFactory<ClusterID> {
250
251
/**
252
* Get deployment target for this factory
253
* @return Deployment target string
254
*/
255
protected abstract String getDeploymentTargetName();
256
257
/**
258
* Check if cluster ID is compatible
259
* @param clusterId Cluster ID to check
260
* @return true if compatible
261
*/
262
protected abstract boolean isCompatibleWith(ClusterID clusterId);
263
264
@Override
265
public boolean isCompatibleWith(Configuration configuration);
266
267
@Override
268
public ClusterSpecification getClusterSpecification(Configuration configuration);
269
}
270
```
271
272
### Cluster Specification
273
274
Resource specification for cluster deployment including memory, CPU, and scaling parameters.
275
276
```java { .api }
277
/**
278
* Specification for cluster resource requirements
279
*/
280
public class ClusterSpecification {
281
/**
282
* Get master memory in MB
283
* @return Master memory size
284
*/
285
public int getMasterMemoryMB();
286
287
/**
288
* Get task manager memory in MB
289
* @return Task manager memory size
290
*/
291
public int getTaskManagerMemoryMB();
292
293
/**
294
* Get number of task managers
295
* @return Number of task managers
296
*/
297
public int getNumberTaskManagers();
298
299
/**
300
* Get slots per task manager
301
* @return Number of slots per task manager
302
*/
303
public int getSlotsPerTaskManager();
304
305
/**
306
* Builder for creating cluster specifications
307
*/
308
public static class Builder {
309
/**
310
* Set master memory size
311
* @param masterMemoryMB Memory in MB
312
* @return Builder instance
313
*/
314
public Builder setMasterMemoryMB(int masterMemoryMB);
315
316
/**
317
* Set task manager memory size
318
* @param taskManagerMemoryMB Memory in MB
319
* @return Builder instance
320
*/
321
public Builder setTaskManagerMemoryMB(int taskManagerMemoryMB);
322
323
/**
324
* Set number of task managers
325
* @param numberTaskManagers Number of task managers
326
* @return Builder instance
327
*/
328
public Builder setNumberTaskManagers(int numberTaskManagers);
329
330
/**
331
* Set slots per task manager
332
* @param slotsPerTaskManager Number of slots
333
* @return Builder instance
334
*/
335
public Builder setSlotsPerTaskManager(int slotsPerTaskManager);
336
337
/**
338
* Create cluster specification
339
* @return Configured cluster specification
340
*/
341
public ClusterSpecification createClusterSpecification();
342
}
343
}
344
```
345
346
## Types
347
348
```java { .api }
349
/**
350
* Provider for cluster clients with resource management
351
* @param <T> Type of cluster identifier
352
*/
353
public interface ClusterClientProvider<T> extends AutoCloseable {
354
/**
355
* Get cluster client instance
356
* @return Cluster client
357
*/
358
ClusterClient<T> getClusterClient();
359
360
/**
361
* Get cluster identifier
362
* @return Cluster ID
363
*/
364
T getClusterId();
365
366
/**
367
* Check if cluster is in per-job mode
368
* @return true if per-job mode
369
*/
370
default boolean isPerJobMode() {
371
return false;
372
}
373
374
@Override
375
void close() throws Exception;
376
}
377
```
378
379
## Exception Handling
380
381
Deployment operations handle various error conditions:
382
383
- **Deployment Errors**: `ClusterDeploymentException` for failed cluster deployments
384
- **Retrieval Errors**: `ClusterRetrieveException` for failed cluster retrieval
385
- **Configuration Errors**: Invalid cluster specifications or missing configuration
386
- **Resource Errors**: Insufficient resources for cluster deployment
387
- **Network Errors**: Communication failures with cluster management systems
388
389
**Error Handling Examples:**
390
391
```java
392
try {
393
ClusterClientProvider<StandaloneClusterId> provider =
394
descriptor.deploySessionCluster(spec);
395
396
// Use cluster...
397
398
} catch (ClusterDeploymentException e) {
399
System.err.println("Failed to deploy cluster: " + e.getMessage());
400
// Handle deployment failure
401
} catch (ClusterRetrieveException e) {
402
System.err.println("Failed to retrieve cluster: " + e.getMessage());
403
// Handle retrieval failure
404
} finally {
405
descriptor.close();
406
}
407
```
408
409
The deployment management system provides a pluggable architecture that allows Flink to support multiple deployment targets through a consistent interface, enabling seamless switching between different cluster types based on configuration.