Apache Flink client library providing programmatic APIs and command-line interfaces for submitting, managing, and monitoring Flink jobs.
—
Cluster deployment abstraction supporting multiple deployment targets including standalone, YARN, Kubernetes, and other containerized environments with pluggable factory pattern and resource specification.
Core interface for cluster descriptors that manage cluster lifecycle including deployment, retrieval, and termination.
/**
* Interface for cluster descriptors that manage cluster lifecycle
* @param <T> Type of cluster identifier
*/
public interface ClusterDescriptor<T> extends AutoCloseable {
/**
* Retrieve an existing cluster by ID
* @param clusterId Cluster identifier
* @return Cluster client provider for the existing cluster
* @throws ClusterRetrieveException if cluster cannot be retrieved
*/
ClusterClientProvider<T> retrieve(T clusterId) throws ClusterRetrieveException;
/**
* Deploy a new session cluster
* @param clusterSpecification Resource specification for the cluster
* @return Cluster client provider for the new cluster
* @throws ClusterDeploymentException if deployment fails
*/
ClusterClientProvider<T> deploySessionCluster(ClusterSpecification clusterSpecification)
throws ClusterDeploymentException;
/**
* Deploy an application cluster (application mode)
* @param clusterSpecification Resource specification for the cluster
* @param applicationConfiguration Application configuration
* @return Cluster client provider for the application cluster
* @throws ClusterDeploymentException if deployment fails
*/
default ClusterClientProvider<T> deployApplicationCluster(
ClusterSpecification clusterSpecification,
ApplicationConfiguration applicationConfiguration
) throws ClusterDeploymentException {
throw new UnsupportedOperationException(
"Application mode is not supported by this cluster descriptor."
);
}
/**
* Terminate the given cluster
* @param clusterId Cluster identifier
* @throws FlinkException if termination fails
*/
void terminateCluster(T clusterId) throws FlinkException;
/**
* Close the cluster descriptor and release resources
*/
@Override
void close() throws Exception;
}Usage Examples:
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
import org.apache.flink.client.deployment.StandaloneClusterId;
// Create cluster specification
ClusterSpecification spec = new ClusterSpecification.Builder()
.setMasterMemoryMB(1024)
.setTaskManagerMemoryMB(2048)
.setNumberTaskManagers(2)
.createClusterSpecification();
// Deploy session cluster
try (ClusterDescriptor<StandaloneClusterId> descriptor =
new StandaloneClusterDescriptor(config, highAvailabilityServices, rpcService)) {
ClusterClientProvider<StandaloneClusterId> provider =
descriptor.deploySessionCluster(spec);
try (ClusterClient<StandaloneClusterId> client = provider.getClusterClient()) {
System.out.println("Cluster deployed: " + client.getClusterId());
System.out.println("Web UI: " + client.getWebInterfaceURL());
// Use cluster...
} finally {
provider.close();
}
}Factory interface for creating cluster clients with automatic deployment target detection based on configuration.
/**
* Factory for creating cluster clients
* @param <T> Type of cluster identifier
*/
public interface ClusterClientFactory<T> {
/**
* Check if this factory is compatible with the given configuration
* @param configuration Flink configuration
* @return true if compatible
*/
boolean isCompatibleWith(Configuration configuration);
/**
* Create cluster descriptor from configuration
* @param configuration Flink configuration
* @return Cluster descriptor instance
*/
ClusterDescriptor<T> createClusterDescriptor(Configuration configuration);
/**
* Get cluster ID from configuration
* @param configuration Flink configuration
* @return Cluster identifier or null if not specified
*/
@Nullable
T getClusterId(Configuration configuration);
/**
* Get cluster specification from configuration
* @param configuration Flink configuration
* @return Cluster specification
*/
ClusterSpecification getClusterSpecification(Configuration configuration);
}Service loader for discovering and loading cluster client factories dynamically.
/**
* Service loader for cluster client factories
*/
public interface ClusterClientServiceLoader {
/**
* Get cluster client factory compatible with configuration
* @param configuration Flink configuration
* @return Compatible cluster client factory
* @throws UnsupportedOperationException if no compatible factory found
*/
<T> ClusterClientFactory<T> getClusterClientFactory(Configuration configuration);
}
/**
* Default implementation of cluster client service loader
*/
public class DefaultClusterClientServiceLoader implements ClusterClientServiceLoader {
/**
* Create service loader instance
*/
public DefaultClusterClientServiceLoader();
@Override
public <T> ClusterClientFactory<T> getClusterClientFactory(Configuration configuration);
}Implementations for standalone cluster deployment and management.
/**
* Cluster descriptor for standalone clusters
*/
public class StandaloneClusterDescriptor implements ClusterDescriptor<StandaloneClusterId> {
/**
* Create standalone cluster descriptor
* @param flinkConfig Flink configuration
* @param haServices High availability services
* @param rpcService RPC service
*/
public StandaloneClusterDescriptor(
Configuration flinkConfig,
HighAvailabilityServices haServices,
RpcService rpcService
);
@Override
public ClusterClientProvider<StandaloneClusterId> retrieve(StandaloneClusterId clusterId)
throws ClusterRetrieveException;
@Override
public ClusterClientProvider<StandaloneClusterId> deploySessionCluster(
ClusterSpecification clusterSpecification
) throws ClusterDeploymentException;
}
/**
* Factory for standalone cluster clients
*/
public class StandaloneClientFactory implements ClusterClientFactory<StandaloneClusterId> {
@Override
public boolean isCompatibleWith(Configuration configuration);
@Override
public ClusterDescriptor<StandaloneClusterId> createClusterDescriptor(
Configuration configuration
);
@Override
public StandaloneClusterId getClusterId(Configuration configuration);
}
/**
* Cluster ID for standalone clusters
*/
public class StandaloneClusterId {
/**
* Create standalone cluster ID
*/
public StandaloneClusterId();
@Override
public String toString();
@Override
public boolean equals(Object obj);
@Override
public int hashCode();
}Base class for containerized cluster client factories providing common functionality for Docker, Kubernetes, and other container-based deployments.
/**
* Base factory for containerized cluster clients
* @param <ClusterID> Type of cluster identifier
* @param <ApplicationClusterID> Type of application cluster identifier
*/
public abstract class AbstractContainerizedClusterClientFactory<
ClusterID, ApplicationClusterID> implements ClusterClientFactory<ClusterID> {
/**
* Get deployment target for this factory
* @return Deployment target string
*/
protected abstract String getDeploymentTargetName();
/**
* Check if cluster ID is compatible
* @param clusterId Cluster ID to check
* @return true if compatible
*/
protected abstract boolean isCompatibleWith(ClusterID clusterId);
@Override
public boolean isCompatibleWith(Configuration configuration);
@Override
public ClusterSpecification getClusterSpecification(Configuration configuration);
}Resource specification for cluster deployment including memory, CPU, and scaling parameters.
/**
* Specification for cluster resource requirements
*/
public class ClusterSpecification {
/**
* Get master memory in MB
* @return Master memory size
*/
public int getMasterMemoryMB();
/**
* Get task manager memory in MB
* @return Task manager memory size
*/
public int getTaskManagerMemoryMB();
/**
* Get number of task managers
* @return Number of task managers
*/
public int getNumberTaskManagers();
/**
* Get slots per task manager
* @return Number of slots per task manager
*/
public int getSlotsPerTaskManager();
/**
* Builder for creating cluster specifications
*/
public static class Builder {
/**
* Set master memory size
* @param masterMemoryMB Memory in MB
* @return Builder instance
*/
public Builder setMasterMemoryMB(int masterMemoryMB);
/**
* Set task manager memory size
* @param taskManagerMemoryMB Memory in MB
* @return Builder instance
*/
public Builder setTaskManagerMemoryMB(int taskManagerMemoryMB);
/**
* Set number of task managers
* @param numberTaskManagers Number of task managers
* @return Builder instance
*/
public Builder setNumberTaskManagers(int numberTaskManagers);
/**
* Set slots per task manager
* @param slotsPerTaskManager Number of slots
* @return Builder instance
*/
public Builder setSlotsPerTaskManager(int slotsPerTaskManager);
/**
* Create cluster specification
* @return Configured cluster specification
*/
public ClusterSpecification createClusterSpecification();
}
}/**
* Provider for cluster clients with resource management
* @param <T> Type of cluster identifier
*/
public interface ClusterClientProvider<T> extends AutoCloseable {
/**
* Get cluster client instance
* @return Cluster client
*/
ClusterClient<T> getClusterClient();
/**
* Get cluster identifier
* @return Cluster ID
*/
T getClusterId();
/**
* Check if cluster is in per-job mode
* @return true if per-job mode
*/
default boolean isPerJobMode() {
return false;
}
@Override
void close() throws Exception;
}Deployment operations handle various error conditions:
ClusterDeploymentException for failed cluster deploymentsClusterRetrieveException for failed cluster retrievalError Handling Examples:
try {
ClusterClientProvider<StandaloneClusterId> provider =
descriptor.deploySessionCluster(spec);
// Use cluster...
} catch (ClusterDeploymentException e) {
System.err.println("Failed to deploy cluster: " + e.getMessage());
// Handle deployment failure
} catch (ClusterRetrieveException e) {
System.err.println("Failed to retrieve cluster: " + e.getMessage());
// Handle retrieval failure
} finally {
descriptor.close();
}The deployment management system provides a pluggable architecture that allows Flink to support multiple deployment targets through a consistent interface, enabling seamless switching between different cluster types based on configuration.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-clients