0
# REST Client Communication
1
2
The Apache Flink REST Client Communication module (`org.apache.flink.client.program.rest.*`) provides REST-based cluster communication capabilities with comprehensive retry logic, SSL support, and configuration management for remote cluster interaction. This module enables reliable communication with Flink clusters over HTTP/HTTPS protocols.
3
4
## Core REST Client Classes
5
6
### RestClusterClient { .api }
7
8
REST-based implementation of the ClusterClient interface for communicating with remote Flink clusters.
9
10
```java
11
public class RestClusterClient<T> implements ClusterClient<T> {
12
// Constructors
13
public RestClusterClient(Configuration configuration,
14
RestClusterClientConfiguration restClusterClientConfiguration,
15
T clusterId) { }
16
17
public RestClusterClient(Configuration configuration,
18
RestClusterClientConfiguration restClusterClientConfiguration,
19
T clusterId,
20
WaitStrategy waitStrategy) { }
21
22
// ClusterClient interface implementation
23
public T getClusterId() { }
24
public Configuration getFlinkConfiguration() { }
25
public String getWebInterfaceURL() { }
26
27
// Job management via REST
28
public CompletableFuture<Collection<JobStatusMessage>> listJobs() { }
29
public CompletableFuture<JobID> submitJob(JobGraph jobGraph) { }
30
public CompletableFuture<JobStatus> getJobStatus(JobID jobId) { }
31
public CompletableFuture<JobResult> requestJobResult(JobID jobId) { }
32
33
// Job control operations
34
public CompletableFuture<Acknowledge> cancel(JobID jobId) { }
35
public CompletableFuture<String> cancelWithSavepoint(JobID jobId, String savepointDirectory) { }
36
public CompletableFuture<String> stopWithSavepoint(JobID jobId,
37
boolean advanceToEndOfEventTime,
38
String savepointDirectory) { }
39
40
// Savepoint operations
41
public CompletableFuture<String> triggerSavepoint(JobID jobId, String savepointDirectory) { }
42
public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) { }
43
44
// Metrics and coordination
45
public CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader) { }
46
public CompletableFuture<CoordinationResponse> sendCoordinationRequest(JobID jobId,
47
OperatorID operatorId,
48
CoordinationRequest request) { }
49
50
// Cluster management
51
public void shutDownCluster() { }
52
public void close() { }
53
}
54
```
55
56
### RestClusterClientConfiguration { .api }
57
58
Configuration class for REST cluster client settings including timeouts, retries, and SSL parameters.
59
60
```java
61
public class RestClusterClientConfiguration {
62
// Factory method
63
public static RestClusterClientConfiguration fromConfiguration(Configuration config) { }
64
65
// Connection configuration
66
public long getConnectionTimeout() { }
67
public long getIdlenessTimeout() { }
68
public AwaitingTime getAwaitLeaderTimeout() { }
69
70
// Retry configuration
71
public int getMaxRetryAttempts() { }
72
public long getRetryDelay() { }
73
74
// SSL configuration
75
public SSLHandlerFactory getSslHandlerFactory() { }
76
public String[] getTrustStore() { }
77
public String getTrustStorePassword() { }
78
public String[] getKeyStore() { }
79
public String getKeyStorePassword() { }
80
public String getSSLProtocol() { }
81
public String[] getSSLAlgorithms() { }
82
}
83
```
84
85
## Retry Strategy Components
86
87
### WaitStrategy { .api }
88
89
Interface for defining wait strategies in retry mechanisms.
90
91
```java
92
public interface WaitStrategy {
93
// Calculate sleep time for a given attempt
94
long sleepTime(long attempt);
95
}
96
```
97
98
### ExponentialWaitStrategy { .api }
99
100
Exponential backoff implementation of wait strategy for retry operations.
101
102
```java
103
public class ExponentialWaitStrategy implements WaitStrategy {
104
// Constructor
105
public ExponentialWaitStrategy(long initialWait, long maxWait) { }
106
107
// WaitStrategy implementation
108
public long sleepTime(long attempt) { }
109
}
110
```
111
112
## Configuration Integration
113
114
The REST client integrates with Flink's configuration system through several key configuration options:
115
116
### Connection Configuration
117
- `rest.address`: REST endpoint hostname
118
- `rest.port`: REST endpoint port
119
- `rest.connection-timeout`: Connection timeout duration
120
- `rest.idleness-timeout`: Idle connection timeout
121
122
### SSL Configuration
123
- `security.ssl.rest.enabled`: Enable SSL for REST connections
124
- `security.ssl.rest.keystore`: Path to SSL keystore
125
- `security.ssl.rest.keystore-password`: Keystore password
126
- `security.ssl.rest.truststore`: Path to SSL truststore
127
- `security.ssl.rest.truststore-password`: Truststore password
128
129
### Retry Configuration
130
- `rest.retry.max-attempts`: Maximum retry attempts
131
- `rest.retry.delay`: Base retry delay
132
133
## Usage Examples
134
135
### Basic REST Client Usage
136
137
```java
138
// Configure REST connection
139
Configuration config = new Configuration();
140
config.setString("rest.address", "flink-cluster.example.com");
141
config.setInteger("rest.port", 8081);
142
config.setDuration("rest.connection-timeout", Duration.ofSeconds(30));
143
config.setInteger("rest.retry.max-attempts", 3);
144
145
// Create REST client configuration
146
RestClusterClientConfiguration restConfig =
147
RestClusterClientConfiguration.fromConfiguration(config);
148
149
// Create cluster ID (implementation-specific)
150
StandaloneClusterId clusterId = new StandaloneClusterId();
151
152
// Create REST cluster client
153
RestClusterClient<StandaloneClusterId> client =
154
new RestClusterClient<>(config, restConfig, clusterId);
155
156
try {
157
// Use the client for operations
158
String webUrl = client.getWebInterfaceURL();
159
System.out.println("Cluster web interface: " + webUrl);
160
161
// List running jobs
162
CompletableFuture<Collection<JobStatusMessage>> jobsFuture = client.listJobs();
163
Collection<JobStatusMessage> jobs = jobsFuture.get();
164
165
for (JobStatusMessage job : jobs) {
166
System.out.println("Job: " + job.getJobName() + " - " + job.getJobState());
167
}
168
} finally {
169
client.close();
170
}
171
```
172
173
### REST Client with Custom Retry Strategy
174
175
```java
176
// Create custom exponential backoff strategy
177
WaitStrategy waitStrategy = new ExponentialWaitStrategy(
178
1000, // initial wait: 1 second
179
30000 // max wait: 30 seconds
180
);
181
182
// Configuration with retry settings
183
Configuration config = new Configuration();
184
config.setString("rest.address", "localhost");
185
config.setInteger("rest.port", 8081);
186
187
RestClusterClientConfiguration restConfig =
188
RestClusterClientConfiguration.fromConfiguration(config);
189
190
// Create client with custom wait strategy
191
RestClusterClient<StandaloneClusterId> client =
192
new RestClusterClient<>(config, restConfig, new StandaloneClusterId(), waitStrategy);
193
194
try {
195
// Submit job with retry support
196
JobGraph jobGraph = /* create job graph */;
197
CompletableFuture<JobID> submitFuture = client.submitJob(jobGraph);
198
JobID jobId = submitFuture.get();
199
200
// Monitor job with automatic retries
201
CompletableFuture<JobStatus> statusFuture = client.getJobStatus(jobId);
202
JobStatus status = statusFuture.get();
203
204
} finally {
205
client.close();
206
}
207
```
208
209
### SSL-Enabled REST Client
210
211
```java
212
// Configure SSL settings
213
Configuration config = new Configuration();
214
config.setString("rest.address", "secure-flink.example.com");
215
config.setInteger("rest.port", 8443);
216
config.setBoolean("security.ssl.rest.enabled", true);
217
config.setString("security.ssl.rest.keystore", "/path/to/keystore.jks");
218
config.setString("security.ssl.rest.keystore-password", "keystorePassword");
219
config.setString("security.ssl.rest.truststore", "/path/to/truststore.jks");
220
config.setString("security.ssl.rest.truststore-password", "truststorePassword");
221
222
// Create SSL-enabled REST client
223
RestClusterClientConfiguration restConfig =
224
RestClusterClientConfiguration.fromConfiguration(config);
225
226
RestClusterClient<StandaloneClusterId> client =
227
new RestClusterClient<>(config, restConfig, new StandaloneClusterId());
228
229
try {
230
// All operations now use HTTPS
231
CompletableFuture<Collection<JobStatusMessage>> jobsFuture = client.listJobs();
232
// ... handle response
233
} finally {
234
client.close();
235
}
236
```
237
238
### Job Management via REST
239
240
```java
241
// REST client setup
242
RestClusterClient<StandaloneClusterId> client = /* create client */;
243
244
try {
245
// Submit job
246
JobGraph jobGraph = /* your job graph */;
247
CompletableFuture<JobID> submitFuture = client.submitJob(jobGraph);
248
JobID jobId = submitFuture.get();
249
250
// Wait for job to start
251
CompletableFuture<JobStatus> statusFuture = client.getJobStatus(jobId);
252
JobStatus status = statusFuture.get();
253
254
while (status != JobStatus.RUNNING && status != JobStatus.FINISHED && status != JobStatus.FAILED) {
255
Thread.sleep(1000);
256
statusFuture = client.getJobStatus(jobId);
257
status = statusFuture.get();
258
}
259
260
if (status == JobStatus.RUNNING) {
261
// Trigger savepoint
262
CompletableFuture<String> savepointFuture =
263
client.triggerSavepoint(jobId, "hdfs://namenode:9000/savepoints");
264
String savepointPath = savepointFuture.get();
265
System.out.println("Savepoint created: " + savepointPath);
266
267
// Stop job with savepoint
268
CompletableFuture<String> stopFuture =
269
client.stopWithSavepoint(jobId, false, "hdfs://namenode:9000/final-savepoint");
270
String finalSavepointPath = stopFuture.get();
271
System.out.println("Job stopped with savepoint: " + finalSavepointPath);
272
}
273
274
// Get final job result
275
CompletableFuture<JobResult> resultFuture = client.requestJobResult(jobId);
276
JobResult result = resultFuture.get();
277
System.out.println("Job finished with state: " + result.getJobExecutionResult().getJobExecutionState());
278
279
} finally {
280
client.close();
281
}
282
```
283
284
### Coordination Request Example
285
286
```java
287
// Send coordination request to operator
288
RestClusterClient<StandaloneClusterId> client = /* create client */;
289
290
try {
291
JobID jobId = /* your job ID */;
292
OperatorID operatorId = /* target operator ID */;
293
294
// Create custom coordination request
295
CoordinationRequest request = new CoordinationRequest() {
296
// Implement coordination request logic
297
};
298
299
// Send request to operator
300
CompletableFuture<CoordinationResponse> responseFuture =
301
client.sendCoordinationRequest(jobId, operatorId, request);
302
303
CoordinationResponse response = responseFuture.get();
304
// Handle coordination response
305
306
} finally {
307
client.close();
308
}
309
```
310
311
### Error Handling and Timeouts
312
313
```java
314
// Configure timeout and retry settings
315
Configuration config = new Configuration();
316
config.setString("rest.address", "localhost");
317
config.setInteger("rest.port", 8081);
318
config.setDuration("rest.connection-timeout", Duration.ofSeconds(10));
319
config.setDuration("rest.idleness-timeout", Duration.ofMinutes(5));
320
config.setInteger("rest.retry.max-attempts", 5);
321
config.setDuration("rest.retry.delay", Duration.ofSeconds(2));
322
323
RestClusterClientConfiguration restConfig =
324
RestClusterClientConfiguration.fromConfiguration(config);
325
326
RestClusterClient<StandaloneClusterId> client =
327
new RestClusterClient<>(config, restConfig, new StandaloneClusterId());
328
329
try {
330
// Operation with timeout handling
331
CompletableFuture<Collection<JobStatusMessage>> jobsFuture = client.listJobs();
332
333
Collection<JobStatusMessage> jobs = jobsFuture
334
.orTimeout(30, TimeUnit.SECONDS)
335
.exceptionally(throwable -> {
336
System.err.println("Failed to list jobs: " + throwable.getMessage());
337
return Collections.emptyList();
338
})
339
.get();
340
341
} catch (Exception e) {
342
System.err.println("REST operation failed: " + e.getMessage());
343
} finally {
344
client.close();
345
}
346
```
347
348
## Required Imports
349
350
```java
351
import org.apache.flink.client.program.rest.RestClusterClient;
352
import org.apache.flink.client.program.rest.RestClusterClientConfiguration;
353
import org.apache.flink.client.program.rest.retry.WaitStrategy;
354
import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
355
import org.apache.flink.client.program.ClusterClient;
356
import org.apache.flink.client.deployment.StandaloneClusterId;
357
import org.apache.flink.api.common.JobID;
358
import org.apache.flink.api.common.JobStatus;
359
import org.apache.flink.runtime.jobgraph.JobGraph;
360
import org.apache.flink.runtime.jobmaster.JobResult;
361
import org.apache.flink.runtime.messages.Acknowledge;
362
import org.apache.flink.runtime.jobgraph.OperatorID;
363
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
364
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
365
import org.apache.flink.runtime.client.JobStatusMessage;
366
import org.apache.flink.configuration.Configuration;
367
import org.apache.flink.runtime.security.contexts.SecurityContext;
368
import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverview;
369
import org.apache.flink.util.concurrent.ScheduledExecutor;
370
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
371
import java.util.concurrent.CompletableFuture;
372
import java.util.concurrent.TimeUnit;
373
import java.util.Collection;
374
import java.util.Collections;
375
import java.util.Map;
376
import java.time.Duration;
377
```
378
379
## Configuration Options Reference
380
381
### Essential REST Configuration
382
```java
383
// Basic connection settings
384
config.setString("rest.address", "localhost");
385
config.setInteger("rest.port", 8081);
386
config.setDuration("rest.connection-timeout", Duration.ofSeconds(30));
387
config.setDuration("rest.idleness-timeout", Duration.ofMinutes(5));
388
389
// Retry configuration
390
config.setInteger("rest.retry.max-attempts", 3);
391
config.setDuration("rest.retry.delay", Duration.ofSeconds(1));
392
393
// SSL configuration (when needed)
394
config.setBoolean("security.ssl.rest.enabled", true);
395
config.setString("security.ssl.rest.keystore", "/path/to/keystore.jks");
396
config.setString("security.ssl.rest.keystore-password", "password");
397
config.setString("security.ssl.rest.truststore", "/path/to/truststore.jks");
398
config.setString("security.ssl.rest.truststore-password", "password");
399
```
400
401
The REST Client Communication module provides robust, production-ready HTTP/HTTPS communication with Flink clusters, supporting advanced features like automatic retries, SSL encryption, and comprehensive timeout handling for reliable distributed system interactions.