or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

cli.mdcluster-management.mdexecution-context.mdexecution-environments.mdindex.mdprogram-management.md
tile.json

execution-context.mddocs/

Execution Context

Execution environments that provide programmatic APIs for submitting and managing Flink jobs within applications.

Capabilities

ContextEnvironment

Execution environment for remote cluster execution that provides programmatic job submission with full context awareness.

/**
 * Execution environment for remote cluster execution with context awareness
 */
public class ContextEnvironment extends ExecutionEnvironment {
    /**
     * Creates a context environment for remote execution
     * @param remoteConnection Client connection to the remote cluster
     * @param jarFiles List of JAR file URLs to include
     * @param classpaths List of additional classpath URLs
     * @param userCodeClassLoader Class loader for user code
     * @param savepointSettings Settings for savepoint restoration
     */
    public ContextEnvironment(
        ClusterClient remoteConnection,
        List<URL> jarFiles,
        List<URL> classpaths,
        ClassLoader userCodeClassLoader,
        SavepointRestoreSettings savepointSettings
    );
    
    /**
     * Executes the job with the specified name
     * @param jobName Name for the job execution
     * @return JobExecutionResult with execution details and results
     * @throws Exception if execution fails
     */
    public JobExecutionResult execute(String jobName) throws Exception;
    
    /**
     * Gets the execution plan without executing the job
     * @return String representation of the execution plan
     * @throws Exception if plan generation fails
     */
    public String getExecutionPlan() throws Exception;
    
    /**
     * Starts a new session for job execution
     * @throws Exception if session creation fails
     */
    public void startNewSession() throws Exception;
    
    /**
     * Gets the cluster client used by this environment
     * @return ClusterClient instance for cluster communication
     */
    public ClusterClient getClient();
    
    /**
     * Gets the list of JAR files associated with this environment
     * @return List of JAR file URLs
     */
    public List<URL> getJars();
    
    /**
     * Gets the list of additional classpaths
     * @return List of classpath URLs
     */
    public List<URL> getClasspaths();
    
    /**
     * Gets the user code class loader
     * @return ClassLoader for user code execution
     */
    public ClassLoader getUserCodeClassLoader();
    
    /**
     * Gets the savepoint restore settings
     * @return SavepointRestoreSettings for job restoration
     */
    public SavepointRestoreSettings getSavepointRestoreSettings();
    
    /**
     * Sets the context environment factory as the active factory
     * @param factory ContextEnvironmentFactory to use for creating environments
     */
    public static void setAsContext(ContextEnvironmentFactory factory);
    
    /**
     * Removes the current context environment factory
     */
    public static void unsetContext();
}

DetachedEnvironment

Execution environment for detached (non-blocking) job execution that returns immediately after job submission.

/**
 * Execution environment for detached (non-blocking) execution
 */
public class DetachedEnvironment extends ContextEnvironment {
    /**
     * Creates a detached environment for non-blocking remote execution
     * @param remoteConnection Client connection to the remote cluster
     * @param jarFiles List of JAR file URLs to include
     * @param classpaths List of additional classpath URLs
     * @param userCodeClassLoader Class loader for user code
     * @param savepointSettings Settings for savepoint restoration
     */
    public DetachedEnvironment(
        ClusterClient remoteConnection,
        List<URL> jarFiles,
        List<URL> classpaths,
        ClassLoader userCodeClassLoader,
        SavepointRestoreSettings savepointSettings
    );
    
    /**
     * Executes the job in detached mode (non-blocking)
     * @param jobName Name for the job execution
     * @return JobExecutionResult with job submission details (not execution results)
     * @throws Exception if job submission fails
     */
    public JobExecutionResult execute(String jobName) throws Exception;
    
    /**
     * Sets the detached execution plan
     * @param plan FlinkPlan to execute in detached mode
     */
    public void setDetachedPlan(FlinkPlan plan);
    
    /**
     * Finalizes the execution and returns submission result
     * @return JobSubmissionResult with job ID and submission status
     * @throws Exception if finalization fails
     */
    public JobSubmissionResult finalizeExecute() throws Exception;
}

ContextEnvironmentFactory

Factory for creating context-appropriate execution environments based on configuration and requirements.

/**
 * Factory for creating context-appropriate execution environments
 */
public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
    /**
     * Creates a context environment factory with full configuration
     * @param client Cluster client for remote communication
     * @param jarFilesToAttach List of JAR files to attach to jobs
     * @param classpathsToAttach List of classpaths to attach to jobs
     * @param userCodeClassLoader Class loader for user code
     * @param defaultParallelism Default parallelism for job execution
     * @param isDetached Whether to create detached environments
     * @param savepointSettings Settings for savepoint restoration
     */
    public ContextEnvironmentFactory(
        ClusterClient client,
        List<URL> jarFilesToAttach,
        List<URL> classpathsToAttach,
        ClassLoader userCodeClassLoader,
        int defaultParallelism,
        boolean isDetached,
        SavepointRestoreSettings savepointSettings
    );
    
    /**
     * Creates an execution environment based on factory configuration
     * @return ExecutionEnvironment instance (ContextEnvironment or DetachedEnvironment)
     */
    public ExecutionEnvironment createExecutionEnvironment();
    
    /**
     * Gets the last environment created by this factory
     * @return Last created ExecutionEnvironment instance
     */
    public ExecutionEnvironment getLastEnvCreated();
}

