or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

application-deployment.mdcli-interface.mdclient-core.mdcluster-management.mdindex.mdprogram-execution.mdrest-client.md
tile.json

program-execution.mddocs/

Program Execution

Program packaging, classloader management, and execution utilities for submitting user applications to Flink clusters. Provides comprehensive support for JAR-based programs and execution environment management.

Capabilities

Packaged Program

Represents a program packaged in a JAR file with all necessary dependencies and configuration.

/**
 * Represents a program packaged in a JAR file
 */
public class PackagedProgram implements AutoCloseable {
    
    // Manifest attribute constants
    public static final String MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS = "program-class";
    public static final String MANIFEST_ATTRIBUTE_MAIN_CLASS = "Main-Class";

    /**
     * Gets savepoint restore settings for this program
     * @return SavepointRestoreSettings instance
     */
    public SavepointRestoreSettings getSavepointSettings();

    /**
     * Gets the program arguments
     * @return Array of program arguments
     */
    public String[] getArguments();

    /**
     * Gets the main class name for this program
     * @return Fully qualified main class name
     */
    public String getMainClassName();

    /**
     * Gets program description by analyzing the JAR
     * @return Program description string
     * @throws ProgramInvocationException if description cannot be retrieved
     */
    public String getDescription() throws ProgramInvocationException;

    /**
     * Invokes the program in interactive mode for execution
     * @throws ProgramInvocationException if invocation fails
     */
    public void invokeInteractiveModeForExecution() throws ProgramInvocationException;

    /**
     * Gets the required classpaths for this program
     * @return List of classpath URLs
     */
    public List<URL> getClasspaths();

    /**
     * Gets the user code classloader for this program
     * @return ClassLoader for user code execution
     */
    public ClassLoader getUserCodeClassLoader();

    /**
     * Gets the job JAR and all dependencies
     * @return List of JAR and dependency URLs
     */
    public List<URL> getJobJarAndDependencies();

    /**
     * Static method to get JAR and dependencies from file
     * @param jarFile JAR file to analyze
     * @param entryPointClassName Entry point class name
     * @return List of JAR and dependency URLs
     * @throws ProgramInvocationException if analysis fails
     */
    public static List<URL> getJobJarAndDependencies(File jarFile, String entryPointClassName) 
        throws ProgramInvocationException;

    /**
     * Extracts nested libraries from a JAR file
     * @param jarFile JAR file URL to extract from
     * @return List of extracted library files
     * @throws ProgramInvocationException if extraction fails
     */
    public static List<File> extractContainedLibraries(URL jarFile) 
        throws ProgramInvocationException;

    /**
     * Creates a new builder for PackagedProgram
     * @return Builder instance
     */
    public static Builder newBuilder();

    /**
     * Closes the program and releases resources
     */
    @Override
    public void close();

    /**
     * Builder pattern for PackagedProgram construction
     */
    public static class Builder {
        /**
         * Sets the JAR file for this program
         * @param jarFile JAR file containing the program
         * @return This builder instance
         */
        public Builder setJarFile(File jarFile);

        /**
         * Sets the entry point class name
         * @param entryPointClassName Fully qualified class name
         * @return This builder instance
         */
        public Builder setEntryPointClassName(String entryPointClassName);

        /**
         * Sets the program arguments
         * @param args Variable arguments for the program
         * @return This builder instance
         */
        public Builder setArguments(String... args);

        /**
         * Sets additional user classpaths
         * @param userClassPaths List of user classpath URLs
         * @return This builder instance
         */
        public Builder setUserClassPaths(List<URL> userClassPaths);

        /**
         * Sets the Flink configuration
         * @param configuration Flink configuration instance
         * @return This builder instance
         */
        public Builder setConfiguration(Configuration configuration);

        /**
         * Sets savepoint restore settings
         * @param savepointRestoreSettings Savepoint settings
         * @return This builder instance
         */
        public Builder setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings);

