Comprehensive command line interface for Flink cluster operations including job submission, monitoring, cancellation, and savepoint management.
Main command-line interface entry point for Flink client operations, providing a complete CLI for job and cluster management.
/**
* Main command-line interface for Flink client operations
*/
public class CliFrontend {
/**
* Creates a CLI frontend with default configuration directory
*/
public CliFrontend();
/**
* Creates a CLI frontend with specific configuration directory
* @param configDir Path to the configuration directory
*/
public CliFrontend(String configDir);
/**
* Parses command line parameters and returns exit code
* @param args Command line arguments
* @return Exit code (0 for success, non-zero for error)
*/
public int parseParameters(String[] args);
/**
* Gets the current configuration
* @return Configuration object with current settings
*/
public Configuration getConfiguration();
/**
* Executes the RUN command to submit and execute a Flink job
* @param args Command line arguments for job execution
* @return Exit code (0 for success, non-zero for error)
*/
public int run(String[] args);
/**
* Executes the INFO command to display job information and execution plan
* @param args Command line arguments for info command
* @return Exit code (0 for success, non-zero for error)
*/
public int info(String[] args);
/**
* Executes the LIST command to display running and completed jobs
* @param args Command line arguments for list command
* @return Exit code (0 for success, non-zero for error)
*/
public int list(String[] args);
/**
* Executes the CANCEL command to cancel a running job
* @param args Command line arguments for cancel command
* @return Exit code (0 for success, non-zero for error)
*/
public int cancel(String[] args);
/**
* Executes the STOP command to gracefully stop a running job
* @param args Command line arguments for stop command
* @return Exit code (0 for success, non-zero for error)
*/
public int stop(String[] args);
/**
* Executes the SAVEPOINT command to create or manage savepoints
* @param args Command line arguments for savepoint command
* @return Exit code (0 for success, non-zero for error)
*/
public int savepoint(String[] args);
/**
* Gets the configuration directory from environment variables
* @return Path to configuration directory or null if not set
*/
public static String getConfigurationDirectoryFromEnv();
/**
* Gets the list of available custom command line interfaces
* @return List of CustomCommandLine implementations
*/
public static List<CustomCommandLine> getCustomCommandLineList();
/**
* Gets the active custom command line interface for given parameters
* @param commandLine Parsed command line arguments
* @return Active CustomCommandLine implementation
*/
public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine);
/**
* Main entry point for CLI execution
* @param args Command line arguments
*/
public static void main(String[] args);
}Parsers for different CLI command options and arguments.
/**
* Command-line argument parser for Flink CLI commands
*/
public class CliFrontendParser {
/**
* Parses arguments for the RUN command
* @param args Command line arguments
* @return RunOptions containing parsed options
* @throws CliArgsException if parsing fails
*/
public static RunOptions parseRunCommand(String[] args) throws CliArgsException;
/**
* Parses arguments for the LIST command
* @param args Command line arguments
* @return ListOptions containing parsed options
* @throws CliArgsException if parsing fails
*/
public static ListOptions parseListCommand(String[] args) throws CliArgsException;
/**
* Parses arguments for the CANCEL command
* @param args Command line arguments
* @return CancelOptions containing parsed options
* @throws CliArgsException if parsing fails
*/
public static CancelOptions parseCancelCommand(String[] args) throws CliArgsException;
/**
* Parses arguments for the STOP command
* @param args Command line arguments
* @return StopOptions containing parsed options
* @throws CliArgsException if parsing fails
*/
public static StopOptions parseStopCommand(String[] args) throws CliArgsException;
/**
* Parses arguments for the SAVEPOINT command
* @param args Command line arguments
* @return SavepointOptions containing parsed options
* @throws CliArgsException if parsing fails
*/
public static SavepointOptions parseSavepointCommand(String[] args) throws CliArgsException;
/**
* Parses arguments for the INFO command
* @param args Command line arguments
* @return InfoOptions containing parsed options
* @throws CliArgsException if parsing fails
*/
public static InfoOptions parseInfoCommand(String[] args) throws CliArgsException;
/**
* Prints general help information
*/
public static void printHelp();
/**
* Prints help information for the RUN command
*/
public static void printHelpForRun();
/**
* Prints help information for the INFO command
*/
public static void printHelpForInfo();
/**
* Prints help information for the LIST command
*/
public static void printHelpForList();
/**
* Prints help information for the CANCEL command
*/
public static void printHelpForCancel();
/**
* Prints help information for the STOP command
*/
public static void printHelpForStop();
/**
* Prints help information for the SAVEPOINT command
*/
public static void printHelpForSavepoint();
}Option classes for different CLI commands and their parameters.
/**
* Base class for all command-line option parsers
*/
public abstract class CommandLineOptions {
/**
* Gets the parsed command line
* @return CommandLine object with parsed options
*/
public CommandLine getCommandLine();
/**
* Checks if help should be printed
* @return true if help was requested
*/
public boolean isPrintHelp();
/**
* Gets the JobManager address from options
* @return JobManager address string
*/
public String getJobManagerAddress();
}
/**
* Base class for options that reference JAR file programs
*/
public abstract class ProgramOptions extends CommandLineOptions {
/**
* Gets the path to the JAR file
* @return JAR file path string
*/
public String getJarFilePath();
/**
* Gets the entry point class name
* @return Fully qualified class name
*/
public String getEntryPointClassName();
/**
* Gets additional classpath URLs
* @return List of classpath URLs
*/
public List<URL> getClasspaths();
/**
* Gets program arguments
* @return Array of program arguments
*/
public String[] getProgramArgs();
/**
* Gets the parallelism level
* @return Parallelism value or -1 if not specified
*/
public int getParallelism();
/**
* Checks if stdout logging is enabled
* @return true if stdout logging is enabled
*/
public boolean getStdoutLogging();
/**
* Checks if detached mode is enabled
* @return true if detached mode is requested
*/
public boolean getDetachedMode();
/**
* Gets savepoint restore settings
* @return SavepointRestoreSettings for job restoration
*/
public SavepointRestoreSettings getSavepointRestoreSettings();
}
/**
* Command-line options for the RUN command
*/
public class RunOptions extends ProgramOptions {
// Inherits all methods from ProgramOptions
}
/**
* Command-line options for the LIST command
*/
public class ListOptions extends CommandLineOptions {
/**
* Checks if only running jobs should be listed
* @return true if filtering for running jobs
*/
public boolean getRunning();
/**
* Checks if only scheduled jobs should be listed
* @return true if filtering for scheduled jobs
*/
public boolean getScheduled();
}
/**
* Command-line options for the CANCEL command
*/
public class CancelOptions extends CommandLineOptions {
// Inherits methods from CommandLineOptions
}
/**
* Command-line options for the STOP command
*/
public class StopOptions extends CommandLineOptions {
// Inherits methods from CommandLineOptions
}
/**
* Command-line options for the SAVEPOINT command
*/
public class SavepointOptions extends CommandLineOptions {
// Inherits methods from CommandLineOptions
}
/**
* Command-line options for the INFO command
*/
public class InfoOptions extends ProgramOptions {
// Inherits all methods from ProgramOptions
}Extension point for custom command-line interfaces and cluster-specific implementations.
/**
* Extension point for custom command-line interfaces
*/
public interface CustomCommandLine<ClusterType> {
/**
* Checks if this command line interface is active for the given parameters
* @param commandLine Parsed command line arguments
* @param configuration Current configuration
* @return true if this interface should handle the command
*/
boolean isActive(CommandLine commandLine, Configuration configuration);
/**
* Gets the unique identifier for this command line interface
* @return String identifier for this CLI
*/
String getId();
/**
* Adds run-specific options to the base options
* @param baseOptions Base options to extend
*/
void addRunOptions(Options baseOptions);
/**
* Adds general options to the base options
* @param baseOptions Base options to extend
*/
void addGeneralOptions(Options baseOptions);
/**
* Retrieves an existing cluster based on command line parameters
* @param commandLine Parsed command line arguments
* @param config Configuration object
* @return Cluster client instance
* @throws Exception if cluster retrieval fails
*/
ClusterType retrieveCluster(CommandLine commandLine, Configuration config) throws Exception;
/**
* Creates a new cluster based on command line parameters
* @param applicationName Name for the application/cluster
* @param commandLine Parsed command line arguments
* @param config Configuration object
* @param userJarFiles List of user JAR files
* @return Cluster client instance
* @throws Exception if cluster creation fails
*/
ClusterType createCluster(String applicationName, CommandLine commandLine, Configuration config, List<URL> userJarFiles) throws Exception;
}
/**
* Default command-line interface for standalone clusters
*/
public class DefaultCLI implements CustomCommandLine<StandaloneClusterClient> {
// Implements all CustomCommandLine methods for standalone clusters
}Usage Examples:
import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.RunOptions;
// Basic CLI usage - run a Flink job
String[] args = {
"run",
"-p", "4", // parallelism
"-c", "com.example.MyFlinkJob", // entry class
"/path/to/my-job.jar", // JAR file
"arg1", "arg2", "arg3" // program arguments
};
CliFrontend cli = new CliFrontend();
int exitCode = cli.parseParameters(args);
System.exit(exitCode);
// Parse run command options programmatically
RunOptions runOptions = CliFrontendParser.parseRunCommand(args);
System.out.println("JAR file: " + runOptions.getJarFilePath());
System.out.println("Entry class: " + runOptions.getEntryPointClassName());
System.out.println("Parallelism: " + runOptions.getParallelism());
System.out.println("Program args: " + Arrays.toString(runOptions.getProgramArgs()));
// CLI commands examples:
// flink run -p 4 -c com.example.Job job.jar arg1 arg2
// flink list
// flink cancel <job-id>
// flink stop <job-id>
// flink savepoint <job-id> [target-directory]
// flink info -c com.example.Job job.jarException types for command-line parsing errors.
/**
* Exception indicating command-line parsing failures
*/
public class CliArgsException extends Exception {
/**
* Creates exception with error message
* @param message Error description
*/
public CliArgsException(String message);
}