Flink Client APIs and utilities for submitting and managing Apache Flink jobs
npx @tessl/cli install tessl/maven-org-apache-flink--flink-clients_2-12@1.14.0Apache Flink Clients provides essential client-side APIs and utilities for interacting with Apache Flink clusters, enabling developers to submit, monitor, and manage stream processing and batch processing jobs programmatically. This library includes comprehensive client utilities for deploying packaged programs, managing job lifecycles, translating execution plans, and handling cluster communication through various deployment targets including YARN, Kubernetes, and standalone clusters.
pom.xml:<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.14.6</version>
</dependency>import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobGraph;import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.api.common.JobID;
// Create a packaged program from JAR
PackagedProgram program = PackagedProgram.newBuilder()
.setJarFile(new File("my-flink-job.jar"))
.setEntryPointClassName("com.example.MyFlinkJob")
.setArguments(new String[]{"arg1", "arg2"})
.build();
// Connect to a standalone cluster
Configuration config = new Configuration();
StandaloneClusterDescriptor clusterDescriptor =
new StandaloneClusterDescriptor(config);
StandaloneClusterId clusterId = new StandaloneClusterId();
try (ClusterClient<StandaloneClusterId> client =
clusterDescriptor.retrieve(clusterId).getClusterClient()) {
// Submit job and monitor
JobID jobId = client.submitJob(program.getJobGraph()).get();
System.out.println("Job submitted with ID: " + jobId);
// Wait for completion
client.requestJobResult(jobId).get();
}Apache Flink Clients is built around several key components:
ClusterClient and ClusterDescriptor interfaces provide abstractions for different deployment targets (standalone, YARN, Kubernetes)PackagedProgram encapsulates user JAR files with their classpaths, main classes, and execution parametersJobGraph instancesEssential client utilities and translators for Flink pipelines, including program execution and pipeline translation functionality.
public enum ClientUtils {
;
public static URLClassLoader buildUserCodeClassLoader(
List<URL> jars,
List<URL> classpaths,
ClassLoader parent,
Configuration configuration);
public static void executeProgram(
PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
PackagedProgram program,
boolean enforceSingleJobExecution,
boolean suppressSysout);
public static void waitUntilJobInitializationFinished(
SupplierWithException<JobStatus, Exception> jobStatusSupplier,
SupplierWithException<JobResult, Exception> jobResultSupplier,
ClassLoader userCodeClassloader) throws JobInitializationException;
}
public interface FlinkPipelineTranslator {
JobGraph translateToJobGraph(
Pipeline pipeline,
Configuration optimizerConfiguration,
int defaultParallelism);
}Cluster deployment and management functionality for various deployment targets including standalone, containerized, and cloud environments.
public interface ClusterClient<T> extends AutoCloseable {
T getClusterId();
Configuration getFlinkConfiguration();
CompletableFuture<JobID> submitJob(JobGraph jobGraph);
CompletableFuture<JobStatus> getJobStatus(JobID jobId);
CompletableFuture<JobResult> requestJobResult(JobID jobId);
CompletableFuture<Acknowledge> cancel(JobID jobId);
CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory);
}
public interface ClusterDescriptor<T> extends AutoCloseable {
ClusterClientProvider<T> retrieve(T clusterId) throws ClusterRetrieveException;
ClusterClientProvider<T> deploySessionCluster(ClusterSpecification clusterSpecification);
ClusterClientProvider<T> deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached);
}Program packaging, classloader management, and execution utilities for submitting user applications to Flink clusters.
public class PackagedProgram implements AutoCloseable {
public static Builder newBuilder();
public String[] getArguments();
public String getMainClassName();
public List<URL> getClasspaths();
public ClassLoader getUserCodeClassLoader();
public List<URL> getJobJarAndDependencies();
public void invokeInteractiveModeForExecution() throws ProgramInvocationException;
public static class Builder {
public Builder setJarFile(File jarFile);
public Builder setEntryPointClassName(String entryPointClassName);
public Builder setArguments(String... args);
public Builder setUserClassPaths(List<URL> userClassPaths);
public Builder setConfiguration(Configuration configuration);
public PackagedProgram build() throws ProgramInvocationException;
}
}Application-specific deployment classes for running Flink applications in application mode with full lifecycle management.
public class ApplicationConfiguration {
public String[] getProgramArguments();
public String getApplicationClassName();
public List<String> getApplicationClasspaths();
public static class ApplicationConfigurationBuilder {
public ApplicationConfigurationBuilder setApplicationClassName(String applicationClassName);
public ApplicationConfigurationBuilder setProgramArguments(String[] programArguments);
public ApplicationConfiguration build();
}
}
public interface ApplicationRunner {
CompletableFuture<Void> run(
DispatcherGateway dispatcherGateway,
PackagedProgram program,
Configuration configuration);
}Command line interface classes for Flink client operations including job submission, monitoring, and cluster management through CLI commands.
public class CliFrontend {
public CliFrontend(Configuration configuration, List<CustomCommandLine> customCommandLines);
public static void main(String[] args);
public int run(String[] args);
public int list(String[] args);
public int cancel(String[] args);
public int stop(String[] args);
public int savepoint(String[] args);
}
public interface CustomCommandLine {
boolean isActive(CommandLine commandLine);
String getId();
void addRunOptions(Options options);
Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine);
}REST client implementations for communicating with Flink clusters through HTTP APIs, including retry strategies and configuration management.
public class RestClusterClient<T> implements ClusterClient<T> {
public RestClusterClient(Configuration config, T clusterId);
public RestClusterClient(
Configuration config,
RestClusterClientConfiguration clientConfiguration,
T clusterId,
RetryStrategy retryStrategy);
// Implements all ClusterClient methods with REST-specific implementations
}
public class RestClusterClientConfiguration {
public RestClientConfiguration getRestClientConfiguration();
public long getAwaitLeaderTimeout();
public int getRetryMaxAttempts();
public long getRetryDelay();
}public class Configuration {
// Core Flink configuration class
public <T> T get(ConfigOption<T> option);
public <T> void set(ConfigOption<T> option, T value);
}
public class JobGraph {
// Represents executable job graph
public JobID getJobID();
public String getName();
}
public class JobID {
// Unique job identifier
public static JobID generate();
public static JobID fromHexString(String hexString);
}
public enum JobStatus {
INITIALIZING, CREATED, RUNNING, FAILING, FAILED,
CANCELLING, CANCELED, FINISHED, RESTARTING,
SUSPENDED, RECONCILING
}
public class JobResult {
public JobID getJobId();
public JobStatus getJobExecutionResult();
public Optional<Throwable> getSerializedThrowable();
}
public class ClusterSpecification {
public int getMasterMemoryMB();
public int getTaskManagerMemoryMB();
public int getSlotsPerTaskManager();
public static class ClusterSpecificationBuilder {
public ClusterSpecificationBuilder setMasterMemoryMB(int masterMemoryMB);
public ClusterSpecificationBuilder setTaskManagerMemoryMB(int taskManagerMemoryMB);
public ClusterSpecificationBuilder setSlotsPerTaskManager(int slotsPerTaskManager);
public ClusterSpecification createClusterSpecification();
}
}
// Exception Types
public class ProgramInvocationException extends Exception {
public ProgramInvocationException(String message);
public ProgramInvocationException(String message, JobID jobID);
public ProgramInvocationException(String message, Throwable cause);
}
public class ClusterDeploymentException extends FlinkException {
public ClusterDeploymentException(String message);
public ClusterDeploymentException(String message, Throwable cause);
}
public class ClusterRetrieveException extends FlinkException {
public ClusterRetrieveException(String message);
public ClusterRetrieveException(String message, Throwable cause);
}
public class JobInitializationException extends Exception {
public JobInitializationException(String message);
public JobInitializationException(String message, Throwable cause);
}