Executors for running Flink programs both locally for development and testing, and remotely on production clusters.
Executor for running Flink programs locally in the same JVM, ideal for development, testing, and debugging.
/**
* Executor for running Flink programs locally in the same JVM
*/
public class LocalExecutor extends PlanExecutor {
/**
* Creates a local executor with default configuration
*/
public LocalExecutor();
/**
* Creates a local executor with specific configuration
* @param conf Configuration for local execution environment
*/
public LocalExecutor(Configuration conf);
/**
* Starts the local execution environment
* @throws Exception if startup fails
*/
public void start() throws Exception;
/**
* Stops the local execution environment and releases resources
* @throws Exception if shutdown fails
*/
public void stop() throws Exception;
/**
* Checks if the local executor is currently running
* @return true if the executor is running
*/
public boolean isRunning();
/**
* Executes a Flink execution plan locally
* @param plan The execution plan to run
* @return JobExecutionResult with execution details and results
* @throws Exception if execution fails
*/
public JobExecutionResult executePlan(Plan plan) throws Exception;
/**
* Gets the optimizer plan as JSON for visualization
* @param plan The execution plan to optimize
* @return JSON string representation of the optimized plan
* @throws Exception if plan optimization fails
*/
public String getOptimizerPlanAsJSON(Plan plan) throws Exception;
/**
* Ends a specific job session
* @param jobID The job ID to end
* @throws Exception if session termination fails
*/
public void endSession(JobID jobID) throws Exception;
/**
* Checks if files should be overwritten by default
* @return true if default overwrite is enabled
*/
public boolean isDefaultOverwriteFiles();
/**
* Sets whether files should be overwritten by default
* @param defaultOverwriteFiles true to enable default overwrite
*/
public void setDefaultOverwriteFiles(boolean defaultOverwriteFiles);
/**
* Gets the number of task slots per TaskManager
* @return Number of task slots
*/
public int getTaskManagerNumSlots();
/**
* Sets the number of task slots per TaskManager
* @param taskManagerNumSlots Number of task slots to configure
*/
public void setTaskManagerNumSlots(int taskManagerNumSlots);
/**
* Executes a program locally (static convenience method)
* @param program The program to execute
* @param args Command line arguments
* @return JobExecutionResult with execution details and results
* @throws Exception if execution fails
*/
public static JobExecutionResult execute(Program program, String... args) throws Exception;
/**
* Executes an execution plan locally (static convenience method)
* @param plan The execution plan to run
* @return JobExecutionResult with execution details and results
* @throws Exception if execution fails
*/
public static JobExecutionResult execute(Plan plan) throws Exception;
/**
* Gets the optimizer plan as JSON (static convenience method)
* @param plan The execution plan to optimize
* @return JSON string representation of the optimized plan
* @throws Exception if plan optimization fails
*/
public static String optimizerPlanAsJSON(Plan plan) throws Exception;
/**
* Gets the execution plan as JSON (static convenience method)
* @param plan The execution plan to convert
* @return JSON string representation of the plan
* @throws Exception if plan conversion fails
*/
public static String getPlanAsJSON(Plan plan) throws Exception;
}Executor for running Flink programs on remote clusters, supporting various connection modes and configurations.
/**
* Executor for running Flink programs on remote clusters
*/
public class RemoteExecutor extends PlanExecutor {
/**
* Creates a remote executor for a specific hostname and port
* @param hostname Hostname of the Flink cluster JobManager
* @param port Port of the Flink cluster JobManager
*/
public RemoteExecutor(String hostname, int port);
/**
* Creates a remote executor with a JAR file dependency
* @param hostname Hostname of the Flink cluster JobManager
* @param port Port of the Flink cluster JobManager
* @param jarFile JAR file URL to include in execution
*/
public RemoteExecutor(String hostname, int port, URL jarFile);
/**
* Creates a remote executor with hostname:port format
* @param hostport Hostname and port in "hostname:port" format
* @param jarFile JAR file URL to include in execution
*/
public RemoteExecutor(String hostport, URL jarFile);
/**
* Creates a remote executor with multiple JAR files
* @param hostname Hostname of the Flink cluster JobManager
* @param port Port of the Flink cluster JobManager
* @param jarFiles List of JAR file URLs to include in execution
*/
public RemoteExecutor(String hostname, int port, List<URL> jarFiles);
/**
* Creates a remote executor with custom configuration
* @param hostname Hostname of the Flink cluster JobManager
* @param port Port of the Flink cluster JobManager
* @param clientConfiguration Configuration for the client connection
*/
public RemoteExecutor(String hostname, int port, Configuration clientConfiguration);
/**
* Creates a remote executor with full configuration options
* @param inet Socket address of the JobManager
* @param clientConfiguration Configuration for the client connection
* @param jarFiles List of JAR file URLs to include in execution
* @param globalClasspaths List of global classpath URLs
*/
public RemoteExecutor(InetSocketAddress inet, Configuration clientConfiguration, List<URL> jarFiles, List<URL> globalClasspaths);
/**
* Starts the remote executor connection
* @throws Exception if connection startup fails
*/
public void start() throws Exception;
/**
* Stops the remote executor and closes connections
* @throws Exception if shutdown fails
*/
public void stop() throws Exception;
/**
* Checks if the remote executor is currently running
* @return true if the executor is running and connected
*/
public boolean isRunning();
/**
* Executes a Flink execution plan on the remote cluster
* @param plan The execution plan to run
* @return JobExecutionResult with execution details and results
* @throws Exception if execution fails
*/
public JobExecutionResult executePlan(Plan plan) throws Exception;
/**
* Executes a job with JARs on the remote cluster
* @param program The job with associated JAR files
* @return JobExecutionResult with execution details and results
* @throws Exception if execution fails
*/
public JobExecutionResult executePlanWithJars(JobWithJars program) throws Exception;
/**
* Gets the optimizer plan as JSON for visualization
* @param plan The execution plan to optimize
* @return JSON string representation of the optimized plan
* @throws Exception if plan optimization fails
*/
public String getOptimizerPlanAsJSON(Plan plan) throws Exception;
/**
* Ends a specific job session on the remote cluster
* @param jobID The job ID to end
* @throws Exception if session termination fails
*/
public void endSession(JobID jobID) throws Exception;
/**
* Sets the default parallelism for job execution
* @param defaultParallelism Default parallelism level
*/
public void setDefaultParallelism(int defaultParallelism);
/**
* Gets the default parallelism for job execution
* @return Default parallelism level
*/
public int getDefaultParallelism();
}Utility methods for client operations and connection management.
/**
* Common utility methods for client operations
*/
public class ClientUtils {
/**
* Parses a hostname:port string into an InetSocketAddress
* @param hostport String in "hostname:port" format
* @return InetSocketAddress parsed from the input string
* @throws IllegalArgumentException if the format is invalid
*/
public static InetSocketAddress parseHostPortAddress(String hostport);
}Usage Examples:
import org.apache.flink.client.LocalExecutor;
import org.apache.flink.client.RemoteExecutor;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.api.common.Plan;
import org.apache.flink.configuration.Configuration;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
// Local execution for development/testing
LocalExecutor localExecutor = new LocalExecutor();
localExecutor.setTaskManagerNumSlots(4);
localExecutor.setDefaultOverwriteFiles(true);
try {
localExecutor.start();
// Execute a Flink plan locally
Plan myPlan = createMyFlinkPlan(); // Your plan creation logic
JobExecutionResult result = localExecutor.executePlan(myPlan);
System.out.println("Local execution completed in: " + result.getNetRuntime() + "ms");
// Get execution plan as JSON for visualization
String planJson = localExecutor.getOptimizerPlanAsJSON(myPlan);
System.out.println("Execution plan: " + planJson);
} finally {
localExecutor.stop();
}
// Remote execution on a cluster
String jobManagerHost = "flink-cluster.example.com";
int jobManagerPort = 6123;
// Basic remote executor
RemoteExecutor remoteExecutor = new RemoteExecutor(jobManagerHost, jobManagerPort);
// Remote executor with JAR dependencies
List<URL> jarFiles = Arrays.asList(
new File("my-job.jar").toURI().toURL(),
new File("dependencies.jar").toURI().toURL()
);
RemoteExecutor remoteExecutorWithJars = new RemoteExecutor(jobManagerHost, jobManagerPort, jarFiles);
// Remote executor with full configuration
Configuration config = new Configuration();
config.setString("jobmanager.rpc.address", jobManagerHost);
config.setInteger("jobmanager.rpc.port", jobManagerPort);
InetSocketAddress jobManagerAddress = ClientUtils.parseHostPortAddress("flink-cluster.example.com:6123");
RemoteExecutor configuredRemoteExecutor = new RemoteExecutor(
jobManagerAddress,
config,
jarFiles,
Arrays.asList() // global classpaths
);
try {
remoteExecutor.start();
remoteExecutor.setDefaultParallelism(8);
// Execute a plan on the remote cluster
JobExecutionResult remoteResult = remoteExecutor.executePlan(myPlan);
System.out.println("Remote execution completed in: " + remoteResult.getNetRuntime() + "ms");
} finally {
remoteExecutor.stop();
}
// Static convenience methods for quick execution
JobExecutionResult quickLocalResult = LocalExecutor.execute(myPlan);
String quickPlanJson = LocalExecutor.getPlanAsJSON(myPlan);Base class for all plan executors providing common execution interface.
/**
* Base class for all plan executors
*/
public abstract class PlanExecutor {
/**
* Starts the executor
* @throws Exception if startup fails
*/
public abstract void start() throws Exception;
/**
* Stops the executor
* @throws Exception if shutdown fails
*/
public abstract void stop() throws Exception;
/**
* Checks if the executor is running
* @return true if the executor is active
*/
public abstract boolean isRunning();
/**
* Executes a Flink execution plan
* @param plan The execution plan to run
* @return JobExecutionResult with execution details
* @throws Exception if execution fails
*/
public abstract JobExecutionResult executePlan(Plan plan) throws Exception;
/**
* Gets the optimized execution plan as JSON
* @param plan The execution plan to optimize
* @return JSON representation of the optimized plan
* @throws Exception if optimization fails
*/
public abstract String getOptimizerPlanAsJSON(Plan plan) throws Exception;
/**
* Ends a job session
* @param jobID The job ID to terminate
* @throws Exception if session termination fails
*/
public abstract void endSession(JobID jobID) throws Exception;
}