CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-clients-2-11

Apache Flink Client APIs and utilities for submitting and interacting with Flink jobs

Pending
Overview
Eval results
Files

program-execution.mddocs/

Program Execution

The Apache Flink Program Execution module (org.apache.flink.client.program.*) provides comprehensive program packaging, execution environments, and job lifecycle management capabilities. This module handles the packaging of user programs, provides cluster client implementations, and manages execution contexts for both batch and streaming applications.

Core Program Interfaces

ClusterClient { .api }

Main interface for interacting with Flink clusters, providing job submission and management capabilities.

public interface ClusterClient<T> extends AutoCloseable {
    // Cluster information
    T getClusterId();
    Configuration getFlinkConfiguration();
    String getWebInterfaceURL();
    
    // Job listing and status
    CompletableFuture<Collection<JobStatusMessage>> listJobs();
    CompletableFuture<JobStatus> getJobStatus(JobID jobId);
    CompletableFuture<JobResult> requestJobResult(JobID jobId);
    
    // Job submission and management
    CompletableFuture<JobID> submitJob(JobGraph jobGraph);
    CompletableFuture<Acknowledge> cancel(JobID jobId);
    CompletableFuture<String> cancelWithSavepoint(JobID jobId, String savepointDirectory);
    CompletableFuture<String> stopWithSavepoint(JobID jobId, boolean advanceToEndOfEventTime, String savepointDirectory);
    
    // Savepoint operations
    CompletableFuture<String> triggerSavepoint(JobID jobId, String savepointDirectory);
    CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath);
    
    // Accumulators and metrics
    default CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID) { }
    CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader);
    
    // Coordination requests
    CompletableFuture<CoordinationResponse> sendCoordinationRequest(JobID jobId, 
                                                                   OperatorID operatorId, 
                                                                   CoordinationRequest request);
    
    // Cluster management
    void shutDownCluster();
    void close();
}

PackagedProgram { .api }

Represents a packaged Flink program with all necessary metadata and dependencies.

public class PackagedProgram implements AutoCloseable {
    // Constants
    public static final String MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS = "program-class";
    public static final String MANIFEST_ATTRIBUTE_MAIN_CLASS = "Main-Class";
    
    // Program information
    public String getMainClassName() { }
    public String[] getArguments() { }
    public ClassLoader getUserCodeClassLoader() { }
    public Configuration getConfiguration() { }
    public SavepointRestoreSettings getSavepointSettings() { }
    
    // Dependencies and resources
    public List<URL> getJobJarAndDependencies() { }
    public static List<URL> getJobJarAndDependencies(File jarFile, String entryPointClassName) { }
    
    // Program execution
    public void invokeInteractiveModeForExecution() { }
    public String getDescription() { }
    public boolean isPython() { }
    
    // Resource management
    public void close() { }
    
    // Builder pattern
    public static Builder newBuilder() { }
    
    public static class Builder {
        public Builder setJarFile(File jarFile) { }
        public Builder setUserClassPaths(List<URL> userClassPaths) { }
        public Builder setEntryPointClassName(String entryPointClassName) { }
        public Builder setConfiguration(Configuration configuration) { }
        public Builder setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings) { }
        public Builder setArguments(String... arguments) { }
        public PackagedProgram build() { }
    }
}

ClusterClientProvider { .api }

Provider interface for obtaining cluster clients.

public interface ClusterClientProvider<T> {
    ClusterClient<T> getClusterClient();
}

Execution Environment Classes

ContextEnvironment { .api }

Execution environment that delegates to a configured pipeline executor.

public class ContextEnvironment extends ExecutionEnvironment {
    // Static context management
    public static void setAsContext(PipelineExecutorServiceLoader executorServiceLoader, 
                                   Configuration configuration, 
                                   ClassLoader userCodeClassLoader, 
                                   boolean enforceSingleJobExecution, 
                                   boolean suppressSysout) { }
    
    public static void unsetAsContext() { }
    
    // ExecutionEnvironment implementation with delegation to executor
}

StreamContextEnvironment { .api }

Stream execution environment for context-based execution.

public class StreamContextEnvironment extends StreamExecutionEnvironment {
    // Static context management
    public static void setAsContext(PipelineExecutorServiceLoader executorServiceLoader, 
                                   Configuration configuration, 
                                   ClassLoader userCodeClassLoader, 
                                   boolean enforceSingleJobExecution, 
                                   boolean suppressSysout) { }
    
