Execution environments that provide programmatic APIs for submitting and managing Flink jobs within applications.
Execution environment for remote cluster execution that provides programmatic job submission with full context awareness.
/**
* Execution environment for remote cluster execution with context awareness
*/
public class ContextEnvironment extends ExecutionEnvironment {
/**
* Creates a context environment for remote execution
* @param remoteConnection Client connection to the remote cluster
* @param jarFiles List of JAR file URLs to include
* @param classpaths List of additional classpath URLs
* @param userCodeClassLoader Class loader for user code
* @param savepointSettings Settings for savepoint restoration
*/
public ContextEnvironment(
ClusterClient remoteConnection,
List<URL> jarFiles,
List<URL> classpaths,
ClassLoader userCodeClassLoader,
SavepointRestoreSettings savepointSettings
);
/**
* Executes the job with the specified name
* @param jobName Name for the job execution
* @return JobExecutionResult with execution details and results
* @throws Exception if execution fails
*/
public JobExecutionResult execute(String jobName) throws Exception;
/**
* Gets the execution plan without executing the job
* @return String representation of the execution plan
* @throws Exception if plan generation fails
*/
public String getExecutionPlan() throws Exception;
/**
* Starts a new session for job execution
* @throws Exception if session creation fails
*/
public void startNewSession() throws Exception;
/**
* Gets the cluster client used by this environment
* @return ClusterClient instance for cluster communication
*/
public ClusterClient getClient();
/**
* Gets the list of JAR files associated with this environment
* @return List of JAR file URLs
*/
public List<URL> getJars();
/**
* Gets the list of additional classpaths
* @return List of classpath URLs
*/
public List<URL> getClasspaths();
/**
* Gets the user code class loader
* @return ClassLoader for user code execution
*/
public ClassLoader getUserCodeClassLoader();
/**
* Gets the savepoint restore settings
* @return SavepointRestoreSettings for job restoration
*/
public SavepointRestoreSettings getSavepointRestoreSettings();
/**
* Sets the context environment factory as the active factory
* @param factory ContextEnvironmentFactory to use for creating environments
*/
public static void setAsContext(ContextEnvironmentFactory factory);
/**
* Removes the current context environment factory
*/
public static void unsetContext();
}Execution environment for detached (non-blocking) job execution that returns immediately after job submission.
/**
* Execution environment for detached (non-blocking) execution
*/
public class DetachedEnvironment extends ContextEnvironment {
/**
* Creates a detached environment for non-blocking remote execution
* @param remoteConnection Client connection to the remote cluster
* @param jarFiles List of JAR file URLs to include
* @param classpaths List of additional classpath URLs
* @param userCodeClassLoader Class loader for user code
* @param savepointSettings Settings for savepoint restoration
*/
public DetachedEnvironment(
ClusterClient remoteConnection,
List<URL> jarFiles,
List<URL> classpaths,
ClassLoader userCodeClassLoader,
SavepointRestoreSettings savepointSettings
);
/**
* Executes the job in detached mode (non-blocking)
* @param jobName Name for the job execution
* @return JobExecutionResult with job submission details (not execution results)
* @throws Exception if job submission fails
*/
public JobExecutionResult execute(String jobName) throws Exception;
/**
* Sets the detached execution plan
* @param plan FlinkPlan to execute in detached mode
*/
public void setDetachedPlan(FlinkPlan plan);
/**
* Finalizes the execution and returns submission result
* @return JobSubmissionResult with job ID and submission status
* @throws Exception if finalization fails
*/
public JobSubmissionResult finalizeExecute() throws Exception;
}Factory for creating context-appropriate execution environments based on configuration and requirements.
/**
* Factory for creating context-appropriate execution environments
*/
public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
/**
* Creates a context environment factory with full configuration
* @param client Cluster client for remote communication
* @param jarFilesToAttach List of JAR files to attach to jobs
* @param classpathsToAttach List of classpaths to attach to jobs
* @param userCodeClassLoader Class loader for user code
* @param defaultParallelism Default parallelism for job execution
* @param isDetached Whether to create detached environments
* @param savepointSettings Settings for savepoint restoration
*/
public ContextEnvironmentFactory(
ClusterClient client,
List<URL> jarFilesToAttach,
List<URL> classpathsToAttach,
ClassLoader userCodeClassLoader,
int defaultParallelism,
boolean isDetached,
SavepointRestoreSettings savepointSettings
);
/**
* Creates an execution environment based on factory configuration
* @return ExecutionEnvironment instance (ContextEnvironment or DetachedEnvironment)
*/
public ExecutionEnvironment createExecutionEnvironment();
/**
* Gets the last environment created by this factory
* @return Last created ExecutionEnvironment instance
*/
public ExecutionEnvironment getLastEnvCreated();
}Specialized environments for plan preview and optimization without actual execution.
/**
* Execution environment for generating plan previews without execution
*/
public class PreviewPlanEnvironment extends ExecutionEnvironment {
/**
* Generates a plan preview instead of executing
* @param jobName Name for the job (used in preview)
* @return JobExecutionResult (empty - no actual execution)
* @throws Exception if preview generation fails
*/
public JobExecutionResult execute(String jobName) throws Exception;
/**
* Gets the execution plan for preview
* @return String representation of the execution plan
* @throws Exception if plan generation fails
*/
public String getExecutionPlan() throws Exception;
/**
* Gets the preview plan as a formatted string
* @return Formatted preview of the execution plan
* @throws Exception if preview generation fails
*/
public String getPreviewPlan() throws Exception;
}
/**
* Execution environment for plan optimization preview
*/
public class OptimizerPlanEnvironment extends ExecutionEnvironment {
/**
* Generates an optimized plan instead of executing
* @param jobName Name for the job (used in optimization)
* @return JobExecutionResult (empty - no actual execution)
* @throws Exception if optimization fails
*/
public JobExecutionResult execute(String jobName) throws Exception;
/**
* Gets the optimized execution plan
* @return String representation of the optimized plan
* @throws Exception if optimization fails
*/
public String getExecutionPlan() throws Exception;
}Usage Examples:
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.DetachedEnvironment;
import org.apache.flink.client.program.ContextEnvironmentFactory;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
// Setup cluster connection
Configuration config = new Configuration();
StandaloneClusterClient client = new StandaloneClusterClient(config);
List<URL> jarFiles = Arrays.asList(
new File("my-job.jar").toURI().toURL()
);
List<URL> classpaths = Arrays.asList();
// Create context environment for synchronous execution
ContextEnvironment contextEnv = new ContextEnvironment(
client,
jarFiles,
classpaths,
Thread.currentThread().getContextClassLoader(),
SavepointRestoreSettings.none()
);
// Set as the active execution environment
ContextEnvironmentFactory factory = new ContextEnvironmentFactory(
client,
jarFiles,
classpaths,
Thread.currentThread().getContextClassLoader(),
4, // default parallelism
false, // not detached
SavepointRestoreSettings.none()
);
ContextEnvironment.setAsContext(factory);
try {
// Now your Flink program code can use ExecutionEnvironment.getExecutionEnvironment()
// and it will automatically use the context environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Build your Flink program
DataSet<String> data = env.fromElements("Hello", "World", "Flink");
data.print();
// Execute the job (this will use the context environment)
JobExecutionResult result = env.execute("My Context Job");
System.out.println("Job completed in: " + result.getNetRuntime() + "ms");
} finally {
ContextEnvironment.unsetContext();
client.shutdown();
}
// Detached execution for fire-and-forget jobs
DetachedEnvironment detachedEnv = new DetachedEnvironment(
client,
jarFiles,
classpaths,
Thread.currentThread().getContextClassLoader(),
SavepointRestoreSettings.none()
);
ContextEnvironmentFactory detachedFactory = new ContextEnvironmentFactory(
client,
jarFiles,
classpaths,
Thread.currentThread().getContextClassLoader(),
4, // default parallelism
true, // detached mode
SavepointRestoreSettings.none()
);
ContextEnvironment.setAsContext(detachedFactory);
try {
ExecutionEnvironment detachedEnvFromFactory = ExecutionEnvironment.getExecutionEnvironment();
// Build your Flink program
DataSet<String> data = detachedEnvFromFactory.fromElements("Detached", "Job", "Execution");
data.writeAsText("/path/to/output");
// This will submit the job and return immediately
JobExecutionResult detachedResult = detachedEnvFromFactory.execute("My Detached Job");
System.out.println("Job submitted with ID: " + detachedResult.getJobID());
} finally {
ContextEnvironment.unsetContext();
}
// Plan preview without execution
PreviewPlanEnvironment previewEnv = new PreviewPlanEnvironment();
previewEnv.setParallelism(4);
DataSet<String> previewData = previewEnv.fromElements("Preview", "Plan", "Generation");
previewData.map(s -> s.toUpperCase()).print();
// Get the execution plan without running the job
String executionPlan = previewEnv.getExecutionPlan();
System.out.println("Execution Plan Preview:");
System.out.println(executionPlan);
String previewPlan = previewEnv.getPreviewPlan();
System.out.println("Formatted Preview:");
System.out.println(previewPlan);Base interfaces for execution environment management.
/**
* Factory interface for creating execution environments
*/
public interface ExecutionEnvironmentFactory {
/**
* Creates an execution environment instance
* @return ExecutionEnvironment configured by this factory
*/
ExecutionEnvironment createExecutionEnvironment();
}