Apache Flink Client APIs and utilities for submitting and interacting with Flink jobs
—
The Apache Flink Cluster Management module (org.apache.flink.client.deployment.*) provides comprehensive deployment and cluster interaction services supporting multiple deployment targets through pluggable factory patterns. This module handles cluster lifecycle operations including deployment, retrieval, and management across different environments like standalone, YARN, and Kubernetes clusters.
Factory interface for creating cluster-specific clients and descriptors.
public interface ClusterClientFactory<ClusterID> {
// Compatibility and configuration
boolean isCompatibleWith(Configuration configuration);
ClusterID getClusterId(Configuration configuration);
ClusterSpecification getClusterSpecification(Configuration configuration);
// Cluster descriptor creation
ClusterDescriptor<ClusterID> createClusterDescriptor(Configuration configuration);
}Interface for describing and managing cluster operations including deployment and retrieval.
public interface ClusterDescriptor<T> extends AutoCloseable {
// Cluster information
String getClusterDescription();
// Cluster operations
ClusterClientProvider<T> retrieve(T clusterId);
ClusterClientProvider<T> deploySessionCluster(ClusterSpecification clusterSpecification);
ClusterClientProvider<T> deployApplicationCluster(ClusterSpecification clusterSpecification,
ApplicationConfiguration applicationConfiguration);
ClusterClientProvider<T> deployJobCluster(ClusterSpecification clusterSpecification,
JobGraph jobGraph,
boolean detached);
// Cluster management
void killCluster(T clusterId);
void close();
}Service loader interface for discovering and loading cluster client factories.
public interface ClusterClientServiceLoader {
<ClusterID> ClusterClientFactory<ClusterID> getClusterClientFactory(Configuration configuration);
}Default implementation of the cluster client service loader using Java ServiceLoader mechanism.
public class DefaultClusterClientServiceLoader implements ClusterClientServiceLoader {
// Service loading implementation
public <ClusterID> ClusterClientFactory<ClusterID> getClusterClientFactory(Configuration configuration) { }
}Factory implementation for standalone Flink clusters.
public class StandaloneClientFactory implements ClusterClientFactory<StandaloneClusterId> {
// ClusterClientFactory interface implementations
public boolean isCompatibleWith(Configuration configuration) { }
public ClusterDescriptor<StandaloneClusterId> createClusterDescriptor(Configuration configuration) { }
public StandaloneClusterId getClusterId(Configuration configuration) { }
public ClusterSpecification getClusterSpecification(Configuration configuration) { }
}Cluster descriptor implementation for standalone Flink clusters.
public class StandaloneClusterDescriptor implements ClusterDescriptor<StandaloneClusterId> {
// ClusterDescriptor interface implementations
public String getClusterDescription() { }
public ClusterClientProvider<StandaloneClusterId> retrieve(StandaloneClusterId clusterId) { }
public ClusterClientProvider<StandaloneClusterId> deploySessionCluster(ClusterSpecification clusterSpecification) { }
public ClusterClientProvider<StandaloneClusterId> deployApplicationCluster(ClusterSpecification clusterSpecification,
ApplicationConfiguration applicationConfiguration) { }
public ClusterClientProvider<StandaloneClusterId> deployJobCluster(ClusterSpecification clusterSpecification,
JobGraph jobGraph,
boolean detached) { }
public void killCluster(StandaloneClusterId clusterId) { }
public void close() { }
}Abstract base class for containerized cluster client factories (YARN, Kubernetes, etc.).
public abstract class AbstractContainerizedClusterClientFactory<ClusterID> implements ClusterClientFactory<ClusterID> {
// Core factory methods
public boolean isCompatibleWith(Configuration configuration) { }
public ClusterDescriptor<ClusterID> createClusterDescriptor(Configuration configuration) { }
public ClusterID getClusterId(Configuration configuration) { }
public ClusterSpecification getClusterSpecification(Configuration configuration) { }
// Executor factory methods
protected abstract PipelineExecutorFactory getExecutorFactory();
protected abstract ClusterID getClusterIdFromConfiguration(Configuration configuration);
protected abstract ClusterDescriptor<ClusterID> createClusterDescriptor(Configuration configuration, String configurationDirectory);
}Configuration class defining cluster resource specifications.
public class ClusterSpecification {
// Constructor
public ClusterSpecification(int masterMemoryMB, int taskManagerMemoryMB, int slotsPerTaskManager) { }
// Resource access methods
public int getMasterMemoryMB() { }
public int getTaskManagerMemoryMB() { }
public int getSlotsPerTaskManager() { }
// Utility methods
public String toString() { }
}Identifier class for standalone clusters.
public class StandaloneClusterId {
// Cluster identification for standalone deployments
}Adapter that bridges cluster clients to job clients, providing job-specific operations.
public class ClusterClientJobClientAdapter<ClusterID> implements JobClient, CoordinationRequestGateway {
// Constructor
public ClusterClientJobClientAdapter(ClusterClientProvider<ClusterID> clusterClientProvider,
JobID jobId,
ClassLoader userCodeClassloader) { }
// JobClient interface implementations
public JobID getJobId() { }
public CompletableFuture<JobStatus> getJobStatus() { }
public CompletableFuture<Void> cancel() { }
public CompletableFuture<String> stopWithSavepoint(boolean advanceToEndOfEventTime, String savepointDirectory) { }
public CompletableFuture<String> triggerSavepoint(String savepointDirectory) { }
public CompletableFuture<Map<String, Object>> getAccumulators() { }
public CompletableFuture<JobExecutionResult> getJobExecutionResult() { }
// CoordinationRequestGateway interface implementations
public CompletableFuture<CoordinationResponse> sendCoordinationRequest(OperatorID operatorId, CoordinationRequest request) { }
}Abstract base class for job cluster pipeline executors.
public abstract class AbstractJobClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>>
implements PipelineExecutor {
// Common job cluster execution functionality
public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration, ClassLoader userClassloader) { }
}Abstract base class for session cluster pipeline executors.
public abstract class AbstractSessionClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>>
implements PipelineExecutor {
// Common session cluster execution functionality
public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration, ClassLoader userClassloader) { }
}Pipeline executor for local execution environment.
public class LocalExecutor implements PipelineExecutor {
// Constants
public static final String NAME = "local";
// PipelineExecutor interface implementation
public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration, ClassLoader userClassloader) { }
}Pipeline executor for remote cluster execution.
public class RemoteExecutor extends AbstractSessionClusterExecutor<StandaloneClusterId, StandaloneClientFactory> {
// Constants
public static final String NAME = "remote";
// Remote execution implementation
}Factory for creating local pipeline executors.
public class LocalExecutorFactory implements PipelineExecutorFactory {
// PipelineExecutorFactory interface implementation
public String getName() { }
public boolean isCompatibleWith(Configuration configuration) { }
public PipelineExecutor getExecutor(Configuration configuration) { }
}Factory for creating remote pipeline executors.
public class RemoteExecutorFactory implements PipelineExecutorFactory {
// PipelineExecutorFactory interface implementation
public String getName() { }
public boolean isCompatibleWith(Configuration configuration) { }
public PipelineExecutor getExecutor(Configuration configuration) { }
}Utility class for pipeline executor operations.
public class PipelineExecutorUtils {
// Static utility methods
public static CompletableFuture<JobClient> getJobClient(Pipeline pipeline,
Configuration configuration,
PipelineExecutor executor,
ClassLoader userCodeClassLoader) { }
}Exception thrown during cluster deployment operations.
public class ClusterDeploymentException extends FlinkException {
// Constructors
public ClusterDeploymentException(String message) { }
public ClusterDeploymentException(String message, Throwable cause) { }
public ClusterDeploymentException(Throwable cause) { }
}Exception thrown when retrieving or connecting to existing clusters.
public class ClusterRetrieveException extends FlinkException {
// Constructors
public ClusterRetrieveException(String message) { }
public ClusterRetrieveException(String message, Throwable cause) { }
public ClusterRetrieveException(Throwable cause) { }
}// Load cluster client factory
Configuration config = new Configuration();
config.setString("execution.target", "remote");
ClusterClientServiceLoader serviceLoader = new DefaultClusterClientServiceLoader();
ClusterClientFactory<StandaloneClusterId> factory = serviceLoader.getClusterClientFactory(config);
// Create cluster descriptor and retrieve client
try (ClusterDescriptor<StandaloneClusterId> descriptor = factory.createClusterDescriptor(config)) {
StandaloneClusterId clusterId = factory.getClusterId(config);
try (ClusterClient<StandaloneClusterId> client = descriptor.retrieve(clusterId).getClusterClient()) {
// Use cluster client for operations
CompletableFuture<Collection<JobStatusMessage>> jobs = client.listJobs();
}
}// Define cluster specification
ClusterSpecification spec = new ClusterSpecification(1024, 2048, 4);
// Deploy session cluster
Configuration config = new Configuration();
ClusterClientServiceLoader serviceLoader = new DefaultClusterClientServiceLoader();
ClusterClientFactory<StandaloneClusterId> factory = serviceLoader.getClusterClientFactory(config);
try (ClusterDescriptor<StandaloneClusterId> descriptor = factory.createClusterDescriptor(config)) {
ClusterClientProvider<StandaloneClusterId> provider = descriptor.deploySessionCluster(spec);
try (ClusterClient<StandaloneClusterId> client = provider.getClusterClient()) {
// Session cluster is now available for job submission
System.out.println("Cluster deployed at: " + client.getWebInterfaceURL());
}
}// Application configuration
ApplicationConfiguration appConfig = new ApplicationConfiguration(
new String[]{"--input", "/data/input", "--output", "/data/output"},
"com.example.MyFlinkApp"
);
// Cluster specification
ClusterSpecification spec = new ClusterSpecification(1024, 2048, 4);
// Deploy application cluster
try (ClusterDescriptor<StandaloneClusterId> descriptor = factory.createClusterDescriptor(config)) {
ClusterClientProvider<StandaloneClusterId> provider =
descriptor.deployApplicationCluster(spec, appConfig);
try (ClusterClient<StandaloneClusterId> client = provider.getClusterClient()) {
// Application cluster is deployed and running
CompletableFuture<JobResult> result = client.requestJobResult(jobId);
}
}// Get pipeline executor
Configuration config = new Configuration();
config.setString("execution.target", "local");
PipelineExecutorServiceLoader executorLoader =
PipelineExecutorServiceLoader.fromConfiguration(config);
PipelineExecutor executor = executorLoader.getExecutor(config);
// Execute pipeline
Pipeline pipeline = /* your Flink program pipeline */;
CompletableFuture<JobClient> jobClientFuture =
PipelineExecutorUtils.getJobClient(pipeline, config, executor, userClassLoader);
JobClient jobClient = jobClientFuture.get();Configuration class for application cluster deployments containing program arguments and entry point information.
public class ApplicationConfiguration {
// Constructors
public ApplicationConfiguration(String[] programArguments, String entryPointClassName) { }
public ApplicationConfiguration(String[] programArguments,
String entryPointClassName,
SavepointRestoreSettings savepointRestoreSettings) { }
// Property access methods
public String[] getProgramArguments() { }
public String getEntryPointClassName() { }
public SavepointRestoreSettings getSavepointRestoreSettings() { }
// Configuration validation
public void validate() { }
public boolean hasValidEntryPoint() { }
}Entry point class for application cluster mode that manages the cluster lifecycle and application execution.
public class ApplicationClusterEntryPoint extends ClusterEntrypoint {
// Main entry point
public static void main(String[] args) { }
// Cluster initialization methods
protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore(Configuration configuration,
ScheduledExecutor scheduledExecutor) { }
protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) { }
// Application-specific setup
protected void initializeServices(Configuration configuration) { }
protected ApplicationRunner createApplicationRunner() { }
}Interface for running applications within application clusters.
public interface ApplicationRunner {
// Application execution
CompletableFuture<Void> run(DispatcherGateway dispatcherGateway, ScheduledExecutor scheduledExecutor);
// Lifecycle management
void cancel();
boolean isCancelled();
}Implementation of ApplicationRunner for detached application execution.
public class DetachedApplicationRunner implements ApplicationRunner {
// Constructor
public DetachedApplicationRunner(boolean enforceSingleJobExecution,
PackagedProgram packagedProgram,
Configuration configuration) { }
// ApplicationRunner interface implementation
public CompletableFuture<Void> run(DispatcherGateway dispatcherGateway, ScheduledExecutor scheduledExecutor) { }
public void cancel() { }
public boolean isCancelled() { }
// Program execution
private CompletableFuture<Void> runApplicationEntryPoint(DispatcherGateway dispatcherGateway,
ScheduledExecutor scheduledExecutor) { }
}Job client implementation for embedded execution within application clusters.
public class EmbeddedJobClient implements JobClient, CoordinationRequestGateway {
// Constructor
public EmbeddedJobClient(JobID jobId,
DispatcherGateway dispatcherGateway,
ClassLoader userClassloader,
ScheduledExecutor scheduledExecutorService) { }
// JobClient interface implementation
public JobID getJobId() { }
public CompletableFuture<JobStatus> getJobStatus() { }
public CompletableFuture<Void> cancel() { }
public CompletableFuture<String> stopWithSavepoint(boolean advanceToEndOfEventTime, String savepointDirectory) { }
public CompletableFuture<String> triggerSavepoint(String savepointDirectory) { }
public CompletableFuture<Map<String, Object>> getAccumulators() { }
public CompletableFuture<JobExecutionResult> getJobExecutionResult() { }
// CoordinationRequestGateway interface implementation
public CompletableFuture<CoordinationResponse> sendCoordinationRequest(OperatorID operatorId,
CoordinationRequest request) { }
}Job client implementation for web-based job submissions.
public class WebSubmissionJobClient implements JobClient {
// Constructor
public WebSubmissionJobClient(JobID jobId,
String restAddress,
int restPort,
Configuration configuration) { }
// JobClient interface implementation
public JobID getJobId() { }
public CompletableFuture<JobStatus> getJobStatus() { }
public CompletableFuture<Void> cancel() { }
public CompletableFuture<String> stopWithSavepoint(boolean advanceToEndOfEventTime, String savepointDirectory) { }
public CompletableFuture<String> triggerSavepoint(String savepointDirectory) { }
public CompletableFuture<Map<String, Object>> getAccumulators() { }
public CompletableFuture<JobExecutionResult> getJobExecutionResult() { }
// Web-specific operations
public CompletableFuture<String> getWebInterfaceURL() { }
public void close() { }
}import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterClientServiceLoader;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.client.deployment.StandaloneClientFactory;
import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
import org.apache.flink.client.deployment.AbstractContainerizedClusterClientFactory;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.client.deployment.ClusterClientJobClientAdapter;
import org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor;
import org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor;
import org.apache.flink.client.deployment.executors.LocalExecutor;
import org.apache.flink.client.deployment.executors.RemoteExecutor;
import org.apache.flink.client.deployment.executors.LocalExecutorFactory;
import org.apache.flink.client.deployment.executors.RemoteExecutorFactory;
import org.apache.flink.client.deployment.executors.PipelineExecutorUtils;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterRetrieveException;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.deployment.application.ApplicationClusterEntryPoint;
import org.apache.flink.client.deployment.application.ApplicationRunner;
import org.apache.flink.client.deployment.application.DetachedApplicationRunner;
import org.apache.flink.client.deployment.application.EmbeddedJobClient;
import org.apache.flink.client.deployment.application.WebSubmissionJobClient;
import org.apache.flink.runtime.clusterframework.ClusterEntrypoint;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraphStore;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.CoordinationRequestGateway;
import org.apache.flink.runtime.state.SavepointRestoreSettings;
import org.apache.flink.core.execution.PipelineExecutorFactory;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineExecutorFactory;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.FlinkException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutor;
import java.util.Collection;
import java.util.Map;Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-clients-2-11