Apache Flink client library providing APIs and utilities for submitting, monitoring and managing Flink jobs programmatically
Utilities for packaging Flink programs as JAR files and managing their execution with dependencies and classpath handling.
Represents a Flink program packaged in a JAR file with its dependencies, providing comprehensive program metadata and execution support.
/**
* This class encapsulates represents a program, packaged in a jar file. It supplies
* functionality to extract nested libraries, search for the program entry point, and extract
* a program plan.
*/
public class PackagedProgram {
// Public Constants
/**
* Property name of the entry in JAR manifest file that describes the Flink specific entry point.
*/
public static final String MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS = "program-class";
/**
* Property name of the entry in JAR manifest file that describes the class with the main method.
*/
public static final String MANIFEST_ATTRIBUTE_MAIN_CLASS = "Main-Class";
// Constructors
/**
* Creates an instance that wraps the plan defined in the jar file using the given
* argument.
* @param jarFile The jar file which contains the plan and a Manifest which defines the program-class
* @param args Optional. The arguments used to create the pact plan, depend on implementation of the pact plan. See getDescription().
* @throws ProgramInvocationException This invocation is thrown if the Program can't be properly loaded. Causes
* may be a missing / wrong class or manifest files.
*/
public PackagedProgram(File jarFile, String... args) throws ProgramInvocationException;
/**
* Creates an instance that wraps the plan defined in the jar file using the given
* argument.
* @param jarFile The jar file which contains the plan and a Manifest which defines the program-class
* @param classpaths Additional classpath URLs needed by the Program.
* @param args Optional. The arguments used to create the pact plan, depend on implementation of the pact plan. See getDescription().
* @throws ProgramInvocationException This invocation is thrown if the Program can't be properly loaded. Causes
* may be a missing / wrong class or manifest files.
*/
public PackagedProgram(File jarFile, List<URL> classpaths, String... args) throws ProgramInvocationException;
/**
* Creates an instance that wraps the plan defined in the jar file using the given
* arguments. For generating the plan the class defined in the className parameter
* is used.
* @param jarFile The jar file which contains the plan.
* @param entryPointClassName Name of the class which generates the plan. Overrides the class defined in the jar file manifest
* @param args Optional. The arguments used to create the pact plan, depend on implementation of the pact plan. See getDescription().
* @throws ProgramInvocationException This invocation is thrown if the Program can't be properly loaded. Causes
* may be a missing / wrong class or manifest files.
*/
public PackagedProgram(File jarFile, String entryPointClassName, String... args) throws ProgramInvocationException;
/**
* Creates an instance that wraps the plan defined in the jar file using the given
* arguments. For generating the plan the class defined in the className parameter
* is used.
* @param jarFile The jar file which contains the plan.
* @param classpaths Additional classpath URLs needed by the Program.
* @param entryPointClassName Name of the class which generates the plan. Overrides the class defined in the jar file manifest
* @param args Optional. The arguments used to create the pact plan, depend on implementation of the pact plan. See getDescription().
* @throws ProgramInvocationException This invocation is thrown if the Program can't be properly loaded. Causes
* may be a missing / wrong class or manifest files.
*/
public PackagedProgram(File jarFile, List<URL> classpaths, String entryPointClassName, String... args) throws ProgramInvocationException;
// Configuration Methods
/**
* Sets the savepoint restore settings for this program.
* @param savepointSettings The savepoint restore settings
*/
public void setSavepointRestoreSettings(SavepointRestoreSettings savepointSettings);
/**
* Gets the savepoint restore settings for this program.
* @return The savepoint restore settings
*/
public SavepointRestoreSettings getSavepointSettings();
// Program Information Methods
/**
* Returns the arguments of this program.
* @return The arguments of this program
*/
public String[] getArguments();
/**
* Returns the main class name of this program.
* @return The main class name
*/
public String getMainClassName();
/**
* Returns true if this program uses interactive mode (main method).
* @return true if using interactive mode
*/
public boolean isUsingInteractiveMode();
/**
* Returns true if this program uses the program entry point.
* @return true if using program entry point
*/
public boolean isUsingProgramEntryPoint();
/**
* Returns the description provided by the Program class. This
* may contain a description of the plan itself and its arguments.
* @return The description of the PactProgram's input parameters.
* @throws ProgramInvocationException This invocation is thrown if the Program can't be properly loaded. Causes
* may be a missing / wrong class or manifest files.
*/
public String getDescription() throws ProgramInvocationException;
// Plan Generation Methods
/**
* Returns the plan without the required jars when the files are already provided by the cluster.
* @return The plan without attached jar files.
* @throws ProgramInvocationException if plan generation fails
*/
public JobWithJars getPlanWithoutJars() throws ProgramInvocationException;
/**
* Returns the plan with all required jars.
* @return The plan with attached jar files.
* @throws ProgramInvocationException if plan generation fails
*/
public JobWithJars getPlanWithJars() throws ProgramInvocationException;
/**
* Returns the analyzed plan without any optimizations.
* @return the analyzed plan without any optimizations.
* @throws ProgramInvocationException Thrown if an error occurred in the user-provided pact assembler. This may indicate
* missing parameters for generation.
*/
public String getPreviewPlan() throws ProgramInvocationException;
// Execution Methods
/**
* This method assumes that the context environment is prepared, or the execution
* will be a local execution by default.
* @throws ProgramInvocationException if execution fails
*/
public void invokeInteractiveModeForExecution() throws ProgramInvocationException;
// Classpath and Library Methods
/**
* Returns the classpaths that are required by the program.
* @return List of URLs for classpaths
*/
public List<URL> getClasspaths();
/**
* Gets the ClassLoader that must be used to load user code classes.
* @return The user code ClassLoader.
*/
public ClassLoader getUserCodeClassLoader();
/**
* Returns all provided libraries needed to run the program.
* @return List of library URLs
*/
public List<URL> getAllLibraries();
/**
* Deletes all temporary files created for contained packaged libraries.
*/
public void deleteExtractedLibraries();
// Static Utility Methods
/**
* Takes all JAR files that are contained in this program's JAR file and extracts them
* to the system's temp directory.
* @param jarFile The JAR file to extract libraries from
* @return The file names of the extracted temporary files.
* @throws ProgramInvocationException Thrown, if the extraction process failed.
*/
public static List<File> extractContainedLibraries(URL jarFile) throws ProgramInvocationException;
/**
* Deletes the extracted temporary library files.
* @param tempLibraries List of temporary library files to delete
*/
public static void deleteExtractedLibraries(List<File> tempLibraries);
/**
* Gets the command line arguments for this program
* @return Array of command line arguments
*/
public String[] getArguments();
/**
* Gets the fully qualified main class name
* @return Main class name for program execution
*/
public String getMainClassName();
/**
* Checks if the program uses interactive mode for execution
* @return true if interactive mode is enabled
*/
public boolean isUsingInteractiveMode();
/**
* Checks if the program uses a specific entry point class
* @return true if using program entry point
*/
public boolean isUsingProgramEntryPoint();
/**
* Gets the execution plan with associated JAR files
* @return JobWithJars containing the execution plan and dependencies
* @throws Exception if plan generation fails
*/
public JobWithJars getPlanWithJars() throws Exception;
/**
* Gets the execution plan without JAR file dependencies
* @return JobWithJars containing only the execution plan
* @throws Exception if plan generation fails
*/
public JobWithJars getPlanWithoutJars() throws Exception;
/**
* Gets a preview of the execution plan as a string
* @return String representation of the execution plan
* @throws Exception if plan preview generation fails
*/
public String getPreviewPlan() throws Exception;
/**
* Gets a description of the program
* @return Program description string
* @throws Exception if description retrieval fails
*/
public String getDescription() throws Exception;
/**
* Invokes the program in interactive mode for execution
* @throws Exception if interactive execution fails
*/
public void invokeInteractiveModeForExecution() throws Exception;
/**
* Gets all library URLs associated with this program
* @return List of library URLs
*/
public List<URL> getAllLibraries();
/**
* Gets the additional classpath URLs
* @return List of classpath URLs
*/
public List<URL> getClasspaths();
/**
* Gets the user code class loader for this program
* @return ClassLoader for user code execution
*/
public ClassLoader getUserCodeClassLoader();
/**
* Deletes any temporarily extracted library files
*/
public void deleteExtractedLibraries();
/**
* Sets savepoint restore settings for this program
* @param savepointSettings Settings for savepoint restoration
*/
public void setSavepointRestoreSettings(SavepointRestoreSettings savepointSettings);
}Program Manifest Constants:
/**
* Manifest attribute for the assembler class
*/
public static final String MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS = "program-class";
/**
* Manifest attribute for the main class
*/
public static final String MANIFEST_ATTRIBUTE_MAIN_CLASS = "Main-Class";Represents a Flink dataflow plan with associated JAR files and classpaths for execution.
/**
* Represents a Flink dataflow plan with associated JAR files and classpaths
*/
public class JobWithJars {
/**
* Creates a job with execution plan and dependencies
* @param plan The Flink execution plan
* @param jarFiles List of JAR file URLs
* @param classpaths List of additional classpath URLs
*/
public JobWithJars(Plan plan, List<URL> jarFiles, List<URL> classpaths);
/**
* Creates a job with execution plan and single JAR file
* @param plan The Flink execution plan
* @param jarFile Single JAR file URL
*/
public JobWithJars(Plan plan, URL jarFile);
/**
* Gets the Flink execution plan
* @return The execution plan for this job
*/
public Plan getPlan();
/**
* Gets the list of JAR file URLs
* @return List of JAR file URLs required for execution
*/
public List<URL> getJarFiles();
/**
* Gets the list of additional classpath URLs
* @return List of classpath URLs for dependencies
*/
public List<URL> getClasspaths();
/**
* Gets the user code class loader
* @return ClassLoader for user code execution
*/
public ClassLoader getUserCodeClassLoader();
/**
* Validates that a JAR file URL is valid and accessible
* @param jar The JAR file URL to validate
* @throws Exception if the JAR file is invalid or inaccessible
*/
public static void checkJarFile(URL jar) throws Exception;
/**
* Builds a user code class loader from JAR files and classpaths
* @param jars List of JAR file URLs
* @param classpaths List of classpath URLs
* @param parent Parent class loader
* @return Configured ClassLoader for user code
*/
public static ClassLoader buildUserCodeClassLoader(List<URL> jars, List<URL> classpaths, ClassLoader parent);
}Usage Examples:
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.JobWithJars;
import java.io.File;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
// Create a basic packaged program
File jarFile = new File("path/to/my-flink-job.jar");
PackagedProgram program = new PackagedProgram(jarFile, "arg1", "arg2", "arg3");
// Get program information
System.out.println("Main class: " + program.getMainClassName());
System.out.println("Arguments: " + Arrays.toString(program.getArguments()));
// Create a program with additional dependencies
List<URL> dependencies = Arrays.asList(
new File("lib/dependency1.jar").toURI().toURL(),
new File("lib/dependency2.jar").toURI().toURL()
);
PackagedProgram programWithDeps = new PackagedProgram(
jarFile,
dependencies,
"com.example.MyFlinkJob",
"arg1", "arg2"
);
// Get execution plan
JobWithJars jobWithJars = program.getPlanWithJars();
Plan executionPlan = jobWithJars.getPlan();
// Preview execution plan
String planPreview = program.getPreviewPlan();
System.out.println("Execution plan preview: " + planPreview);
// Access class loader for custom operations
ClassLoader userClassLoader = program.getUserCodeClassLoader();
// Clean up temporary files
program.deleteExtractedLibraries();Exception types for program-related errors.
/**
* Exception indicating errors during Flink program invocation
*/
public class ProgramInvocationException extends Exception {
/**
* Creates exception with error message
* @param message Error description
*/
public ProgramInvocationException(String message);
/**
* Creates exception with underlying cause
* @param cause Root cause of the error
*/
public ProgramInvocationException(Throwable cause);
/**
* Creates exception with message and underlying cause
* @param message Error description
* @param cause Root cause of the error
*/
public ProgramInvocationException(String message, Throwable cause);
}
/**
* Runtime exception indicating errors in Flink program parametrization
*/
public class ProgramParametrizationException extends RuntimeException {
/**
* Creates exception with error message
* @param message Error description
*/
public ProgramParametrizationException(String message);
}
/**
* Exception indicating no job was executed during program invocation
*/
public class ProgramMissingJobException extends Exception {
public ProgramMissingJobException();
}Configuration for restoring jobs from savepoints.
/**
* Settings for restoring jobs from savepoints
*/
public class SavepointRestoreSettings {
/**
* Creates settings for no savepoint restoration
* @return SavepointRestoreSettings with no restoration
*/
public static SavepointRestoreSettings none();
/**
* Creates settings for restoring from a savepoint path
* @param savepointPath Path to the savepoint directory
* @return SavepointRestoreSettings configured for the specified path
*/
public static SavepointRestoreSettings forPath(String savepointPath);
/**
* Creates settings for restoring from a savepoint with non-restored state handling
* @param savepointPath Path to the savepoint directory
* @param allowNonRestoredState Whether to allow state that cannot be restored
* @return SavepointRestoreSettings with specified configuration
*/
public static SavepointRestoreSettings forPath(String savepointPath, boolean allowNonRestoredState);
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-clients-2-10