or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

cli.mdcluster-management.mdexecution-context.mdexecution-environments.mdindex.mdprogram-management.md
tile.json

program-management.mddocs/

Program Management

Utilities for packaging Flink programs as JAR files and managing their execution with dependencies and classpath handling.

Capabilities

PackagedProgram

Represents a Flink program packaged in a JAR file with its dependencies, providing comprehensive program metadata and execution support.

/**
 * This class encapsulates represents a program, packaged in a jar file. It supplies
 * functionality to extract nested libraries, search for the program entry point, and extract
 * a program plan.
 */
public class PackagedProgram {
    
    // Public Constants
    
    /**
     * Property name of the entry in JAR manifest file that describes the Flink specific entry point.
     */
    public static final String MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS = "program-class";
    
    /**
     * Property name of the entry in JAR manifest file that describes the class with the main method.
     */
    public static final String MANIFEST_ATTRIBUTE_MAIN_CLASS = "Main-Class";
    
    // Constructors
    
    /**
     * Creates an instance that wraps the plan defined in the jar file using the given
     * argument.
     * @param jarFile The jar file which contains the plan and a Manifest which defines the program-class
     * @param args Optional. The arguments used to create the pact plan, depend on implementation of the pact plan. See getDescription().
     * @throws ProgramInvocationException This invocation is thrown if the Program can't be properly loaded. Causes
     *         may be a missing / wrong class or manifest files.
     */
    public PackagedProgram(File jarFile, String... args) throws ProgramInvocationException;
    
    /**
     * Creates an instance that wraps the plan defined in the jar file using the given
     * argument.
     * @param jarFile The jar file which contains the plan and a Manifest which defines the program-class
     * @param classpaths Additional classpath URLs needed by the Program.
     * @param args Optional. The arguments used to create the pact plan, depend on implementation of the pact plan. See getDescription().
     * @throws ProgramInvocationException This invocation is thrown if the Program can't be properly loaded. Causes
     *         may be a missing / wrong class or manifest files.
     */
    public PackagedProgram(File jarFile, List<URL> classpaths, String... args) throws ProgramInvocationException;
    
    /**
     * Creates an instance that wraps the plan defined in the jar file using the given
     * arguments. For generating the plan the class defined in the className parameter
     * is used.
     * @param jarFile The jar file which contains the plan.
     * @param entryPointClassName Name of the class which generates the plan. Overrides the class defined in the jar file manifest
     * @param args Optional. The arguments used to create the pact plan, depend on implementation of the pact plan. See getDescription().
     * @throws ProgramInvocationException This invocation is thrown if the Program can't be properly loaded. Causes
     *         may be a missing / wrong class or manifest files.
     */
    public PackagedProgram(File jarFile, String entryPointClassName, String... args) throws ProgramInvocationException;
    
    /**
     * Creates an instance that wraps the plan defined in the jar file using the given
     * arguments. For generating the plan the class defined in the className parameter
     * is used.
     * @param jarFile The jar file which contains the plan.
     * @param classpaths Additional classpath URLs needed by the Program.
     * @param entryPointClassName Name of the class which generates the plan. Overrides the class defined in the jar file manifest
     * @param args Optional. The arguments used to create the pact plan, depend on implementation of the pact plan. See getDescription().
     * @throws ProgramInvocationException This invocation is thrown if the Program can't be properly loaded. Causes
     *         may be a missing / wrong class or manifest files.
     */
    public PackagedProgram(File jarFile, List<URL> classpaths, String entryPointClassName, String... args) throws ProgramInvocationException;
    
    // Configuration Methods
    
    /**
     * Sets the savepoint restore settings for this program.
     * @param savepointSettings The savepoint restore settings
     */
    public void setSavepointRestoreSettings(SavepointRestoreSettings savepointSettings);
    
    /**
     * Gets the savepoint restore settings for this program.
     * @return The savepoint restore settings
     */
    public SavepointRestoreSettings getSavepointSettings();
    
    // Program Information Methods
    
    /**
     * Returns the arguments of this program.
     * @return The arguments of this program
     */
    public String[] getArguments();
    
    /**
     * Returns the main class name of this program.
     * @return The main class name
     */
    public String getMainClassName();
    
    /**
     * Returns true if this program uses interactive mode (main method).
     * @return true if using interactive mode
     */
    public boolean isUsingInteractiveMode();
    
    /**
     * Returns true if this program uses the program entry point.
     * @return true if using program entry point
     */
    public boolean isUsingProgramEntryPoint();
    
    /**
     * Returns the description provided by the Program class. This
     * may contain a description of the plan itself and its arguments.
     * @return The description of the PactProgram's input parameters.
     * @throws ProgramInvocationException This invocation is thrown if the Program can't be properly loaded. Causes
     *         may be a missing / wrong class or manifest files.
     */
    public String getDescription() throws ProgramInvocationException;
    
    // Plan Generation Methods
    
