Apache Flink Client APIs and utilities for submitting and interacting with Flink jobs
—
The Apache Flink REST Client Communication module (org.apache.flink.client.program.rest.*) provides REST-based cluster communication capabilities with comprehensive retry logic, SSL support, and configuration management for remote cluster interaction. This module enables reliable communication with Flink clusters over HTTP/HTTPS protocols.
REST-based implementation of the ClusterClient interface for communicating with remote Flink clusters.
public class RestClusterClient<T> implements ClusterClient<T> {
// Constructors
public RestClusterClient(Configuration configuration,
RestClusterClientConfiguration restClusterClientConfiguration,
T clusterId) { }
public RestClusterClient(Configuration configuration,
RestClusterClientConfiguration restClusterClientConfiguration,
T clusterId,
WaitStrategy waitStrategy) { }
// ClusterClient interface implementation
public T getClusterId() { }
public Configuration getFlinkConfiguration() { }
public String getWebInterfaceURL() { }
// Job management via REST
public CompletableFuture<Collection<JobStatusMessage>> listJobs() { }
public CompletableFuture<JobID> submitJob(JobGraph jobGraph) { }
public CompletableFuture<JobStatus> getJobStatus(JobID jobId) { }
public CompletableFuture<JobResult> requestJobResult(JobID jobId) { }
// Job control operations
public CompletableFuture<Acknowledge> cancel(JobID jobId) { }
public CompletableFuture<String> cancelWithSavepoint(JobID jobId, String savepointDirectory) { }
public CompletableFuture<String> stopWithSavepoint(JobID jobId,
boolean advanceToEndOfEventTime,
String savepointDirectory) { }
// Savepoint operations
public CompletableFuture<String> triggerSavepoint(JobID jobId, String savepointDirectory) { }
public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) { }
// Metrics and coordination
public CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader) { }
public CompletableFuture<CoordinationResponse> sendCoordinationRequest(JobID jobId,
OperatorID operatorId,
CoordinationRequest request) { }
// Cluster management
public void shutDownCluster() { }
public void close() { }
}Configuration class for REST cluster client settings including timeouts, retries, and SSL parameters.
public class RestClusterClientConfiguration {
// Factory method
public static RestClusterClientConfiguration fromConfiguration(Configuration config) { }
// Connection configuration
public long getConnectionTimeout() { }
public long getIdlenessTimeout() { }
public AwaitingTime getAwaitLeaderTimeout() { }
// Retry configuration
public int getMaxRetryAttempts() { }
public long getRetryDelay() { }
// SSL configuration
public SSLHandlerFactory getSslHandlerFactory() { }
public String[] getTrustStore() { }
public String getTrustStorePassword() { }
public String[] getKeyStore() { }
public String getKeyStorePassword() { }
public String getSSLProtocol() { }
public String[] getSSLAlgorithms() { }
}Interface for defining wait strategies in retry mechanisms.
public interface WaitStrategy {
// Calculate sleep time for a given attempt
long sleepTime(long attempt);
}Exponential backoff implementation of wait strategy for retry operations.
public class ExponentialWaitStrategy implements WaitStrategy {
// Constructor
public ExponentialWaitStrategy(long initialWait, long maxWait) { }
// WaitStrategy implementation
public long sleepTime(long attempt) { }
}The REST client integrates with Flink's configuration system through several key configuration options:
rest.address: REST endpoint hostnamerest.port: REST endpoint portrest.connection-timeout: Connection timeout durationrest.idleness-timeout: Idle connection timeoutsecurity.ssl.rest.enabled: Enable SSL for REST connectionssecurity.ssl.rest.keystore: Path to SSL keystoresecurity.ssl.rest.keystore-password: Keystore passwordsecurity.ssl.rest.truststore: Path to SSL truststoresecurity.ssl.rest.truststore-password: Truststore passwordrest.retry.max-attempts: Maximum retry attemptsrest.retry.delay: Base retry delay// Configure REST connection
Configuration config = new Configuration();
config.setString("rest.address", "flink-cluster.example.com");
config.setInteger("rest.port", 8081);
config.setDuration("rest.connection-timeout", Duration.ofSeconds(30));
config.setInteger("rest.retry.max-attempts", 3);
// Create REST client configuration
RestClusterClientConfiguration restConfig =
RestClusterClientConfiguration.fromConfiguration(config);
// Create cluster ID (implementation-specific)
StandaloneClusterId clusterId = new StandaloneClusterId();
// Create REST cluster client
RestClusterClient<StandaloneClusterId> client =
new RestClusterClient<>(config, restConfig, clusterId);
try {
// Use the client for operations
String webUrl = client.getWebInterfaceURL();
System.out.println("Cluster web interface: " + webUrl);
// List running jobs
CompletableFuture<Collection<JobStatusMessage>> jobsFuture = client.listJobs();
Collection<JobStatusMessage> jobs = jobsFuture.get();
for (JobStatusMessage job : jobs) {
System.out.println("Job: " + job.getJobName() + " - " + job.getJobState());
}
} finally {
client.close();
}// Create custom exponential backoff strategy
WaitStrategy waitStrategy = new ExponentialWaitStrategy(
1000, // initial wait: 1 second
30000 // max wait: 30 seconds
);
// Configuration with retry settings
Configuration config = new Configuration();
config.setString("rest.address", "localhost");
config.setInteger("rest.port", 8081);
RestClusterClientConfiguration restConfig =
RestClusterClientConfiguration.fromConfiguration(config);
// Create client with custom wait strategy
RestClusterClient<StandaloneClusterId> client =
new RestClusterClient<>(config, restConfig, new StandaloneClusterId(), waitStrategy);
try {
// Submit job with retry support
JobGraph jobGraph = /* create job graph */;
CompletableFuture<JobID> submitFuture = client.submitJob(jobGraph);
JobID jobId = submitFuture.get();
// Monitor job with automatic retries
CompletableFuture<JobStatus> statusFuture = client.getJobStatus(jobId);
JobStatus status = statusFuture.get();
} finally {
client.close();
}// Configure SSL settings
Configuration config = new Configuration();
config.setString("rest.address", "secure-flink.example.com");
config.setInteger("rest.port", 8443);
config.setBoolean("security.ssl.rest.enabled", true);
config.setString("security.ssl.rest.keystore", "/path/to/keystore.jks");
config.setString("security.ssl.rest.keystore-password", "keystorePassword");
config.setString("security.ssl.rest.truststore", "/path/to/truststore.jks");
config.setString("security.ssl.rest.truststore-password", "truststorePassword");
// Create SSL-enabled REST client
RestClusterClientConfiguration restConfig =
RestClusterClientConfiguration.fromConfiguration(config);
RestClusterClient<StandaloneClusterId> client =
new RestClusterClient<>(config, restConfig, new StandaloneClusterId());
try {
// All operations now use HTTPS
CompletableFuture<Collection<JobStatusMessage>> jobsFuture = client.listJobs();
// ... handle response
} finally {
client.close();
}// REST client setup
RestClusterClient<StandaloneClusterId> client = /* create client */;
try {
// Submit job
JobGraph jobGraph = /* your job graph */;
CompletableFuture<JobID> submitFuture = client.submitJob(jobGraph);
JobID jobId = submitFuture.get();
// Wait for job to start
CompletableFuture<JobStatus> statusFuture = client.getJobStatus(jobId);
JobStatus status = statusFuture.get();
while (status != JobStatus.RUNNING && status != JobStatus.FINISHED && status != JobStatus.FAILED) {
Thread.sleep(1000);
statusFuture = client.getJobStatus(jobId);
status = statusFuture.get();
}
if (status == JobStatus.RUNNING) {
// Trigger savepoint
CompletableFuture<String> savepointFuture =
client.triggerSavepoint(jobId, "hdfs://namenode:9000/savepoints");
String savepointPath = savepointFuture.get();
System.out.println("Savepoint created: " + savepointPath);
// Stop job with savepoint
CompletableFuture<String> stopFuture =
client.stopWithSavepoint(jobId, false, "hdfs://namenode:9000/final-savepoint");
String finalSavepointPath = stopFuture.get();
System.out.println("Job stopped with savepoint: " + finalSavepointPath);
}
// Get final job result
CompletableFuture<JobResult> resultFuture = client.requestJobResult(jobId);
JobResult result = resultFuture.get();
System.out.println("Job finished with state: " + result.getJobExecutionResult().getJobExecutionState());
} finally {
client.close();
}// Send coordination request to operator
RestClusterClient<StandaloneClusterId> client = /* create client */;
try {
JobID jobId = /* your job ID */;
OperatorID operatorId = /* target operator ID */;
// Create custom coordination request
CoordinationRequest request = new CoordinationRequest() {
// Implement coordination request logic
};
// Send request to operator
CompletableFuture<CoordinationResponse> responseFuture =
client.sendCoordinationRequest(jobId, operatorId, request);
CoordinationResponse response = responseFuture.get();
// Handle coordination response
} finally {
client.close();
}// Configure timeout and retry settings
Configuration config = new Configuration();
config.setString("rest.address", "localhost");
config.setInteger("rest.port", 8081);
config.setDuration("rest.connection-timeout", Duration.ofSeconds(10));
config.setDuration("rest.idleness-timeout", Duration.ofMinutes(5));
config.setInteger("rest.retry.max-attempts", 5);
config.setDuration("rest.retry.delay", Duration.ofSeconds(2));
RestClusterClientConfiguration restConfig =
RestClusterClientConfiguration.fromConfiguration(config);
RestClusterClient<StandaloneClusterId> client =
new RestClusterClient<>(config, restConfig, new StandaloneClusterId());
try {
// Operation with timeout handling
CompletableFuture<Collection<JobStatusMessage>> jobsFuture = client.listJobs();
Collection<JobStatusMessage> jobs = jobsFuture
.orTimeout(30, TimeUnit.SECONDS)
.exceptionally(throwable -> {
System.err.println("Failed to list jobs: " + throwable.getMessage());
return Collections.emptyList();
})
.get();
} catch (Exception e) {
System.err.println("REST operation failed: " + e.getMessage());
} finally {
client.close();
}import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.client.program.rest.RestClusterClientConfiguration;
import org.apache.flink.client.program.rest.retry.WaitStrategy;
import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.security.contexts.SecurityContext;
import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverview;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.time.Duration;// Basic connection settings
config.setString("rest.address", "localhost");
config.setInteger("rest.port", 8081);
config.setDuration("rest.connection-timeout", Duration.ofSeconds(30));
config.setDuration("rest.idleness-timeout", Duration.ofMinutes(5));
// Retry configuration
config.setInteger("rest.retry.max-attempts", 3);
config.setDuration("rest.retry.delay", Duration.ofSeconds(1));
// SSL configuration (when needed)
config.setBoolean("security.ssl.rest.enabled", true);
config.setString("security.ssl.rest.keystore", "/path/to/keystore.jks");
config.setString("security.ssl.rest.keystore-password", "password");
config.setString("security.ssl.rest.truststore", "/path/to/truststore.jks");
config.setString("security.ssl.rest.truststore-password", "password");The REST Client Communication module provides robust, production-ready HTTP/HTTPS communication with Flink clusters, supporting advanced features like automatic retries, SSL encryption, and comprehensive timeout handling for reliable distributed system interactions.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-clients-2-11