Apache Flink client library providing APIs and utilities for submitting, monitoring and managing Flink jobs programmatically
Comprehensive command line interface for Flink cluster operations including job submission, monitoring, cancellation, and savepoint management.
Main command-line interface entry point for Flink client operations, providing a complete CLI for job and cluster management.
/**
* Main command-line interface for Flink client operations
*/
public class CliFrontend {
/**
* Creates a CLI frontend with default configuration directory
*/
public CliFrontend();
/**
* Creates a CLI frontend with specific configuration directory
* @param configDir Path to the configuration directory
*/
public CliFrontend(String configDir);
/**
* Parses command line parameters and returns exit code
* @param args Command line arguments
* @return Exit code (0 for success, non-zero for error)
*/
public int parseParameters(String[] args);
/**
* Gets the current configuration
* @return Configuration object with current settings
*/
public Configuration getConfiguration();
/**
* Executes the RUN command to submit and execute a Flink job
* @param args Command line arguments for job execution
* @return Exit code (0 for success, non-zero for error)
*/
public int run(String[] args);
/**
* Executes the INFO command to display job information and execution plan
* @param args Command line arguments for info command
* @return Exit code (0 for success, non-zero for error)
*/
public int info(String[] args);
/**
* Executes the LIST command to display running and completed jobs
* @param args Command line arguments for list command
* @return Exit code (0 for success, non-zero for error)
*/
public int list(String[] args);
/**
* Executes the CANCEL command to cancel a running job
* @param args Command line arguments for cancel command
* @return Exit code (0 for success, non-zero for error)
*/
public int cancel(String[] args);
/**
* Executes the STOP command to gracefully stop a running job
* @param args Command line arguments for stop command
* @return Exit code (0 for success, non-zero for error)
*/
public int stop(String[] args);
/**
* Executes the SAVEPOINT command to create or manage savepoints
* @param args Command line arguments for savepoint command
* @return Exit code (0 for success, non-zero for error)
*/
public int savepoint(String[] args);
/**
* Gets the configuration directory from environment variables
* @return Path to configuration directory or null if not set
*/
public static String getConfigurationDirectoryFromEnv();
/**
* Gets the list of available custom command line interfaces
* @return List of CustomCommandLine implementations
*/
public static List<CustomCommandLine> getCustomCommandLineList();
/**
* Gets the active custom command line interface for given parameters
* @param commandLine Parsed command line arguments
* @return Active CustomCommandLine implementation
*/
public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine);
/**
* Main entry point for CLI execution
* @param args Command line arguments
*/
public static void main(String[] args);
}Parsers for different CLI command options and arguments.
/**
* Command-line argument parser for Flink CLI commands
*/
public class CliFrontendParser {
/**
* Parses arguments for the RUN command
* @param args Command line arguments
* @return RunOptions containing parsed options
* @throws CliArgsException if parsing fails
*/
public static RunOptions parseRunCommand(String[] args) throws CliArgsException;
/**
* Parses arguments for the LIST command
* @param args Command line arguments
* @return ListOptions containing parsed options
* @throws CliArgsException if parsing fails
*/
public static ListOptions parseListCommand(String[] args) throws CliArgsException;
/**
* Parses arguments for the CANCEL command
* @param args Command line arguments
* @return CancelOptions containing parsed options
* @throws CliArgsException if parsing fails
*/
public static CancelOptions parseCancelCommand(String[] args) throws CliArgsException;
/**
* Parses arguments for the STOP command
* @param args Command line arguments
* @return StopOptions containing parsed options
* @throws CliArgsException if parsing fails
*/
public static StopOptions parseStopCommand(String[] args) throws CliArgsException;
/**
* Parses arguments for the SAVEPOINT command
* @param args Command line arguments
* @return SavepointOptions containing parsed options
* @throws CliArgsException if parsing fails
*/
public static SavepointOptions parseSavepointCommand(String[] args) throws CliArgsException;
/**
* Parses arguments for the INFO command
* @param args Command line arguments
* @return InfoOptions containing parsed options
* @throws CliArgsException if parsing fails
*/
public static InfoOptions parseInfoCommand(String[] args) throws CliArgsException;
/**
* Prints general help information
*/
public static void printHelp();
/**
* Prints help information for the RUN command
*/
public static void printHelpForRun();
/**
* Prints help information for the INFO command
*/
public static void printHelpForInfo();
/**
* Prints help information for the LIST command
*/
public static void printHelpForList();
/**
* Prints help information for the CANCEL command
*/
public static void printHelpForCancel();
/**
* Prints help information for the STOP command
*/
public static void printHelpForStop();
/**
* Prints help information for the SAVEPOINT command
*/
public static void printHelpForSavepoint();
}Option classes for different CLI commands and their parameters.
/**
* Base class for all command-line option parsers
*/
public abstract class CommandLineOptions {
/**
* Gets the parsed command line
* @return CommandLine object with parsed options
*/
public CommandLine getCommandLine();
/**
* Checks if help should be printed
* @return true if help was requested
*/
public boolean isPrintHelp();
/**
* Gets the JobManager address from options
* @return JobManager address string
*/
public String getJobManagerAddress();
}
/**
* Base class for options that reference JAR file programs
*/
public abstract class ProgramOptions extends CommandLineOptions {
/**
* Gets the path to the JAR file
* @return JAR file path string
*/
public String getJarFilePath();
/**
* Gets the entry point class name
* @return Fully qualified class name
*/
public String getEntryPointClassName();
/**
* Gets additional classpath URLs
* @return List of classpath URLs
*/
public List<URL> getClasspaths();
/**
* Gets program arguments
* @return Array of program arguments
*/
public String[] getProgramArgs();
/**
* Gets the parallelism level
* @return Parallelism value or -1 if not specified
*/
public int getParallelism();
/**
* Checks if stdout logging is enabled
* @return true if stdout logging is enabled
*/
public boolean getStdoutLogging();
/**
* Checks if detached mode is enabled
* @return true if detached mode is requested
*/
public boolean getDetachedMode();
/**
* Gets savepoint restore settings
* @return SavepointRestoreSettings for job restoration
*/
public SavepointRestoreSettings getSavepointRestoreSettings();
}
/**
* Command-line options for the RUN command
*/
public class RunOptions extends ProgramOptions {
// Inherits all methods from ProgramOptions
}
/**
* Command-line options for the LIST command
*/
public class ListOptions extends CommandLineOptions {
/**
* Checks if only running jobs should be listed
* @return true if filtering for running jobs
*/
public boolean getRunning();
/**
* Checks if only scheduled jobs should be listed
* @return true if filtering for scheduled jobs
*/
public boolean getScheduled();
}
/**
* Command-line options for the CANCEL command
*/
public class CancelOptions extends CommandLineOptions {
// Inherits methods from CommandLineOptions
}
/**
* Command-line options for the STOP command
*/
public class StopOptions extends CommandLineOptions {
// Inherits methods from CommandLineOptions
}
/**
* Command-line options for the SAVEPOINT command
*/
public class SavepointOptions extends CommandLineOptions {
// Inherits methods from CommandLineOptions
}
/**
* Command-line options for the INFO command
*/
public class InfoOptions extends ProgramOptions {
// Inherits all methods from ProgramOptions
}Extension point for custom command-line interfaces and cluster-specific implementations.
/**
* Extension point for custom command-line interfaces
*/
public interface CustomCommandLine<ClusterType> {
/**
* Checks if this command line interface is active for the given parameters
* @param commandLine Parsed command line arguments
* @param configuration Current configuration
* @return true if this interface should handle the command
*/
boolean isActive(CommandLine commandLine, Configuration configuration);
/**
* Gets the unique identifier for this command line interface
* @return String identifier for this CLI
*/
String getId();
/**
* Adds run-specific options to the base options
* @param baseOptions Base options to extend
*/
void addRunOptions(Options baseOptions);
/**
* Adds general options to the base options
* @param baseOptions Base options to extend
*/
void addGeneralOptions(Options baseOptions);
/**
* Retrieves an existing cluster based on command line parameters
* @param commandLine Parsed command line arguments
* @param config Configuration object
* @return Cluster client instance
* @throws Exception if cluster retrieval fails
*/
ClusterType retrieveCluster(CommandLine commandLine, Configuration config) throws Exception;
/**
* Creates a new cluster based on command line parameters
* @param applicationName Name for the application/cluster
* @param commandLine Parsed command line arguments
* @param config Configuration object
* @param userJarFiles List of user JAR files
* @return Cluster client instance
* @throws Exception if cluster creation fails
*/
ClusterType createCluster(String applicationName, CommandLine commandLine, Configuration config, List<URL> userJarFiles) throws Exception;
}
/**
* Default command-line interface for standalone clusters
*/
public class DefaultCLI implements CustomCommandLine<StandaloneClusterClient> {
// Implements all CustomCommandLine methods for standalone clusters
}Usage Examples:
import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.RunOptions;
// Basic CLI usage - run a Flink job
String[] args = {
"run",
"-p", "4", // parallelism
"-c", "com.example.MyFlinkJob", // entry class
"/path/to/my-job.jar", // JAR file
"arg1", "arg2", "arg3" // program arguments
};
CliFrontend cli = new CliFrontend();
int exitCode = cli.parseParameters(args);
System.exit(exitCode);
// Parse run command options programmatically
RunOptions runOptions = CliFrontendParser.parseRunCommand(args);
System.out.println("JAR file: " + runOptions.getJarFilePath());
System.out.println("Entry class: " + runOptions.getEntryPointClassName());
System.out.println("Parallelism: " + runOptions.getParallelism());
System.out.println("Program args: " + Arrays.toString(runOptions.getProgramArgs()));
// CLI commands examples:
// flink run -p 4 -c com.example.Job job.jar arg1 arg2
// flink list
// flink cancel <job-id>
// flink stop <job-id>
// flink savepoint <job-id> [target-directory]
// flink info -c com.example.Job job.jarException types for command-line parsing errors.
/**
* Exception indicating command-line parsing failures
*/
public class CliArgsException extends Exception {
/**
* Creates exception with error message
* @param message Error description
*/
public CliArgsException(String message);
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-clients-2-10