CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-clients

Apache Flink client library providing programmatic APIs and command-line interfaces for submitting, managing, and monitoring Flink jobs.

Pending
Overview
Eval results
Files

program-packaging.mddocs/

Program Packaging and Execution

Functionality for packaging Flink programs in JAR files, managing classpaths, handling user applications, and executing programs with proper isolation and resource management.

Capabilities

Packaged Program

Core class representing a Flink program packaged in a JAR file with full support for classpath management, argument handling, and program execution.

/**
 * Represents a Flink program packaged in a JAR file
 */
public class PackagedProgram implements AutoCloseable {
    /**
     * Create a new packaged program builder
     * @return Builder instance for constructing PackagedProgram
     */
    public static Builder newBuilder();
    
    /**
     * Get the main class of the packaged program
     * @return Main class object
     * @throws ProgramInvocationException if main class cannot be determined
     */
    public Class<?> getMainClass() throws ProgramInvocationException;
    
    /**
     * Get program arguments
     * @return Array of program arguments
     */
    public String[] getArguments();
    
    /**
     * Get JAR file URL
     * @return URL of the JAR file
     */
    public URL getJarFile();
    
    /**
     * Get user code class loader
     * @return Class loader for user code
     */
    public URLClassLoader getUserCodeClassLoader();
    
    /**
     * Get savepoint restore settings
     * @return Savepoint restore settings
     */
    public SavepointRestoreSettings getSavepointRestoreSettings();
    
    /**
     * Check if this is a Python program
     * @return true if Python program, false otherwise
     */
    public boolean isPython();
    
    /**
     * Invoke the program's main method for interactive execution
     * @throws ProgramInvocationException if execution fails
     */
    public void invokeInteractiveModeForExecution() throws ProgramInvocationException;
    
    /**
     * Get list of classpath URLs
     * @return List of classpath URLs
     */
    public List<URL> getClasspaths();
    
    /**
     * Check if program has main method
     * @return true if main method exists
     */
    public boolean hasMainMethod();
    
    @Override
    public void close();
    
    /**
     * Builder for creating PackagedProgram instances
     */
    public static class Builder {
        /**
         * Set JAR file for the program
         * @param jarFile JAR file containing the program
         * @return Builder instance
         */
        public Builder setJarFile(File jarFile);
        
        /**
         * Set program arguments
         * @param arguments Array of arguments
         * @return Builder instance
         */
        public Builder setArguments(String... arguments);
        
        /**
         * Set entry point class name
         * @param entryPointClassName Fully qualified class name
         * @return Builder instance
         */
        public Builder setEntryPointClassName(@Nullable String entryPointClassName);
        
        /**
         * Set savepoint restore settings
         * @param savepointRestoreSettings Savepoint settings
         * @return Builder instance
         */
        public Builder setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings);
        
        /**
         * Set additional classpath URLs
         * @param classpaths List of classpath URLs
         * @return Builder instance
         */
        public Builder setClasspaths(List<URL> classpaths);
        
        /**
         * Set user code class loader
         * @param userCodeClassLoader Class loader for user code
         * @return Builder instance
         */
        public Builder setUserCodeClassLoader(URLClassLoader userCodeClassLoader);
        
        /**
         * Build the PackagedProgram instance
         * @return Configured PackagedProgram
         * @throws ProgramInvocationException if construction fails
         */
        public PackagedProgram build() throws ProgramInvocationException;
    }
}

Usage Examples:

import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;

// Create a simple packaged program
PackagedProgram program = PackagedProgram.newBuilder()
    .setJarFile(new File("/path/to/flink-job.jar"))
    .setArguments("--input", "input.txt", "--output", "output.txt")
    .build();

// Create program with custom entry point
PackagedProgram programWithEntryPoint = PackagedProgram.newBuilder()
    .setJarFile(new File("/path/to/job.jar"))
    .setEntryPointClassName("com.mycompany.MyFlinkJob")
    .setArguments("--parallelism", "4")
    .build();

// Create program with savepoint restore
SavepointRestoreSettings savepointSettings = SavepointRestoreSettings
    .forPath("/path/to/savepoint", false);

PackagedProgram programWithSavepoint = PackagedProgram.newBuilder()
    .setJarFile(new File("/path/to/job.jar"))
    .setSavepointRestoreSettings(savepointSettings)
    .build();

// Use the program
System.out.println("Main class: " + program.getMainClass().getName());
System.out.println("Arguments: " + Arrays.toString(program.getArguments()));

// Clean up resources
program.close();

Packaged Program Utilities

Utility functions for working with packaged programs, including Python program detection and program creation helpers.

/**
 * Utilities for working with packaged programs
 */
public class PackagedProgramUtils {
    /**
     * Check if the program is a Python program
     * @param program Packaged program to check
     * @return true if Python program
     */
    public static boolean isPython(PackagedProgram program);
    
    /**
     * Check if file path represents a Python program
     * @param file File to check
     * @return true if Python file
     */
    public static boolean isPython(File file);
    
    /**
     * Get program description from packaged program
     * @param program Packaged program
     * @return Program description or null
     */
    @Nullable
    public static String getProgramDescription(PackagedProgram program);
    
    /**
     * Extract nested libraries from JAR file
     * @param jarFile JAR file to extract from
     * @param extractionDir Target directory for extraction
     * @return List of extracted library files
     * @throws IOException if extraction fails
     */
    public static List<File> extractNestedLibraries(File jarFile, File extractionDir) 
        throws IOException;
}

