0
# Apache Flink Client APIs (flink-clients)
1
2
## Package Overview
3
4
The Apache Flink flink-clients module provides comprehensive client-side APIs and utilities for interacting with Apache Flink clusters. This module serves as the primary programmatic interface for submitting and managing Flink jobs, offering functionality for packaging user programs, translating execution plans, managing cluster connections, and handling job lifecycle operations across different deployment scenarios.
5
6
Key features include:
7
- Command-line interface (CLI) for interactive job management
8
- Cluster deployment and management utilities
9
- Program packaging and execution abstractions
10
- Pipeline translation services for converting high-level Flink programs into executable job graphs
11
- REST client communication for remote cluster interaction
12
- Support for standalone, YARN, Kubernetes, and local execution environments
13
14
## Package Information
15
16
**Maven Coordinates:**
17
```xml
18
<dependency>
19
<groupId>org.apache.flink</groupId>
20
<artifactId>flink-clients_2.11</artifactId>
21
<version>1.14.6</version>
22
</dependency>
23
```
24
25
**Requirements:**
26
- Java 8 or higher
27
- Apache Flink runtime (same version)
28
29
**Documentation:** https://flink.apache.org/
30
31
**License:** Apache License 2.0
32
33
## Core Imports
34
35
### Essential Types
36
```java
37
// Core client utilities
38
import org.apache.flink.client.ClientUtils;
39
import org.apache.flink.client.FlinkPipelineTranslationUtil;
40
import org.apache.flink.client.FlinkPipelineTranslator;
41
42
// CLI and deployment
43
import org.apache.flink.client.cli.CliFrontend;
44
import org.apache.flink.client.cli.CustomCommandLine;
45
import org.apache.flink.client.deployment.ClusterClientServiceLoader;
46
import org.apache.flink.client.deployment.ClusterClientFactory;
47
import org.apache.flink.client.deployment.ClusterDescriptor;
48
49
// Program execution
50
import org.apache.flink.client.program.ClusterClient;
51
import org.apache.flink.client.program.PackagedProgram;
52
import org.apache.flink.client.program.ClusterClientProvider;
53
54
// Application deployment
55
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
56
import org.apache.flink.client.deployment.application.ApplicationClusterEntryPoint;
57
58
// REST communication
59
import org.apache.flink.client.program.rest.RestClusterClient;
60
import org.apache.flink.client.program.rest.RestClusterClientConfiguration;
61
62
// Configuration and exceptions
63
import org.apache.flink.configuration.Configuration;
64
import org.apache.flink.api.common.JobID;
65
import org.apache.flink.runtime.jobgraph.JobGraph;
66
```
67
68
### Maven Dependency
69
```xml
70
<dependency>
71
<groupId>org.apache.flink</groupId>
72
<artifactId>flink-clients_2.11</artifactId>
73
<version>1.14.6</version>
74
</dependency>
75
```
76
77
## Basic Usage
78
79
### Simple Job Submission
80
```java
81
// Configure Flink cluster connection
82
Configuration config = new Configuration();
83
config.setString("rest.address", "localhost");
84
config.setInteger("rest.port", 8081);
85
86
// Create packaged program
87
PackagedProgram program = PackagedProgram.newBuilder()
88
.setJarFile(new File("my-flink-job.jar"))
89
.setEntryPointClassName("com.example.MyFlinkJob")
90
.setArguments("--input", "/path/to/input")
91
.build();
92
93
// Execute program
94
ClusterClientServiceLoader serviceLoader = new DefaultClusterClientServiceLoader();
95
ClientUtils.executeProgram(serviceLoader, config, program, false, false);
96
```
97
98
### CLI Usage
99
```java
100
// Initialize CLI frontend
101
Configuration config = GlobalConfiguration.loadConfiguration();
102
List<CustomCommandLine> customCommandLines = CliFrontend.loadCustomCommandLines(config, configDir);
103
CliFrontend cli = new CliFrontend(config, customCommandLines);
104
105
// Parse and execute command
106
int exitCode = cli.parseAndRun(new String[]{"run", "my-job.jar", "--parallelism", "4"});
107
```
108
109
## Architecture
110
111
The flink-clients module is organized into several key architectural layers:
112
113
### 1. Core Client Layer (`org.apache.flink.client`)
114
The foundation layer providing essential client utilities, pipeline translation interfaces, and common abstractions for interacting with Flink clusters.
115
116
### 2. Command Line Interface (`org.apache.flink.client.cli`)
117
Interactive command-line tools and parsers for job management operations including run, list, cancel, stop, and savepoint commands.
118
119
### 3. Cluster Management (`org.apache.flink.client.deployment.*`)
120
Deployment and cluster interaction services supporting multiple deployment targets through pluggable factory patterns.
121
122
### 4. Program Execution (`org.apache.flink.client.program.*`)
123
Program packaging, execution environments, and cluster client implementations for job lifecycle management.
124
125
### 5. REST Communication (`org.apache.flink.client.program.rest.*`)
126
REST-based cluster communication with retry logic and SSL support for remote cluster interaction.
127
128
## Capabilities
129
130
### CLI Operations { .api }
131
132
Command-line interface functionality for interactive job management.
133
134
```java
135
// Main CLI entry point
136
public class CliFrontend {
137
public CliFrontend(Configuration configuration, List<CustomCommandLine> customCommandLines) { }
138
public int parseAndRun(String[] args) { }
139
public static void main(String[] args) { }
140
}
141
142
// Custom command line interface
143
public interface CustomCommandLine {
144
boolean isActive(CommandLine commandLine);
145
String getId();
146
void addRunOptions(Options baseOptions);
147
Configuration toConfiguration(CommandLine commandLine);
148
}
149
150
// Program execution options
151
public class ProgramOptions extends CommandLineOptions {
152
public ProgramOptions(CommandLine line) { }
153
public String getJarFilePath() { }
154
public String getEntryPointClassName() { }
155
public String[] getProgramArgs() { }
156
public int getParallelism() { }
157
}
158
```
159
160
**Required imports:**
161
```java
162
import org.apache.flink.client.cli.CliFrontend;
163
import org.apache.flink.client.cli.CustomCommandLine;
164
import org.apache.flink.client.cli.ProgramOptions;
165
import org.apache.flink.configuration.Configuration;
166
import org.apache.commons.cli.CommandLine;
167
import org.apache.commons.cli.Options;
168
```
169
170
[CLI Operations Documentation](./cli-operations.md)
171
172
### Cluster Management { .api }
173
174
Deployment and cluster interaction services supporting multiple deployment targets.
175
176
```java
177
// Cluster client factory interface
178
public interface ClusterClientFactory<ClusterID> {
179
boolean isCompatibleWith(Configuration configuration);
180
ClusterDescriptor<ClusterID> createClusterDescriptor(Configuration configuration);
181
ClusterID getClusterId(Configuration configuration);
182
ClusterSpecification getClusterSpecification(Configuration configuration);
183
}
184
185
// Cluster descriptor for deployment operations
186
public interface ClusterDescriptor<T> extends AutoCloseable {
187
String getClusterDescription();
188
ClusterClientProvider<T> retrieve(T clusterId);
189
ClusterClientProvider<T> deploySessionCluster(ClusterSpecification clusterSpecification);
190
ClusterClientProvider<T> deployApplicationCluster(ClusterSpecification clusterSpecification, ApplicationConfiguration applicationConfiguration);
191
}
192
193
// Service loader for cluster clients
194
public interface ClusterClientServiceLoader {
195
<ClusterID> ClusterClientFactory<ClusterID> getClusterClientFactory(Configuration configuration);
196
}
197
198
// Cluster specification
199
public class ClusterSpecification {
200
public ClusterSpecification(int masterMemoryMB, int taskManagerMemoryMB, int slotsPerTaskManager) { }
201
public int getMasterMemoryMB() { }
202
public int getTaskManagerMemoryMB() { }
203
public int getSlotsPerTaskManager() { }
204
}
205
```
206
207
**Required imports:**
208
```java
209
import org.apache.flink.client.deployment.ClusterClientFactory;
210
import org.apache.flink.client.deployment.ClusterDescriptor;
211
import org.apache.flink.client.deployment.ClusterClientServiceLoader;
212
import org.apache.flink.client.deployment.ClusterSpecification;
213
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
214
import org.apache.flink.client.program.ClusterClientProvider;
215
import org.apache.flink.configuration.Configuration;
216
```
217
218
[Cluster Management Documentation](./cluster-management.md)
219
220
### Program Execution { .api }
221
222
Program packaging, execution environments, and job lifecycle management.
223
224
```java
225
// Main cluster client interface
226
public interface ClusterClient<T> extends AutoCloseable {
227
T getClusterId();
228
Configuration getFlinkConfiguration();
229
String getWebInterfaceURL();
230
CompletableFuture<Collection<JobStatusMessage>> listJobs();
231
CompletableFuture<JobID> submitJob(JobGraph jobGraph);
232
CompletableFuture<JobStatus> getJobStatus(JobID jobId);
233
CompletableFuture<JobResult> requestJobResult(JobID jobId);
234
CompletableFuture<Acknowledge> cancel(JobID jobId);
235
CompletableFuture<String> triggerSavepoint(JobID jobId, String savepointDirectory);
236
}
237
238
// Packaged program representation
239
public class PackagedProgram implements AutoCloseable {
240
public String getMainClassName() { }
241
public String[] getArguments() { }
242
public ClassLoader getUserCodeClassLoader() { }
243
public Configuration getConfiguration() { }
244
245
public static Builder newBuilder() { }
246
247
public static class Builder {
248
public Builder setJarFile(File jarFile) { }
249
public Builder setEntryPointClassName(String entryPointClassName) { }
250
public Builder setArguments(String... arguments) { }
251
public Builder setConfiguration(Configuration configuration) { }
252
public PackagedProgram build() { }
253
}
254
}
255
256
// Client utilities
257
public enum ClientUtils {
258
public static void executeProgram(PipelineExecutorServiceLoader executorServiceLoader,
259
Configuration configuration,
260
PackagedProgram program,
261
boolean enforceSingleJobExecution,
262
boolean suppressSysout);
263
264
public static URLClassLoader buildUserCodeClassLoader(List<URL> jars,
265
List<URL> classpaths,
266
ClassLoader parent,
267
Configuration configuration);
268
}
269
```
270
271
**Required imports:**
272
```java
273
import org.apache.flink.client.program.ClusterClient;
274
import org.apache.flink.client.program.PackagedProgram;
275
import org.apache.flink.client.ClientUtils;
276
import org.apache.flink.api.common.JobID;
277
import org.apache.flink.runtime.jobgraph.JobGraph;
278
import org.apache.flink.runtime.jobmaster.JobResult;
279
import org.apache.flink.api.common.JobStatus;
280
import org.apache.flink.runtime.messages.Acknowledge;
281
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
282
import org.apache.flink.configuration.Configuration;
283
import java.util.concurrent.CompletableFuture;
284
import java.util.Collection;
285
import java.io.File;
286
import java.net.URL;
287
import java.util.List;
288
```
289
290
[Program Execution Documentation](./program-execution.md)
291
292
### REST Client Communication { .api }
293
294
REST-based cluster communication with retry logic and SSL support.
295
296
```java
297
// REST cluster client
298
public class RestClusterClient<T> implements ClusterClient<T> {
299
public RestClusterClient(Configuration configuration,
300
RestClusterClientConfiguration restClusterClientConfiguration,
301
T clusterId) { }
302
303
public RestClusterClient(Configuration configuration,
304
RestClusterClientConfiguration restClusterClientConfiguration,
305
T clusterId,
306
WaitStrategy waitStrategy) { }
307
}
308
309
// REST client configuration
310
public class RestClusterClientConfiguration {
311
public static RestClusterClientConfiguration fromConfiguration(Configuration config) { }
312
public long getConnectionTimeout() { }
313
public long getIdlenessTimeout() { }
314
public int getMaxRetryAttempts() { }
315
public long getRetryDelay() { }
316
public AwaitingTime getAwaitLeaderTimeout() { }
317
}
318
319
// Wait strategy for retry logic
320
public interface WaitStrategy {
321
long sleepTime(long attempt);
322
}
323
324
// Exponential backoff implementation
325
public class ExponentialWaitStrategy implements WaitStrategy {
326
public ExponentialWaitStrategy(long initialWait, long maxWait) { }
327
public long sleepTime(long attempt) { }
328
}
329
```
330
331
**Required imports:**
332
```java
333
import org.apache.flink.client.program.rest.RestClusterClient;
334
import org.apache.flink.client.program.rest.RestClusterClientConfiguration;
335
import org.apache.flink.client.program.rest.retry.WaitStrategy;
336
import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
337
import org.apache.flink.configuration.Configuration;
338
import org.apache.flink.util.concurrent.ScheduledExecutor;
339
```
340
341
[REST Client Communication Documentation](./rest-client-communication.md)
342
343
### Application Deployment { .api }
344
345
Specialized deployment utilities for application cluster mode.
346
347
```java
348
// Application configuration
349
public class ApplicationConfiguration {
350
public ApplicationConfiguration(String[] programArguments, String entryPointClassName) { }
351
public ApplicationConfiguration(String[] programArguments,
352
String entryPointClassName,
353
SavepointRestoreSettings savepointRestoreSettings) { }
354
355
public String[] getProgramArguments() { }
356
public String getEntryPointClassName() { }
357
public SavepointRestoreSettings getSavepointRestoreSettings() { }
358
}
359
360
// Application deployer interface
361
public interface ApplicationDeployer {
362
void run(Configuration effectiveConfiguration, ApplicationConfiguration applicationConfiguration);
363
}
364
365
// Application cluster deployer
366
public class ApplicationClusterDeployer implements ApplicationDeployer {
367
public ApplicationClusterDeployer(ClusterClientServiceLoader clusterClientServiceLoader) { }
368
public void run(Configuration configuration, ApplicationConfiguration applicationConfiguration) { }
369
}
370
371
// Application cluster entry point
372
public class ApplicationClusterEntryPoint extends ClusterEntrypoint {
373
public static void main(String[] args) { }
374
}
375
```
376
377
**Required imports:**
378
```java
379
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
380
import org.apache.flink.client.deployment.application.ApplicationClusterDeployer;
381
import org.apache.flink.client.deployment.application.ApplicationClusterEntryPoint;
382
import org.apache.flink.client.cli.ApplicationDeployer;
383
import org.apache.flink.client.deployment.ClusterClientServiceLoader;
384
import org.apache.flink.runtime.state.StateBackend;
385
import org.apache.flink.configuration.Configuration;
386
```
387
388
### Pipeline Translation { .api }
389
390
Pipeline translation services for converting high-level Flink programs into executable job graphs.
391
392
```java
393
// Pipeline translator interface
394
public interface FlinkPipelineTranslator {
395
JobGraph translateToJobGraph(Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism);
396
String translateToJSONExecutionPlan(Pipeline pipeline);
397
boolean canTranslate(Pipeline pipeline);
398
}
399
400
// Pipeline translation utilities
401
public final class FlinkPipelineTranslationUtil {
402
public static JobGraph getJobGraph(Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism) { }
403
public static JobGraph getJobGraphUnderUserClassLoader(ClassLoader userClassloader,
404
Pipeline pipeline,
405
Configuration configuration,
406
int defaultParallelism) { }
407
public static String translateToJSONExecutionPlan(Pipeline pipeline) { }
408
}
409
410
// Stream graph translator
411
public class StreamGraphTranslator implements FlinkPipelineTranslator {
412
public JobGraph translateToJobGraph(Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism) { }
413
public String translateToJSONExecutionPlan(Pipeline pipeline) { }
414
public boolean canTranslate(Pipeline pipeline) { }
415
}
416
```
417
418
**Required imports:**
419
```java
420
import org.apache.flink.client.FlinkPipelineTranslator;
421
import org.apache.flink.client.FlinkPipelineTranslationUtil;
422
import org.apache.flink.client.StreamGraphTranslator;
423
import org.apache.flink.api.dag.Pipeline;
424
import org.apache.flink.runtime.jobgraph.JobGraph;
425
import org.apache.flink.configuration.Configuration;
426
```