Flink Client APIs and utilities for submitting and managing Apache Flink jobs
Program packaging, classloader management, and execution utilities for submitting user applications to Flink clusters. Provides comprehensive support for JAR-based programs and execution environment management.
Represents a program packaged in a JAR file with all necessary dependencies and configuration.
/**
* Represents a program packaged in a JAR file
*/
public class PackagedProgram implements AutoCloseable {
// Manifest attribute constants
public static final String MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS = "program-class";
public static final String MANIFEST_ATTRIBUTE_MAIN_CLASS = "Main-Class";
/**
* Gets savepoint restore settings for this program
* @return SavepointRestoreSettings instance
*/
public SavepointRestoreSettings getSavepointSettings();
/**
* Gets the program arguments
* @return Array of program arguments
*/
public String[] getArguments();
/**
* Gets the main class name for this program
* @return Fully qualified main class name
*/
public String getMainClassName();
/**
* Gets program description by analyzing the JAR
* @return Program description string
* @throws ProgramInvocationException if description cannot be retrieved
*/
public String getDescription() throws ProgramInvocationException;
/**
* Invokes the program in interactive mode for execution
* @throws ProgramInvocationException if invocation fails
*/
public void invokeInteractiveModeForExecution() throws ProgramInvocationException;
/**
* Gets the required classpaths for this program
* @return List of classpath URLs
*/
public List<URL> getClasspaths();
/**
* Gets the user code classloader for this program
* @return ClassLoader for user code execution
*/
public ClassLoader getUserCodeClassLoader();
/**
* Gets the job JAR and all dependencies
* @return List of JAR and dependency URLs
*/
public List<URL> getJobJarAndDependencies();
/**
* Static method to get JAR and dependencies from file
* @param jarFile JAR file to analyze
* @param entryPointClassName Entry point class name
* @return List of JAR and dependency URLs
* @throws ProgramInvocationException if analysis fails
*/
public static List<URL> getJobJarAndDependencies(File jarFile, String entryPointClassName)
throws ProgramInvocationException;
/**
* Extracts nested libraries from a JAR file
* @param jarFile JAR file URL to extract from
* @return List of extracted library files
* @throws ProgramInvocationException if extraction fails
*/
public static List<File> extractContainedLibraries(URL jarFile)
throws ProgramInvocationException;
/**
* Creates a new builder for PackagedProgram
* @return Builder instance
*/
public static Builder newBuilder();
/**
* Closes the program and releases resources
*/
@Override
public void close();
/**
* Builder pattern for PackagedProgram construction
*/
public static class Builder {
/**
* Sets the JAR file for this program
* @param jarFile JAR file containing the program
* @return This builder instance
*/
public Builder setJarFile(File jarFile);
/**
* Sets the entry point class name
* @param entryPointClassName Fully qualified class name
* @return This builder instance
*/
public Builder setEntryPointClassName(String entryPointClassName);
/**
* Sets the program arguments
* @param args Variable arguments for the program
* @return This builder instance
*/
public Builder setArguments(String... args);
/**
* Sets additional user classpaths
* @param userClassPaths List of user classpath URLs
* @return This builder instance
*/
public Builder setUserClassPaths(List<URL> userClassPaths);
/**
* Sets the Flink configuration
* @param configuration Flink configuration instance
* @return This builder instance
*/
public Builder setConfiguration(Configuration configuration);
/**
* Sets savepoint restore settings
* @param savepointRestoreSettings Savepoint settings
* @return This builder instance
*/
public Builder setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings);
/**
* Builds the PackagedProgram instance
* @return PackagedProgram instance
* @throws ProgramInvocationException if building fails
*/
public PackagedProgram build() throws ProgramInvocationException;
}
}Usage Example:
import org.apache.flink.client.program.PackagedProgram;
import java.io.File;
// Create a packaged program from JAR
PackagedProgram program = PackagedProgram.newBuilder()
.setJarFile(new File("/path/to/my-flink-job.jar"))
.setEntryPointClassName("com.example.MyFlinkJob")
.setArguments("--input", "/data/input", "--output", "/data/output")
.build();
try {
System.out.println("Program: " + program.getDescription());
System.out.println("Main class: " + program.getMainClassName());
// Get classpath information
List<URL> dependencies = program.getJobJarAndDependencies();
System.out.println("Dependencies: " + dependencies.size());
// Execute the program
program.invokeInteractiveModeForExecution();
} finally {
program.close();
}Utility functions for working with packaged programs.
/**
* Utilities for packaged programs
*/
public class PackagedProgramUtils {
/**
* Checks if the program is a Python program
* @param entryPointClassName Entry point class name to check
* @return true if Python program, false otherwise
*/
public static boolean isPython(String entryPointClassName);
/**
* Gets the Python JAR URL for Python programs
* @return URL of the Python JAR
*/
public static URL getPythonJar();
}Interface for retrieving packaged programs from various sources.
/**
* Interface for retrieving packaged programs
*/
public interface PackagedProgramRetriever {
/**
* Retrieves a packaged program
* @return PackagedProgram instance
* @throws ProgramRetrievalException if retrieval fails
*/
PackagedProgram getPackagedProgram() throws ProgramRetrievalException;
}
/**
* Default implementation for retrieving packaged programs
*/
public class DefaultPackagedProgramRetriever implements PackagedProgramRetriever {
@Override
public PackagedProgram getPackagedProgram() throws ProgramRetrievalException;
}Specialized execution environments that provide context for program execution.
/**
* Execution environment with context for DataSet programs
*/
public class ContextEnvironment extends ExecutionEnvironment {
// Provides execution context for DataSet API programs
// Automatically configured when running within Flink client
}
/**
* Execution environment for plan optimization
*/
public class OptimizerPlanEnvironment extends ExecutionEnvironment {
// Used for generating execution plans without actual execution
// Useful for plan analysis and optimization
}
/**
* Stream execution environment with context for DataStream programs
*/
public class StreamContextEnvironment extends StreamExecutionEnvironment {
// Provides execution context for DataStream API programs
// Automatically configured when running within Flink client
}
/**
* Environment for stream execution plans
*/
public class StreamPlanEnvironment extends StreamExecutionEnvironment {
// Used for generating stream execution plans
// Enables plan inspection without execution
}Usage Example:
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.api.java.ExecutionEnvironment;
// The environment is automatically set up when running through client
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// If running in client context, this will be a ContextEnvironment
if (env instanceof ContextEnvironment) {
System.out.println("Running in client context");
}
// Create and execute DataSet program
DataSet<String> input = env.readTextFile("/path/to/input");
input.flatMap(new Tokenizer())
.groupBy(0)
.sum(1)
.writeAsCsv("/path/to/output");
env.execute("Word Count Example");Factory for creating per-job mini clusters for testing and development.
/**
* Factory for per-job mini clusters
*/
public class PerJobMiniClusterFactory {
/**
* Creates a mini cluster for running a single job
* @param configuration Cluster configuration
* @return MiniCluster instance configured for single job execution
*/
public static MiniCluster createMiniCluster(Configuration configuration);
}/**
* Exception for program invocation errors
*/
public class ProgramInvocationException extends Exception {
/**
* Creates exception with message
* @param message Error message
*/
public ProgramInvocationException(String message);
/**
* Creates exception with message and job ID
* @param message Error message
* @param jobID Associated job ID
*/
public ProgramInvocationException(String message, JobID jobID);
/**
* Creates exception with cause
* @param cause Root cause throwable
*/
public ProgramInvocationException(Throwable cause);
/**
* Creates exception with message and cause
* @param message Error message
* @param cause Root cause throwable
*/
public ProgramInvocationException(String message, Throwable cause);
/**
* Creates exception with message, job ID and cause
* @param message Error message
* @param jobID Associated job ID
* @param cause Root cause throwable
*/
public ProgramInvocationException(String message, JobID jobID, Throwable cause);
}
/**
* Exception when program doesn't contain a job
*/
public class ProgramMissingJobException extends FlinkException {
/**
* Creates exception with message
* @param message Error message describing missing job
*/
public ProgramMissingJobException(String message);
}
/**
* Exception for program parameterization errors
*/
public class ProgramParametrizationException extends RuntimeException {
/**
* Creates exception with message
* @param message Error message
*/
public ProgramParametrizationException(String message);
/**
* Creates exception with message and cause
* @param message Error message
* @param cause Root cause throwable
*/
public ProgramParametrizationException(String message, Throwable cause);
}
/**
* Exception for program abortion (extends Error for immediate termination)
*/
public class ProgramAbortException extends Error {
/**
* Creates default abort exception
*/
public ProgramAbortException();
/**
* Creates abort exception with message
* @param message Abort message
*/
public ProgramAbortException(String message);
/**
* Creates abort exception with message and cause
* @param message Abort message
* @param cause Root cause throwable
*/
public ProgramAbortException(String message, Throwable cause);
}
/**
* Exception for program retrieval errors
*/
public class ProgramRetrievalException extends Exception {
public ProgramRetrievalException(String message);
public ProgramRetrievalException(String message, Throwable cause);
}public class SavepointRestoreSettings {
public static SavepointRestoreSettings none();
public static SavepointRestoreSettings forPath(String savepointPath);
public static SavepointRestoreSettings forPath(String savepointPath, boolean allowNonRestoredState);
public boolean restoreSavepoint();
public String getRestorePath();
public boolean allowNonRestoredState();
}
public abstract class ExecutionEnvironment {
public static ExecutionEnvironment getExecutionEnvironment();
public abstract JobExecutionResult execute(String jobName) throws Exception;
// DataSet API methods
public <X> DataSet<X> fromCollection(Collection<X> data);
public DataSet<String> readTextFile(String filePath);
}
public abstract class StreamExecutionEnvironment {
public static StreamExecutionEnvironment getExecutionEnvironment();
public JobExecutionResult execute(String jobName) throws Exception;
// DataStream API methods
public <T> DataStreamSource<T> fromCollection(Collection<T> data);
public DataStreamSource<String> socketTextStream(String hostname, int port);
}
public interface DataSet<T> {
<R> DataSet<R> map(MapFunction<T, R> mapper);
<R> DataSet<R> flatMap(FlatMapFunction<T, R> flatMapper);
DataSet<T> filter(FilterFunction<T> filter);
UnsortedGrouping<T> groupBy(int... fields);
DataSink<T> writeAsCsv(String filePath);
}
public interface DataStream<T> {
<R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper);
<R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper);
SingleOutputStreamOperator<T> filter(FilterFunction<T> filter);
DataStreamSink<T> print();
}
public class MiniCluster implements AutoCloseable {
public void start() throws Exception;
public void close() throws Exception;
public CompletableFuture<JobResult> executeJobBlocking(JobGraph job);
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-clients-2-12