    /**
     * Returns the plan without the required jars when the files are already provided by the cluster.
     * @return The plan without attached jar files.
     * @throws ProgramInvocationException if plan generation fails
     */
    public JobWithJars getPlanWithoutJars() throws ProgramInvocationException;
    
    /**
     * Returns the plan with all required jars.
     * @return The plan with attached jar files.
     * @throws ProgramInvocationException if plan generation fails
     */
    public JobWithJars getPlanWithJars() throws ProgramInvocationException;
    
    /**
     * Returns the analyzed plan without any optimizations.
     * @return the analyzed plan without any optimizations.
     * @throws ProgramInvocationException Thrown if an error occurred in the user-provided pact assembler. This may indicate
     *         missing parameters for generation.
     */
    public String getPreviewPlan() throws ProgramInvocationException;
    
    // Execution Methods
    
    /**
     * This method assumes that the context environment is prepared, or the execution
     * will be a local execution by default.
     * @throws ProgramInvocationException if execution fails
     */
    public void invokeInteractiveModeForExecution() throws ProgramInvocationException;
    
    // Classpath and Library Methods
    
    /**
     * Returns the classpaths that are required by the program.
     * @return List of URLs for classpaths
     */
    public List<URL> getClasspaths();
    
    /**
     * Gets the ClassLoader that must be used to load user code classes.
     * @return The user code ClassLoader.
     */
    public ClassLoader getUserCodeClassLoader();
    
    /**
     * Returns all provided libraries needed to run the program.
     * @return List of library URLs
     */
    public List<URL> getAllLibraries();
    
    /**
     * Deletes all temporary files created for contained packaged libraries.
     */
    public void deleteExtractedLibraries();
    
    // Static Utility Methods
    
    /**
     * Takes all JAR files that are contained in this program's JAR file and extracts them
     * to the system's temp directory.
     * @param jarFile The JAR file to extract libraries from
     * @return The file names of the extracted temporary files.
     * @throws ProgramInvocationException Thrown, if the extraction process failed.
     */
    public static List<File> extractContainedLibraries(URL jarFile) throws ProgramInvocationException;
    
    /**
     * Deletes the extracted temporary library files.
     * @param tempLibraries List of temporary library files to delete
     */
    public static void deleteExtractedLibraries(List<File> tempLibraries);
    
    /**
     * Gets the command line arguments for this program
     * @return Array of command line arguments
     */
    public String[] getArguments();
    
    /**
     * Gets the fully qualified main class name
     * @return Main class name for program execution
     */
    public String getMainClassName();
    
    /**
     * Checks if the program uses interactive mode for execution
     * @return true if interactive mode is enabled
     */
    public boolean isUsingInteractiveMode();
    
    /**
     * Checks if the program uses a specific entry point class
     * @return true if using program entry point
     */
    public boolean isUsingProgramEntryPoint();
    
    /**
     * Gets the execution plan with associated JAR files
     * @return JobWithJars containing the execution plan and dependencies
     * @throws Exception if plan generation fails
     */
    public JobWithJars getPlanWithJars() throws Exception;
    
    /**
     * Gets the execution plan without JAR file dependencies
     * @return JobWithJars containing only the execution plan
     * @throws Exception if plan generation fails
     */
    public JobWithJars getPlanWithoutJars() throws Exception;
    
    /**
     * Gets a preview of the execution plan as a string
     * @return String representation of the execution plan
     * @throws Exception if plan preview generation fails
     */
    public String getPreviewPlan() throws Exception;
    
    /**
     * Gets a description of the program
     * @return Program description string
     * @throws Exception if description retrieval fails
     */
    public String getDescription() throws Exception;
    
    /**
     * Invokes the program in interactive mode for execution
     * @throws Exception if interactive execution fails
     */
    public void invokeInteractiveModeForExecution() throws Exception;
    
    /**
     * Gets all library URLs associated with this program
     * @return List of library URLs
     */
    public List<URL> getAllLibraries();
    
    /**
     * Gets the additional classpath URLs
     * @return List of classpath URLs
     */
    public List<URL> getClasspaths();
    
    /**
     * Gets the user code class loader for this program
     * @return ClassLoader for user code execution
     */
    public ClassLoader getUserCodeClassLoader();
    
    /**
     * Deletes any temporarily extracted library files
     */
    public void deleteExtractedLibraries();
    
    /**
     * Sets savepoint restore settings for this program
     * @param savepointSettings Settings for savepoint restoration
     */
    public void setSavepointRestoreSettings(SavepointRestoreSettings savepointSettings);
}

Program Manifest Constants:

/**
 * Manifest attribute for the assembler class
 */
public static final String MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS = "program-class";

/**
 * Manifest attribute for the main class
 */
public static final String MANIFEST_ATTRIBUTE_MAIN_CLASS = "Main-Class";

JobWithJars

Represents a Flink dataflow plan with associated JAR files and classpaths for execution.

/**
 * Represents a Flink dataflow plan with associated JAR files and classpaths
 */
