0
# CLI Interface
1
2
Command line interface classes for Flink client operations including job submission, monitoring, and cluster management through CLI commands. Provides comprehensive command line parsing, option handling, and execution framework.
3
4
## Capabilities
5
6
### CLI Frontend
7
8
Main entry point for Flink command line interface providing job management operations.
9
10
```java { .api }
11
/**
12
* Main entry point for Flink command line interface
13
*/
14
public class CliFrontend {
15
/**
16
* Creates CLI frontend with configuration and custom command lines
17
* @param configuration Flink configuration
18
* @param customCommandLines List of custom command line implementations
19
*/
20
public CliFrontend(Configuration configuration, List<CustomCommandLine> customCommandLines);
21
22
/**
23
* Main method for CLI execution
24
* @param args Command line arguments
25
*/
26
public static void main(String[] args);
27
28
/**
29
* Runs CLI with given arguments
30
* @param args Command line arguments
31
* @return Exit code (0 for success, non-zero for error)
32
*/
33
public int run(String[] args);
34
35
/**
36
* Lists jobs on the cluster
37
* @param args Command line arguments for list operation
38
* @return Exit code
39
*/
40
public int list(String[] args);
41
42
/**
43
* Cancels a running job
44
* @param args Command line arguments for cancel operation
45
* @return Exit code
46
*/
47
public int cancel(String[] args);
48
49
/**
50
* Stops a running job
51
* @param args Command line arguments for stop operation
52
* @return Exit code
53
*/
54
public int stop(String[] args);
55
56
/**
57
* Manages savepoints (create, dispose, etc.)
58
* @param args Command line arguments for savepoint operation
59
* @return Exit code
60
*/
61
public int savepoint(String[] args);
62
}
63
```
64
65
**Usage Example:**
66
67
```java
68
import org.apache.flink.client.cli.CliFrontend;
69
import org.apache.flink.configuration.Configuration;
70
71
// Create CLI frontend
72
Configuration config = new Configuration();
73
List<CustomCommandLine> customCLIs = Arrays.asList(new DefaultCLI(), new GenericCLI());
74
CliFrontend cli = new CliFrontend(config, customCLIs);
75
76
// Execute CLI commands programmatically
77
int exitCode;
78
79
// List jobs
80
exitCode = cli.list(new String[]{"list"});
81
82
// Cancel a job
83
exitCode = cli.cancel(new String[]{"cancel", "a1b2c3d4e5f6"});
84
85
// Create savepoint
86
exitCode = cli.savepoint(new String[]{"savepoint", "a1b2c3d4e5f6", "/path/to/savepoint"});
87
```
88
89
### Custom Command Line Interface
90
91
Interface and implementations for extending CLI functionality.
92
93
```java { .api }
94
/**
95
* Interface for custom command line implementations
96
*/
97
public interface CustomCommandLine {
98
/**
99
* Checks if this command line is active for given arguments
100
* @param commandLine Parsed command line
101
* @return true if this CLI should handle the command
102
*/
103
boolean isActive(CommandLine commandLine);
104
105
/**
106
* Gets the unique identifier for this command line
107
* @return Unique string identifier
108
*/
109
String getId();
110
111
/**
112
* Adds run options to the options parser
113
* @param options Options instance to add to
114
*/
115
void addRunOptions(Options options);
116
117
/**
118
* Adds general options to the options parser
119
* @param options Options instance to add to
120
*/
121
void addGeneralOptions(Options options);
122
123
/**
124
* Applies command line options to configuration
125
* @param commandLine Parsed command line
126
* @return Updated configuration with CLI options applied
127
*/
128
Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine);
129
}
130
131
/**
132
* Base class for custom command line interfaces
133
*/
134
public abstract class AbstractCustomCommandLine implements CustomCommandLine {
135
@Override
136
public boolean isActive(CommandLine commandLine);
137
138
@Override
139
public String getId();
140
141
@Override
142
public void addRunOptions(Options options);
143
144
@Override
145
public void addGeneralOptions(Options options);
146
}
147
148
/**
149
* Default command line interface implementation
150
*/
151
public class DefaultCLI extends AbstractCustomCommandLine {
152
// Default implementation for standard Flink CLI operations
153
}
154
155
/**
156
* Generic command line interface
157
*/
158
public class GenericCLI implements CustomCommandLine {
159
@Override
160
public boolean isActive(CommandLine commandLine);
161
162
@Override
163
public String getId();
164
165
@Override
166
public void addRunOptions(Options options);
167
168
@Override
169
public void addGeneralOptions(Options options);
170
171
@Override
172
public Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine);
173
}
174
```
175
176
### CLI Frontend Parser
177
178
Parser for command line arguments with support for different operation modes.
179
180
```java { .api }
181
/**
182
* Parses command line arguments for Flink CLI
183
*/
184
public class CliFrontendParser {
185
/**
186
* Parses command line arguments
187
* @param options Available options
188
* @param args Arguments to parse
189
* @param stopAtNonOptions Whether to stop parsing at non-options
190
* @return Parsed CommandLine instance
191
*/
192
public static CommandLine parse(Options options, String[] args, boolean stopAtNonOptions);
193
194
/**
195
* Gets options for run command
196
* @return Options instance with run command options
197
*/
198
public static Options getRunCommandOptions();
199
200
/**
201
* Gets options for cancel command
202
* @return Options instance with cancel command options
203
*/
204
public static Options getCancelCommandOptions();
205
206
/**
207
* Gets options for stop command
208
* @return Options instance with stop command options
209
*/
210
public static Options getStopCommandOptions();
211
212
/**
213
* Gets options for savepoint command
214
* @return Options instance with savepoint command options
215
*/
216
public static Options getSavepointCommandOptions();
217
218
/**
219
* Gets options for list command
220
* @return Options instance with list command options
221
*/
222
public static Options getListCommandOptions();
223
}
224
```
225
226
### Command Line Options Classes
227
228
Specialized classes for parsing different types of CLI operations.
229
230
```java { .api }
231
/**
232
* Base class for command line option parsing
233
*/
234
public abstract class CommandLineOptions {
235
/**
236
* Creates command line options from parsed command line
237
* @param cmdLine Parsed command line
238
*/
239
protected CommandLineOptions(CommandLine cmdLine);
240
}
241
242
/**
243
* Command line options for cancel operations
244
*/
245
public class CancelOptions extends CommandLineOptions {
246
/**
247
* Creates cancel options from command line
248
* @param cmdLine Parsed command line
249
*/
250
public CancelOptions(CommandLine cmdLine);
251
252
/**
253
* Gets the job ID to cancel
254
* @return JobID instance
255
*/
256
public JobID getJobId();
257
}
258
259
/**
260
* Options for list operations
261
*/
262
public class ListOptions extends CommandLineOptions {
263
/**
264
* Creates list options from command line
265
* @param cmdLine Parsed command line
266
*/
267
public ListOptions(CommandLine cmdLine);
268
269
/**
270
* Checks if running jobs should be shown
271
* @return true if showing running jobs
272
*/
273
public boolean showRunning();
274
275
/**
276
* Checks if scheduled jobs should be shown
277
* @return true if showing scheduled jobs
278
*/
279
public boolean showScheduled();
280
281
/**
282
* Checks if all jobs should be shown
283
* @return true if showing all jobs
284
*/
285
public boolean showAll();
286
}
287
288
/**
289
* Program-specific command line options
290
*/
291
public class ProgramOptions extends CommandLineOptions {
292
/**
293
* Creates program options from command line
294
* @param cmdLine Parsed command line
295
*/
296
public ProgramOptions(CommandLine cmdLine);
297
298
/**
299
* Gets the JAR file path
300
* @return Path to JAR file
301
*/
302
public String getJarFilePath();
303
304
/**
305
* Gets the entry point class name
306
* @return Fully qualified class name
307
*/
308
public String getEntryPointClassName();
309
310
/**
311
* Gets program arguments
312
* @return Array of program arguments
313
*/
314
public String[] getProgramArgs();
315
316
/**
317
* Gets additional classpaths
318
* @return List of classpath URLs
319
*/
320
public List<URL> getClasspaths();
321
322
/**
323
* Gets the parallelism setting
324
* @return Parallelism value, -1 if not set
325
*/
326
public int getParallelism();
327
328
/**
329
* Gets detached mode setting
330
* @return true if running in detached mode
331
*/
332
public boolean getDetachedMode();
333
334
/**
335
* Gets shutdown on attached exit setting
336
* @return true if should shutdown on exit
337
*/
338
public boolean isShutdownOnAttachedExit();
339
}
340
341
/**
342
* Options for savepoint operations
343
*/
344
public class SavepointOptions extends CommandLineOptions {
345
/**
346
* Creates savepoint options from command line
347
* @param cmdLine Parsed command line
348
*/
349
public SavepointOptions(CommandLine cmdLine);
350
351
/**
352
* Gets the job ID for savepoint operation
353
* @return JobID instance
354
*/
355
public JobID getJobId();
356
357
/**
358
* Gets the savepoint path
359
* @return Path to savepoint
360
*/
361
public String getSavepointPath();
362
363
/**
364
* Checks if this is a dispose operation
365
* @return true if disposing savepoint
366
*/
367
public boolean isDispose();
368
369
/**
370
* Gets the target directory for savepoint
371
* @return Target directory path
372
*/
373
public String getTargetDirectory();
374
}
375
376
/**
377
* Options for stop operations
378
*/
379
public class StopOptions extends CommandLineOptions {
380
/**
381
* Creates stop options from command line
382
* @param cmdLine Parsed command line
383
*/
384
public StopOptions(CommandLine cmdLine);
385
386
/**
387
* Gets the job ID to stop
388
* @return JobID instance
389
*/
390
public JobID getJobId();
391
392
/**
393
* Checks if savepoint flag is set
394
* @return true if creating savepoint on stop
395
*/
396
public boolean hasSavepointFlag();
397
398
/**
399
* Gets the target directory for savepoint on stop
400
* @return Target directory path
401
*/
402
public String getTargetDirectory();
403
404
/**
405
* Checks if should advance to end of event time
406
* @return true if advancing to end of event time
407
*/
408
public boolean shouldAdvanceToEndOfEventTime();
409
}
410
```
411
412
**Usage Example:**
413
414
```java
415
import org.apache.flink.client.cli.*;
416
import org.apache.commons.cli.CommandLine;
417
418
// Parse program options
419
String[] args = {"run", "-p", "4", "-c", "com.example.Main", "app.jar", "--input", "data"};
420
Options options = CliFrontendParser.getRunCommandOptions();
421
CommandLine cmdLine = CliFrontendParser.parse(options, args, true);
422
423
ProgramOptions programOpts = new ProgramOptions(cmdLine);
424
System.out.println("JAR file: " + programOpts.getJarFilePath());
425
System.out.println("Main class: " + programOpts.getEntryPointClassName());
426
System.out.println("Parallelism: " + programOpts.getParallelism());
427
System.out.println("Args: " + Arrays.toString(programOpts.getProgramArgs()));
428
429
// Parse cancel options
430
String[] cancelArgs = {"cancel", "a1b2c3d4e5f6"};
431
Options cancelOptions = CliFrontendParser.getCancelCommandOptions();
432
CommandLine cancelCmdLine = CliFrontendParser.parse(cancelOptions, cancelArgs, false);
433
434
CancelOptions cancelOpts = new CancelOptions(cancelCmdLine);
435
System.out.println("Job to cancel: " + cancelOpts.getJobId());
436
```
437
438
### Configuration and Utilities
439
440
Utility classes for CLI configuration and option processing.
441
442
```java { .api }
443
/**
444
* Configuration options for Flink client
445
*/
446
public class ClientOptions {
447
/**
448
* Client retry period configuration option
449
*/
450
public static final ConfigOption<String> CLIENT_RETRY_PERIOD;
451
452
/**
453
* Client timeout configuration option
454
*/
455
public static final ConfigOption<Integer> CLIENT_TIMEOUT;
456
}
457
458
/**
459
* Utilities for program options
460
*/
461
public class ProgramOptionsUtils {
462
/**
463
* Creates configuration from program options
464
* @param programOptions Program options to convert
465
* @return Configuration with program options applied
466
*/
467
public static Configuration createConfigurationFromOptions(ProgramOptions programOptions);
468
}
469
470
/**
471
* Utilities for handling dynamic properties
472
*/
473
public class DynamicPropertiesUtil {
474
/**
475
* Creates configuration from dynamic properties
476
* @param dynamicProperties List of dynamic property strings (key=value format)
477
* @return Configuration with dynamic properties applied
478
*/
479
public static Configuration createConfigFromDynamicProperties(List<String> dynamicProperties);
480
}
481
482
/**
483
* Access and manipulate execution configuration
484
*/
485
public class ExecutionConfigAccessor {
486
/**
487
* Creates ExecutionConfig from Configuration
488
* @param configuration Flink configuration
489
* @return ExecutionConfig instance
490
*/
491
public static ExecutionConfig fromConfiguration(Configuration configuration);
492
}
493
```
494
495
**Usage Example:**
496
497
```java
498
import org.apache.flink.client.cli.ProgramOptionsUtils;
499
import org.apache.flink.client.cli.DynamicPropertiesUtil;
500
501
// Create configuration from program options
502
Configuration config = ProgramOptionsUtils.createConfigurationFromOptions(programOptions);
503
504
// Apply dynamic properties
505
List<String> dynamicProps = Arrays.asList(
506
"parallelism.default=8",
507
"taskmanager.memory.process.size=2g"
508
);
509
Configuration dynamicConfig = DynamicPropertiesUtil.createConfigFromDynamicProperties(dynamicProps);
510
511
// Merge configurations
512
config.addAll(dynamicConfig);
513
514
// Get execution config
515
ExecutionConfig execConfig = ExecutionConfigAccessor.fromConfiguration(config);
516
```
517
518
## Exception Types
519
520
```java { .api }
521
/**
522
* Exception for command line argument errors
523
*/
524
public class CliArgsException extends Exception {
525
/**
526
* Creates exception with message
527
* @param message Error message
528
*/
529
public CliArgsException(String message);
530
531
/**
532
* Creates exception with message and cause
533
* @param message Error message
534
* @param cause Root cause throwable
535
*/
536
public CliArgsException(String message, Throwable cause);
537
}
538
```
539
540
## Types
541
542
```java { .api }
543
public class CommandLine {
544
// Apache Commons CLI CommandLine class
545
public String[] getArgs();
546
public String getOptionValue(String opt);
547
public boolean hasOption(String opt);
548
public String[] getOptionValues(String opt);
549
}
550
551
public class Options {
552
// Apache Commons CLI Options class
553
public Options addOption(String opt, boolean hasArg, String description);
554
public Options addOption(String opt, String longOpt, boolean hasArg, String description);
555
}
556
557
public interface ConfigOption<T> {
558
String key();
559
T defaultValue();
560
Class<T> getClazz();
561
}
562
563
public class ExecutionConfig implements Serializable {
564
public void setParallelism(int parallelism);
565
public int getParallelism();
566
public void setMaxParallelism(int maxParallelism);
567
public int getMaxParallelism();
568
}
569
```