    public static void unsetAsContext() { }
    
    // StreamExecutionEnvironment implementation with delegation
}

OptimizerPlanEnvironment { .api }

Execution environment for creating execution plans without execution.

public class OptimizerPlanEnvironment extends ExecutionEnvironment {
    // Constructor
    public OptimizerPlanEnvironment(Optimizer optimizer, int defaultParallelism) { }
    
    // Plan generation methods
    public OptimizedPlan getOptimizedPlan(Program program) { }
    public OptimizedPlan getOptimizedPlan(List<DataSinkNode> sinks) { }
    public String getExecutionPlan() { }
    
    // ExecutionEnvironment overrides for optimization
    public JobExecutionResult execute() { }
    public JobExecutionResult execute(String jobName) { }
    
    // Plan access
    public Plan createProgramPlan() { }
    public Plan createProgramPlan(String jobName) { }
}

StreamPlanEnvironment { .api }

Stream execution environment for plan generation.

public class StreamPlanEnvironment extends StreamExecutionEnvironment {
    // Constructor
    public StreamPlanEnvironment(PipelineExecutorServiceLoader executorServiceLoader, 
                               Configuration configuration, 
                               ClassLoader userClassLoader) { }
    
    // Stream plan generation methods
    public StreamGraph getStreamGraph() { }
    public StreamGraph getStreamGraph(boolean clearTransformations) { }
    public String getExecutionPlan() { }
    
    // StreamExecutionEnvironment overrides for planning
    public JobExecutionResult execute() { }
    public JobExecutionResult execute(String jobName) { }
    public JobExecutionResult execute(StreamGraph streamGraph) { }
    
    // Plan conversion utilities
    public JobGraph getJobGraph() { }
    public JobGraph getJobGraph(String jobName) { }
}

Cluster Client Implementations

MiniClusterClient { .api }

Cluster client implementation for local mini clusters.

public class MiniClusterClient implements ClusterClient<MiniClusterClient.MiniClusterId> {
    // ClusterClient interface implementation for mini clusters
    
    // Nested cluster ID class
    public static class MiniClusterId {
        // Mini cluster identification
    }
}

Program Utilities

PackagedProgramRetriever { .api }

Interface for retrieving packaged programs.

public interface PackagedProgramRetriever {
    PackagedProgram getPackagedProgram();
}

DefaultPackagedProgramRetriever { .api }

Default implementation of packaged program retriever.

public class DefaultPackagedProgramRetriever implements PackagedProgramRetriever {
    public PackagedProgram getPackagedProgram() { }
}

PackagedProgramUtils { .api }

Utility enum with static methods for working with packaged programs.

public enum PackagedProgramUtils {
    // Static utility methods
    public static boolean isPython(String entryPointClassName) { }
    public static Pipeline getPipelineFromProgram(PackagedProgram program, 
                                                Configuration configuration, 
                                                int parallelism, 
                                                boolean suppressOutput) { }
    public static URI resolveURI(String path) { }
}

PerJobMiniClusterFactory { .api }

Factory for creating per-job mini clusters.

public class PerJobMiniClusterFactory {
    // Constructor
    public PerJobMiniClusterFactory(Configuration configuration) { }
    
    // Mini cluster creation methods for per-job execution
    public MiniCluster createMiniCluster(JobGraph jobGraph, 
                                       Configuration configuration) { }
    
    public MiniClusterConfiguration createMiniClusterConfiguration(JobGraph jobGraph, 
                                                                  Configuration configuration) { }
    
    // Resource calculation methods
    public int calculateNumberOfTaskManagers(JobGraph jobGraph) { }
    public int calculateTaskSlotsPerTaskManager(Configuration configuration) { }
    
    // Configuration setup
    public Configuration setupConfiguration(Configuration originalConfig, 
                                          JobGraph jobGraph) { }
}

Exception Classes

ProgramInvocationException { .api }

Exception thrown during program invocation.

public class ProgramInvocationException extends Exception {
    // Constructors
    public ProgramInvocationException(String message) { }
    public ProgramInvocationException(String message, Throwable cause) { }
    public ProgramInvocationException(Throwable cause) { }
}

