Apache Flink client library providing programmatic APIs and command-line interfaces for submitting, managing, and monitoring Flink jobs.
—
Functionality for packaging Flink programs in JAR files, managing classpaths, handling user applications, and executing programs with proper isolation and resource management.
Core class representing a Flink program packaged in a JAR file with full support for classpath management, argument handling, and program execution.
/**
* Represents a Flink program packaged in a JAR file
*/
public class PackagedProgram implements AutoCloseable {
/**
* Create a new packaged program builder
* @return Builder instance for constructing PackagedProgram
*/
public static Builder newBuilder();
/**
* Get the main class of the packaged program
* @return Main class object
* @throws ProgramInvocationException if main class cannot be determined
*/
public Class<?> getMainClass() throws ProgramInvocationException;
/**
* Get program arguments
* @return Array of program arguments
*/
public String[] getArguments();
/**
* Get JAR file URL
* @return URL of the JAR file
*/
public URL getJarFile();
/**
* Get user code class loader
* @return Class loader for user code
*/
public URLClassLoader getUserCodeClassLoader();
/**
* Get savepoint restore settings
* @return Savepoint restore settings
*/
public SavepointRestoreSettings getSavepointRestoreSettings();
/**
* Check if this is a Python program
* @return true if Python program, false otherwise
*/
public boolean isPython();
/**
* Invoke the program's main method for interactive execution
* @throws ProgramInvocationException if execution fails
*/
public void invokeInteractiveModeForExecution() throws ProgramInvocationException;
/**
* Get list of classpath URLs
* @return List of classpath URLs
*/
public List<URL> getClasspaths();
/**
* Check if program has main method
* @return true if main method exists
*/
public boolean hasMainMethod();
@Override
public void close();
/**
* Builder for creating PackagedProgram instances
*/
public static class Builder {
/**
* Set JAR file for the program
* @param jarFile JAR file containing the program
* @return Builder instance
*/
public Builder setJarFile(File jarFile);
/**
* Set program arguments
* @param arguments Array of arguments
* @return Builder instance
*/
public Builder setArguments(String... arguments);
/**
* Set entry point class name
* @param entryPointClassName Fully qualified class name
* @return Builder instance
*/
public Builder setEntryPointClassName(@Nullable String entryPointClassName);
/**
* Set savepoint restore settings
* @param savepointRestoreSettings Savepoint settings
* @return Builder instance
*/
public Builder setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings);
/**
* Set additional classpath URLs
* @param classpaths List of classpath URLs
* @return Builder instance
*/
public Builder setClasspaths(List<URL> classpaths);
/**
* Set user code class loader
* @param userCodeClassLoader Class loader for user code
* @return Builder instance
*/
public Builder setUserCodeClassLoader(URLClassLoader userCodeClassLoader);
/**
* Build the PackagedProgram instance
* @return Configured PackagedProgram
* @throws ProgramInvocationException if construction fails
*/
public PackagedProgram build() throws ProgramInvocationException;
}
}Usage Examples:
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
// Create a simple packaged program
PackagedProgram program = PackagedProgram.newBuilder()
.setJarFile(new File("/path/to/flink-job.jar"))
.setArguments("--input", "input.txt", "--output", "output.txt")
.build();
// Create program with custom entry point
PackagedProgram programWithEntryPoint = PackagedProgram.newBuilder()
.setJarFile(new File("/path/to/job.jar"))
.setEntryPointClassName("com.mycompany.MyFlinkJob")
.setArguments("--parallelism", "4")
.build();
// Create program with savepoint restore
SavepointRestoreSettings savepointSettings = SavepointRestoreSettings
.forPath("/path/to/savepoint", false);
PackagedProgram programWithSavepoint = PackagedProgram.newBuilder()
.setJarFile(new File("/path/to/job.jar"))
.setSavepointRestoreSettings(savepointSettings)
.build();
// Use the program
System.out.println("Main class: " + program.getMainClass().getName());
System.out.println("Arguments: " + Arrays.toString(program.getArguments()));
// Clean up resources
program.close();Utility functions for working with packaged programs, including Python program detection and program creation helpers.
/**
* Utilities for working with packaged programs
*/
public class PackagedProgramUtils {
/**
* Check if the program is a Python program
* @param program Packaged program to check
* @return true if Python program
*/
public static boolean isPython(PackagedProgram program);
/**
* Check if file path represents a Python program
* @param file File to check
* @return true if Python file
*/
public static boolean isPython(File file);
/**
* Get program description from packaged program
* @param program Packaged program
* @return Program description or null
*/
@Nullable
public static String getProgramDescription(PackagedProgram program);
/**
* Extract nested libraries from JAR file
* @param jarFile JAR file to extract from
* @param extractionDir Target directory for extraction
* @return List of extracted library files
* @throws IOException if extraction fails
*/
public static List<File> extractNestedLibraries(File jarFile, File extractionDir)
throws IOException;
}Interface and implementation for retrieving packaged programs, enabling flexible program loading strategies.
/**
* Interface for retrieving packaged programs
*/
public interface PackagedProgramRetriever {
/**
* Retrieve a packaged program
* @return PackagedProgram instance
* @throws ProgramInvocationException if retrieval fails
*/
PackagedProgram getPackagedProgram() throws ProgramInvocationException;
}
/**
* Default implementation of packaged program retriever
*/
public class DefaultPackagedProgramRetriever implements PackagedProgramRetriever {
/**
* Create retriever with JAR file and arguments
* @param jarFile JAR file containing the program
* @param entryPointClassName Optional entry point class name
* @param programArguments Program arguments
* @param savepointRestoreSettings Savepoint restore settings
* @param classpaths Additional classpath URLs
*/
public DefaultPackagedProgramRetriever(
File jarFile,
@Nullable String entryPointClassName,
String[] programArguments,
SavepointRestoreSettings savepointRestoreSettings,
List<URL> classpaths
);
@Override
public PackagedProgram getPackagedProgram() throws ProgramInvocationException;
}Specialized execution environments for stream processing contexts and plan generation.
/**
* Execution environment for stream processing contexts
*/
public class StreamContextEnvironment extends StreamExecutionEnvironment {
/**
* Create stream context environment
* @param client Cluster client for job submission
* @param parallelism Default parallelism
* @param userCodeClassLoader User code class loader
* @param savepointRestoreSettings Savepoint restore settings
*/
public StreamContextEnvironment(
ClusterClient<?> client,
int parallelism,
ClassLoader userCodeClassLoader,
SavepointRestoreSettings savepointRestoreSettings
);
@Override
public JobExecutionResult execute(String jobName) throws Exception;
@Override
public JobClient executeAsync(String jobName) throws Exception;
}
/**
* Environment for stream plan generation
*/
public class StreamPlanEnvironment extends StreamExecutionEnvironment {
/**
* Create stream plan environment
* @param executorServiceLoader Executor service loader
* @param configuration Flink configuration
* @param userCodeClassLoader User code class loader
* @param savepointRestoreSettings Savepoint restore settings
*/
public StreamPlanEnvironment(
PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
ClassLoader userCodeClassLoader,
SavepointRestoreSettings savepointRestoreSettings
);
@Override
public JobExecutionResult execute(String jobName) throws Exception;
/**
* Get the execution plan as JSON
* @return Execution plan JSON string
*/
public String getExecutionPlan();
}Factory for creating per-job mini clusters for testing and local development.
/**
* Factory for per-job mini clusters
*/
public class PerJobMiniClusterFactory {
/**
* Create per-job mini cluster factory
* @param miniClusterConfiguration Mini cluster configuration
*/
public PerJobMiniClusterFactory(MiniClusterConfiguration miniClusterConfiguration);
/**
* Create mini cluster for job execution
* @param jobGraph Job graph to execute
* @return Mini cluster instance
* @throws Exception if creation fails
*/
public MiniCluster create(JobGraph jobGraph) throws Exception;
}/**
* Settings for savepoint restoration
*/
public class SavepointRestoreSettings {
/**
* Create settings for restoring from savepoint path
* @param savepointPath Path to savepoint
* @param allowNonRestoredState Whether to allow non-restored state
* @return Savepoint restore settings
*/
public static SavepointRestoreSettings forPath(
String savepointPath,
boolean allowNonRestoredState
);
/**
* Create settings with no savepoint restoration
* @return Settings for no restoration
*/
public static SavepointRestoreSettings none();
/**
* Check if restoration is enabled
* @return true if restoration enabled
*/
public boolean restoreSavepoint();
/**
* Get savepoint path
* @return Savepoint path or null
*/
@Nullable
public String getRestorePath();
/**
* Check if non-restored state is allowed
* @return true if allowed
*/
public boolean allowNonRestoredState();
}Program packaging operations handle various error conditions:
FileNotFoundException for missing JAR files, IOException for file access issuesClassNotFoundException for missing main classes, NoClassDefFoundError for dependenciesProgramInvocationException for program execution failuresProgramParametrizationException for invalid program parametersProgramMissingJobException when program doesn't define a Flink jobCommon Error Patterns:
try {
PackagedProgram program = PackagedProgram.newBuilder()
.setJarFile(new File("job.jar"))
.build();
// Use program...
program.close();
} catch (ProgramInvocationException e) {
// Handle program construction/execution errors
System.err.println("Program error: " + e.getMessage());
} catch (FileNotFoundException e) {
// Handle missing JAR file
System.err.println("JAR file not found: " + e.getMessage());
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-clients