        /**
         * Builds the PackagedProgram instance
         * @return PackagedProgram instance
         * @throws ProgramInvocationException if building fails
         */
        public PackagedProgram build() throws ProgramInvocationException;
    }
}

Usage Example:

import org.apache.flink.client.program.PackagedProgram;
import java.io.File;

// Create a packaged program from JAR
PackagedProgram program = PackagedProgram.newBuilder()
    .setJarFile(new File("/path/to/my-flink-job.jar"))
    .setEntryPointClassName("com.example.MyFlinkJob")
    .setArguments("--input", "/data/input", "--output", "/data/output")
    .build();

try {
    System.out.println("Program: " + program.getDescription());
    System.out.println("Main class: " + program.getMainClassName());
    
    // Get classpath information
    List<URL> dependencies = program.getJobJarAndDependencies();
    System.out.println("Dependencies: " + dependencies.size());
    
    // Execute the program
    program.invokeInteractiveModeForExecution();
    
} finally {
    program.close();
}

Packaged Program Utilities

Utility functions for working with packaged programs.

/**
 * Utilities for packaged programs
 */
public class PackagedProgramUtils {
    /**
     * Checks if the program is a Python program
     * @param entryPointClassName Entry point class name to check
     * @return true if Python program, false otherwise
     */
    public static boolean isPython(String entryPointClassName);

    /**
     * Gets the Python JAR URL for Python programs
     * @return URL of the Python JAR
     */
    public static URL getPythonJar();
}

Program Retriever Interface

Interface for retrieving packaged programs from various sources.

/**
 * Interface for retrieving packaged programs
 */
public interface PackagedProgramRetriever {
    /**
     * Retrieves a packaged program
     * @return PackagedProgram instance
     * @throws ProgramRetrievalException if retrieval fails
     */
    PackagedProgram getPackagedProgram() throws ProgramRetrievalException;
}

/**
 * Default implementation for retrieving packaged programs
 */
public class DefaultPackagedProgramRetriever implements PackagedProgramRetriever {
    @Override
    public PackagedProgram getPackagedProgram() throws ProgramRetrievalException;
}

Execution Environments

Specialized execution environments that provide context for program execution.

/**
 * Execution environment with context for DataSet programs
 */
public class ContextEnvironment extends ExecutionEnvironment {
    // Provides execution context for DataSet API programs
    // Automatically configured when running within Flink client
}

/**
 * Execution environment for plan optimization
 */
public class OptimizerPlanEnvironment extends ExecutionEnvironment {
    // Used for generating execution plans without actual execution
    // Useful for plan analysis and optimization
}

/**
 * Stream execution environment with context for DataStream programs
 */
public class StreamContextEnvironment extends StreamExecutionEnvironment {
    // Provides execution context for DataStream API programs
    // Automatically configured when running within Flink client
}

/**
 * Environment for stream execution plans
 */
public class StreamPlanEnvironment extends StreamExecutionEnvironment {
    // Used for generating stream execution plans
    // Enables plan inspection without execution
}

Usage Example:

import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.api.java.ExecutionEnvironment;

// The environment is automatically set up when running through client
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// If running in client context, this will be a ContextEnvironment
if (env instanceof ContextEnvironment) {
    System.out.println("Running in client context");
}

// Create and execute DataSet program
DataSet<String> input = env.readTextFile("/path/to/input");
input.flatMap(new Tokenizer())
     .groupBy(0)
     .sum(1)
     .writeAsCsv("/path/to/output");

env.execute("Word Count Example");

Mini Cluster Factory

Factory for creating per-job mini clusters for testing and development.

/**
 * Factory for per-job mini clusters
 */
public class PerJobMiniClusterFactory {
    /**
     * Creates a mini cluster for running a single job
     * @param configuration Cluster configuration
     * @return MiniCluster instance configured for single job execution
     */
    public static MiniCluster createMiniCluster(Configuration configuration);
}

Exception Types

/**
 * Exception for program invocation errors
 */