ProgramParametrizationException { .api }

Runtime exception for program parametrization errors.

public class ProgramParametrizationException extends RuntimeException {
    // Constructors
    public ProgramParametrizationException(String message) { }
    public ProgramParametrizationException(String message, Throwable cause) { }
    public ProgramParametrizationException(Throwable cause) { }
}

ProgramMissingJobException { .api }

Exception thrown when a program doesn't define any jobs.

public class ProgramMissingJobException extends FlinkException {
    // Constructors
    public ProgramMissingJobException(String message) { }
    public ProgramMissingJobException(String message, Throwable cause) { }
}

ProgramAbortException { .api }

Error thrown to abort program execution.

public class ProgramAbortException extends Error {
    // Constructors
    public ProgramAbortException(String message) { }
    public ProgramAbortException(String message, Throwable cause) { }
}

Client Utilities

ClientUtils Core Methods { .api }

Detailed implementation of core ClientUtils methods for program execution and job management.

buildUserCodeClassLoader Method

public static URLClassLoader buildUserCodeClassLoader(List<URL> jars, 
                                                    List<URL> classpaths, 
                                                    ClassLoader parent, 
                                                    Configuration configuration) {
    // Combines JAR files and classpath URLs into a single URLClassLoader
    // Ensures proper isolation of user code from system classes
    // Handles parent-first or child-first delegation based on configuration
    // Returns URLClassLoader configured for user code execution
}

Parameters:

  • jars: List of JAR file URLs containing user program code
  • classpaths: Additional classpath URLs for dependencies
  • parent: Parent ClassLoader for delegation
  • configuration: Flink configuration containing classloader settings

Returns: URLClassLoader configured for user code isolation

executeProgram Method

public static void executeProgram(PipelineExecutorServiceLoader executorServiceLoader, 
                                Configuration configuration, 
                                PackagedProgram program, 
                                boolean enforceSingleJobExecution, 
                                boolean suppressSysout) {
    // Sets up execution environment contexts
    // Loads and executes the packaged program
    // Handles both batch and streaming execution modes
    // Manages job lifecycle including submission and monitoring
    // Cleans up resources after execution completion
}

Parameters:

  • executorServiceLoader: Service loader for pipeline executors
  • configuration: Execution configuration
  • program: Packaged program to execute
  • enforceSingleJobExecution: Whether to enforce single job execution
  • suppressSysout: Whether to suppress system output during execution

waitUntilJobInitializationFinished Method

public static void waitUntilJobInitializationFinished(SupplierWithException<JobStatus, Exception> jobStatusSupplier, 
                                                     SupplierWithException<JobResult, Exception> jobResultSupplier, 
                                                     ClassLoader userCodeClassloader) {
    // Polls job status until initialization is complete
    // Handles various job states during startup phase
    // Manages timeout and retry logic for status checks
    // Switches context to user classloader for status operations
    // Throws appropriate exceptions for initialization failures
}

Parameters:

  • jobStatusSupplier: Supplier for retrieving current job status
  • jobResultSupplier: Supplier for retrieving job result when available
  • userCodeClassloader: User code classloader for context switching

Usage Examples

Basic Program Execution

// Create packaged program
PackagedProgram program = PackagedProgram.newBuilder()
    .setJarFile(new File("my-flink-job.jar"))
    .setEntryPointClassName("com.example.MyFlinkJob")
    .setArguments("--input", "/data/input", "--output", "/data/output")
    .setConfiguration(new Configuration())
    .build();

// Execute program using ClientUtils
Configuration config = new Configuration();
config.setString("execution.target", "local");

PipelineExecutorServiceLoader executorLoader = 
    PipelineExecutorServiceLoader.fromConfiguration(config);

try {
    ClientUtils.executeProgram(executorLoader, config, program, false, false);
} finally {
    program.close();
}

Cluster Client Usage

// Get cluster client through factory
Configuration config = new Configuration();
config.setString("rest.address", "localhost");
config.setInteger("rest.port", 8081);

ClusterClientServiceLoader serviceLoader = new DefaultClusterClientServiceLoader();
ClusterClientFactory<StandaloneClusterId> factory = serviceLoader.getClusterClientFactory(config);

