Apache Flink Client APIs and utilities for submitting and interacting with Flink jobs
—
The Apache Flink CLI Operations module (org.apache.flink.client.cli) provides comprehensive command-line interface functionality for interactive job management. This module includes the main CLI frontend, command parsers, option handlers, and custom command line implementations for different deployment scenarios.
Main CLI frontend class that serves as the primary entry point for all Flink command-line operations.
public class CliFrontend {
// Constructors
public CliFrontend(Configuration configuration, List<CustomCommandLine> customCommandLines) { }
public CliFrontend(Configuration configuration,
ClusterClientServiceLoader clusterClientServiceLoader,
List<CustomCommandLine> customCommandLines) { }
// Core methods
public Configuration getConfiguration() { }
public Options getCustomCommandLineOptions() { }
public int parseAndRun(String[] args) { }
public CustomCommandLine validateAndGetActiveCommandLine(CommandLine commandLine) { }
public CommandLine getCommandLine(Options commandOptions, String[] args, boolean stopAtNonOptions) { }
// Command execution methods
protected void run(String[] args) { }
protected void runApplication(String[] args) { }
protected void info(String[] args) { }
protected void list(String[] args) { }
protected void cancel(String[] args) { }
protected void stop(String[] args) { }
protected void savepoint(String[] args) { }
// Static utilities
public static void main(String[] args) { }
public static String getConfigurationDirectoryFromEnv() { }
public static List<CustomCommandLine> loadCustomCommandLines(Configuration configuration, String configurationDirectory) { }
}Action Constants:
private static final String ACTION_RUN = "run";
private static final String ACTION_RUN_APPLICATION = "run-application";
private static final String ACTION_INFO = "info";
private static final String ACTION_LIST = "list";
private static final String ACTION_CANCEL = "cancel";
private static final String ACTION_STOP = "stop";
private static final String ACTION_SAVEPOINT = "savepoint";Interface for custom command line implementations that provide deployment-specific functionality.
public interface CustomCommandLine {
// Core interface methods
boolean isActive(CommandLine commandLine);
String getId();
void addRunOptions(Options baseOptions);
void addGeneralOptions(Options baseOptions);
Configuration toConfiguration(CommandLine commandLine);
// Default methods
default CommandLine parseCommandLineOptions(String[] args, boolean stopAtNonOptions) { }
}Handles program execution options and command line parsing for job runs.
public class ProgramOptions extends CommandLineOptions {
// Constructor
public ProgramOptions(CommandLine line) { }
// Factory method
public static ProgramOptions create(CommandLine commandLine) { }
// Validation and access methods
public void validate() { }
public String getJarFilePath() { }
public String getEntryPointClassName() { }
public String[] getProgramArgs() { }
public List<URL> getClasspaths() { }
public int getParallelism() { }
public SavepointRestoreSettings getSavepointRestoreSettings() { }
public boolean getDetachedMode() { }
public boolean getShutdownOnAttachedExit() { }
}Command line options for the list command to query running jobs.
public class ListOptions extends CommandLineOptions {
// Constructor
public ListOptions(CommandLine line) { }
// Option access methods
public boolean showRunning() { }
public boolean showScheduled() { }
public boolean showAll() { }
public boolean isPrintHelp() { }
}Command line options for savepoint operations including creation and disposal.
public class SavepointOptions extends CommandLineOptions {
// Option access methods
public boolean isDispose() { }
public String getSavepointPath() { }
public String[] getArgs() { }
public boolean isPrintHelp() { }
}Command line options for job cancellation with optional savepoint creation.
public class CancelOptions extends CommandLineOptions {
// Option access methods
public boolean isWithSavepoint() { }
public String getSavepointTargetDirectory() { }
public String[] getArgs() { }
public boolean isPrintHelp() { }
}Command line options for gracefully stopping jobs with savepoint support.
public class StopOptions extends CommandLineOptions {
// Option access methods
public boolean hasSavepointFlag() { }
public boolean shouldAdvanceToEndOfEventTime() { }
public String getTargetDirectory() { }
public String[] getArgs() { }
public boolean isPrintHelp() { }
}Default CLI implementation extending the abstract base class.
public class DefaultCLI extends AbstractCustomCommandLine {
// Implements CustomCommandLine interface methods
}Generic CLI implementation for general-purpose command line handling.
public class GenericCLI implements CustomCommandLine {
// Constructor
public GenericCLI(Configuration configuration, String configurationDirectory) { }
// CustomCommandLine interface implementations
public boolean isActive(CommandLine commandLine) { }
public String getId() { }
public void addRunOptions(Options baseOptions) { }
public void addGeneralOptions(Options baseOptions) { }
public Configuration toConfiguration(CommandLine commandLine) { }
}Abstract base class for custom command line implementations.
public abstract class AbstractCustomCommandLine implements CustomCommandLine {
// Base implementation methods and common functionality
}Static utility class for parsing command line arguments and generating help text.
public class CliFrontendParser {
// Parsing methods
public static CommandLine parse(Options options, String[] args, boolean stopAtNonOptions) { }
public static Options mergeOptions(Options options1, Options options2) { }
// Option generation methods
public static Options getRunCommandOptions() { }
public static Options getListCommandOptions() { }
public static Options getInfoCommandOptions() { }
public static Options getCancelCommandOptions() { }
public static Options getStopCommandOptions() { }
public static Options getSavepointCommandOptions() { }
// Help printing methods
public static void printHelp(List<CustomCommandLine> customCommandLines) { }
public static void printHelpForRun(List<CustomCommandLine> customCommandLines) { }
public static void printHelpForRunApplication(List<CustomCommandLine> customCommandLines) { }
public static void printHelpForList(List<CustomCommandLine> customCommandLines) { }
public static void printHelpForInfo() { }
public static void printHelpForCancel(List<CustomCommandLine> customCommandLines) { }
public static void printHelpForStop(List<CustomCommandLine> customCommandLines) { }
public static void printHelpForSavepoint(List<CustomCommandLine> customCommandLines) { }
}Provides access to execution configuration from program options.
public class ExecutionConfigAccessor {
// Factory method
public static ExecutionConfigAccessor fromProgramOptions(ProgramOptions programOptions, List<?> jobJars) { }
// Configuration application
public void applyToConfiguration(Configuration configuration) { }
}Utility enum for handling Python program options and validation.
public enum ProgramOptionsUtils {
// Static utility methods
public static boolean isPythonEntryPoint(CommandLine commandLine) { }
public static ProgramOptions createPythonProgramOptions(CommandLine commandLine) { }
}Utility class for handling dynamic properties in command line arguments.
public class DynamicPropertiesUtil {
// Static utility methods for dynamic property parsing
public static Properties parseDynamicProperties(String[] args) { }
public static Configuration applyDynamicProperties(Configuration config, Properties dynamicProperties) { }
public static void validateDynamicProperty(String key, String value) { }
public static Map<String, String> convertToMap(Properties properties) { }
}Configuration options specific to client operations.
public class ClientOptions {
// Configuration options
public static final ConfigOption<Duration> CLIENT_TIMEOUT;
public static final ConfigOption<Integer> CLIENT_RETRY_PERIOD;
}Abstract base class for all command line option implementations.
public abstract class CommandLineOptions {
// Base command line option functionality
}Interface for deploying applications to clusters.
public interface ApplicationDeployer {
void run(Configuration effectiveConfiguration, ApplicationConfiguration applicationConfiguration);
}Exception thrown for command line argument parsing errors.
public class CliArgsException extends Exception {
// Constructors
public CliArgsException(String message) { }
public CliArgsException(String message, Throwable cause) { }
}// Initialize CLI with configuration
Configuration config = GlobalConfiguration.loadConfiguration();
String configDir = CliFrontend.getConfigurationDirectoryFromEnv();
List<CustomCommandLine> customCommandLines = CliFrontend.loadCustomCommandLines(config, configDir);
// Create CLI frontend
CliFrontend cli = new CliFrontend(config, customCommandLines);
// Parse and execute commands
int exitCode = cli.parseAndRun(args);// Implement custom command line
public class MyCustomCLI implements CustomCommandLine {
@Override
public boolean isActive(CommandLine commandLine) {
return commandLine.hasOption("my-option");
}
@Override
public String getId() {
return "my-custom-cli";
}
@Override
public void addRunOptions(Options baseOptions) {
baseOptions.addOption("my-option", true, "My custom option");
}
@Override
public void addGeneralOptions(Options baseOptions) {
// Add general options
}
@Override
public Configuration toConfiguration(CommandLine commandLine) {
Configuration config = new Configuration();
// Convert command line to configuration
return config;
}
}// Parse program options from command line
CommandLine cmd = CliFrontendParser.parse(options, args, true);
ProgramOptions programOptions = ProgramOptions.create(cmd);
// Access program details
String jarPath = programOptions.getJarFilePath();
String entryClass = programOptions.getEntryPointClassName();
String[] programArgs = programOptions.getProgramArgs();
int parallelism = programOptions.getParallelism();Interface for translating Flink pipelines to different execution formats.
public interface FlinkPipelineTranslator {
// Core translation methods
JobGraph translateToJobGraph(Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism);
String translateToJSONExecutionPlan(Pipeline pipeline);
boolean canTranslate(Pipeline pipeline);
}Utility enum providing core client functionality for program execution and job management.
public enum ClientUtils {
// Static utility methods
public static URLClassLoader buildUserCodeClassLoader(List<URL> jars,
List<URL> classpaths,
ClassLoader parent,
Configuration configuration) { }
public static void executeProgram(PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
PackagedProgram program,
boolean enforceSingleJobExecution,
boolean suppressSysout) { }
public static void waitUntilJobInitializationFinished(SupplierWithException<JobStatus, Exception> jobStatusSupplier,
SupplierWithException<JobResult, Exception> jobResultSupplier,
ClassLoader userCodeClassloader) { }
}Utility class for pipeline translation operations.
public final class FlinkPipelineTranslationUtil {
// Static translation utilities
public static JobGraph getJobGraph(Pipeline pipeline,
Configuration optimizerConfiguration,
int defaultParallelism) { }
public static JobGraph getJobGraphUnderUserClassLoader(ClassLoader userClassloader,
Pipeline pipeline,
Configuration configuration,
int defaultParallelism) { }
public static String translateToJSONExecutionPlan(Pipeline pipeline) { }
}Implementation of FlinkPipelineTranslator for streaming pipelines.
public class StreamGraphTranslator implements FlinkPipelineTranslator {
// FlinkPipelineTranslator interface implementations
public JobGraph translateToJobGraph(Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism) { }
public String translateToJSONExecutionPlan(Pipeline pipeline) { }
public boolean canTranslate(Pipeline pipeline) { }
}Implementation of FlinkPipelineTranslator for batch execution plans.
public class PlanTranslator implements FlinkPipelineTranslator {
// FlinkPipelineTranslator interface implementations
public JobGraph translateToJobGraph(Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism) { }
public String translateToJSONExecutionPlan(Pipeline pipeline) { }
public boolean canTranslate(Pipeline pipeline) { }
}import org.apache.flink.client.FlinkPipelineTranslator;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.FlinkPipelineTranslationUtil;
import org.apache.flink.client.StreamGraphTranslator;
import org.apache.flink.client.PlanTranslator;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.client.cli.ProgramOptions;
import org.apache.flink.client.cli.ListOptions;
import org.apache.flink.client.cli.SavepointOptions;
import org.apache.flink.client.cli.CancelOptions;
import org.apache.flink.client.cli.StopOptions;
import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.client.cli.GenericCLI;
import org.apache.flink.client.cli.AbstractCustomCommandLine;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.ExecutionConfigAccessor;
import org.apache.flink.client.cli.ProgramOptionsUtils;
import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.client.cli.CommandLineOptions;
import org.apache.flink.client.cli.ApplicationDeployer;
import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import java.util.List;
import java.util.Properties;
import java.util.Map;
import java.net.URL;
import java.net.URLClassLoader;Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-clients-2-11