0
# REST Client
1
2
REST client implementations for communicating with Flink clusters through HTTP APIs, including retry strategies, configuration management, and comprehensive cluster operation support.
3
4
## Capabilities
5
6
### REST Cluster Client
7
8
REST-based implementation of the ClusterClient interface for HTTP communication with Flink clusters.
9
10
```java { .api }
11
/**
12
* REST-based cluster client implementation
13
* @param <T> Type of cluster identifier
14
*/
15
public class RestClusterClient<T> implements ClusterClient<T> {
16
/**
17
* Creates REST cluster client with basic configuration
18
* @param config Flink configuration
19
* @param clusterId Cluster identifier
20
*/
21
public RestClusterClient(Configuration config, T clusterId);
22
23
/**
24
* Creates REST cluster client with high availability services factory
25
* @param config Flink configuration
26
* @param clusterId Cluster identifier
27
* @param factory High availability services factory
28
*/
29
public RestClusterClient(
30
Configuration config,
31
T clusterId,
32
ClientHighAvailabilityServicesFactory factory);
33
34
// Implements all ClusterClient methods with REST-specific implementations
35
@Override
36
public T getClusterId();
37
38
@Override
39
public Configuration getFlinkConfiguration();
40
41
@Override
42
public void shutDownCluster();
43
44
@Override
45
public String getWebInterfaceURL();
46
47
@Override
48
public CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception;
49
50
@Override
51
public CompletableFuture<JobID> submitJob(JobGraph jobGraph);
52
53
@Override
54
public CompletableFuture<JobStatus> getJobStatus(JobID jobId);
55
56
@Override
57
public CompletableFuture<JobResult> requestJobResult(JobID jobId);
58
59
@Override
60
public CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID);
61
62
@Override
63
public CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader);
64
65
@Override
66
public CompletableFuture<Acknowledge> cancel(JobID jobId);
67
68
@Override
69
public CompletableFuture<String> cancelWithSavepoint(JobID jobId, String savepointDirectory);
70
71
@Override
72
public CompletableFuture<String> stopWithSavepoint(
73
JobID jobId,
74
boolean advanceToEndOfEventTime,
75
String savepointDirectory);
76
77
@Override
78
public CompletableFuture<String> triggerSavepoint(JobID jobId, String savepointDirectory);
79
80
@Override
81
public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) throws FlinkException;
82
83
@Override
84
public CompletableFuture<CoordinationResponse> sendCoordinationRequest(
85
JobID jobId,
86
OperatorID operatorId,
87
CoordinationRequest request);
88
89
@Override
90
public void close();
91
}
92
```
93
94
**Usage Example:**
95
96
```java
97
import org.apache.flink.client.program.rest.RestClusterClient;
98
import org.apache.flink.client.program.rest.RestClusterClientConfiguration;
99
import org.apache.flink.configuration.Configuration;
100
import org.apache.flink.configuration.JobManagerOptions;
101
import org.apache.flink.configuration.RestOptions;
102
103
// Configure REST client
104
Configuration config = new Configuration();
105
config.setString(JobManagerOptions.ADDRESS, "localhost");
106
config.setInteger(JobManagerOptions.PORT, 6123);
107
config.setInteger(RestOptions.PORT, 8081);
108
109
// Create REST client configuration
110
RestClusterClientConfiguration clientConfig = new RestClusterClientConfiguration.RestClusterClientConfigurationBuilder()
111
.setRetryMaxAttempts(10)
112
.setRetryDelay(1000)
113
.setAwaitLeaderTimeout(30000)
114
.build();
115
116
// Create REST cluster client
117
try (RestClusterClient<StandaloneClusterId> client = new RestClusterClient<>(
118
config, clientConfig, new StandaloneClusterId(), new ExponentialWaitStrategy(1000, 10000))) {
119
120
// Submit job via REST
121
JobGraph jobGraph = createJobGraph();
122
JobID jobId = client.submitJob(jobGraph).get();
123
System.out.println("Job submitted via REST: " + jobId);
124
125
// Monitor job status
126
JobStatus status = client.getJobStatus(jobId).get();
127
System.out.println("Job status: " + status);
128
129
// Create savepoint via REST
130
String savepointPath = client.triggerSavepoint(jobId, "/path/to/savepoints").get();
131
System.out.println("Savepoint created: " + savepointPath);
132
}
133
```
134
135
### REST Client Configuration
136
137
Configuration class for REST cluster clients with connection and retry settings.
138
139
```java { .api }
140
/**
141
* Configuration for REST cluster clients
142
*/
143
public final class RestClusterClientConfiguration {
144
/**
145
* Gets the REST client configuration
146
* @return RestClientConfiguration instance
147
*/
148
public RestClientConfiguration getRestClientConfiguration();
149
150
/**
151
* Gets the await leader timeout in milliseconds
152
* @return Timeout for awaiting cluster leader
153
*/
154
public long getAwaitLeaderTimeout();
155
156
/**
157
* Gets the maximum number of retry attempts
158
* @return Maximum retry attempts
159
*/
160
public int getRetryMaxAttempts();
161
162
/**
163
* Gets the retry delay in milliseconds
164
* @return Delay between retry attempts
165
*/
166
public long getRetryDelay();
167
168
/**
169
* Creates RestClusterClientConfiguration from Flink configuration
170
* @param config Flink configuration containing REST options
171
* @return RestClusterClientConfiguration instance
172
* @throws ConfigurationException If configuration is invalid
173
*/
174
public static RestClusterClientConfiguration fromConfiguration(Configuration config)
175
throws ConfigurationException;
176
}
177
```
178
179
**Usage Example:**
180
181
```java
182
import org.apache.flink.client.program.rest.RestClusterClientConfiguration;
183
184
// Create Flink configuration with REST options
185
Configuration flinkConfig = new Configuration();
186
flinkConfig.setLong(RestOptions.AWAIT_LEADER_TIMEOUT, 60000L); // 60 seconds
187
flinkConfig.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 5); // 5 retry attempts
188
flinkConfig.setLong(RestOptions.RETRY_DELAY, 2000L); // 2 second delay
189
190
// Create REST client configuration from Flink configuration
191
RestClusterClientConfiguration config = RestClusterClientConfiguration.fromConfiguration(flinkConfig);
192
193
System.out.println("Leader timeout: " + config.getAwaitLeaderTimeout());
194
System.out.println("Max retries: " + config.getRetryMaxAttempts());
195
System.out.println("Retry delay: " + config.getRetryDelay());
196
```
197
198
### Retry Strategies
199
200
Strategy implementations for handling retry logic in REST client operations.
201
202
```java { .api }
203
/**
204
* Strategy for waiting between retry attempts
205
*/
206
public interface WaitStrategy {
207
/**
208
* Calculates sleep time for the given attempt count
209
* @param attemptCount Number of attempts made (starting from 0)
210
* @return Sleep time in milliseconds
211
*/
212
long sleepTime(long attemptCount);
213
}
214
215
/**
216
* Configuration options for REST client settings
217
*/
218
public class RestOptions {
219
public static final ConfigOption<Long> AWAIT_LEADER_TIMEOUT;
220
public static final ConfigOption<Integer> RETRY_MAX_ATTEMPTS;
221
public static final ConfigOption<Long> RETRY_DELAY;
222
}
223
224
/**
225
* Exponential backoff wait strategy
226
*/
227
public class ExponentialWaitStrategy implements WaitStrategy {
228
/**
229
* Creates exponential wait strategy
230
* @param initialDelay Initial delay in milliseconds
231
* @param maxDelay Maximum delay in milliseconds
232
*/
233
public ExponentialWaitStrategy(long initialDelay, long maxDelay);
234
235
/**
236
* Calculates exponential backoff sleep time
237
* @param attemptCount Number of attempts made
238
* @return Sleep time with exponential backoff
239
*/
240
@Override
241
public long sleepTime(long attemptCount);
242
}
243
```
244
245
**Usage Example:**
246
247
```java
248
import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
249
import org.apache.flink.client.program.rest.retry.WaitStrategy;
250
251
// Create exponential backoff strategy
252
WaitStrategy waitStrategy = new ExponentialWaitStrategy(1000, 30000); // 1s initial, 30s max
253
254
// Calculate wait times for different attempts
255
for (int attempt = 0; attempt < 5; attempt++) {
256
long waitTime = waitStrategy.sleepTime(attempt);
257
System.out.println("Attempt " + attempt + ": wait " + waitTime + "ms");
258
}
259
// Output:
260
// Attempt 0: wait 1000ms
261
// Attempt 1: wait 2000ms
262
// Attempt 2: wait 4000ms
263
// Attempt 3: wait 8000ms
264
// Attempt 4: wait 16000ms
265
```
266
267
### REST Client Integration
268
269
The REST client integrates with the broader Flink client ecosystem through standard interfaces and provides HTTP-based communication with Flink clusters.
270
271
**Key Integration Points:**
272
273
1. **ClusterClient Interface**: RestClusterClient implements the standard ClusterClient interface, making it interchangeable with other client implementations
274
2. **Configuration Integration**: Uses standard Flink Configuration class for setup
275
3. **Retry Strategy**: Pluggable retry strategies for handling network failures and temporary unavailability
276
4. **Async Operations**: All operations return CompletableFuture for non-blocking execution
277
5. **Resource Management**: Implements AutoCloseable for proper resource cleanup
278
279
**REST Endpoints Used:**
280
281
```java { .api }
282
// Common REST endpoints accessed by RestClusterClient (for reference):
283
// GET /jobs - List jobs
284
// POST /jobs - Submit job
285
// GET /jobs/{jobid} - Get job details
286
// PATCH /jobs/{jobid} - Cancel job
287
// POST /jobs/{jobid}/savepoints - Trigger savepoint
288
// DELETE /jobs/{jobid}/savepoints/{savepointId} - Dispose savepoint
289
// POST /jobs/{jobid}/stop - Stop job with savepoint
290
// GET /jobs/{jobid}/accumulators - Get job accumulators
291
// POST /jobs/{jobid}/coordination/{operatorId} - Send coordination request
292
```
293
294
**Configuration Options:**
295
296
```java { .api }
297
// Important configuration keys for REST client (from Flink configuration):
298
// rest.address - REST server address
299
// rest.port - REST server port
300
// rest.connection-timeout - Connection timeout
301
// rest.idleness-timeout - Idleness timeout
302
// rest.await-leader-timeout - Leader election timeout
303
// rest.retry.max-attempts - Maximum retry attempts
304
// rest.retry.delay - Delay between retries
305
```
306
307
**Usage in Different Deployment Modes:**
308
309
```java
310
import org.apache.flink.client.program.rest.RestClusterClient;
311
312
// For standalone cluster
313
Configuration standaloneConfig = new Configuration();
314
standaloneConfig.setString("rest.address", "flink-master");
315
standaloneConfig.setInteger("rest.port", 8081);
316
317
RestClusterClient<StandaloneClusterId> standaloneClient =
318
new RestClusterClient<>(standaloneConfig, new StandaloneClusterId());
319
320
// For session cluster (e.g., YARN session)
321
Configuration sessionConfig = new Configuration();
322
sessionConfig.setString("yarn.application.id", "application_123456_0001");
323
sessionConfig.setString("rest.address", "yarn-session-master");
324
325
RestClusterClient<YarnClusterId> yarnClient =
326
new RestClusterClient<>(sessionConfig, yarnClusterId);
327
```
328
329
## Types
330
331
```java { .api }
332
/**
333
* REST client configuration for low-level HTTP settings
334
*/
335
public class RestClientConfiguration {
336
public long getConnectionTimeout();
337
public long getIdlenessTimeout();
338
public int getMaxContentLength();
339
public SSLHandlerFactory getSslHandlerFactory();
340
}
341
342
/**
343
* Retry strategy interface
344
*/
345
public interface RetryStrategy {
346
boolean canRetry(int attemptCount);
347
long getRetryDelay(int attemptCount);
348
}
349
350
/**
351
* HTTP-specific exceptions (typically from underlying REST framework)
352
*/
353
public class RestClientException extends Exception {
354
public RestClientException(String message);
355
public RestClientException(String message, Throwable cause);
356
}
357
358
/**
359
* Connection timeout exception
360
*/
361
public class ConnectionTimeoutException extends RestClientException {
362
public ConnectionTimeoutException(String message);
363
}
364
365
/**
366
* Leader retrieval exception for REST clients
367
*/
368
public class LeaderRetrievalException extends RestClientException {
369
public LeaderRetrievalException(String message);
370
public LeaderRetrievalException(String message, Throwable cause);
371
}
372
```
373
374
## Advanced Usage Patterns
375
376
### Custom Retry Strategy
377
378
```java
379
import org.apache.flink.client.program.rest.retry.WaitStrategy;
380
381
// Implement custom retry strategy
382
public class LinearWaitStrategy implements WaitStrategy {
383
private final long baseDelay;
384
private final long increment;
385
386
public LinearWaitStrategy(long baseDelay, long increment) {
387
this.baseDelay = baseDelay;
388
this.increment = increment;
389
}
390
391
@Override
392
public long sleepTime(long attemptCount) {
393
return baseDelay + (attemptCount * increment);
394
}
395
}
396
397
// Use custom strategy
398
WaitStrategy customStrategy = new LinearWaitStrategy(1000, 500);
399
RestClusterClient<T> client = new RestClusterClient<>(
400
config, clientConfig, clusterId, customStrategy);
401
```
402
403
### Monitoring and Error Handling
404
405
```java
406
import org.apache.flink.client.program.rest.RestClusterClient;
407
import java.util.concurrent.CompletionException;
408
409
try (RestClusterClient<StandaloneClusterId> client = createRestClient()) {
410
// Submit job with error handling
411
CompletableFuture<JobID> submitFuture = client.submitJob(jobGraph);
412
413
JobID jobId = submitFuture.handle((result, throwable) -> {
414
if (throwable != null) {
415
if (throwable instanceof CompletionException) {
416
Throwable cause = throwable.getCause();
417
if (cause instanceof RestClientException) {
418
System.err.println("REST communication failed: " + cause.getMessage());
419
} else {
420
System.err.println("Job submission failed: " + cause.getMessage());
421
}
422
}
423
return null;
424
}
425
return result;
426
}).get();
427
428
if (jobId != null) {
429
System.out.println("Job submitted successfully: " + jobId);
430
431
// Monitor job with periodic status checks
432
CompletableFuture<JobResult> resultFuture = client.requestJobResult(jobId);
433
JobResult result = resultFuture.get();
434
System.out.println("Job completed with status: " + result.getJobExecutionResult());
435
}
436
}
437
```