0
# CLI Operations
1
2
The Apache Flink CLI Operations module (`org.apache.flink.client.cli`) provides comprehensive command-line interface functionality for interactive job management. This module includes the main CLI frontend, command parsers, option handlers, and custom command line implementations for different deployment scenarios.
3
4
## Core CLI Components
5
6
### CliFrontend { .api }
7
8
Main CLI frontend class that serves as the primary entry point for all Flink command-line operations.
9
10
```java
11
public class CliFrontend {
12
// Constructors
13
public CliFrontend(Configuration configuration, List<CustomCommandLine> customCommandLines) { }
14
public CliFrontend(Configuration configuration,
15
ClusterClientServiceLoader clusterClientServiceLoader,
16
List<CustomCommandLine> customCommandLines) { }
17
18
// Core methods
19
public Configuration getConfiguration() { }
20
public Options getCustomCommandLineOptions() { }
21
public int parseAndRun(String[] args) { }
22
public CustomCommandLine validateAndGetActiveCommandLine(CommandLine commandLine) { }
23
public CommandLine getCommandLine(Options commandOptions, String[] args, boolean stopAtNonOptions) { }
24
25
// Command execution methods
26
protected void run(String[] args) { }
27
protected void runApplication(String[] args) { }
28
protected void info(String[] args) { }
29
protected void list(String[] args) { }
30
protected void cancel(String[] args) { }
31
protected void stop(String[] args) { }
32
protected void savepoint(String[] args) { }
33
34
// Static utilities
35
public static void main(String[] args) { }
36
public static String getConfigurationDirectoryFromEnv() { }
37
public static List<CustomCommandLine> loadCustomCommandLines(Configuration configuration, String configurationDirectory) { }
38
}
39
```
40
41
**Action Constants:**
42
```java
43
private static final String ACTION_RUN = "run";
44
private static final String ACTION_RUN_APPLICATION = "run-application";
45
private static final String ACTION_INFO = "info";
46
private static final String ACTION_LIST = "list";
47
private static final String ACTION_CANCEL = "cancel";
48
private static final String ACTION_STOP = "stop";
49
private static final String ACTION_SAVEPOINT = "savepoint";
50
```
51
52
### CustomCommandLine { .api }
53
54
Interface for custom command line implementations that provide deployment-specific functionality.
55
56
```java
57
public interface CustomCommandLine {
58
// Core interface methods
59
boolean isActive(CommandLine commandLine);
60
String getId();
61
void addRunOptions(Options baseOptions);
62
void addGeneralOptions(Options baseOptions);
63
Configuration toConfiguration(CommandLine commandLine);
64
65
// Default methods
66
default CommandLine parseCommandLineOptions(String[] args, boolean stopAtNonOptions) { }
67
}
68
```
69
70
## Command Option Classes
71
72
### ProgramOptions { .api }
73
74
Handles program execution options and command line parsing for job runs.
75
76
```java
77
public class ProgramOptions extends CommandLineOptions {
78
// Constructor
79
public ProgramOptions(CommandLine line) { }
80
81
// Factory method
82
public static ProgramOptions create(CommandLine commandLine) { }
83
84
// Validation and access methods
85
public void validate() { }
86
public String getJarFilePath() { }
87
public String getEntryPointClassName() { }
88
public String[] getProgramArgs() { }
89
public List<URL> getClasspaths() { }
90
public int getParallelism() { }
91
public SavepointRestoreSettings getSavepointRestoreSettings() { }
92
public boolean getDetachedMode() { }
93
public boolean getShutdownOnAttachedExit() { }
94
}
95
```
96
97
### ListOptions { .api }
98
99
Command line options for the list command to query running jobs.
100
101
```java
102
public class ListOptions extends CommandLineOptions {
103
// Constructor
104
public ListOptions(CommandLine line) { }
105
106
// Option access methods
107
public boolean showRunning() { }
108
public boolean showScheduled() { }
109
public boolean showAll() { }
110
public boolean isPrintHelp() { }
111
}
112
```
113
114
### SavepointOptions { .api }
115
116
Command line options for savepoint operations including creation and disposal.
117
118
```java
119
public class SavepointOptions extends CommandLineOptions {
120
// Option access methods
121
public boolean isDispose() { }
122
public String getSavepointPath() { }
123
public String[] getArgs() { }
124
public boolean isPrintHelp() { }
125
}
126
```
127
128
### CancelOptions { .api }
129
130
Command line options for job cancellation with optional savepoint creation.
131
132
```java
133
public class CancelOptions extends CommandLineOptions {
134
// Option access methods
135
public boolean isWithSavepoint() { }
136
public String getSavepointTargetDirectory() { }
137
public String[] getArgs() { }
138
public boolean isPrintHelp() { }
139
}
140
```
141
142
### StopOptions { .api }
143
144
Command line options for gracefully stopping jobs with savepoint support.
145
146
```java
147
public class StopOptions extends CommandLineOptions {
148
// Option access methods
149
public boolean hasSavepointFlag() { }
150
public boolean shouldAdvanceToEndOfEventTime() { }
151
public String getTargetDirectory() { }
152
public String[] getArgs() { }
153
public boolean isPrintHelp() { }
154
}
155
```
156
157
## CLI Implementation Classes
158
159
### DefaultCLI { .api }
160
161
Default CLI implementation extending the abstract base class.
162
163
```java
164
public class DefaultCLI extends AbstractCustomCommandLine {
165
// Implements CustomCommandLine interface methods
166
}
167
```
168
169
### GenericCLI { .api }
170
171
Generic CLI implementation for general-purpose command line handling.
172
173
```java
174
public class GenericCLI implements CustomCommandLine {
175
// Constructor
176
public GenericCLI(Configuration configuration, String configurationDirectory) { }
177
178
// CustomCommandLine interface implementations
179
public boolean isActive(CommandLine commandLine) { }
180
public String getId() { }
181
public void addRunOptions(Options baseOptions) { }
182
public void addGeneralOptions(Options baseOptions) { }
183
public Configuration toConfiguration(CommandLine commandLine) { }
184
}
185
```
186
187
### AbstractCustomCommandLine { .api }
188
189
Abstract base class for custom command line implementations.
190
191
```java
192
public abstract class AbstractCustomCommandLine implements CustomCommandLine {
193
// Base implementation methods and common functionality
194
}
195
```
196
197
## CLI Utility Classes
198
199
### CliFrontendParser { .api }
200
201
Static utility class for parsing command line arguments and generating help text.
202
203
```java
204
public class CliFrontendParser {
205
// Parsing methods
206
public static CommandLine parse(Options options, String[] args, boolean stopAtNonOptions) { }
207
public static Options mergeOptions(Options options1, Options options2) { }
208
209
// Option generation methods
210
public static Options getRunCommandOptions() { }
211
public static Options getListCommandOptions() { }
212
public static Options getInfoCommandOptions() { }
213
public static Options getCancelCommandOptions() { }
214
public static Options getStopCommandOptions() { }
215
public static Options getSavepointCommandOptions() { }
216
217
// Help printing methods
218
public static void printHelp(List<CustomCommandLine> customCommandLines) { }
219
public static void printHelpForRun(List<CustomCommandLine> customCommandLines) { }
220
public static void printHelpForRunApplication(List<CustomCommandLine> customCommandLines) { }
221
public static void printHelpForList(List<CustomCommandLine> customCommandLines) { }
222
public static void printHelpForInfo() { }
223
public static void printHelpForCancel(List<CustomCommandLine> customCommandLines) { }
224
public static void printHelpForStop(List<CustomCommandLine> customCommandLines) { }
225
public static void printHelpForSavepoint(List<CustomCommandLine> customCommandLines) { }
226
}
227
```
228
229
### ExecutionConfigAccessor { .api }
230
231
Provides access to execution configuration from program options.
232
233
```java
234
public class ExecutionConfigAccessor {
235
// Factory method
236
public static ExecutionConfigAccessor fromProgramOptions(ProgramOptions programOptions, List<?> jobJars) { }
237
238
// Configuration application
239
public void applyToConfiguration(Configuration configuration) { }
240
}
241
```
242
243
### ProgramOptionsUtils { .api }
244
245
Utility enum for handling Python program options and validation.
246
247
```java
248
public enum ProgramOptionsUtils {
249
// Static utility methods
250
public static boolean isPythonEntryPoint(CommandLine commandLine) { }
251
public static ProgramOptions createPythonProgramOptions(CommandLine commandLine) { }
252
}
253
```
254
255
### DynamicPropertiesUtil { .api }
256
257
Utility class for handling dynamic properties in command line arguments.
258
259
```java
260
public class DynamicPropertiesUtil {
261
// Static utility methods for dynamic property parsing
262
public static Properties parseDynamicProperties(String[] args) { }
263
public static Configuration applyDynamicProperties(Configuration config, Properties dynamicProperties) { }
264
public static void validateDynamicProperty(String key, String value) { }
265
public static Map<String, String> convertToMap(Properties properties) { }
266
}
267
```
268
269
## Configuration and Options
270
271
### ClientOptions { .api }
272
273
Configuration options specific to client operations.
274
275
```java
276
public class ClientOptions {
277
// Configuration options
278
public static final ConfigOption<Duration> CLIENT_TIMEOUT;
279
public static final ConfigOption<Integer> CLIENT_RETRY_PERIOD;
280
}
281
```
282
283
### CommandLineOptions { .api }
284
285
Abstract base class for all command line option implementations.
286
287
```java
288
public abstract class CommandLineOptions {
289
// Base command line option functionality
290
}
291
```
292
293
## Application Deployment Interface
294
295
### ApplicationDeployer { .api }
296
297
Interface for deploying applications to clusters.
298
299
```java
300
public interface ApplicationDeployer {
301
void run(Configuration effectiveConfiguration, ApplicationConfiguration applicationConfiguration);
302
}
303
```
304
305
## Exception Handling
306
307
### CliArgsException { .api }
308
309
Exception thrown for command line argument parsing errors.
310
311
```java
312
public class CliArgsException extends Exception {
313
// Constructors
314
public CliArgsException(String message) { }
315
public CliArgsException(String message, Throwable cause) { }
316
}
317
```
318
319
## Usage Examples
320
321
### Basic CLI Usage
322
323
```java
324
// Initialize CLI with configuration
325
Configuration config = GlobalConfiguration.loadConfiguration();
326
String configDir = CliFrontend.getConfigurationDirectoryFromEnv();
327
List<CustomCommandLine> customCommandLines = CliFrontend.loadCustomCommandLines(config, configDir);
328
329
// Create CLI frontend
330
CliFrontend cli = new CliFrontend(config, customCommandLines);
331
332
// Parse and execute commands
333
int exitCode = cli.parseAndRun(args);
334
```
335
336
### Custom Command Line Implementation
337
338
```java
339
// Implement custom command line
340
public class MyCustomCLI implements CustomCommandLine {
341
@Override
342
public boolean isActive(CommandLine commandLine) {
343
return commandLine.hasOption("my-option");
344
}
345
346
@Override
347
public String getId() {
348
return "my-custom-cli";
349
}
350
351
@Override
352
public void addRunOptions(Options baseOptions) {
353
baseOptions.addOption("my-option", true, "My custom option");
354
}
355
356
@Override
357
public void addGeneralOptions(Options baseOptions) {
358
// Add general options
359
}
360
361
@Override
362
public Configuration toConfiguration(CommandLine commandLine) {
363
Configuration config = new Configuration();
364
// Convert command line to configuration
365
return config;
366
}
367
}
368
```
369
370
### Program Options Usage
371
372
```java
373
// Parse program options from command line
374
CommandLine cmd = CliFrontendParser.parse(options, args, true);
375
ProgramOptions programOptions = ProgramOptions.create(cmd);
376
377
// Access program details
378
String jarPath = programOptions.getJarFilePath();
379
String entryClass = programOptions.getEntryPointClassName();
380
String[] programArgs = programOptions.getProgramArgs();
381
int parallelism = programOptions.getParallelism();
382
```
383
384
## Core Client Utilities
385
386
### FlinkPipelineTranslator { .api }
387
388
Interface for translating Flink pipelines to different execution formats.
389
390
```java
391
public interface FlinkPipelineTranslator {
392
// Core translation methods
393
JobGraph translateToJobGraph(Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism);
394
String translateToJSONExecutionPlan(Pipeline pipeline);
395
boolean canTranslate(Pipeline pipeline);
396
}
397
```
398
399
### ClientUtils { .api }
400
401
Utility enum providing core client functionality for program execution and job management.
402
403
```java
404
public enum ClientUtils {
405
// Static utility methods
406
public static URLClassLoader buildUserCodeClassLoader(List<URL> jars,
407
List<URL> classpaths,
408
ClassLoader parent,
409
Configuration configuration) { }
410
411
public static void executeProgram(PipelineExecutorServiceLoader executorServiceLoader,
412
Configuration configuration,
413
PackagedProgram program,
414
boolean enforceSingleJobExecution,
415
boolean suppressSysout) { }
416
417
public static void waitUntilJobInitializationFinished(SupplierWithException<JobStatus, Exception> jobStatusSupplier,
418
SupplierWithException<JobResult, Exception> jobResultSupplier,
419
ClassLoader userCodeClassloader) { }
420
}
421
```
422
423
### FlinkPipelineTranslationUtil { .api }
424
425
Utility class for pipeline translation operations.
426
427
```java
428
public final class FlinkPipelineTranslationUtil {
429
// Static translation utilities
430
public static JobGraph getJobGraph(Pipeline pipeline,
431
Configuration optimizerConfiguration,
432
int defaultParallelism) { }
433
434
public static JobGraph getJobGraphUnderUserClassLoader(ClassLoader userClassloader,
435
Pipeline pipeline,
436
Configuration configuration,
437
int defaultParallelism) { }
438
439
public static String translateToJSONExecutionPlan(Pipeline pipeline) { }
440
}
441
```
442
443
### StreamGraphTranslator { .api }
444
445
Implementation of FlinkPipelineTranslator for streaming pipelines.
446
447
```java
448
public class StreamGraphTranslator implements FlinkPipelineTranslator {
449
// FlinkPipelineTranslator interface implementations
450
public JobGraph translateToJobGraph(Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism) { }
451
public String translateToJSONExecutionPlan(Pipeline pipeline) { }
452
public boolean canTranslate(Pipeline pipeline) { }
453
}
454
```
455
456
### PlanTranslator { .api }
457
458
Implementation of FlinkPipelineTranslator for batch execution plans.
459
460
```java
461
public class PlanTranslator implements FlinkPipelineTranslator {
462
// FlinkPipelineTranslator interface implementations
463
public JobGraph translateToJobGraph(Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism) { }
464
public String translateToJSONExecutionPlan(Pipeline pipeline) { }
465
public boolean canTranslate(Pipeline pipeline) { }
466
}
467
```
468
469
## Required Imports
470
471
```java
472
import org.apache.flink.client.FlinkPipelineTranslator;
473
import org.apache.flink.client.ClientUtils;
474
import org.apache.flink.client.FlinkPipelineTranslationUtil;
475
import org.apache.flink.client.StreamGraphTranslator;
476
import org.apache.flink.client.PlanTranslator;
477
import org.apache.flink.client.cli.CliFrontend;
478
import org.apache.flink.client.cli.CustomCommandLine;
479
import org.apache.flink.client.cli.ProgramOptions;
480
import org.apache.flink.client.cli.ListOptions;
481
import org.apache.flink.client.cli.SavepointOptions;
482
import org.apache.flink.client.cli.CancelOptions;
483
import org.apache.flink.client.cli.StopOptions;
484
import org.apache.flink.client.cli.DefaultCLI;
485
import org.apache.flink.client.cli.GenericCLI;
486
import org.apache.flink.client.cli.AbstractCustomCommandLine;
487
import org.apache.flink.client.cli.CliFrontendParser;
488
import org.apache.flink.client.cli.ExecutionConfigAccessor;
489
import org.apache.flink.client.cli.ProgramOptionsUtils;
490
import org.apache.flink.client.cli.ClientOptions;
491
import org.apache.flink.client.cli.CommandLineOptions;
492
import org.apache.flink.client.cli.ApplicationDeployer;
493
import org.apache.flink.client.cli.CliArgsException;
494
import org.apache.flink.configuration.Configuration;
495
import org.apache.flink.configuration.GlobalConfiguration;
496
import org.apache.flink.runtime.state.StateBackend;
497
import org.apache.flink.runtime.jobgraph.JobGraph;
498
import org.apache.flink.runtime.jobmaster.JobResult;
499
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
500
import org.apache.flink.api.dag.Pipeline;
501
import org.apache.flink.util.function.SupplierWithException;
502
import org.apache.commons.cli.CommandLine;
503
import org.apache.commons.cli.Options;
504
import java.util.List;
505
import java.util.Properties;
506
import java.util.Map;
507
import java.net.URL;
508
import java.net.URLClassLoader;
509
```