public class JobWithJars {
    /**
     * Creates a job with execution plan and dependencies
     * @param plan The Flink execution plan
     * @param jarFiles List of JAR file URLs
     * @param classpaths List of additional classpath URLs
     */
    public JobWithJars(Plan plan, List<URL> jarFiles, List<URL> classpaths);
    
    /**
     * Creates a job with execution plan and single JAR file
     * @param plan The Flink execution plan
     * @param jarFile Single JAR file URL
     */
    public JobWithJars(Plan plan, URL jarFile);
    
    /**
     * Gets the Flink execution plan
     * @return The execution plan for this job
     */
    public Plan getPlan();
    
    /**
     * Gets the list of JAR file URLs
     * @return List of JAR file URLs required for execution
     */
    public List<URL> getJarFiles();
    
    /**
     * Gets the list of additional classpath URLs
     * @return List of classpath URLs for dependencies
     */
    public List<URL> getClasspaths();
    
    /**
     * Gets the user code class loader
     * @return ClassLoader for user code execution
     */
    public ClassLoader getUserCodeClassLoader();
    
    /**
     * Validates that a JAR file URL is valid and accessible
     * @param jar The JAR file URL to validate
     * @throws Exception if the JAR file is invalid or inaccessible
     */
    public static void checkJarFile(URL jar) throws Exception;
    
    /**
     * Builds a user code class loader from JAR files and classpaths
     * @param jars List of JAR file URLs
     * @param classpaths List of classpath URLs
     * @param parent Parent class loader
     * @return Configured ClassLoader for user code
     */
    public static ClassLoader buildUserCodeClassLoader(List<URL> jars, List<URL> classpaths, ClassLoader parent);
}

Usage Examples:

import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.JobWithJars;
import java.io.File;
import java.net.URL;
import java.util.Arrays;
import java.util.List;

// Create a basic packaged program
File jarFile = new File("path/to/my-flink-job.jar");
PackagedProgram program = new PackagedProgram(jarFile, "arg1", "arg2", "arg3");

// Get program information
System.out.println("Main class: " + program.getMainClassName());
System.out.println("Arguments: " + Arrays.toString(program.getArguments()));

// Create a program with additional dependencies
List<URL> dependencies = Arrays.asList(
    new File("lib/dependency1.jar").toURI().toURL(),
    new File("lib/dependency2.jar").toURI().toURL()
);

PackagedProgram programWithDeps = new PackagedProgram(
    jarFile, 
    dependencies, 
    "com.example.MyFlinkJob",
    "arg1", "arg2"
);

// Get execution plan
JobWithJars jobWithJars = program.getPlanWithJars();
Plan executionPlan = jobWithJars.getPlan();

// Preview execution plan
String planPreview = program.getPreviewPlan();
System.out.println("Execution plan preview: " + planPreview);

// Access class loader for custom operations
ClassLoader userClassLoader = program.getUserCodeClassLoader();

// Clean up temporary files
program.deleteExtractedLibraries();

Program Exception Types

Exception types for program-related errors.

/**
 * Exception indicating errors during Flink program invocation
 */
public class ProgramInvocationException extends Exception {
    /**
     * Creates exception with error message
     * @param message Error description
     */
    public ProgramInvocationException(String message);
    
    /**
     * Creates exception with underlying cause
     * @param cause Root cause of the error
     */
    public ProgramInvocationException(Throwable cause);
    
    /**
     * Creates exception with message and underlying cause
     * @param message Error description
     * @param cause Root cause of the error
     */
    public ProgramInvocationException(String message, Throwable cause);
}

/**
 * Runtime exception indicating errors in Flink program parametrization
 */
public class ProgramParametrizationException extends RuntimeException {
    /**
     * Creates exception with error message
     * @param message Error description
     */
    public ProgramParametrizationException(String message);
}

/**
 * Exception indicating no job was executed during program invocation
 */
public class ProgramMissingJobException extends Exception {
    public ProgramMissingJobException();
}

Savepoint Restore Settings

Configuration for restoring jobs from savepoints.

/**
 * Settings for restoring jobs from savepoints
 */
public class SavepointRestoreSettings {
    /**
     * Creates settings for no savepoint restoration
     * @return SavepointRestoreSettings with no restoration
     */
    public static SavepointRestoreSettings none();
    
    /**
     * Creates settings for restoring from a savepoint path
     * @param savepointPath Path to the savepoint directory
     * @return SavepointRestoreSettings configured for the specified path
     */
    public static SavepointRestoreSettings forPath(String savepointPath);
    
    /**
     * Creates settings for restoring from a savepoint with non-restored state handling
     * @param savepointPath Path to the savepoint directory
     * @param allowNonRestoredState Whether to allow state that cannot be restored
     * @return SavepointRestoreSettings with specified configuration
     */
    public static SavepointRestoreSettings forPath(String savepointPath, boolean allowNonRestoredState);
}