REST client implementations for communicating with Flink clusters through HTTP APIs, including retry strategies, configuration management, and comprehensive cluster operation support.
REST-based implementation of the ClusterClient interface for HTTP communication with Flink clusters.
/**
* REST-based cluster client implementation
* @param <T> Type of cluster identifier
*/
public class RestClusterClient<T> implements ClusterClient<T> {
/**
* Creates REST cluster client with basic configuration
* @param config Flink configuration
* @param clusterId Cluster identifier
*/
public RestClusterClient(Configuration config, T clusterId);
/**
* Creates REST cluster client with high availability services factory
* @param config Flink configuration
* @param clusterId Cluster identifier
* @param factory High availability services factory
*/
public RestClusterClient(
Configuration config,
T clusterId,
ClientHighAvailabilityServicesFactory factory);
// Implements all ClusterClient methods with REST-specific implementations
@Override
public T getClusterId();
@Override
public Configuration getFlinkConfiguration();
@Override
public void shutDownCluster();
@Override
public String getWebInterfaceURL();
@Override
public CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception;
@Override
public CompletableFuture<JobID> submitJob(JobGraph jobGraph);
@Override
public CompletableFuture<JobStatus> getJobStatus(JobID jobId);
@Override
public CompletableFuture<JobResult> requestJobResult(JobID jobId);
@Override
public CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID);
@Override
public CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader);
@Override
public CompletableFuture<Acknowledge> cancel(JobID jobId);
@Override
public CompletableFuture<String> cancelWithSavepoint(JobID jobId, String savepointDirectory);
@Override
public CompletableFuture<String> stopWithSavepoint(
JobID jobId,
boolean advanceToEndOfEventTime,
String savepointDirectory);
@Override
public CompletableFuture<String> triggerSavepoint(JobID jobId, String savepointDirectory);
@Override
public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) throws FlinkException;
@Override
public CompletableFuture<CoordinationResponse> sendCoordinationRequest(
JobID jobId,
OperatorID operatorId,
CoordinationRequest request);
@Override
public void close();
}Usage Example:
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.client.program.rest.RestClusterClientConfiguration;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
// Configure REST client
Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(JobManagerOptions.PORT, 6123);
config.setInteger(RestOptions.PORT, 8081);
// Create REST client configuration
RestClusterClientConfiguration clientConfig = new RestClusterClientConfiguration.RestClusterClientConfigurationBuilder()
.setRetryMaxAttempts(10)
.setRetryDelay(1000)
.setAwaitLeaderTimeout(30000)
.build();
// Create REST cluster client
try (RestClusterClient<StandaloneClusterId> client = new RestClusterClient<>(
config, clientConfig, new StandaloneClusterId(), new ExponentialWaitStrategy(1000, 10000))) {
// Submit job via REST
JobGraph jobGraph = createJobGraph();
JobID jobId = client.submitJob(jobGraph).get();
System.out.println("Job submitted via REST: " + jobId);
// Monitor job status
JobStatus status = client.getJobStatus(jobId).get();
System.out.println("Job status: " + status);
// Create savepoint via REST
String savepointPath = client.triggerSavepoint(jobId, "/path/to/savepoints").get();
System.out.println("Savepoint created: " + savepointPath);
}Configuration class for REST cluster clients with connection and retry settings.
/**
* Configuration for REST cluster clients
*/
public final class RestClusterClientConfiguration {
/**
* Gets the REST client configuration
* @return RestClientConfiguration instance
*/
public RestClientConfiguration getRestClientConfiguration();
/**
* Gets the await leader timeout in milliseconds
* @return Timeout for awaiting cluster leader
*/
public long getAwaitLeaderTimeout();
/**
* Gets the maximum number of retry attempts
* @return Maximum retry attempts
*/
public int getRetryMaxAttempts();
/**
* Gets the retry delay in milliseconds
* @return Delay between retry attempts
*/
public long getRetryDelay();
/**
* Creates RestClusterClientConfiguration from Flink configuration
* @param config Flink configuration containing REST options
* @return RestClusterClientConfiguration instance
* @throws ConfigurationException If configuration is invalid
*/
public static RestClusterClientConfiguration fromConfiguration(Configuration config)
throws ConfigurationException;
}Usage Example:
import org.apache.flink.client.program.rest.RestClusterClientConfiguration;
// Create Flink configuration with REST options
Configuration flinkConfig = new Configuration();
flinkConfig.setLong(RestOptions.AWAIT_LEADER_TIMEOUT, 60000L); // 60 seconds
flinkConfig.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 5); // 5 retry attempts
flinkConfig.setLong(RestOptions.RETRY_DELAY, 2000L); // 2 second delay
// Create REST client configuration from Flink configuration
RestClusterClientConfiguration config = RestClusterClientConfiguration.fromConfiguration(flinkConfig);
System.out.println("Leader timeout: " + config.getAwaitLeaderTimeout());
System.out.println("Max retries: " + config.getRetryMaxAttempts());
System.out.println("Retry delay: " + config.getRetryDelay());Strategy implementations for handling retry logic in REST client operations.
/**
* Strategy for waiting between retry attempts
*/
public interface WaitStrategy {
/**
* Calculates sleep time for the given attempt count
* @param attemptCount Number of attempts made (starting from 0)
* @return Sleep time in milliseconds
*/
long sleepTime(long attemptCount);
}
/**
* Configuration options for REST client settings
*/
public class RestOptions {
public static final ConfigOption<Long> AWAIT_LEADER_TIMEOUT;
public static final ConfigOption<Integer> RETRY_MAX_ATTEMPTS;
public static final ConfigOption<Long> RETRY_DELAY;
}
/**
* Exponential backoff wait strategy
*/
public class ExponentialWaitStrategy implements WaitStrategy {
/**
* Creates exponential wait strategy
* @param initialDelay Initial delay in milliseconds
* @param maxDelay Maximum delay in milliseconds
*/
public ExponentialWaitStrategy(long initialDelay, long maxDelay);
/**
* Calculates exponential backoff sleep time
* @param attemptCount Number of attempts made
* @return Sleep time with exponential backoff
*/
@Override
public long sleepTime(long attemptCount);
}Usage Example:
import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
import org.apache.flink.client.program.rest.retry.WaitStrategy;
// Create exponential backoff strategy
WaitStrategy waitStrategy = new ExponentialWaitStrategy(1000, 30000); // 1s initial, 30s max
// Calculate wait times for different attempts
for (int attempt = 0; attempt < 5; attempt++) {
long waitTime = waitStrategy.sleepTime(attempt);
System.out.println("Attempt " + attempt + ": wait " + waitTime + "ms");
}
// Output:
// Attempt 0: wait 1000ms
// Attempt 1: wait 2000ms
// Attempt 2: wait 4000ms
// Attempt 3: wait 8000ms
// Attempt 4: wait 16000msThe REST client integrates with the broader Flink client ecosystem through standard interfaces and provides HTTP-based communication with Flink clusters.
Key Integration Points:
REST Endpoints Used:
// Common REST endpoints accessed by RestClusterClient (for reference):
// GET /jobs - List jobs
// POST /jobs - Submit job
// GET /jobs/{jobid} - Get job details
// PATCH /jobs/{jobid} - Cancel job
// POST /jobs/{jobid}/savepoints - Trigger savepoint
// DELETE /jobs/{jobid}/savepoints/{savepointId} - Dispose savepoint
// POST /jobs/{jobid}/stop - Stop job with savepoint
// GET /jobs/{jobid}/accumulators - Get job accumulators
// POST /jobs/{jobid}/coordination/{operatorId} - Send coordination requestConfiguration Options:
// Important configuration keys for REST client (from Flink configuration):
// rest.address - REST server address
// rest.port - REST server port
// rest.connection-timeout - Connection timeout
// rest.idleness-timeout - Idleness timeout
// rest.await-leader-timeout - Leader election timeout
// rest.retry.max-attempts - Maximum retry attempts
// rest.retry.delay - Delay between retriesUsage in Different Deployment Modes:
import org.apache.flink.client.program.rest.RestClusterClient;
// For standalone cluster
Configuration standaloneConfig = new Configuration();
standaloneConfig.setString("rest.address", "flink-master");
standaloneConfig.setInteger("rest.port", 8081);
RestClusterClient<StandaloneClusterId> standaloneClient =
new RestClusterClient<>(standaloneConfig, new StandaloneClusterId());
// For session cluster (e.g., YARN session)
Configuration sessionConfig = new Configuration();
sessionConfig.setString("yarn.application.id", "application_123456_0001");
sessionConfig.setString("rest.address", "yarn-session-master");
RestClusterClient<YarnClusterId> yarnClient =
new RestClusterClient<>(sessionConfig, yarnClusterId);/**
* REST client configuration for low-level HTTP settings
*/
public class RestClientConfiguration {
public long getConnectionTimeout();
public long getIdlenessTimeout();
public int getMaxContentLength();
public SSLHandlerFactory getSslHandlerFactory();
}
/**
* Retry strategy interface
*/
public interface RetryStrategy {
boolean canRetry(int attemptCount);
long getRetryDelay(int attemptCount);
}
/**
* HTTP-specific exceptions (typically from underlying REST framework)
*/
public class RestClientException extends Exception {
public RestClientException(String message);
public RestClientException(String message, Throwable cause);
}
/**
* Connection timeout exception
*/
public class ConnectionTimeoutException extends RestClientException {
public ConnectionTimeoutException(String message);
}
/**
* Leader retrieval exception for REST clients
*/
public class LeaderRetrievalException extends RestClientException {
public LeaderRetrievalException(String message);
public LeaderRetrievalException(String message, Throwable cause);
}import org.apache.flink.client.program.rest.retry.WaitStrategy;
// Implement custom retry strategy
public class LinearWaitStrategy implements WaitStrategy {
private final long baseDelay;
private final long increment;
public LinearWaitStrategy(long baseDelay, long increment) {
this.baseDelay = baseDelay;
this.increment = increment;
}
@Override
public long sleepTime(long attemptCount) {
return baseDelay + (attemptCount * increment);
}
}
// Use custom strategy
WaitStrategy customStrategy = new LinearWaitStrategy(1000, 500);
RestClusterClient<T> client = new RestClusterClient<>(
config, clientConfig, clusterId, customStrategy);import org.apache.flink.client.program.rest.RestClusterClient;
import java.util.concurrent.CompletionException;
try (RestClusterClient<StandaloneClusterId> client = createRestClient()) {
// Submit job with error handling
CompletableFuture<JobID> submitFuture = client.submitJob(jobGraph);
JobID jobId = submitFuture.handle((result, throwable) -> {
if (throwable != null) {
if (throwable instanceof CompletionException) {
Throwable cause = throwable.getCause();
if (cause instanceof RestClientException) {
System.err.println("REST communication failed: " + cause.getMessage());
} else {
System.err.println("Job submission failed: " + cause.getMessage());
}
}
return null;
}
return result;
}).get();
if (jobId != null) {
System.out.println("Job submitted successfully: " + jobId);
// Monitor job with periodic status checks
CompletableFuture<JobResult> resultFuture = client.requestJobResult(jobId);
JobResult result = resultFuture.get();
System.out.println("Job completed with status: " + result.getJobExecutionResult());
}
}