Utilities for packaging Flink programs as JAR files and managing their execution with dependencies and classpath handling.
Represents a Flink program packaged in a JAR file with its dependencies, providing comprehensive program metadata and execution support.
/**
* This class encapsulates represents a program, packaged in a jar file. It supplies
* functionality to extract nested libraries, search for the program entry point, and extract
* a program plan.
*/
public class PackagedProgram {
// Public Constants
/**
* Property name of the entry in JAR manifest file that describes the Flink specific entry point.
*/
public static final String MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS = "program-class";
/**
* Property name of the entry in JAR manifest file that describes the class with the main method.
*/
public static final String MANIFEST_ATTRIBUTE_MAIN_CLASS = "Main-Class";
// Constructors
/**
* Creates an instance that wraps the plan defined in the jar file using the given
* argument.
* @param jarFile The jar file which contains the plan and a Manifest which defines the program-class
* @param args Optional. The arguments used to create the pact plan, depend on implementation of the pact plan. See getDescription().
* @throws ProgramInvocationException This invocation is thrown if the Program can't be properly loaded. Causes
* may be a missing / wrong class or manifest files.
*/
public PackagedProgram(File jarFile, String... args) throws ProgramInvocationException;
/**
* Creates an instance that wraps the plan defined in the jar file using the given
* argument.
* @param jarFile The jar file which contains the plan and a Manifest which defines the program-class
* @param classpaths Additional classpath URLs needed by the Program.
* @param args Optional. The arguments used to create the pact plan, depend on implementation of the pact plan. See getDescription().
* @throws ProgramInvocationException This invocation is thrown if the Program can't be properly loaded. Causes
* may be a missing / wrong class or manifest files.
*/
public PackagedProgram(File jarFile, List<URL> classpaths, String... args) throws ProgramInvocationException;
/**
* Creates an instance that wraps the plan defined in the jar file using the given
* arguments. For generating the plan the class defined in the className parameter
* is used.
* @param jarFile The jar file which contains the plan.
* @param entryPointClassName Name of the class which generates the plan. Overrides the class defined in the jar file manifest
* @param args Optional. The arguments used to create the pact plan, depend on implementation of the pact plan. See getDescription().
* @throws ProgramInvocationException This invocation is thrown if the Program can't be properly loaded. Causes
* may be a missing / wrong class or manifest files.
*/
public PackagedProgram(File jarFile, String entryPointClassName, String... args) throws ProgramInvocationException;
/**
* Creates an instance that wraps the plan defined in the jar file using the given
* arguments. For generating the plan the class defined in the className parameter
* is used.
* @param jarFile The jar file which contains the plan.
* @param classpaths Additional classpath URLs needed by the Program.
* @param entryPointClassName Name of the class which generates the plan. Overrides the class defined in the jar file manifest
* @param args Optional. The arguments used to create the pact plan, depend on implementation of the pact plan. See getDescription().
* @throws ProgramInvocationException This invocation is thrown if the Program can't be properly loaded. Causes
* may be a missing / wrong class or manifest files.
*/
public PackagedProgram(File jarFile, List<URL> classpaths, String entryPointClassName, String... args) throws ProgramInvocationException;
// Configuration Methods
/**
* Sets the savepoint restore settings for this program.
* @param savepointSettings The savepoint restore settings
*/
public void setSavepointRestoreSettings(SavepointRestoreSettings savepointSettings);
/**
* Gets the savepoint restore settings for this program.
* @return The savepoint restore settings
*/
public SavepointRestoreSettings getSavepointSettings();
// Program Information Methods
/**
* Returns the arguments of this program.
* @return The arguments of this program
*/
public String[] getArguments();
/**
* Returns the main class name of this program.
* @return The main class name
*/
public String getMainClassName();
/**
* Returns true if this program uses interactive mode (main method).
* @return true if using interactive mode
*/
public boolean isUsingInteractiveMode();
/**
* Returns true if this program uses the program entry point.
* @return true if using program entry point
*/
public boolean isUsingProgramEntryPoint();
/**
* Returns the description provided by the Program class. This
* may contain a description of the plan itself and its arguments.
* @return The description of the PactProgram's input parameters.
* @throws ProgramInvocationException This invocation is thrown if the Program can't be properly loaded. Causes
* may be a missing / wrong class or manifest files.
*/
public String getDescription() throws ProgramInvocationException;
// Plan Generation Methods
/**
* Returns the plan without the required jars when the files are already provided by the cluster.
* @return The plan without attached jar files.
* @throws ProgramInvocationException if plan generation fails
*/
public JobWithJars getPlanWithoutJars() throws ProgramInvocationException;
/**
* Returns the plan with all required jars.
* @return The plan with attached jar files.
* @throws ProgramInvocationException if plan generation fails
*/
public JobWithJars getPlanWithJars() throws ProgramInvocationException;
/**
* Returns the analyzed plan without any optimizations.
* @return the analyzed plan without any optimizations.
* @throws ProgramInvocationException Thrown if an error occurred in the user-provided pact assembler. This may indicate
* missing parameters for generation.
*/
public String getPreviewPlan() throws ProgramInvocationException;
// Execution Methods
/**
* This method assumes that the context environment is prepared, or the execution
* will be a local execution by default.
* @throws ProgramInvocationException if execution fails
*/
public void invokeInteractiveModeForExecution() throws ProgramInvocationException;
// Classpath and Library Methods
/**
* Returns the classpaths that are required by the program.
* @return List of URLs for classpaths
*/
public List<URL> getClasspaths();
/**
* Gets the ClassLoader that must be used to load user code classes.
* @return The user code ClassLoader.
*/
public ClassLoader getUserCodeClassLoader();
/**
* Returns all provided libraries needed to run the program.
* @return List of library URLs
*/
public List<URL> getAllLibraries();
/**
* Deletes all temporary files created for contained packaged libraries.
*/
public void deleteExtractedLibraries();
// Static Utility Methods
/**
* Takes all JAR files that are contained in this program's JAR file and extracts them
* to the system's temp directory.
* @param jarFile The JAR file to extract libraries from
* @return The file names of the extracted temporary files.
* @throws ProgramInvocationException Thrown, if the extraction process failed.
*/
public static List<File> extractContainedLibraries(URL jarFile) throws ProgramInvocationException;
/**
* Deletes the extracted temporary library files.
* @param tempLibraries List of temporary library files to delete
*/
public static void deleteExtractedLibraries(List<File> tempLibraries);
/**
* Gets the command line arguments for this program
* @return Array of command line arguments
*/
public String[] getArguments();
/**
* Gets the fully qualified main class name
* @return Main class name for program execution
*/
public String getMainClassName();
/**
* Checks if the program uses interactive mode for execution
* @return true if interactive mode is enabled
*/
public boolean isUsingInteractiveMode();
/**
* Checks if the program uses a specific entry point class
* @return true if using program entry point
*/
public boolean isUsingProgramEntryPoint();
/**
* Gets the execution plan with associated JAR files
* @return JobWithJars containing the execution plan and dependencies
* @throws Exception if plan generation fails
*/
public JobWithJars getPlanWithJars() throws Exception;
/**
* Gets the execution plan without JAR file dependencies
* @return JobWithJars containing only the execution plan
* @throws Exception if plan generation fails
*/
public JobWithJars getPlanWithoutJars() throws Exception;
/**
* Gets a preview of the execution plan as a string
* @return String representation of the execution plan
* @throws Exception if plan preview generation fails
*/
public String getPreviewPlan() throws Exception;
/**
* Gets a description of the program
* @return Program description string
* @throws Exception if description retrieval fails
*/
public String getDescription() throws Exception;
/**
* Invokes the program in interactive mode for execution
* @throws Exception if interactive execution fails
*/
public void invokeInteractiveModeForExecution() throws Exception;
/**
* Gets all library URLs associated with this program
* @return List of library URLs
*/
public List<URL> getAllLibraries();
/**
* Gets the additional classpath URLs
* @return List of classpath URLs
*/
public List<URL> getClasspaths();
/**
* Gets the user code class loader for this program
* @return ClassLoader for user code execution
*/
public ClassLoader getUserCodeClassLoader();
/**
* Deletes any temporarily extracted library files
*/
public void deleteExtractedLibraries();
/**
* Sets savepoint restore settings for this program
* @param savepointSettings Settings for savepoint restoration
*/
public void setSavepointRestoreSettings(SavepointRestoreSettings savepointSettings);
}Program Manifest Constants:
/**
* Manifest attribute for the assembler class
*/
public static final String MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS = "program-class";
/**
* Manifest attribute for the main class
*/
public static final String MANIFEST_ATTRIBUTE_MAIN_CLASS = "Main-Class";Represents a Flink dataflow plan with associated JAR files and classpaths for execution.
/**
* Represents a Flink dataflow plan with associated JAR files and classpaths
*/
public class JobWithJars {
/**
* Creates a job with execution plan and dependencies
* @param plan The Flink execution plan
* @param jarFiles List of JAR file URLs
* @param classpaths List of additional classpath URLs
*/
public JobWithJars(Plan plan, List<URL> jarFiles, List<URL> classpaths);
/**
* Creates a job with execution plan and single JAR file
* @param plan The Flink execution plan
* @param jarFile Single JAR file URL
*/
public JobWithJars(Plan plan, URL jarFile);
/**
* Gets the Flink execution plan
* @return The execution plan for this job
*/
public Plan getPlan();
/**
* Gets the list of JAR file URLs
* @return List of JAR file URLs required for execution
*/
public List<URL> getJarFiles();
/**
* Gets the list of additional classpath URLs
* @return List of classpath URLs for dependencies
*/
public List<URL> getClasspaths();
/**
* Gets the user code class loader
* @return ClassLoader for user code execution
*/
public ClassLoader getUserCodeClassLoader();
/**
* Validates that a JAR file URL is valid and accessible
* @param jar The JAR file URL to validate
* @throws Exception if the JAR file is invalid or inaccessible
*/
public static void checkJarFile(URL jar) throws Exception;
/**
* Builds a user code class loader from JAR files and classpaths
* @param jars List of JAR file URLs
* @param classpaths List of classpath URLs
* @param parent Parent class loader
* @return Configured ClassLoader for user code
*/
public static ClassLoader buildUserCodeClassLoader(List<URL> jars, List<URL> classpaths, ClassLoader parent);
}Usage Examples:
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.JobWithJars;
import java.io.File;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
// Create a basic packaged program
File jarFile = new File("path/to/my-flink-job.jar");
PackagedProgram program = new PackagedProgram(jarFile, "arg1", "arg2", "arg3");
// Get program information
System.out.println("Main class: " + program.getMainClassName());
System.out.println("Arguments: " + Arrays.toString(program.getArguments()));
// Create a program with additional dependencies
List<URL> dependencies = Arrays.asList(
new File("lib/dependency1.jar").toURI().toURL(),
new File("lib/dependency2.jar").toURI().toURL()
);
PackagedProgram programWithDeps = new PackagedProgram(
jarFile,
dependencies,
"com.example.MyFlinkJob",
"arg1", "arg2"
);
// Get execution plan
JobWithJars jobWithJars = program.getPlanWithJars();
Plan executionPlan = jobWithJars.getPlan();
// Preview execution plan
String planPreview = program.getPreviewPlan();
System.out.println("Execution plan preview: " + planPreview);
// Access class loader for custom operations
ClassLoader userClassLoader = program.getUserCodeClassLoader();
// Clean up temporary files
program.deleteExtractedLibraries();Exception types for program-related errors.
/**
* Exception indicating errors during Flink program invocation
*/
public class ProgramInvocationException extends Exception {
/**
* Creates exception with error message
* @param message Error description
*/
public ProgramInvocationException(String message);
/**
* Creates exception with underlying cause
* @param cause Root cause of the error
*/
public ProgramInvocationException(Throwable cause);
/**
* Creates exception with message and underlying cause
* @param message Error description
* @param cause Root cause of the error
*/
public ProgramInvocationException(String message, Throwable cause);
}
/**
* Runtime exception indicating errors in Flink program parametrization
*/
public class ProgramParametrizationException extends RuntimeException {
/**
* Creates exception with error message
* @param message Error description
*/
public ProgramParametrizationException(String message);
}
/**
* Exception indicating no job was executed during program invocation
*/
public class ProgramMissingJobException extends Exception {
public ProgramMissingJobException();
}Configuration for restoring jobs from savepoints.
/**
* Settings for restoring jobs from savepoints
*/
public class SavepointRestoreSettings {
/**
* Creates settings for no savepoint restoration
* @return SavepointRestoreSettings with no restoration
*/
public static SavepointRestoreSettings none();
/**
* Creates settings for restoring from a savepoint path
* @param savepointPath Path to the savepoint directory
* @return SavepointRestoreSettings configured for the specified path
*/
public static SavepointRestoreSettings forPath(String savepointPath);
/**
* Creates settings for restoring from a savepoint with non-restored state handling
* @param savepointPath Path to the savepoint directory
* @param allowNonRestoredState Whether to allow state that cannot be restored
* @return SavepointRestoreSettings with specified configuration
*/
public static SavepointRestoreSettings forPath(String savepointPath, boolean allowNonRestoredState);
}