0
# Apache Flink Clients
1
2
Apache Flink Clients provides essential client-side APIs and utilities for interacting with Apache Flink clusters, enabling developers to submit, monitor, and manage stream processing and batch processing jobs programmatically. This library includes comprehensive client utilities for deploying packaged programs, managing job lifecycles, translating execution plans, and handling cluster communication through various deployment targets including YARN, Kubernetes, and standalone clusters.
3
4
## Package Information
5
6
- **Package Name**: org.apache.flink/flink-clients_2.12
7
- **Package Type**: maven
8
- **Language**: Java (Scala 2.12 compatible)
9
- **Version**: 1.14.6
10
- **Installation**: Add to your Maven `pom.xml`:
11
12
```xml
13
<dependency>
14
<groupId>org.apache.flink</groupId>
15
<artifactId>flink-clients_2.12</artifactId>
16
<version>1.14.6</version>
17
</dependency>
18
```
19
20
## Core Imports
21
22
```java
23
import org.apache.flink.client.program.ClusterClient;
24
import org.apache.flink.client.program.PackagedProgram;
25
import org.apache.flink.client.deployment.ClusterDescriptor;
26
import org.apache.flink.client.deployment.ClusterClientFactory;
27
import org.apache.flink.client.cli.CliFrontend;
28
import org.apache.flink.configuration.Configuration;
29
import org.apache.flink.api.common.JobID;
30
import org.apache.flink.runtime.jobgraph.JobGraph;
31
```
32
33
## Basic Usage
34
35
```java
36
import org.apache.flink.client.program.PackagedProgram;
37
import org.apache.flink.client.program.ClusterClient;
38
import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
39
import org.apache.flink.client.deployment.StandaloneClusterId;
40
import org.apache.flink.configuration.Configuration;
41
import org.apache.flink.api.common.JobID;
42
43
// Create a packaged program from JAR
44
PackagedProgram program = PackagedProgram.newBuilder()
45
.setJarFile(new File("my-flink-job.jar"))
46
.setEntryPointClassName("com.example.MyFlinkJob")
47
.setArguments(new String[]{"arg1", "arg2"})
48
.build();
49
50
// Connect to a standalone cluster
51
Configuration config = new Configuration();
52
StandaloneClusterDescriptor clusterDescriptor =
53
new StandaloneClusterDescriptor(config);
54
StandaloneClusterId clusterId = new StandaloneClusterId();
55
56
try (ClusterClient<StandaloneClusterId> client =
57
clusterDescriptor.retrieve(clusterId).getClusterClient()) {
58
59
// Submit job and monitor
60
JobID jobId = client.submitJob(program.getJobGraph()).get();
61
System.out.println("Job submitted with ID: " + jobId);
62
63
// Wait for completion
64
client.requestJobResult(jobId).get();
65
}
66
```
67
68
## Architecture
69
70
Apache Flink Clients is built around several key components:
71
72
- **Cluster Management**: `ClusterClient` and `ClusterDescriptor` interfaces provide abstractions for different deployment targets (standalone, YARN, Kubernetes)
73
- **Program Packaging**: `PackagedProgram` encapsulates user JAR files with their classpaths, main classes, and execution parameters
74
- **Pipeline Translation**: Translators convert Flink DataStream and DataSet APIs into executable `JobGraph` instances
75
- **Execution Environments**: Context environments bridge user code with cluster execution infrastructure
76
- **CLI Framework**: Command line interfaces for job submission, monitoring, and cluster management operations
77
- **REST Integration**: REST clients enable communication with Flink clusters through HTTP APIs
78
79
## Capabilities
80
81
### Core Client Utilities
82
83
Essential client utilities and translators for Flink pipelines, including program execution and pipeline translation functionality.
84
85
```java { .api }
86
public enum ClientUtils {
87
;
88
89
public static URLClassLoader buildUserCodeClassLoader(
90
List<URL> jars,
91
List<URL> classpaths,
92
ClassLoader parent,
93
Configuration configuration);
94
95
public static void executeProgram(
96
PipelineExecutorServiceLoader executorServiceLoader,
97
Configuration configuration,
98
PackagedProgram program,
99
boolean enforceSingleJobExecution,
100
boolean suppressSysout);
101
102
public static void waitUntilJobInitializationFinished(
103
SupplierWithException<JobStatus, Exception> jobStatusSupplier,
104
SupplierWithException<JobResult, Exception> jobResultSupplier,
105
ClassLoader userCodeClassloader) throws JobInitializationException;
106
}
107
108
public interface FlinkPipelineTranslator {
109
JobGraph translateToJobGraph(
110
Pipeline pipeline,
111
Configuration optimizerConfiguration,
112
int defaultParallelism);
113
}
114
```
115
116
[Client Core](./client-core.md)
117
118
### Cluster Deployment and Management
119
120
Cluster deployment and management functionality for various deployment targets including standalone, containerized, and cloud environments.
121
122
```java { .api }
123
public interface ClusterClient<T> extends AutoCloseable {
124
T getClusterId();
125
Configuration getFlinkConfiguration();
126
CompletableFuture<JobID> submitJob(JobGraph jobGraph);
127
CompletableFuture<JobStatus> getJobStatus(JobID jobId);
128
CompletableFuture<JobResult> requestJobResult(JobID jobId);
129
CompletableFuture<Acknowledge> cancel(JobID jobId);
130
CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory);
131
}
132
133
public interface ClusterDescriptor<T> extends AutoCloseable {
134
ClusterClientProvider<T> retrieve(T clusterId) throws ClusterRetrieveException;
135
ClusterClientProvider<T> deploySessionCluster(ClusterSpecification clusterSpecification);
136
ClusterClientProvider<T> deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached);
137
}
138
```
139
140
[Cluster Management](./cluster-management.md)
141
142
### Program Packaging and Execution
143
144
Program packaging, classloader management, and execution utilities for submitting user applications to Flink clusters.
145
146
```java { .api }
147
public class PackagedProgram implements AutoCloseable {
148
public static Builder newBuilder();
149
public String[] getArguments();
150
public String getMainClassName();
151
public List<URL> getClasspaths();
152
public ClassLoader getUserCodeClassLoader();
153
public List<URL> getJobJarAndDependencies();
154
public void invokeInteractiveModeForExecution() throws ProgramInvocationException;
155
156
public static class Builder {
157
public Builder setJarFile(File jarFile);
158
public Builder setEntryPointClassName(String entryPointClassName);
159
public Builder setArguments(String... args);
160
public Builder setUserClassPaths(List<URL> userClassPaths);
161
public Builder setConfiguration(Configuration configuration);
162
public PackagedProgram build() throws ProgramInvocationException;
163
}
164
}
165
```
166
167
[Program Execution](./program-execution.md)
168
169
### Application Deployment
170
171
Application-specific deployment classes for running Flink applications in application mode with full lifecycle management.
172
173
```java { .api }
174
public class ApplicationConfiguration {
175
public String[] getProgramArguments();
176
public String getApplicationClassName();
177
public List<String> getApplicationClasspaths();
178
179
public static class ApplicationConfigurationBuilder {
180
public ApplicationConfigurationBuilder setApplicationClassName(String applicationClassName);
181
public ApplicationConfigurationBuilder setProgramArguments(String[] programArguments);
182
public ApplicationConfiguration build();
183
}
184
}
185
186
public interface ApplicationRunner {
187
CompletableFuture<Void> run(
188
DispatcherGateway dispatcherGateway,
189
PackagedProgram program,
190
Configuration configuration);
191
}
192
```
193
194
[Application Deployment](./application-deployment.md)
195
196
### Command Line Interface
197
198
Command line interface classes for Flink client operations including job submission, monitoring, and cluster management through CLI commands.
199
200
```java { .api }
201
public class CliFrontend {
202
public CliFrontend(Configuration configuration, List<CustomCommandLine> customCommandLines);
203
public static void main(String[] args);
204
public int run(String[] args);
205
public int list(String[] args);
206
public int cancel(String[] args);
207
public int stop(String[] args);
208
public int savepoint(String[] args);
209
}
210
211
public interface CustomCommandLine {
212
boolean isActive(CommandLine commandLine);
213
String getId();
214
void addRunOptions(Options options);
215
Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine);
216
}
217
```
218
219
[CLI Interface](./cli-interface.md)
220
221
### REST Client Implementation
222
223
REST client implementations for communicating with Flink clusters through HTTP APIs, including retry strategies and configuration management.
224
225
```java { .api }
226
public class RestClusterClient<T> implements ClusterClient<T> {
227
public RestClusterClient(Configuration config, T clusterId);
228
public RestClusterClient(
229
Configuration config,
230
RestClusterClientConfiguration clientConfiguration,
231
T clusterId,
232
RetryStrategy retryStrategy);
233
234
// Implements all ClusterClient methods with REST-specific implementations
235
}
236
237
public class RestClusterClientConfiguration {
238
public RestClientConfiguration getRestClientConfiguration();
239
public long getAwaitLeaderTimeout();
240
public int getRetryMaxAttempts();
241
public long getRetryDelay();
242
}
243
```
244
245
[REST Client](./rest-client.md)
246
247
## Types
248
249
```java { .api }
250
public class Configuration {
251
// Core Flink configuration class
252
public <T> T get(ConfigOption<T> option);
253
public <T> void set(ConfigOption<T> option, T value);
254
}
255
256
public class JobGraph {
257
// Represents executable job graph
258
public JobID getJobID();
259
public String getName();
260
}
261
262
public class JobID {
263
// Unique job identifier
264
public static JobID generate();
265
public static JobID fromHexString(String hexString);
266
}
267
268
public enum JobStatus {
269
INITIALIZING, CREATED, RUNNING, FAILING, FAILED,
270
CANCELLING, CANCELED, FINISHED, RESTARTING,
271
SUSPENDED, RECONCILING
272
}
273
274
public class JobResult {
275
public JobID getJobId();
276
public JobStatus getJobExecutionResult();
277
public Optional<Throwable> getSerializedThrowable();
278
}
279
280
public class ClusterSpecification {
281
public int getMasterMemoryMB();
282
public int getTaskManagerMemoryMB();
283
public int getSlotsPerTaskManager();
284
285
public static class ClusterSpecificationBuilder {
286
public ClusterSpecificationBuilder setMasterMemoryMB(int masterMemoryMB);
287
public ClusterSpecificationBuilder setTaskManagerMemoryMB(int taskManagerMemoryMB);
288
public ClusterSpecificationBuilder setSlotsPerTaskManager(int slotsPerTaskManager);
289
public ClusterSpecification createClusterSpecification();
290
}
291
}
292
293
// Exception Types
294
public class ProgramInvocationException extends Exception {
295
public ProgramInvocationException(String message);
296
public ProgramInvocationException(String message, JobID jobID);
297
public ProgramInvocationException(String message, Throwable cause);
298
}
299
300
public class ClusterDeploymentException extends FlinkException {
301
public ClusterDeploymentException(String message);
302
public ClusterDeploymentException(String message, Throwable cause);
303
}
304
305
public class ClusterRetrieveException extends FlinkException {
306
public ClusterRetrieveException(String message);
307
public ClusterRetrieveException(String message, Throwable cause);
308
}
309
310
public class JobInitializationException extends Exception {
311
public JobInitializationException(String message);
312
public JobInitializationException(String message, Throwable cause);
313
}
314
```