Preview and Optimization Environments

Specialized environments for plan preview and optimization without actual execution.

/**
 * Execution environment for generating plan previews without execution
 */
public class PreviewPlanEnvironment extends ExecutionEnvironment {
    /**
     * Generates a plan preview instead of executing
     * @param jobName Name for the job (used in preview)
     * @return JobExecutionResult (empty - no actual execution)
     * @throws Exception if preview generation fails
     */
    public JobExecutionResult execute(String jobName) throws Exception;
    
    /**
     * Gets the execution plan for preview
     * @return String representation of the execution plan
     * @throws Exception if plan generation fails
     */
    public String getExecutionPlan() throws Exception;
    
    /**
     * Gets the preview plan as a formatted string
     * @return Formatted preview of the execution plan
     * @throws Exception if preview generation fails
     */
    public String getPreviewPlan() throws Exception;
}

/**
 * Execution environment for plan optimization preview
 */
public class OptimizerPlanEnvironment extends ExecutionEnvironment {
    /**
     * Generates an optimized plan instead of executing
     * @param jobName Name for the job (used in optimization)
     * @return JobExecutionResult (empty - no actual execution)
     * @throws Exception if optimization fails
     */
    public JobExecutionResult execute(String jobName) throws Exception;
    
    /**
     * Gets the optimized execution plan
     * @return String representation of the optimized plan
     * @throws Exception if optimization fails
     */
    public String getExecutionPlan() throws Exception;
}

Usage Examples:

import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.DetachedEnvironment;
import org.apache.flink.client.program.ContextEnvironmentFactory;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import java.net.URL;
import java.util.Arrays;
import java.util.List;

// Setup cluster connection
Configuration config = new Configuration();
StandaloneClusterClient client = new StandaloneClusterClient(config);

List<URL> jarFiles = Arrays.asList(
    new File("my-job.jar").toURI().toURL()
);
List<URL> classpaths = Arrays.asList();

// Create context environment for synchronous execution
ContextEnvironment contextEnv = new ContextEnvironment(
    client,
    jarFiles,
    classpaths,
    Thread.currentThread().getContextClassLoader(),
    SavepointRestoreSettings.none()
);

// Set as the active execution environment
ContextEnvironmentFactory factory = new ContextEnvironmentFactory(
    client,
    jarFiles,
    classpaths,
    Thread.currentThread().getContextClassLoader(),
    4, // default parallelism
    false, // not detached
    SavepointRestoreSettings.none()
);

ContextEnvironment.setAsContext(factory);

try {
    // Now your Flink program code can use ExecutionEnvironment.getExecutionEnvironment()
    // and it will automatically use the context environment
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    // Build your Flink program
    DataSet<String> data = env.fromElements("Hello", "World", "Flink");
    data.print();
    
    // Execute the job (this will use the context environment)
    JobExecutionResult result = env.execute("My Context Job");
    System.out.println("Job completed in: " + result.getNetRuntime() + "ms");
    
} finally {
    ContextEnvironment.unsetContext();
    client.shutdown();
}

// Detached execution for fire-and-forget jobs
DetachedEnvironment detachedEnv = new DetachedEnvironment(
    client,
    jarFiles,
    classpaths,
    Thread.currentThread().getContextClassLoader(),
    SavepointRestoreSettings.none()
);

ContextEnvironmentFactory detachedFactory = new ContextEnvironmentFactory(
    client,
    jarFiles,
    classpaths,
    Thread.currentThread().getContextClassLoader(),
    4, // default parallelism
    true, // detached mode
    SavepointRestoreSettings.none()
);

ContextEnvironment.setAsContext(detachedFactory);

try {
    ExecutionEnvironment detachedEnvFromFactory = ExecutionEnvironment.getExecutionEnvironment();
    
    // Build your Flink program
    DataSet<String> data = detachedEnvFromFactory.fromElements("Detached", "Job", "Execution");
    data.writeAsText("/path/to/output");
    
    // This will submit the job and return immediately
    JobExecutionResult detachedResult = detachedEnvFromFactory.execute("My Detached Job");
    System.out.println("Job submitted with ID: " + detachedResult.getJobID());
    
} finally {
    ContextEnvironment.unsetContext();
}

// Plan preview without execution
PreviewPlanEnvironment previewEnv = new PreviewPlanEnvironment();
previewEnv.setParallelism(4);

DataSet<String> previewData = previewEnv.fromElements("Preview", "Plan", "Generation");
previewData.map(s -> s.toUpperCase()).print();

// Get the execution plan without running the job
String executionPlan = previewEnv.getExecutionPlan();
System.out.println("Execution Plan Preview:");
System.out.println(executionPlan);

String previewPlan = previewEnv.getPreviewPlan();
System.out.println("Formatted Preview:");
System.out.println(previewPlan);

Environment Management Interfaces

Base interfaces for execution environment management.

/**
 * Factory interface for creating execution environments
 */
public interface ExecutionEnvironmentFactory {
    /**
     * Creates an execution environment instance
     * @return ExecutionEnvironment configured by this factory
     */
    ExecutionEnvironment createExecutionEnvironment();
}