Cluster deployment and management functionality for various deployment targets including standalone, containerized, and cloud environments. Provides abstractions for deploying, retrieving, and managing Flink clusters.
Main interface for interacting with Flink clusters, providing job management and cluster operations.
/**
* Main interface for interacting with Flink clusters
* @param <T> Type of cluster identifier
*/
public interface ClusterClient<T> extends AutoCloseable {
/**
* Returns the cluster identifier
* @return Cluster ID of generic type T
*/
T getClusterId();
/**
* Returns the Flink configuration for this cluster
* @return Configuration instance
*/
Configuration getFlinkConfiguration();
/**
* Shuts down the cluster
*/
void shutDownCluster();
/**
* Returns the web interface URL for the cluster
* @return URL string for web interface, may be null
*/
String getWebInterfaceURL();
/**
* Lists all jobs running on the cluster
* @return CompletableFuture containing collection of job status messages
* @throws Exception if listing fails
*/
CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception;
/**
* Submits a job to the cluster
* @param jobGraph JobGraph to submit
* @return CompletableFuture containing assigned JobID
*/
CompletableFuture<JobID> submitJob(JobGraph jobGraph);
/**
* Gets the status of a specific job
* @param jobId ID of the job to check
* @return CompletableFuture containing job status
*/
CompletableFuture<JobStatus> getJobStatus(JobID jobId);
/**
* Requests the result of a job execution
* @param jobId ID of the job
* @return CompletableFuture containing job result
*/
CompletableFuture<JobResult> requestJobResult(JobID jobId);
/**
* Gets job accumulators with default classloader
* @param jobID ID of the job
* @return CompletableFuture containing accumulator map
*/
CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID);
/**
* Gets job accumulators with specific classloader
* @param jobID ID of the job
* @param loader ClassLoader for deserialization
* @return CompletableFuture containing accumulator map
*/
CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader);
/**
* Cancels a running job
* @param jobId ID of the job to cancel
* @return CompletableFuture containing acknowledgment
*/
CompletableFuture<Acknowledge> cancel(JobID jobId);
/**
* Cancels a job with savepoint
* @param jobId ID of the job to cancel
* @param savepointDirectory Directory to store the savepoint (nullable - uses default if null)
* @return CompletableFuture containing savepoint path
*/
CompletableFuture<String> cancelWithSavepoint(JobID jobId, @Nullable String savepointDirectory);
/**
* Stops a job with savepoint
* @param jobId ID of the job to stop
* @param advanceToEndOfEventTime Whether to advance to end of event time
* @param savepointDirectory Directory to store the savepoint (nullable - uses default if null)
* @return CompletableFuture containing savepoint path
*/
CompletableFuture<String> stopWithSavepoint(
JobID jobId,
boolean advanceToEndOfEventTime,
@Nullable String savepointDirectory);
/**
* Triggers a savepoint for a running job
* @param jobId ID of the job
* @param savepointDirectory Directory to store the savepoint (nullable - uses default if null)
* @return CompletableFuture containing savepoint path
*/
CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory);
/**
* Disposes a savepoint
* @param savepointPath Path to the savepoint to dispose
* @return CompletableFuture containing acknowledgment
* @throws FlinkException if disposal fails
*/
CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) throws FlinkException;
/**
* Sends a coordination request to an operator
* @param jobId ID of the job containing the operator
* @param operatorId ID of the operator
* @param request Coordination request to send
* @return CompletableFuture containing coordination response
*/
CompletableFuture<CoordinationResponse> sendCoordinationRequest(
JobID jobId,
OperatorID operatorId,
CoordinationRequest request);
}Usage Example:
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
import org.apache.flink.client.deployment.StandaloneClusterId;
// Connect to standalone cluster
StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config);
try (ClusterClient<StandaloneClusterId> client =
descriptor.retrieve(new StandaloneClusterId()).getClusterClient()) {
// Submit job
JobID jobId = client.submitJob(jobGraph).get();
// Monitor job status
JobStatus status = client.getJobStatus(jobId).get();
System.out.println("Job status: " + status);
// Trigger savepoint
String savepointPath = client.triggerSavepoint(jobId, "/path/to/savepoints").get();
System.out.println("Savepoint created at: " + savepointPath);
}Interface for cluster deployment and management operations.
/**
* Descriptor for cluster deployment and management
* @param <T> Type of cluster identifier
*/
public interface ClusterDescriptor<T> extends AutoCloseable {
/**
* Returns a description of the cluster
* @return String description of cluster capabilities
*/
String getClusterDescription();
/**
* Retrieves an existing cluster
* @param clusterId Identifier of the cluster to retrieve
* @return ClusterClientProvider for the cluster
* @throws ClusterRetrieveException if cluster cannot be retrieved
*/
ClusterClientProvider<T> retrieve(T clusterId) throws ClusterRetrieveException;
/**
* Deploys a new session cluster
* @param clusterSpecification Resource specification for the cluster
* @return ClusterClientProvider for the deployed cluster
* @throws ClusterDeploymentException if deployment fails
*/
ClusterClientProvider<T> deploySessionCluster(ClusterSpecification clusterSpecification)
throws ClusterDeploymentException;
/**
* Deploys an application cluster
* @param clusterSpecification Resource specification for the cluster
* @param applicationConfiguration Application-specific configuration
* @return ClusterClientProvider for the deployed cluster
* @throws ClusterDeploymentException if deployment fails
*/
ClusterClientProvider<T> deployApplicationCluster(
ClusterSpecification clusterSpecification,
ApplicationConfiguration applicationConfiguration)
throws ClusterDeploymentException;
/**
* Deploys a job-specific cluster
* @param clusterSpecification Resource specification for the cluster
* @param jobGraph Job graph to deploy
* @param detached Whether to run in detached mode
* @return ClusterClientProvider for the deployed cluster
* @throws ClusterDeploymentException if deployment fails
*/
ClusterClientProvider<T> deployJobCluster(
ClusterSpecification clusterSpecification,
JobGraph jobGraph,
boolean detached) throws ClusterDeploymentException;
/**
* Kills an existing cluster
* @param clusterId Identifier of the cluster to kill
* @throws FlinkException if killing fails
*/
void killCluster(T clusterId) throws FlinkException;
}Factory interface for creating cluster clients and descriptors.
/**
* Factory for creating cluster clients and descriptors
* @param <ClusterID> Type of cluster identifier
*/
public interface ClusterClientFactory<ClusterID> {
/**
* Checks if this factory is compatible with the given configuration
* @param configuration Flink configuration to check
* @return true if compatible, false otherwise
*/
boolean isCompatibleWith(Configuration configuration);
/**
* Creates a cluster descriptor for the given configuration
* @param configuration Flink configuration
* @return ClusterDescriptor instance
*/
ClusterDescriptor<ClusterID> createClusterDescriptor(Configuration configuration);
/**
* Extracts cluster ID from configuration
* @param configuration Flink configuration
* @return Cluster ID instance
*/
ClusterID getClusterId(Configuration configuration);
/**
* Creates cluster specification from configuration
* @param configuration Flink configuration
* @return ClusterSpecification instance
*/
ClusterSpecification getClusterSpecification(Configuration configuration);
}Provider interface for accessing cluster clients.
/**
* Provider for cluster clients
* @param <T> Type of cluster identifier
*/
public interface ClusterClientProvider<T> {
/**
* Gets the cluster client instance
* @return ClusterClient instance for this cluster
*/
ClusterClient<T> getClusterClient();
/**
* Closes the provider and releases resources
*/
void close();
}Specification for cluster resources and configuration.
/**
* Specification for cluster resources
*/
public class ClusterSpecification {
/**
* Gets the master/JobManager memory in MB
* @return Memory in MB
*/
public int getMasterMemoryMB();
/**
* Gets the TaskManager memory in MB
* @return Memory in MB
*/
public int getTaskManagerMemoryMB();
/**
* Gets the number of slots per TaskManager
* @return Number of slots
*/
public int getSlotsPerTaskManager();
/**
* Builder for creating ClusterSpecification instances
*/
public static class ClusterSpecificationBuilder {
/**
* Sets master memory in MB
* @param masterMemoryMB Memory in MB
* @return This builder instance
*/
public ClusterSpecificationBuilder setMasterMemoryMB(int masterMemoryMB);
/**
* Sets TaskManager memory in MB
* @param taskManagerMemoryMB Memory in MB
* @return This builder instance
*/
public ClusterSpecificationBuilder setTaskManagerMemoryMB(int taskManagerMemoryMB);
/**
* Sets slots per TaskManager
* @param slotsPerTaskManager Number of slots
* @return This builder instance
*/
public ClusterSpecificationBuilder setSlotsPerTaskManager(int slotsPerTaskManager);
/**
* Creates the ClusterSpecification
* @return ClusterSpecification instance
*/
public ClusterSpecification createClusterSpecification();
}
}Usage Example:
import org.apache.flink.client.deployment.ClusterSpecification;
// Create cluster specification
ClusterSpecification spec = new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(1024)
.setTaskManagerMemoryMB(2048)
.setSlotsPerTaskManager(4)
.createClusterSpecification();
// Deploy session cluster
ClusterDescriptor<StandaloneClusterId> descriptor =
new StandaloneClusterDescriptor(config);
ClusterClientProvider<StandaloneClusterId> provider =
descriptor.deploySessionCluster(spec);Implementations for standalone cluster deployment and management.
/**
* Factory for standalone cluster clients
*/
public class StandaloneClientFactory implements ClusterClientFactory<StandaloneClusterId> {
@Override
public boolean isCompatibleWith(Configuration configuration);
@Override
public ClusterDescriptor<StandaloneClusterId> createClusterDescriptor(Configuration configuration);
@Override
public StandaloneClusterId getClusterId(Configuration configuration);
@Override
public ClusterSpecification getClusterSpecification(Configuration configuration);
}
/**
* Descriptor for standalone clusters
*/
public class StandaloneClusterDescriptor implements ClusterDescriptor<StandaloneClusterId> {
public StandaloneClusterDescriptor(Configuration flinkConfiguration);
// Implements all ClusterDescriptor methods
}
/**
* Identifier for standalone clusters
*/
public class StandaloneClusterId {
public StandaloneClusterId();
}Client implementation for mini clusters used in testing and local development.
/**
* Client for mini clusters (testing/local development)
*/
public class MiniClusterClient implements ClusterClient<MiniClusterClient.MiniClusterId> {
/**
* Identifier for mini clusters
*/
public static class MiniClusterId {
// Mini cluster identifier implementation
}
// Implements all ClusterClient methods for mini cluster
}public interface ClusterClientServiceLoader {
Stream<ClusterClientFactory<ClusterID>> getClusterClientFactories();
}
public class DefaultClusterClientServiceLoader implements ClusterClientServiceLoader {
@Override
public Stream<ClusterClientFactory<ClusterID>> getClusterClientFactories();
}
public class JobStatusMessage {
public JobID getJobId();
public String getJobName();
public JobStatus getJobStatus();
public long getStartTime();
}
public interface Acknowledge {
// Acknowledgment marker interface
}
public interface CoordinationRequest {
// Base interface for coordination requests
}
public interface CoordinationResponse {
// Base interface for coordination responses
}
public class OperatorID {
public static OperatorID generate();
public static OperatorID fromHexString(String hexString);
}
// Exception Types
public class ClusterDeploymentException extends FlinkException {
public ClusterDeploymentException(String message);
public ClusterDeploymentException(String message, Throwable cause);
}
public class ClusterRetrieveException extends FlinkException {
public ClusterRetrieveException(String message);
public ClusterRetrieveException(String message, Throwable cause);
}
public abstract class AbstractContainerizedClusterClientFactory<ClusterID>
implements ClusterClientFactory<ClusterID> {
// Base class for containerized cluster client factories
}
public class ClusterClientJobClientAdapter<ClusterID> implements JobClient {
public ClusterClientJobClientAdapter(
ClusterClientProvider<ClusterID> clusterClientProvider,
JobID jobId,
ClassLoader userCodeClassloader);
// Implements JobClient interface by adapting ClusterClient calls
}