public class ProgramInvocationException extends Exception {
    /**
     * Creates exception with message
     * @param message Error message
     */
    public ProgramInvocationException(String message);

    /**
     * Creates exception with message and job ID
     * @param message Error message
     * @param jobID Associated job ID
     */
    public ProgramInvocationException(String message, JobID jobID);

    /**
     * Creates exception with cause
     * @param cause Root cause throwable
     */
    public ProgramInvocationException(Throwable cause);

    /**
     * Creates exception with message and cause
     * @param message Error message
     * @param cause Root cause throwable
     */
    public ProgramInvocationException(String message, Throwable cause);

    /**
     * Creates exception with message, job ID and cause
     * @param message Error message
     * @param jobID Associated job ID
     * @param cause Root cause throwable
     */
    public ProgramInvocationException(String message, JobID jobID, Throwable cause);
}

/**
 * Exception when program doesn't contain a job
 */
public class ProgramMissingJobException extends FlinkException {
    /**
     * Creates exception with message
     * @param message Error message describing missing job
     */
    public ProgramMissingJobException(String message);
}

/**
 * Exception for program parameterization errors
 */
public class ProgramParametrizationException extends RuntimeException {
    /**
     * Creates exception with message
     * @param message Error message
     */
    public ProgramParametrizationException(String message);

    /**
     * Creates exception with message and cause
     * @param message Error message
     * @param cause Root cause throwable
     */
    public ProgramParametrizationException(String message, Throwable cause);
}

/**
 * Exception for program abortion (extends Error for immediate termination)
 */
public class ProgramAbortException extends Error {
    /**
     * Creates default abort exception
     */
    public ProgramAbortException();

    /**
     * Creates abort exception with message
     * @param message Abort message
     */
    public ProgramAbortException(String message);

    /**
     * Creates abort exception with message and cause
     * @param message Abort message
     * @param cause Root cause throwable
     */
    public ProgramAbortException(String message, Throwable cause);
}

/**
 * Exception for program retrieval errors
 */
public class ProgramRetrievalException extends Exception {
    public ProgramRetrievalException(String message);
    public ProgramRetrievalException(String message, Throwable cause);
}

Types

public class SavepointRestoreSettings {
    public static SavepointRestoreSettings none();
    public static SavepointRestoreSettings forPath(String savepointPath);
    public static SavepointRestoreSettings forPath(String savepointPath, boolean allowNonRestoredState);
    
    public boolean restoreSavepoint();
    public String getRestorePath();
    public boolean allowNonRestoredState();
}

public abstract class ExecutionEnvironment {
    public static ExecutionEnvironment getExecutionEnvironment();
    public abstract JobExecutionResult execute(String jobName) throws Exception;
    
    // DataSet API methods
    public <X> DataSet<X> fromCollection(Collection<X> data);
    public DataSet<String> readTextFile(String filePath);
}

public abstract class StreamExecutionEnvironment {
    public static StreamExecutionEnvironment getExecutionEnvironment();
    public JobExecutionResult execute(String jobName) throws Exception;
    
    // DataStream API methods
    public <T> DataStreamSource<T> fromCollection(Collection<T> data);
    public DataStreamSource<String> socketTextStream(String hostname, int port);
}

public interface DataSet<T> {
    <R> DataSet<R> map(MapFunction<T, R> mapper);
    <R> DataSet<R> flatMap(FlatMapFunction<T, R> flatMapper);
    DataSet<T> filter(FilterFunction<T> filter);
    UnsortedGrouping<T> groupBy(int... fields);
    DataSink<T> writeAsCsv(String filePath);
}

public interface DataStream<T> {
    <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper);
    <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper);
    SingleOutputStreamOperator<T> filter(FilterFunction<T> filter);
    DataStreamSink<T> print();
}

public class MiniCluster implements AutoCloseable {
    public void start() throws Exception;
    public void close() throws Exception;
    public CompletableFuture<JobResult> executeJobBlocking(JobGraph job);
}