Flink Client APIs and utilities for submitting and managing Apache Flink jobs
REST client implementations for communicating with Flink clusters through HTTP APIs, including retry strategies, configuration management, and comprehensive cluster operation support.
REST-based implementation of the ClusterClient interface for HTTP communication with Flink clusters.
/**
* REST-based cluster client implementation
* @param <T> Type of cluster identifier
*/
public class RestClusterClient<T> implements ClusterClient<T> {
/**
* Creates REST cluster client with basic configuration
* @param config Flink configuration
* @param clusterId Cluster identifier
*/
public RestClusterClient(Configuration config, T clusterId);
/**
* Creates REST cluster client with high availability services factory
* @param config Flink configuration
* @param clusterId Cluster identifier
* @param factory High availability services factory
*/
public RestClusterClient(
Configuration config,
T clusterId,
ClientHighAvailabilityServicesFactory factory);
// Implements all ClusterClient methods with REST-specific implementations
@Override
public T getClusterId();
@Override
public Configuration getFlinkConfiguration();
@Override
public void shutDownCluster();
@Override
public String getWebInterfaceURL();
@Override
public CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception;
@Override
public CompletableFuture<JobID> submitJob(JobGraph jobGraph);
@Override
public CompletableFuture<JobStatus> getJobStatus(JobID jobId);
@Override
public CompletableFuture<JobResult> requestJobResult(JobID jobId);
@Override
public CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID);
@Override
public CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader);
@Override
public CompletableFuture<Acknowledge> cancel(JobID jobId);
@Override
public CompletableFuture<String> cancelWithSavepoint(JobID jobId, String savepointDirectory);
@Override
public CompletableFuture<String> stopWithSavepoint(
JobID jobId,
boolean advanceToEndOfEventTime,
String savepointDirectory);
@Override
public CompletableFuture<String> triggerSavepoint(JobID jobId, String savepointDirectory);
@Override
public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) throws FlinkException;
@Override
public CompletableFuture<CoordinationResponse> sendCoordinationRequest(
JobID jobId,
OperatorID operatorId,
CoordinationRequest request);
@Override
public void close();
}Usage Example:
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.client.program.rest.RestClusterClientConfiguration;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
// Configure REST client
Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(JobManagerOptions.PORT, 6123);
config.setInteger(RestOptions.PORT, 8081);
// Create REST client configuration
RestClusterClientConfiguration clientConfig = new RestClusterClientConfiguration.RestClusterClientConfigurationBuilder()
.setRetryMaxAttempts(10)
.setRetryDelay(1000)
.setAwaitLeaderTimeout(30000)
.build();
// Create REST cluster client
try (RestClusterClient<StandaloneClusterId> client = new RestClusterClient<>(
config, clientConfig, new StandaloneClusterId(), new ExponentialWaitStrategy(1000, 10000))) {
// Submit job via REST
JobGraph jobGraph = createJobGraph();
JobID jobId = client.submitJob(jobGraph).get();
System.out.println("Job submitted via REST: " + jobId);
// Monitor job status
JobStatus status = client.getJobStatus(jobId).get();
System.out.println("Job status: " + status);
// Create savepoint via REST
String savepointPath = client.triggerSavepoint(jobId, "/path/to/savepoints").get();
System.out.println("Savepoint created: " + savepointPath);
}Configuration class for REST cluster clients with connection and retry settings.
/**
* Configuration for REST cluster clients
*/
public final class RestClusterClientConfiguration {
/**
* Gets the REST client configuration
* @return RestClientConfiguration instance
*/
public RestClientConfiguration getRestClientConfiguration();
/**
* Gets the await leader timeout in milliseconds
* @return Timeout for awaiting cluster leader
*/
public long getAwaitLeaderTimeout();
/**
* Gets the maximum number of retry attempts
* @return Maximum retry attempts
*/
public int getRetryMaxAttempts();
/**
* Gets the retry delay in milliseconds
* @return Delay between retry attempts
*/
public long getRetryDelay();
/**
* Creates RestClusterClientConfiguration from Flink configuration
* @param config Flink configuration containing REST options
* @return RestClusterClientConfiguration instance
* @throws ConfigurationException If configuration is invalid
*/
public static RestClusterClientConfiguration fromConfiguration(Configuration config)
throws ConfigurationException;
}Usage Example:
import org.apache.flink.client.program.rest.RestClusterClientConfiguration;
// Create Flink configuration with REST options
Configuration flinkConfig = new Configuration();
flinkConfig.setLong(RestOptions.AWAIT_LEADER_TIMEOUT, 60000L); // 60 seconds
flinkConfig.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 5); // 5 retry attempts
flinkConfig.setLong(RestOptions.RETRY_DELAY, 2000L); // 2 second delay
// Create REST client configuration from Flink configuration
RestClusterClientConfiguration config = RestClusterClientConfiguration.fromConfiguration(flinkConfig);
System.out.println("Leader timeout: " + config.getAwaitLeaderTimeout());
System.out.println("Max retries: " + config.getRetryMaxAttempts());
System.out.println("Retry delay: " + config.getRetryDelay());Strategy implementations for handling retry logic in REST client operations.
/**
* Strategy for waiting between retry attempts
*/
public interface WaitStrategy {
/**
* Calculates sleep time for the given attempt count
* @param attemptCount Number of attempts made (starting from 0)
* @return Sleep time in milliseconds
*/
long sleepTime(long attemptCount);
}
/**
* Configuration options for REST client settings
*/
public class RestOptions {
public static final ConfigOption<Long> AWAIT_LEADER_TIMEOUT;
public static final ConfigOption<Integer> RETRY_MAX_ATTEMPTS;
public static final ConfigOption<Long> RETRY_DELAY;
}
/**
* Exponential backoff wait strategy
*/
public class ExponentialWaitStrategy implements WaitStrategy {
/**
* Creates exponential wait strategy
* @param initialDelay Initial delay in milliseconds
* @param maxDelay Maximum delay in milliseconds
*/
public ExponentialWaitStrategy(long initialDelay, long maxDelay);
/**
* Calculates exponential backoff sleep time
* @param attemptCount Number of attempts made
* @return Sleep time with exponential backoff
*/
@Override
public long sleepTime(long attemptCount);
}Usage Example:
import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
import org.apache.flink.client.program.rest.retry.WaitStrategy;
// Create exponential backoff strategy
WaitStrategy waitStrategy = new ExponentialWaitStrategy(1000, 30000); // 1s initial, 30s max
// Calculate wait times for different attempts
for (int attempt = 0; attempt < 5; attempt++) {
long waitTime = waitStrategy.sleepTime(attempt);
System.out.println("Attempt " + attempt + ": wait " + waitTime + "ms");
}
// Output:
// Attempt 0: wait 1000ms
// Attempt 1: wait 2000ms
// Attempt 2: wait 4000ms
// Attempt 3: wait 8000ms
// Attempt 4: wait 16000msThe REST client integrates with the broader Flink client ecosystem through standard interfaces and provides HTTP-based communication with Flink clusters.
Key Integration Points:
REST Endpoints Used:
// Common REST endpoints accessed by RestClusterClient (for reference):
// GET /jobs - List jobs
// POST /jobs - Submit job
// GET /jobs/{jobid} - Get job details
// PATCH /jobs/{jobid} - Cancel job
// POST /jobs/{jobid}/savepoints - Trigger savepoint
// DELETE /jobs/{jobid}/savepoints/{savepointId} - Dispose savepoint
// POST /jobs/{jobid}/stop - Stop job with savepoint
// GET /jobs/{jobid}/accumulators - Get job accumulators
// POST /jobs/{jobid}/coordination/{operatorId} - Send coordination requestConfiguration Options:
// Important configuration keys for REST client (from Flink configuration):
// rest.address - REST server address
// rest.port - REST server port
// rest.connection-timeout - Connection timeout
// rest.idleness-timeout - Idleness timeout
// rest.await-leader-timeout - Leader election timeout
// rest.retry.max-attempts - Maximum retry attempts
// rest.retry.delay - Delay between retriesUsage in Different Deployment Modes:
import org.apache.flink.client.program.rest.RestClusterClient;
// For standalone cluster
Configuration standaloneConfig = new Configuration();
standaloneConfig.setString("rest.address", "flink-master");
standaloneConfig.setInteger("rest.port", 8081);
RestClusterClient<StandaloneClusterId> standaloneClient =
new RestClusterClient<>(standaloneConfig, new StandaloneClusterId());
// For session cluster (e.g., YARN session)
Configuration sessionConfig = new Configuration();
sessionConfig.setString("yarn.application.id", "application_123456_0001");
sessionConfig.setString("rest.address", "yarn-session-master");
RestClusterClient<YarnClusterId> yarnClient =
new RestClusterClient<>(sessionConfig, yarnClusterId);/**
* REST client configuration for low-level HTTP settings
*/
public class RestClientConfiguration {
public long getConnectionTimeout();
public long getIdlenessTimeout();
public int getMaxContentLength();
public SSLHandlerFactory getSslHandlerFactory();
}
/**
* Retry strategy interface
*/
public interface RetryStrategy {
boolean canRetry(int attemptCount);
long getRetryDelay(int attemptCount);
}
/**
* HTTP-specific exceptions (typically from underlying REST framework)
*/
public class RestClientException extends Exception {
public RestClientException(String message);
public RestClientException(String message, Throwable cause);
}
/**
* Connection timeout exception
*/
public class ConnectionTimeoutException extends RestClientException {
public ConnectionTimeoutException(String message);
}
/**
* Leader retrieval exception for REST clients
*/
public class LeaderRetrievalException extends RestClientException {
public LeaderRetrievalException(String message);
public LeaderRetrievalException(String message, Throwable cause);
}import org.apache.flink.client.program.rest.retry.WaitStrategy;
// Implement custom retry strategy
public class LinearWaitStrategy implements WaitStrategy {
private final long baseDelay;
private final long increment;
public LinearWaitStrategy(long baseDelay, long increment) {
this.baseDelay = baseDelay;
this.increment = increment;
}
@Override
public long sleepTime(long attemptCount) {
return baseDelay + (attemptCount * increment);
}
}
// Use custom strategy
WaitStrategy customStrategy = new LinearWaitStrategy(1000, 500);
RestClusterClient<T> client = new RestClusterClient<>(
config, clientConfig, clusterId, customStrategy);import org.apache.flink.client.program.rest.RestClusterClient;
import java.util.concurrent.CompletionException;
try (RestClusterClient<StandaloneClusterId> client = createRestClient()) {
// Submit job with error handling
CompletableFuture<JobID> submitFuture = client.submitJob(jobGraph);
JobID jobId = submitFuture.handle((result, throwable) -> {
if (throwable != null) {
if (throwable instanceof CompletionException) {
Throwable cause = throwable.getCause();
if (cause instanceof RestClientException) {
System.err.println("REST communication failed: " + cause.getMessage());
} else {
System.err.println("Job submission failed: " + cause.getMessage());
}
}
return null;
}
return result;
}).get();
if (jobId != null) {
System.out.println("Job submitted successfully: " + jobId);
// Monitor job with periodic status checks
CompletableFuture<JobResult> resultFuture = client.requestJobResult(jobId);
JobResult result = resultFuture.get();
System.out.println("Job completed with status: " + result.getJobExecutionResult());
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-clients-2-12