Apache Flink client library providing APIs and utilities for submitting, monitoring and managing Flink jobs programmatically
npx @tessl/cli install tessl/maven-org-apache-flink--flink-clients-2-10@1.3.0The Apache Flink clients library (flink-clients_2.10) provides comprehensive APIs and utilities for programmatically interacting with Apache Flink clusters. It enables developers to submit batch and streaming jobs, monitor job execution, manage cluster resources, and handle job lifecycle operations from external applications.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>1.3.3</version>
</dependency>import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.LocalExecutor;
import org.apache.flink.client.RemoteExecutor;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.client.program.ContextEnvironment;import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.ProgramMissingJobException;
import org.apache.flink.client.JobExecutionException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import java.io.File;
try {
// Connect to a Flink cluster
Configuration config = new Configuration();
StandaloneClusterClient client = new StandaloneClusterClient(config);
// Package and run a Flink program
File jarFile = new File("path/to/your/flink-job.jar");
PackagedProgram program = new PackagedProgram(jarFile, "arg1", "arg2");
// Submit the job
JobSubmissionResult result = client.run(program, 4); // 4 = parallelism
JobID jobId = result.getJobID();
// Monitor job execution
JobExecutionResult executionResult = client.retrieveJob(jobId);
System.out.println("Job finished with result: " + executionResult.getNetRuntime());
// Clean up
client.shutdown();
} catch (ProgramInvocationException | ProgramMissingJobException e) {
System.err.println("Program execution failed: " + e.getMessage());
} catch (JobExecutionException e) {
System.err.println("Job execution failed: " + e.getMessage());
} catch (Exception e) {
System.err.println("Error: " + e.getMessage());
}Apache Flink clients library is built around several key components:
ClusterClient, StandaloneClusterClient)PackagedProgram, JobWithJars)Core functionality for connecting to and managing Flink clusters, including job submission, monitoring, and lifecycle operations.
public abstract class ClusterClient {
public ClusterClient(Configuration flinkConfig) throws Exception;
public ClusterClient(Configuration flinkConfig, HighAvailabilityServices highAvailabilityServices);
public JobSubmissionResult run(PackagedProgram prog, int parallelism) throws ProgramInvocationException, ProgramMissingJobException;
public JobExecutionResult run(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException;
public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException;
public void cancel(JobID jobId) throws Exception;
public void stop(JobID jobId) throws Exception;
public Map<String, Object> getAccumulators(JobID jobID) throws Exception;
public JobExecutionResult retrieveJob(JobID jobID) throws JobExecutionException;
public void shutdown() throws Exception;
public void setDetached(boolean isDetached);
public boolean isDetached();
public abstract void waitForClusterToBeReady();
public abstract String getWebInterfaceURL();
public abstract GetClusterStatusResponse getClusterStatus();
}
public class StandaloneClusterClient extends ClusterClient {
public StandaloneClusterClient(Configuration config);
public StandaloneClusterClient(Configuration config, HighAvailabilityServices highAvailabilityServices);
}Utilities for packaging Flink programs as JAR files and managing their execution with dependencies and classpath handling.
public class PackagedProgram {
public static final String MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS = "program-class";
public static final String MANIFEST_ATTRIBUTE_MAIN_CLASS = "Main-Class";
public PackagedProgram(File jarFile, String... args) throws ProgramInvocationException;
public PackagedProgram(File jarFile, List<URL> classpaths, String... args) throws ProgramInvocationException;
public PackagedProgram(File jarFile, String entryPointClassName, String... args) throws ProgramInvocationException;
public PackagedProgram(File jarFile, List<URL> classpaths, String entryPointClassName, String... args) throws ProgramInvocationException;
public String[] getArguments();
public String getMainClassName();
public JobWithJars getPlanWithJars() throws ProgramInvocationException;
public JobWithJars getPlanWithoutJars() throws ProgramInvocationException;
public ClassLoader getUserCodeClassLoader();
public List<URL> getClasspaths();
public String getDescription() throws ProgramInvocationException;
}
public class JobWithJars {
public JobWithJars(Plan plan, List<URL> jarFiles, List<URL> classpaths);
public Plan getPlan();
public List<URL> getJarFiles();
public List<URL> getClasspaths();
}Comprehensive command line interface for Flink cluster operations including job submission, monitoring, cancellation, and savepoint management.
public class CliFrontend {
public CliFrontend();
public CliFrontend(String configDir);
public int run(String[] args);
public int info(String[] args);
public int list(String[] args);
public int cancel(String[] args);
public int stop(String[] args);
public int savepoint(String[] args);
public static void main(String[] args);
}Executors for running Flink programs both locally for development and testing, and remotely on production clusters.
public class LocalExecutor extends PlanExecutor {
public LocalExecutor();
public LocalExecutor(Configuration conf);
public void start() throws Exception;
public void stop() throws Exception;
public JobExecutionResult executePlan(Plan plan) throws Exception;
public static JobExecutionResult execute(Plan plan) throws Exception;
}
public class RemoteExecutor extends PlanExecutor {
public RemoteExecutor(String hostname, int port);
public RemoteExecutor(String hostname, int port, List<URL> jarFiles);
public RemoteExecutor(InetSocketAddress inet, Configuration clientConfiguration, List<URL> jarFiles, List<URL> globalClasspaths);
public JobExecutionResult executePlan(Plan plan) throws Exception;
}Execution environments that provide programmatic APIs for submitting and managing Flink jobs within applications.
public class ContextEnvironment extends ExecutionEnvironment {
public ContextEnvironment(ClusterClient remoteConnection, List<URL> jarFiles, List<URL> classpaths, ClassLoader userCodeClassLoader, SavepointRestoreSettings savepointSettings);
public JobExecutionResult execute(String jobName) throws Exception;
public String getExecutionPlan() throws Exception;
public ClusterClient getClient();
public static void setAsContext(ContextEnvironmentFactory factory);
public static void unsetContext();
}public class JobSubmissionResult {
public JobID getJobID();
public boolean isJobExecutionResult();
}
public class JobExecutionResult extends JobSubmissionResult {
public long getNetRuntime();
public Map<String, Object> getAllAccumulatorResults();
}
public class ProgramInvocationException extends Exception {
public ProgramInvocationException(String message);
public ProgramInvocationException(Throwable cause);
public ProgramInvocationException(String message, Throwable cause);
}
public class ProgramMissingJobException extends ProgramInvocationException {
public ProgramMissingJobException(String message);
}
public class JobExecutionException extends Exception {
public JobExecutionException(JobID jobId, String message);
public JobExecutionException(JobID jobId, String message, Throwable cause);
}
public class CompilerException extends Exception {
public CompilerException();
public CompilerException(String message);
public CompilerException(Throwable cause);
public CompilerException(String message, Throwable cause);
}
public class SavepointRestoreSettings {
public static SavepointRestoreSettings none();
public static SavepointRestoreSettings forPath(String savepointPath);
public static SavepointRestoreSettings forPath(String savepointPath, boolean allowNonRestoredState);
public boolean restoreSavepoint();
public String getRestorePath();
public boolean allowNonRestoredState();
}
public class GetClusterStatusResponse {
public int numTaskManagersConnected();
public int numSlotsTotal();
public int numSlotsAvailable();
}
public class JobListeningContext {
public JobExecutionResult getJobExecutionResult() throws Exception;
public void cancel() throws Exception;
}