Packaged Program Retriever

Interface and implementation for retrieving packaged programs, enabling flexible program loading strategies.

/**
 * Interface for retrieving packaged programs
 */
public interface PackagedProgramRetriever {
    /**
     * Retrieve a packaged program
     * @return PackagedProgram instance
     * @throws ProgramInvocationException if retrieval fails
     */
    PackagedProgram getPackagedProgram() throws ProgramInvocationException;
}

/**
 * Default implementation of packaged program retriever
 */
public class DefaultPackagedProgramRetriever implements PackagedProgramRetriever {
    /**
     * Create retriever with JAR file and arguments
     * @param jarFile JAR file containing the program
     * @param entryPointClassName Optional entry point class name
     * @param programArguments Program arguments
     * @param savepointRestoreSettings Savepoint restore settings
     * @param classpaths Additional classpath URLs
     */
    public DefaultPackagedProgramRetriever(
        File jarFile,
        @Nullable String entryPointClassName,
        String[] programArguments,
        SavepointRestoreSettings savepointRestoreSettings,
        List<URL> classpaths
    );
    
    @Override
    public PackagedProgram getPackagedProgram() throws ProgramInvocationException;
}

Stream Environment Classes

Specialized execution environments for stream processing contexts and plan generation.

/**
 * Execution environment for stream processing contexts
 */
public class StreamContextEnvironment extends StreamExecutionEnvironment {
    /**
     * Create stream context environment
     * @param client Cluster client for job submission
     * @param parallelism Default parallelism
     * @param userCodeClassLoader User code class loader
     * @param savepointRestoreSettings Savepoint restore settings
     */
    public StreamContextEnvironment(
        ClusterClient<?> client,
        int parallelism,
        ClassLoader userCodeClassLoader,
        SavepointRestoreSettings savepointRestoreSettings
    );
    
    @Override
    public JobExecutionResult execute(String jobName) throws Exception;
    
    @Override
    public JobClient executeAsync(String jobName) throws Exception;
}

/**
 * Environment for stream plan generation
 */
public class StreamPlanEnvironment extends StreamExecutionEnvironment {
    /**
     * Create stream plan environment
     * @param executorServiceLoader Executor service loader
     * @param configuration Flink configuration
     * @param userCodeClassLoader User code class loader
     * @param savepointRestoreSettings Savepoint restore settings
     */
    public StreamPlanEnvironment(
        PipelineExecutorServiceLoader executorServiceLoader,
        Configuration configuration,
        ClassLoader userCodeClassLoader,
        SavepointRestoreSettings savepointRestoreSettings
    );
    
    @Override
    public JobExecutionResult execute(String jobName) throws Exception;
    
    /**
     * Get the execution plan as JSON
     * @return Execution plan JSON string
     */
    public String getExecutionPlan();
}

Mini Cluster Factory

Factory for creating per-job mini clusters for testing and local development.

/**
 * Factory for per-job mini clusters
 */
public class PerJobMiniClusterFactory {
    /**
     * Create per-job mini cluster factory
     * @param miniClusterConfiguration Mini cluster configuration
     */
    public PerJobMiniClusterFactory(MiniClusterConfiguration miniClusterConfiguration);
    
    /**
     * Create mini cluster for job execution
     * @param jobGraph Job graph to execute
     * @return Mini cluster instance
     * @throws Exception if creation fails
     */
    public MiniCluster create(JobGraph jobGraph) throws Exception;
}

Types

/**
 * Settings for savepoint restoration
 */
public class SavepointRestoreSettings {
    /**
     * Create settings for restoring from savepoint path
     * @param savepointPath Path to savepoint
     * @param allowNonRestoredState Whether to allow non-restored state
     * @return Savepoint restore settings
     */
    public static SavepointRestoreSettings forPath(
        String savepointPath, 
        boolean allowNonRestoredState
    );
    
    /**
     * Create settings with no savepoint restoration
     * @return Settings for no restoration
     */
    public static SavepointRestoreSettings none();
    
    /**
     * Check if restoration is enabled
     * @return true if restoration enabled
     */
    public boolean restoreSavepoint();
    
    /**
     * Get savepoint path
     * @return Savepoint path or null
     */
    @Nullable
    public String getRestorePath();
    
    /**
     * Check if non-restored state is allowed
     * @return true if allowed
     */
    public boolean allowNonRestoredState();
}

Exception Handling

Program packaging operations handle various error conditions:

  • File Errors: FileNotFoundException for missing JAR files, IOException for file access issues
  • Class Loading Errors: ClassNotFoundException for missing main classes, NoClassDefFoundError for dependencies
  • Program Errors: ProgramInvocationException for program execution failures
  • Configuration Errors: ProgramParametrizationException for invalid program parameters
  • Missing Job Errors: ProgramMissingJobException when program doesn't define a Flink job

Common Error Patterns:

try {
    PackagedProgram program = PackagedProgram.newBuilder()
        .setJarFile(new File("job.jar"))
        .build();
    
    // Use program...
    program.close();
    
} catch (ProgramInvocationException e) {
    // Handle program construction/execution errors
    System.err.println("Program error: " + e.getMessage());
} catch (FileNotFoundException e) {
    // Handle missing JAR file
    System.err.println("JAR file not found: " + e.getMessage());
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-clients

docs

application-mode.md

artifact-management.md

cli-frontend.md

cluster-client.md

deployment-management.md

index.md

program-packaging.md

tile.json