try (ClusterDescriptor<StandaloneClusterId> descriptor = factory.createClusterDescriptor(config)) {
    StandaloneClusterId clusterId = factory.getClusterId(config);
    
    try (ClusterClient<StandaloneClusterId> client = descriptor.retrieve(clusterId).getClusterClient()) {
        // Submit job
        JobGraph jobGraph = /* create job graph */;
        CompletableFuture<JobID> jobIdFuture = client.submitJob(jobGraph);
        JobID jobId = jobIdFuture.get();
        
        // Monitor job status
        CompletableFuture<JobStatus> statusFuture = client.getJobStatus(jobId);
        JobStatus status = statusFuture.get();
        System.out.println("Job status: " + status);
        
        // Get job result
        CompletableFuture<JobResult> resultFuture = client.requestJobResult(jobId);
        JobResult result = resultFuture.get();
    }
}

Context Environment Usage

// Set up context environment
Configuration config = new Configuration();
PipelineExecutorServiceLoader executorLoader = 
    PipelineExecutorServiceLoader.fromConfiguration(config);
ClassLoader userClassLoader = Thread.currentThread().getContextClassLoader();

ContextEnvironment.setAsContext(executorLoader, config, userClassLoader, false, false);
StreamContextEnvironment.setAsContext(executorLoader, config, userClassLoader, false, false);

try {
    // Your Flink program code here
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    // ... define your job ...
    env.execute("My Flink Job");
} finally {
    // Clean up context
    ContextEnvironment.unsetAsContext();
    StreamContextEnvironment.unsetAsContext();
}

Mini Cluster Usage

// Create mini cluster configuration
Configuration config = new Configuration();
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 4);

// Create and start mini cluster
MiniCluster miniCluster = new MiniCluster(new MiniClusterConfiguration(
    config,
    1, // number of task managers
    RpcServiceUtils.createRemoteRpcService(/* configuration */),
    "localhost"
));

miniCluster.start();

try {
    // Create mini cluster client
    MiniClusterClient client = new MiniClusterClient(config, miniCluster);
    
    // Use client for job operations
    CompletableFuture<Collection<JobStatusMessage>> jobs = client.listJobs();
    
} finally {
    miniCluster.close();
}

Savepoint Operations

// Trigger savepoint
try (ClusterClient<StandaloneClusterId> client = /* get cluster client */) {
    JobID jobId = /* your job ID */;
    String savepointDirectory = "hdfs://namenode:port/savepoints";
    
    CompletableFuture<String> savepointFuture = 
        client.triggerSavepoint(jobId, savepointDirectory);
    String savepointPath = savepointFuture.get();
    
    System.out.println("Savepoint created at: " + savepointPath);
    
    // Later, dispose the savepoint if no longer needed
    CompletableFuture<Acknowledge> disposeFuture = client.disposeSavepoint(savepointPath);
    disposeFuture.get();
}

Pipeline from Program

// Extract pipeline from packaged program
PackagedProgram program = /* create packaged program */;
Configuration config = new Configuration();
int parallelism = 4;
boolean suppressOutput = true;

Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(
    program, config, parallelism, suppressOutput);

// Use pipeline with executor
PipelineExecutor executor = /* get executor */;
CompletableFuture<JobClient> jobClientFuture = 
    executor.execute(pipeline, config, program.getUserCodeClassLoader());

Required Imports

import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.StreamContextEnvironment;
import org.apache.flink.client.program.OptimizerPlanEnvironment;
import org.apache.flink.client.program.StreamPlanEnvironment;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.client.program.PackagedProgramRetriever;
import org.apache.flink.client.program.DefaultPackagedProgramRetriever;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.client.program.PerJobMiniClusterFactory;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.ProgramParametrizationException;
import org.apache.flink.client.program.ProgramMissingJobException;
import org.apache.flink.client.program.ProgramAbortException;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.dag.DataSinkNode;
import org.apache.flink.api.common.Plan;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.util.FlinkException;
import java.util.concurrent.CompletableFuture;
import java.util.Collection;
import java.util.Map;
import java.util.List;
import java.io.File;
import java.net.URL;
import java.net.URI;
import java.net.URLClassLoader;

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-clients-2-11

docs

cli-operations.md

cluster-management.md

index.md

program-execution.md

rest-client-communication.md

tile.json