Command line interface classes for Flink client operations including job submission, monitoring, and cluster management through CLI commands. Provides comprehensive command line parsing, option handling, and execution framework.
Main entry point for Flink command line interface providing job management operations.
/**
* Main entry point for Flink command line interface
*/
public class CliFrontend {
/**
* Creates CLI frontend with configuration and custom command lines
* @param configuration Flink configuration
* @param customCommandLines List of custom command line implementations
*/
public CliFrontend(Configuration configuration, List<CustomCommandLine> customCommandLines);
/**
* Main method for CLI execution
* @param args Command line arguments
*/
public static void main(String[] args);
/**
* Runs CLI with given arguments
* @param args Command line arguments
* @return Exit code (0 for success, non-zero for error)
*/
public int run(String[] args);
/**
* Lists jobs on the cluster
* @param args Command line arguments for list operation
* @return Exit code
*/
public int list(String[] args);
/**
* Cancels a running job
* @param args Command line arguments for cancel operation
* @return Exit code
*/
public int cancel(String[] args);
/**
* Stops a running job
* @param args Command line arguments for stop operation
* @return Exit code
*/
public int stop(String[] args);
/**
* Manages savepoints (create, dispose, etc.)
* @param args Command line arguments for savepoint operation
* @return Exit code
*/
public int savepoint(String[] args);
}Usage Example:
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.configuration.Configuration;
// Create CLI frontend
Configuration config = new Configuration();
List<CustomCommandLine> customCLIs = Arrays.asList(new DefaultCLI(), new GenericCLI());
CliFrontend cli = new CliFrontend(config, customCLIs);
// Execute CLI commands programmatically
int exitCode;
// List jobs
exitCode = cli.list(new String[]{"list"});
// Cancel a job
exitCode = cli.cancel(new String[]{"cancel", "a1b2c3d4e5f6"});
// Create savepoint
exitCode = cli.savepoint(new String[]{"savepoint", "a1b2c3d4e5f6", "/path/to/savepoint"});Interface and implementations for extending CLI functionality.
/**
* Interface for custom command line implementations
*/
public interface CustomCommandLine {
/**
* Checks if this command line is active for given arguments
* @param commandLine Parsed command line
* @return true if this CLI should handle the command
*/
boolean isActive(CommandLine commandLine);
/**
* Gets the unique identifier for this command line
* @return Unique string identifier
*/
String getId();
/**
* Adds run options to the options parser
* @param options Options instance to add to
*/
void addRunOptions(Options options);
/**
* Adds general options to the options parser
* @param options Options instance to add to
*/
void addGeneralOptions(Options options);
/**
* Applies command line options to configuration
* @param commandLine Parsed command line
* @return Updated configuration with CLI options applied
*/
Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine);
}
/**
* Base class for custom command line interfaces
*/
public abstract class AbstractCustomCommandLine implements CustomCommandLine {
@Override
public boolean isActive(CommandLine commandLine);
@Override
public String getId();
@Override
public void addRunOptions(Options options);
@Override
public void addGeneralOptions(Options options);
}
/**
* Default command line interface implementation
*/
public class DefaultCLI extends AbstractCustomCommandLine {
// Default implementation for standard Flink CLI operations
}
/**
* Generic command line interface
*/
public class GenericCLI implements CustomCommandLine {
@Override
public boolean isActive(CommandLine commandLine);
@Override
public String getId();
@Override
public void addRunOptions(Options options);
@Override
public void addGeneralOptions(Options options);
@Override
public Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine);
}Parser for command line arguments with support for different operation modes.
/**
* Parses command line arguments for Flink CLI
*/
public class CliFrontendParser {
/**
* Parses command line arguments
* @param options Available options
* @param args Arguments to parse
* @param stopAtNonOptions Whether to stop parsing at non-options
* @return Parsed CommandLine instance
*/
public static CommandLine parse(Options options, String[] args, boolean stopAtNonOptions);
/**
* Gets options for run command
* @return Options instance with run command options
*/
public static Options getRunCommandOptions();
/**
* Gets options for cancel command
* @return Options instance with cancel command options
*/
public static Options getCancelCommandOptions();
/**
* Gets options for stop command
* @return Options instance with stop command options
*/
public static Options getStopCommandOptions();
/**
* Gets options for savepoint command
* @return Options instance with savepoint command options
*/
public static Options getSavepointCommandOptions();
/**
* Gets options for list command
* @return Options instance with list command options
*/
public static Options getListCommandOptions();
}Specialized classes for parsing different types of CLI operations.
/**
* Base class for command line option parsing
*/
public abstract class CommandLineOptions {
/**
* Creates command line options from parsed command line
* @param cmdLine Parsed command line
*/
protected CommandLineOptions(CommandLine cmdLine);
}
/**
* Command line options for cancel operations
*/
public class CancelOptions extends CommandLineOptions {
/**
* Creates cancel options from command line
* @param cmdLine Parsed command line
*/
public CancelOptions(CommandLine cmdLine);
/**
* Gets the job ID to cancel
* @return JobID instance
*/
public JobID getJobId();
}
/**
* Options for list operations
*/
public class ListOptions extends CommandLineOptions {
/**
* Creates list options from command line
* @param cmdLine Parsed command line
*/
public ListOptions(CommandLine cmdLine);
/**
* Checks if running jobs should be shown
* @return true if showing running jobs
*/
public boolean showRunning();
/**
* Checks if scheduled jobs should be shown
* @return true if showing scheduled jobs
*/
public boolean showScheduled();
/**
* Checks if all jobs should be shown
* @return true if showing all jobs
*/
public boolean showAll();
}
/**
* Program-specific command line options
*/
public class ProgramOptions extends CommandLineOptions {
/**
* Creates program options from command line
* @param cmdLine Parsed command line
*/
public ProgramOptions(CommandLine cmdLine);
/**
* Gets the JAR file path
* @return Path to JAR file
*/
public String getJarFilePath();
/**
* Gets the entry point class name
* @return Fully qualified class name
*/
public String getEntryPointClassName();
/**
* Gets program arguments
* @return Array of program arguments
*/
public String[] getProgramArgs();
/**
* Gets additional classpaths
* @return List of classpath URLs
*/
public List<URL> getClasspaths();
/**
* Gets the parallelism setting
* @return Parallelism value, -1 if not set
*/
public int getParallelism();
/**
* Gets detached mode setting
* @return true if running in detached mode
*/
public boolean getDetachedMode();
/**
* Gets shutdown on attached exit setting
* @return true if should shutdown on exit
*/
public boolean isShutdownOnAttachedExit();
}
/**
* Options for savepoint operations
*/
public class SavepointOptions extends CommandLineOptions {
/**
* Creates savepoint options from command line
* @param cmdLine Parsed command line
*/
public SavepointOptions(CommandLine cmdLine);
/**
* Gets the job ID for savepoint operation
* @return JobID instance
*/
public JobID getJobId();
/**
* Gets the savepoint path
* @return Path to savepoint
*/
public String getSavepointPath();
/**
* Checks if this is a dispose operation
* @return true if disposing savepoint
*/
public boolean isDispose();
/**
* Gets the target directory for savepoint
* @return Target directory path
*/
public String getTargetDirectory();
}
/**
* Options for stop operations
*/
public class StopOptions extends CommandLineOptions {
/**
* Creates stop options from command line
* @param cmdLine Parsed command line
*/
public StopOptions(CommandLine cmdLine);
/**
* Gets the job ID to stop
* @return JobID instance
*/
public JobID getJobId();
/**
* Checks if savepoint flag is set
* @return true if creating savepoint on stop
*/
public boolean hasSavepointFlag();
/**
* Gets the target directory for savepoint on stop
* @return Target directory path
*/
public String getTargetDirectory();
/**
* Checks if should advance to end of event time
* @return true if advancing to end of event time
*/
public boolean shouldAdvanceToEndOfEventTime();
}Usage Example:
import org.apache.flink.client.cli.*;
import org.apache.commons.cli.CommandLine;
// Parse program options
String[] args = {"run", "-p", "4", "-c", "com.example.Main", "app.jar", "--input", "data"};
Options options = CliFrontendParser.getRunCommandOptions();
CommandLine cmdLine = CliFrontendParser.parse(options, args, true);
ProgramOptions programOpts = new ProgramOptions(cmdLine);
System.out.println("JAR file: " + programOpts.getJarFilePath());
System.out.println("Main class: " + programOpts.getEntryPointClassName());
System.out.println("Parallelism: " + programOpts.getParallelism());
System.out.println("Args: " + Arrays.toString(programOpts.getProgramArgs()));
// Parse cancel options
String[] cancelArgs = {"cancel", "a1b2c3d4e5f6"};
Options cancelOptions = CliFrontendParser.getCancelCommandOptions();
CommandLine cancelCmdLine = CliFrontendParser.parse(cancelOptions, cancelArgs, false);
CancelOptions cancelOpts = new CancelOptions(cancelCmdLine);
System.out.println("Job to cancel: " + cancelOpts.getJobId());Utility classes for CLI configuration and option processing.
/**
* Configuration options for Flink client
*/
public class ClientOptions {
/**
* Client retry period configuration option
*/
public static final ConfigOption<String> CLIENT_RETRY_PERIOD;
/**
* Client timeout configuration option
*/
public static final ConfigOption<Integer> CLIENT_TIMEOUT;
}
/**
* Utilities for program options
*/
public class ProgramOptionsUtils {
/**
* Creates configuration from program options
* @param programOptions Program options to convert
* @return Configuration with program options applied
*/
public static Configuration createConfigurationFromOptions(ProgramOptions programOptions);
}
/**
* Utilities for handling dynamic properties
*/
public class DynamicPropertiesUtil {
/**
* Creates configuration from dynamic properties
* @param dynamicProperties List of dynamic property strings (key=value format)
* @return Configuration with dynamic properties applied
*/
public static Configuration createConfigFromDynamicProperties(List<String> dynamicProperties);
}
/**
* Access and manipulate execution configuration
*/
public class ExecutionConfigAccessor {
/**
* Creates ExecutionConfig from Configuration
* @param configuration Flink configuration
* @return ExecutionConfig instance
*/
public static ExecutionConfig fromConfiguration(Configuration configuration);
}Usage Example:
import org.apache.flink.client.cli.ProgramOptionsUtils;
import org.apache.flink.client.cli.DynamicPropertiesUtil;
// Create configuration from program options
Configuration config = ProgramOptionsUtils.createConfigurationFromOptions(programOptions);
// Apply dynamic properties
List<String> dynamicProps = Arrays.asList(
"parallelism.default=8",
"taskmanager.memory.process.size=2g"
);
Configuration dynamicConfig = DynamicPropertiesUtil.createConfigFromDynamicProperties(dynamicProps);
// Merge configurations
config.addAll(dynamicConfig);
// Get execution config
ExecutionConfig execConfig = ExecutionConfigAccessor.fromConfiguration(config);/**
* Exception for command line argument errors
*/
public class CliArgsException extends Exception {
/**
* Creates exception with message
* @param message Error message
*/
public CliArgsException(String message);
/**
* Creates exception with message and cause
* @param message Error message
* @param cause Root cause throwable
*/
public CliArgsException(String message, Throwable cause);
}public class CommandLine {
// Apache Commons CLI CommandLine class
public String[] getArgs();
public String getOptionValue(String opt);
public boolean hasOption(String opt);
public String[] getOptionValues(String opt);
}
public class Options {
// Apache Commons CLI Options class
public Options addOption(String opt, boolean hasArg, String description);
public Options addOption(String opt, String longOpt, boolean hasArg, String description);
}
public interface ConfigOption<T> {
String key();
T defaultValue();
Class<T> getClazz();
}
public class ExecutionConfig implements Serializable {
public void setParallelism(int parallelism);
public int getParallelism();
public void setMaxParallelism(int maxParallelism);
public int getMaxParallelism();
}