0
# Cluster Client Management
1
2
Core interface for programmatic cluster interaction, supporting job submission, status monitoring, and cluster lifecycle management across different deployment targets including standalone, REST-based, and mini clusters.
3
4
## Capabilities
5
6
### Cluster Client Interface
7
8
Main interface for communicating with Flink clusters, providing comprehensive job and cluster management capabilities.
9
10
```java { .api }
11
/**
12
* Interface for cluster clients that communicate with Flink clusters
13
* @param <T> Type of cluster identifier
14
*/
15
public interface ClusterClient<T> extends AutoCloseable {
16
/**
17
* Close the cluster client and release resources
18
*/
19
@Override
20
void close();
21
22
/**
23
* Get the cluster identifier
24
* @return Cluster ID identifying the connected cluster
25
*/
26
T getClusterId();
27
28
/**
29
* Get the Flink configuration used by this client
30
* @return Flink configuration object
31
*/
32
Configuration getFlinkConfiguration();
33
34
/**
35
* Shut down the cluster that this client communicates with
36
*/
37
void shutDownCluster();
38
39
/**
40
* Get URL to the cluster web interface
41
* @return Web interface URL as string
42
*/
43
String getWebInterfaceURL();
44
45
/**
46
* List all jobs on the cluster (running and finished)
47
* @return Future collection of job status messages
48
* @throws Exception if connection fails
49
*/
50
CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception;
51
52
/**
53
* Submit an execution plan to the cluster
54
* @param executionPlan Execution plan to submit
55
* @return Future with assigned job ID
56
*/
57
CompletableFuture<JobID> submitJob(ExecutionPlan executionPlan);
58
59
/**
60
* Get status of a specific job
61
* @param jobId ID of the job to query
62
* @return Future with job status
63
*/
64
CompletableFuture<JobStatus> getJobStatus(JobID jobId);
65
66
/**
67
* Cancel a running job
68
* @param jobId ID of the job to cancel
69
* @return Future with acknowledgment
70
*/
71
CompletableFuture<Acknowledge> cancel(JobID jobId);
72
73
/**
74
* Stop a job with optional savepoint
75
* @param jobId ID of the job to stop
76
* @param advanceToEndOfEventTime Whether to advance to end of event time
77
* @param savepointDir Optional savepoint directory
78
* @param formatType Optional savepoint format type
79
* @return Future with savepoint path
80
*/
81
CompletableFuture<String> stopWithSavepoint(
82
JobID jobId,
83
boolean advanceToEndOfEventTime,
84
@Nullable String savepointDir,
85
SavepointFormatType formatType
86
);
87
88
/**
89
* Trigger a savepoint for a running job
90
* @param jobId ID of the job
91
* @param savepointDir Target directory for savepoint
92
* @param formatType Savepoint format type
93
* @return Future with savepoint path
94
*/
95
CompletableFuture<String> triggerSavepoint(
96
JobID jobId,
97
@Nullable String savepointDir,
98
SavepointFormatType formatType
99
);
100
101
/**
102
* Dispose a savepoint
103
* @param savepointPath Path to the savepoint to dispose
104
* @return Future with acknowledgment
105
* @throws FlinkException if disposal fails
106
*/
107
CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) throws FlinkException;
108
109
/**
110
* Cancel a job and trigger a savepoint
111
* @param jobId ID of the job to cancel
112
* @param savepointDirectory Directory for the savepoint
113
* @param formatType Savepoint format type
114
* @return Future with savepoint path
115
*/
116
CompletableFuture<String> cancelWithSavepoint(
117
JobID jobId,
118
@Nullable String savepointDirectory,
119
SavepointFormatType formatType
120
);
121
122
/**
123
* Trigger a detached savepoint (returns quickly with trigger ID)
124
* @param jobId ID of the job
125
* @param savepointDirectory Target directory for savepoint
126
* @param formatType Savepoint format type
127
* @return Future with savepoint trigger ID
128
*/
129
CompletableFuture<String> triggerDetachedSavepoint(
130
JobID jobId,
131
@Nullable String savepointDirectory,
132
SavepointFormatType formatType
133
);
134
135
/**
136
* Trigger a checkpoint for a job
137
* @param jobId ID of the job
138
* @param checkpointType Type of checkpoint (configured/full/incremental)
139
* @return Future with checkpoint ID
140
*/
141
CompletableFuture<Long> triggerCheckpoint(JobID jobId, CheckpointType checkpointType);
142
143
/**
144
* Get accumulators for a job
145
* @param jobID Job identifier
146
* @param loader Class loader for deserializing results
147
* @return Future with accumulator map
148
*/
149
CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader);
150
}
151
```
152
153
**Usage Examples:**
154
155
```java
156
import org.apache.flink.client.program.ClusterClient;
157
import org.apache.flink.client.program.rest.RestClusterClient;
158
import org.apache.flink.client.program.rest.RestClusterClientConfiguration;
159
160
// Create and use a REST cluster client
161
Configuration config = new Configuration();
162
config.setString(RestOptions.ADDRESS, "localhost");
163
config.setInteger(RestOptions.PORT, 8081);
164
165
RestClusterClientConfiguration clientConfig =
166
RestClusterClientConfiguration.fromConfiguration(config);
167
168
try (ClusterClient<?> client = new RestClusterClient<>(clientConfig, "default")) {
169
// List all jobs
170
Collection<JobStatusMessage> jobs = client.listJobs().get();
171
System.out.println("Found " + jobs.size() + " jobs");
172
173
// Get job status
174
JobID jobId = JobID.fromHexString("a1b2c3d4e5f6");
175
JobStatus status = client.getJobStatus(jobId).get();
176
System.out.println("Job status: " + status);
177
178
// Trigger savepoint
179
String savepointPath = client.triggerSavepoint(
180
jobId,
181
"/path/to/savepoints",
182
SavepointFormatType.CANONICAL
183
).get();
184
System.out.println("Savepoint created: " + savepointPath);
185
}
186
```
187
188
### REST Cluster Client
189
190
REST-based implementation of cluster client for communication with Flink clusters via REST API.
191
192
```java { .api }
193
/**
194
* REST-based cluster client implementation
195
*/
196
public class RestClusterClient<T> implements ClusterClient<T> {
197
/**
198
* Create REST cluster client with configuration and cluster ID
199
* @param configuration REST client configuration
200
* @param clusterId Cluster identifier
201
*/
202
public RestClusterClient(
203
RestClusterClientConfiguration configuration,
204
T clusterId
205
);
206
207
/**
208
* Create REST cluster client with configuration, cluster ID, and thread pool size
209
* @param configuration REST client configuration
210
* @param clusterId Cluster identifier
211
* @param maxRetryAttempts Maximum retry attempts for requests
212
*/
213
public RestClusterClient(
214
RestClusterClientConfiguration configuration,
215
T clusterId,
216
int maxRetryAttempts
217
);
218
}
219
220
/**
221
* Configuration for REST cluster clients
222
*/
223
public class RestClusterClientConfiguration {
224
/**
225
* Create configuration from Flink configuration
226
* @param config Flink configuration
227
* @return REST client configuration
228
*/
229
public static RestClusterClientConfiguration fromConfiguration(Configuration config);
230
231
/**
232
* Get REST server endpoint
233
* @return REST endpoint configuration
234
*/
235
public RestServerEndpointConfiguration getRestServerEndpointConfiguration();
236
237
/**
238
* Get executor service for async operations
239
* @return Scheduled executor service
240
*/
241
public ScheduledExecutorService getExecutorService();
242
}
243
```
244
245
### Mini Cluster Client
246
247
Cluster client implementation for mini clusters used in testing and local development.
248
249
```java { .api }
250
/**
251
* Cluster client for mini clusters
252
*/
253
public class MiniClusterClient implements ClusterClient<MiniClusterClient.MiniClusterId> {
254
/**
255
* Create mini cluster client
256
* @param configuration Flink configuration
257
* @param miniCluster Mini cluster instance
258
*/
259
public MiniClusterClient(Configuration configuration, MiniCluster miniCluster);
260
261
/**
262
* Cluster ID type for mini clusters
263
*/
264
public static class MiniClusterId {
265
public MiniClusterId();
266
}
267
}
268
```
269
270
### Cluster Client Provider
271
272
Provider interface for cluster clients, enabling lazy creation and resource management.
273
274
```java { .api }
275
/**
276
* Provider interface for cluster clients
277
* @param <T> Type of cluster identifier
278
*/
279
public interface ClusterClientProvider<T> extends AutoCloseable {
280
/**
281
* Get the cluster client instance
282
* @return Cluster client
283
*/
284
ClusterClient<T> getClusterClient();
285
286
/**
287
* Get the cluster ID
288
* @return Cluster identifier
289
*/
290
T getClusterId();
291
292
/**
293
* Close the provider and release resources
294
*/
295
@Override
296
void close() throws Exception;
297
}
298
```
299
300
### Cluster Client Job Client Adapter
301
302
Adapter that bridges between cluster client and job client interfaces for unified job management.
303
304
```java { .api }
305
/**
306
* Adapter between cluster client and job client interfaces
307
* @param <T> Type of cluster identifier
308
*/
309
public class ClusterClientJobClientAdapter<T> implements JobClient {
310
/**
311
* Create adapter with cluster client and job ID
312
* @param clusterClient Cluster client instance
313
* @param jobID Job identifier
314
* @param userCodeClassLoader User code class loader
315
*/
316
public ClusterClientJobClientAdapter(
317
ClusterClientProvider<T> clusterClient,
318
JobID jobID,
319
ClassLoader userCodeClassLoader
320
);
321
322
@Override
323
public JobID getJobID();
324
325
@Override
326
public CompletableFuture<JobStatus> getJobStatus();
327
328
@Override
329
public CompletableFuture<Void> cancel();
330
331
@Override
332
public CompletableFuture<String> stopWithSavepoint(
333
boolean advanceToEndOfEventTime,
334
@Nullable String savepointDir,
335
SavepointFormatType formatType
336
);
337
338
@Override
339
public CompletableFuture<String> triggerSavepoint(
340
@Nullable String savepointDir,
341
SavepointFormatType formatType
342
);
343
344
@Override
345
public CompletableFuture<JobExecutionResult> getJobExecutionResult();
346
}
347
```
348
349
### Retry Strategies
350
351
Configurable retry mechanisms for REST operations with exponential backoff support.
352
353
```java { .api }
354
/**
355
* Interface for wait strategies in retry logic
356
*/
357
public interface WaitStrategy {
358
/**
359
* Calculate wait time for given attempt
360
* @param attempt Current attempt number (starting from 1)
361
* @return Wait duration
362
*/
363
Duration calculateWaitTime(int attempt);
364
}
365
366
/**
367
* Exponential backoff wait strategy
368
*/
369
public class ExponentialWaitStrategy implements WaitStrategy {
370
/**
371
* Create exponential wait strategy
372
* @param initialWait Initial wait duration
373
* @param maxWait Maximum wait duration
374
* @param multiplier Backoff multiplier
375
*/
376
public ExponentialWaitStrategy(
377
Duration initialWait,
378
Duration maxWait,
379
double multiplier
380
);
381
382
@Override
383
public Duration calculateWaitTime(int attempt);
384
}
385
```
386
387
## Types
388
389
```java { .api }
390
/**
391
* Job status message containing job information
392
*/
393
public class JobStatusMessage {
394
/**
395
* Get job ID
396
* @return Job identifier
397
*/
398
public JobID getJobId();
399
400
/**
401
* Get job name
402
* @return Job name
403
*/
404
public String getJobName();
405
406
/**
407
* Get current job state
408
* @return Job status enum
409
*/
410
public JobStatus getJobState();
411
412
/**
413
* Get job start time
414
* @return Start time as timestamp
415
*/
416
public long getStartTime();
417
}
418
419
/**
420
* Job execution result containing final job information
421
*/
422
public class JobExecutionResult {
423
public JobID getJobID();
424
public long getNetRuntime();
425
public Map<String, Object> getAllAccumulatorResults();
426
}
427
```
428
429
## Error Handling
430
431
Cluster client operations handle various error conditions:
432
433
- **Connection Errors**: Network failures, cluster unavailability
434
- **Authentication Errors**: Security configuration issues
435
- **Job Errors**: Invalid job submissions, job not found
436
- **Resource Errors**: Insufficient cluster resources
437
- **Timeout Errors**: Long-running operations exceeding timeouts
438
439
All asynchronous operations return `CompletableFuture` instances that can be composed and handle exceptions appropriately using `handle()`, `exceptionally()`, or `whenComplete()` methods.