Apache Flink Client APIs and utilities for submitting and interacting with Flink jobs
—
The Apache Flink Program Execution module (org.apache.flink.client.program.*) provides comprehensive program packaging, execution environments, and job lifecycle management capabilities. This module handles the packaging of user programs, provides cluster client implementations, and manages execution contexts for both batch and streaming applications.
Main interface for interacting with Flink clusters, providing job submission and management capabilities.
public interface ClusterClient<T> extends AutoCloseable {
// Cluster information
T getClusterId();
Configuration getFlinkConfiguration();
String getWebInterfaceURL();
// Job listing and status
CompletableFuture<Collection<JobStatusMessage>> listJobs();
CompletableFuture<JobStatus> getJobStatus(JobID jobId);
CompletableFuture<JobResult> requestJobResult(JobID jobId);
// Job submission and management
CompletableFuture<JobID> submitJob(JobGraph jobGraph);
CompletableFuture<Acknowledge> cancel(JobID jobId);
CompletableFuture<String> cancelWithSavepoint(JobID jobId, String savepointDirectory);
CompletableFuture<String> stopWithSavepoint(JobID jobId, boolean advanceToEndOfEventTime, String savepointDirectory);
// Savepoint operations
CompletableFuture<String> triggerSavepoint(JobID jobId, String savepointDirectory);
CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath);
// Accumulators and metrics
default CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID) { }
CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader);
// Coordination requests
CompletableFuture<CoordinationResponse> sendCoordinationRequest(JobID jobId,
OperatorID operatorId,
CoordinationRequest request);
// Cluster management
void shutDownCluster();
void close();
}Represents a packaged Flink program with all necessary metadata and dependencies.
public class PackagedProgram implements AutoCloseable {
// Constants
public static final String MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS = "program-class";
public static final String MANIFEST_ATTRIBUTE_MAIN_CLASS = "Main-Class";
// Program information
public String getMainClassName() { }
public String[] getArguments() { }
public ClassLoader getUserCodeClassLoader() { }
public Configuration getConfiguration() { }
public SavepointRestoreSettings getSavepointSettings() { }
// Dependencies and resources
public List<URL> getJobJarAndDependencies() { }
public static List<URL> getJobJarAndDependencies(File jarFile, String entryPointClassName) { }
// Program execution
public void invokeInteractiveModeForExecution() { }
public String getDescription() { }
public boolean isPython() { }
// Resource management
public void close() { }
// Builder pattern
public static Builder newBuilder() { }
public static class Builder {
public Builder setJarFile(File jarFile) { }
public Builder setUserClassPaths(List<URL> userClassPaths) { }
public Builder setEntryPointClassName(String entryPointClassName) { }
public Builder setConfiguration(Configuration configuration) { }
public Builder setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings) { }
public Builder setArguments(String... arguments) { }
public PackagedProgram build() { }
}
}Provider interface for obtaining cluster clients.
public interface ClusterClientProvider<T> {
ClusterClient<T> getClusterClient();
}Execution environment that delegates to a configured pipeline executor.
public class ContextEnvironment extends ExecutionEnvironment {
// Static context management
public static void setAsContext(PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
ClassLoader userCodeClassLoader,
boolean enforceSingleJobExecution,
boolean suppressSysout) { }
public static void unsetAsContext() { }
// ExecutionEnvironment implementation with delegation to executor
}Stream execution environment for context-based execution.
public class StreamContextEnvironment extends StreamExecutionEnvironment {
// Static context management
public static void setAsContext(PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
ClassLoader userCodeClassLoader,
boolean enforceSingleJobExecution,
boolean suppressSysout) { }
public static void unsetAsContext() { }
// StreamExecutionEnvironment implementation with delegation
}Execution environment for creating execution plans without execution.
public class OptimizerPlanEnvironment extends ExecutionEnvironment {
// Constructor
public OptimizerPlanEnvironment(Optimizer optimizer, int defaultParallelism) { }
// Plan generation methods
public OptimizedPlan getOptimizedPlan(Program program) { }
public OptimizedPlan getOptimizedPlan(List<DataSinkNode> sinks) { }
public String getExecutionPlan() { }
// ExecutionEnvironment overrides for optimization
public JobExecutionResult execute() { }
public JobExecutionResult execute(String jobName) { }
// Plan access
public Plan createProgramPlan() { }
public Plan createProgramPlan(String jobName) { }
}Stream execution environment for plan generation.
public class StreamPlanEnvironment extends StreamExecutionEnvironment {
// Constructor
public StreamPlanEnvironment(PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
ClassLoader userClassLoader) { }
// Stream plan generation methods
public StreamGraph getStreamGraph() { }
public StreamGraph getStreamGraph(boolean clearTransformations) { }
public String getExecutionPlan() { }
// StreamExecutionEnvironment overrides for planning
public JobExecutionResult execute() { }
public JobExecutionResult execute(String jobName) { }
public JobExecutionResult execute(StreamGraph streamGraph) { }
// Plan conversion utilities
public JobGraph getJobGraph() { }
public JobGraph getJobGraph(String jobName) { }
}Cluster client implementation for local mini clusters.
public class MiniClusterClient implements ClusterClient<MiniClusterClient.MiniClusterId> {
// ClusterClient interface implementation for mini clusters
// Nested cluster ID class
public static class MiniClusterId {
// Mini cluster identification
}
}Interface for retrieving packaged programs.
public interface PackagedProgramRetriever {
PackagedProgram getPackagedProgram();
}Default implementation of packaged program retriever.
public class DefaultPackagedProgramRetriever implements PackagedProgramRetriever {
public PackagedProgram getPackagedProgram() { }
}Utility enum with static methods for working with packaged programs.
public enum PackagedProgramUtils {
// Static utility methods
public static boolean isPython(String entryPointClassName) { }
public static Pipeline getPipelineFromProgram(PackagedProgram program,
Configuration configuration,
int parallelism,
boolean suppressOutput) { }
public static URI resolveURI(String path) { }
}Factory for creating per-job mini clusters.
public class PerJobMiniClusterFactory {
// Constructor
public PerJobMiniClusterFactory(Configuration configuration) { }
// Mini cluster creation methods for per-job execution
public MiniCluster createMiniCluster(JobGraph jobGraph,
Configuration configuration) { }
public MiniClusterConfiguration createMiniClusterConfiguration(JobGraph jobGraph,
Configuration configuration) { }
// Resource calculation methods
public int calculateNumberOfTaskManagers(JobGraph jobGraph) { }
public int calculateTaskSlotsPerTaskManager(Configuration configuration) { }
// Configuration setup
public Configuration setupConfiguration(Configuration originalConfig,
JobGraph jobGraph) { }
}Exception thrown during program invocation.
public class ProgramInvocationException extends Exception {
// Constructors
public ProgramInvocationException(String message) { }
public ProgramInvocationException(String message, Throwable cause) { }
public ProgramInvocationException(Throwable cause) { }
}Runtime exception for program parametrization errors.
public class ProgramParametrizationException extends RuntimeException {
// Constructors
public ProgramParametrizationException(String message) { }
public ProgramParametrizationException(String message, Throwable cause) { }
public ProgramParametrizationException(Throwable cause) { }
}Exception thrown when a program doesn't define any jobs.
public class ProgramMissingJobException extends FlinkException {
// Constructors
public ProgramMissingJobException(String message) { }
public ProgramMissingJobException(String message, Throwable cause) { }
}Error thrown to abort program execution.
public class ProgramAbortException extends Error {
// Constructors
public ProgramAbortException(String message) { }
public ProgramAbortException(String message, Throwable cause) { }
}Detailed implementation of core ClientUtils methods for program execution and job management.
public static URLClassLoader buildUserCodeClassLoader(List<URL> jars,
List<URL> classpaths,
ClassLoader parent,
Configuration configuration) {
// Combines JAR files and classpath URLs into a single URLClassLoader
// Ensures proper isolation of user code from system classes
// Handles parent-first or child-first delegation based on configuration
// Returns URLClassLoader configured for user code execution
}Parameters:
jars: List of JAR file URLs containing user program codeclasspaths: Additional classpath URLs for dependenciesparent: Parent ClassLoader for delegationconfiguration: Flink configuration containing classloader settingsReturns: URLClassLoader configured for user code isolation
public static void executeProgram(PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
PackagedProgram program,
boolean enforceSingleJobExecution,
boolean suppressSysout) {
// Sets up execution environment contexts
// Loads and executes the packaged program
// Handles both batch and streaming execution modes
// Manages job lifecycle including submission and monitoring
// Cleans up resources after execution completion
}Parameters:
executorServiceLoader: Service loader for pipeline executorsconfiguration: Execution configurationprogram: Packaged program to executeenforceSingleJobExecution: Whether to enforce single job executionsuppressSysout: Whether to suppress system output during executionpublic static void waitUntilJobInitializationFinished(SupplierWithException<JobStatus, Exception> jobStatusSupplier,
SupplierWithException<JobResult, Exception> jobResultSupplier,
ClassLoader userCodeClassloader) {
// Polls job status until initialization is complete
// Handles various job states during startup phase
// Manages timeout and retry logic for status checks
// Switches context to user classloader for status operations
// Throws appropriate exceptions for initialization failures
}Parameters:
jobStatusSupplier: Supplier for retrieving current job statusjobResultSupplier: Supplier for retrieving job result when availableuserCodeClassloader: User code classloader for context switching// Create packaged program
PackagedProgram program = PackagedProgram.newBuilder()
.setJarFile(new File("my-flink-job.jar"))
.setEntryPointClassName("com.example.MyFlinkJob")
.setArguments("--input", "/data/input", "--output", "/data/output")
.setConfiguration(new Configuration())
.build();
// Execute program using ClientUtils
Configuration config = new Configuration();
config.setString("execution.target", "local");
PipelineExecutorServiceLoader executorLoader =
PipelineExecutorServiceLoader.fromConfiguration(config);
try {
ClientUtils.executeProgram(executorLoader, config, program, false, false);
} finally {
program.close();
}// Get cluster client through factory
Configuration config = new Configuration();
config.setString("rest.address", "localhost");
config.setInteger("rest.port", 8081);
ClusterClientServiceLoader serviceLoader = new DefaultClusterClientServiceLoader();
ClusterClientFactory<StandaloneClusterId> factory = serviceLoader.getClusterClientFactory(config);
try (ClusterDescriptor<StandaloneClusterId> descriptor = factory.createClusterDescriptor(config)) {
StandaloneClusterId clusterId = factory.getClusterId(config);
try (ClusterClient<StandaloneClusterId> client = descriptor.retrieve(clusterId).getClusterClient()) {
// Submit job
JobGraph jobGraph = /* create job graph */;
CompletableFuture<JobID> jobIdFuture = client.submitJob(jobGraph);
JobID jobId = jobIdFuture.get();
// Monitor job status
CompletableFuture<JobStatus> statusFuture = client.getJobStatus(jobId);
JobStatus status = statusFuture.get();
System.out.println("Job status: " + status);
// Get job result
CompletableFuture<JobResult> resultFuture = client.requestJobResult(jobId);
JobResult result = resultFuture.get();
}
}// Set up context environment
Configuration config = new Configuration();
PipelineExecutorServiceLoader executorLoader =
PipelineExecutorServiceLoader.fromConfiguration(config);
ClassLoader userClassLoader = Thread.currentThread().getContextClassLoader();
ContextEnvironment.setAsContext(executorLoader, config, userClassLoader, false, false);
StreamContextEnvironment.setAsContext(executorLoader, config, userClassLoader, false, false);
try {
// Your Flink program code here
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// ... define your job ...
env.execute("My Flink Job");
} finally {
// Clean up context
ContextEnvironment.unsetAsContext();
StreamContextEnvironment.unsetAsContext();
}// Create mini cluster configuration
Configuration config = new Configuration();
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 4);
// Create and start mini cluster
MiniCluster miniCluster = new MiniCluster(new MiniClusterConfiguration(
config,
1, // number of task managers
RpcServiceUtils.createRemoteRpcService(/* configuration */),
"localhost"
));
miniCluster.start();
try {
// Create mini cluster client
MiniClusterClient client = new MiniClusterClient(config, miniCluster);
// Use client for job operations
CompletableFuture<Collection<JobStatusMessage>> jobs = client.listJobs();
} finally {
miniCluster.close();
}// Trigger savepoint
try (ClusterClient<StandaloneClusterId> client = /* get cluster client */) {
JobID jobId = /* your job ID */;
String savepointDirectory = "hdfs://namenode:port/savepoints";
CompletableFuture<String> savepointFuture =
client.triggerSavepoint(jobId, savepointDirectory);
String savepointPath = savepointFuture.get();
System.out.println("Savepoint created at: " + savepointPath);
// Later, dispose the savepoint if no longer needed
CompletableFuture<Acknowledge> disposeFuture = client.disposeSavepoint(savepointPath);
disposeFuture.get();
}// Extract pipeline from packaged program
PackagedProgram program = /* create packaged program */;
Configuration config = new Configuration();
int parallelism = 4;
boolean suppressOutput = true;
Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(
program, config, parallelism, suppressOutput);
// Use pipeline with executor
PipelineExecutor executor = /* get executor */;
CompletableFuture<JobClient> jobClientFuture =
executor.execute(pipeline, config, program.getUserCodeClassLoader());import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.StreamContextEnvironment;
import org.apache.flink.client.program.OptimizerPlanEnvironment;
import org.apache.flink.client.program.StreamPlanEnvironment;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.client.program.PackagedProgramRetriever;
import org.apache.flink.client.program.DefaultPackagedProgramRetriever;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.client.program.PerJobMiniClusterFactory;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.ProgramParametrizationException;
import org.apache.flink.client.program.ProgramMissingJobException;
import org.apache.flink.client.program.ProgramAbortException;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.dag.DataSinkNode;
import org.apache.flink.api.common.Plan;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.util.FlinkException;
import java.util.concurrent.CompletableFuture;
import java.util.Collection;
import java.util.Map;
import java.util.List;
import java.io.File;
import java.net.URL;
import java.net.URI;
import java.net.URLClassLoader;Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-clients-2-11