Program packaging, classloader management, and execution utilities for submitting user applications to Flink clusters. Provides comprehensive support for JAR-based programs and execution environment management.
Represents a program packaged in a JAR file with all necessary dependencies and configuration.
/**
* Represents a program packaged in a JAR file
*/
public class PackagedProgram implements AutoCloseable {
// Manifest attribute constants
public static final String MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS = "program-class";
public static final String MANIFEST_ATTRIBUTE_MAIN_CLASS = "Main-Class";
/**
* Gets savepoint restore settings for this program
* @return SavepointRestoreSettings instance
*/
public SavepointRestoreSettings getSavepointSettings();
/**
* Gets the program arguments
* @return Array of program arguments
*/
public String[] getArguments();
/**
* Gets the main class name for this program
* @return Fully qualified main class name
*/
public String getMainClassName();
/**
* Gets program description by analyzing the JAR
* @return Program description string
* @throws ProgramInvocationException if description cannot be retrieved
*/
public String getDescription() throws ProgramInvocationException;
/**
* Invokes the program in interactive mode for execution
* @throws ProgramInvocationException if invocation fails
*/
public void invokeInteractiveModeForExecution() throws ProgramInvocationException;
/**
* Gets the required classpaths for this program
* @return List of classpath URLs
*/
public List<URL> getClasspaths();
/**
* Gets the user code classloader for this program
* @return ClassLoader for user code execution
*/
public ClassLoader getUserCodeClassLoader();
/**
* Gets the job JAR and all dependencies
* @return List of JAR and dependency URLs
*/
public List<URL> getJobJarAndDependencies();
/**
* Static method to get JAR and dependencies from file
* @param jarFile JAR file to analyze
* @param entryPointClassName Entry point class name
* @return List of JAR and dependency URLs
* @throws ProgramInvocationException if analysis fails
*/
public static List<URL> getJobJarAndDependencies(File jarFile, String entryPointClassName)
throws ProgramInvocationException;
/**
* Extracts nested libraries from a JAR file
* @param jarFile JAR file URL to extract from
* @return List of extracted library files
* @throws ProgramInvocationException if extraction fails
*/
public static List<File> extractContainedLibraries(URL jarFile)
throws ProgramInvocationException;
/**
* Creates a new builder for PackagedProgram
* @return Builder instance
*/
public static Builder newBuilder();
/**
* Closes the program and releases resources
*/
@Override
public void close();
/**
* Builder pattern for PackagedProgram construction
*/
public static class Builder {
/**
* Sets the JAR file for this program
* @param jarFile JAR file containing the program
* @return This builder instance
*/
public Builder setJarFile(File jarFile);
/**
* Sets the entry point class name
* @param entryPointClassName Fully qualified class name
* @return This builder instance
*/
public Builder setEntryPointClassName(String entryPointClassName);
/**
* Sets the program arguments
* @param args Variable arguments for the program
* @return This builder instance
*/
public Builder setArguments(String... args);
/**
* Sets additional user classpaths
* @param userClassPaths List of user classpath URLs
* @return This builder instance
*/
public Builder setUserClassPaths(List<URL> userClassPaths);
/**
* Sets the Flink configuration
* @param configuration Flink configuration instance
* @return This builder instance
*/
public Builder setConfiguration(Configuration configuration);
/**
* Sets savepoint restore settings
* @param savepointRestoreSettings Savepoint settings
* @return This builder instance
*/
public Builder setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings);
/**
* Builds the PackagedProgram instance
* @return PackagedProgram instance
* @throws ProgramInvocationException if building fails
*/
public PackagedProgram build() throws ProgramInvocationException;
}
}Usage Example:
import org.apache.flink.client.program.PackagedProgram;
import java.io.File;
// Create a packaged program from JAR
PackagedProgram program = PackagedProgram.newBuilder()
.setJarFile(new File("/path/to/my-flink-job.jar"))
.setEntryPointClassName("com.example.MyFlinkJob")
.setArguments("--input", "/data/input", "--output", "/data/output")
.build();
try {
System.out.println("Program: " + program.getDescription());
System.out.println("Main class: " + program.getMainClassName());
// Get classpath information
List<URL> dependencies = program.getJobJarAndDependencies();
System.out.println("Dependencies: " + dependencies.size());
// Execute the program
program.invokeInteractiveModeForExecution();
} finally {
program.close();
}Utility functions for working with packaged programs.
/**
* Utilities for packaged programs
*/
public class PackagedProgramUtils {
/**
* Checks if the program is a Python program
* @param entryPointClassName Entry point class name to check
* @return true if Python program, false otherwise
*/
public static boolean isPython(String entryPointClassName);
/**
* Gets the Python JAR URL for Python programs
* @return URL of the Python JAR
*/
public static URL getPythonJar();
}Interface for retrieving packaged programs from various sources.
/**
* Interface for retrieving packaged programs
*/
public interface PackagedProgramRetriever {
/**
* Retrieves a packaged program
* @return PackagedProgram instance
* @throws ProgramRetrievalException if retrieval fails
*/
PackagedProgram getPackagedProgram() throws ProgramRetrievalException;
}
/**
* Default implementation for retrieving packaged programs
*/
public class DefaultPackagedProgramRetriever implements PackagedProgramRetriever {
@Override
public PackagedProgram getPackagedProgram() throws ProgramRetrievalException;
}Specialized execution environments that provide context for program execution.
/**
* Execution environment with context for DataSet programs
*/
public class ContextEnvironment extends ExecutionEnvironment {
// Provides execution context for DataSet API programs
// Automatically configured when running within Flink client
}
/**
* Execution environment for plan optimization
*/
public class OptimizerPlanEnvironment extends ExecutionEnvironment {
// Used for generating execution plans without actual execution
// Useful for plan analysis and optimization
}
/**
* Stream execution environment with context for DataStream programs
*/
public class StreamContextEnvironment extends StreamExecutionEnvironment {
// Provides execution context for DataStream API programs
// Automatically configured when running within Flink client
}
/**
* Environment for stream execution plans
*/
public class StreamPlanEnvironment extends StreamExecutionEnvironment {
// Used for generating stream execution plans
// Enables plan inspection without execution
}Usage Example:
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.api.java.ExecutionEnvironment;
// The environment is automatically set up when running through client
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// If running in client context, this will be a ContextEnvironment
if (env instanceof ContextEnvironment) {
System.out.println("Running in client context");
}
// Create and execute DataSet program
DataSet<String> input = env.readTextFile("/path/to/input");
input.flatMap(new Tokenizer())
.groupBy(0)
.sum(1)
.writeAsCsv("/path/to/output");
env.execute("Word Count Example");Factory for creating per-job mini clusters for testing and development.
/**
* Factory for per-job mini clusters
*/
public class PerJobMiniClusterFactory {
/**
* Creates a mini cluster for running a single job
* @param configuration Cluster configuration
* @return MiniCluster instance configured for single job execution
*/
public static MiniCluster createMiniCluster(Configuration configuration);
}/**
* Exception for program invocation errors
*/
public class ProgramInvocationException extends Exception {
/**
* Creates exception with message
* @param message Error message
*/
public ProgramInvocationException(String message);
/**
* Creates exception with message and job ID
* @param message Error message
* @param jobID Associated job ID
*/
public ProgramInvocationException(String message, JobID jobID);
/**
* Creates exception with cause
* @param cause Root cause throwable
*/
public ProgramInvocationException(Throwable cause);
/**
* Creates exception with message and cause
* @param message Error message
* @param cause Root cause throwable
*/
public ProgramInvocationException(String message, Throwable cause);
/**
* Creates exception with message, job ID and cause
* @param message Error message
* @param jobID Associated job ID
* @param cause Root cause throwable
*/
public ProgramInvocationException(String message, JobID jobID, Throwable cause);
}
/**
* Exception when program doesn't contain a job
*/
public class ProgramMissingJobException extends FlinkException {
/**
* Creates exception with message
* @param message Error message describing missing job
*/
public ProgramMissingJobException(String message);
}
/**
* Exception for program parameterization errors
*/
public class ProgramParametrizationException extends RuntimeException {
/**
* Creates exception with message
* @param message Error message
*/
public ProgramParametrizationException(String message);
/**
* Creates exception with message and cause
* @param message Error message
* @param cause Root cause throwable
*/
public ProgramParametrizationException(String message, Throwable cause);
}
/**
* Exception for program abortion (extends Error for immediate termination)
*/
public class ProgramAbortException extends Error {
/**
* Creates default abort exception
*/
public ProgramAbortException();
/**
* Creates abort exception with message
* @param message Abort message
*/
public ProgramAbortException(String message);
/**
* Creates abort exception with message and cause
* @param message Abort message
* @param cause Root cause throwable
*/
public ProgramAbortException(String message, Throwable cause);
}
/**
* Exception for program retrieval errors
*/
public class ProgramRetrievalException extends Exception {
public ProgramRetrievalException(String message);
public ProgramRetrievalException(String message, Throwable cause);
}public class SavepointRestoreSettings {
public static SavepointRestoreSettings none();
public static SavepointRestoreSettings forPath(String savepointPath);
public static SavepointRestoreSettings forPath(String savepointPath, boolean allowNonRestoredState);
public boolean restoreSavepoint();
public String getRestorePath();
public boolean allowNonRestoredState();
}
public abstract class ExecutionEnvironment {
public static ExecutionEnvironment getExecutionEnvironment();
public abstract JobExecutionResult execute(String jobName) throws Exception;
// DataSet API methods
public <X> DataSet<X> fromCollection(Collection<X> data);
public DataSet<String> readTextFile(String filePath);
}
public abstract class StreamExecutionEnvironment {
public static StreamExecutionEnvironment getExecutionEnvironment();
public JobExecutionResult execute(String jobName) throws Exception;
// DataStream API methods
public <T> DataStreamSource<T> fromCollection(Collection<T> data);
public DataStreamSource<String> socketTextStream(String hostname, int port);
}
public interface DataSet<T> {
<R> DataSet<R> map(MapFunction<T, R> mapper);
<R> DataSet<R> flatMap(FlatMapFunction<T, R> flatMapper);
DataSet<T> filter(FilterFunction<T> filter);
UnsortedGrouping<T> groupBy(int... fields);
DataSink<T> writeAsCsv(String filePath);
}
public interface DataStream<T> {
<R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper);
<R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper);
SingleOutputStreamOperator<T> filter(FilterFunction<T> filter);
DataStreamSink<T> print();
}
public class MiniCluster implements AutoCloseable {
public void start() throws Exception;
public void close() throws Exception;
public CompletableFuture<JobResult> executeJobBlocking(JobGraph job);
}