0
# Client Core
1
2
Core client utilities and translators for Flink pipelines, providing essential functionality for program execution, pipeline translation, and classloader management.
3
4
## Capabilities
5
6
### Client Utils
7
8
Utility functions for Flink client operations including classloader creation, program execution, and job result handling.
9
10
```java { .api }
11
/**
12
* Utility functions for Flink client operations
13
*/
14
public enum ClientUtils {
15
/**
16
* Creates user code classloader with specified JARs and classpaths
17
* @param jars List of JAR URLs to include
18
* @param classpaths List of classpath URLs to include
19
* @param parent Parent classloader
20
* @param configuration Flink configuration
21
* @return URLClassLoader for user code execution
22
*/
23
public static URLClassLoader buildUserCodeClassLoader(
24
List<URL> jars,
25
List<URL> classpaths,
26
ClassLoader parent,
27
Configuration configuration);
28
29
/**
30
* Executes a packaged program using the provided executor service
31
* @param executorServiceLoader Service loader for pipeline executors
32
* @param configuration Flink configuration
33
* @param program Packaged program to execute
34
* @param enforceSingleJobExecution Whether to enforce single job execution
35
* @param suppressSysout Whether to suppress system output during execution
36
*/
37
public static void executeProgram(
38
PipelineExecutorServiceLoader executorServiceLoader,
39
Configuration configuration,
40
PackagedProgram program,
41
boolean enforceSingleJobExecution,
42
boolean suppressSysout);
43
44
/**
45
* Waits until job initialization is finished, blocks until the job status is not INITIALIZING
46
* @param jobStatusSupplier Supplier returning the job status
47
* @param jobResultSupplier Supplier returning the job result (called only if job reaches FAILED state)
48
* @param userCodeClassloader User code classloader for exception deserialization
49
* @throws JobInitializationException If the initialization failed
50
*/
51
public static void waitUntilJobInitializationFinished(
52
SupplierWithException<JobStatus, Exception> jobStatusSupplier,
53
SupplierWithException<JobResult, Exception> jobResultSupplier,
54
ClassLoader userCodeClassloader) throws JobInitializationException;
55
56
}
57
```
58
59
**Usage Example:**
60
61
```java
62
import org.apache.flink.client.ClientUtils;
63
import org.apache.flink.configuration.Configuration;
64
65
// Create user code classloader
66
List<URL> jars = Arrays.asList(new File("user-job.jar").toURI().toURL());
67
List<URL> classpaths = Arrays.asList(new File("lib/").toURI().toURL());
68
URLClassLoader userClassLoader = ClientUtils.buildUserCodeClassLoader(
69
jars, classpaths, Thread.currentThread().getContextClassLoader(), new Configuration());
70
71
// Execute a program
72
Configuration config = new Configuration();
73
PackagedProgram program = PackagedProgram.newBuilder()
74
.setJarFile(new File("user-job.jar"))
75
.build();
76
77
PipelineExecutorServiceLoader executorLoader =
78
DefaultExecutorServiceLoader.INSTANCE;
79
ClientUtils.executeProgram(executorLoader, config, program, false, false);
80
```
81
82
### Pipeline Translation
83
84
Interface and utilities for translating Flink pipelines into executable job graphs.
85
86
```java { .api }
87
/**
88
* Interface for translating Flink pipelines into executable job graphs
89
*/
90
public interface FlinkPipelineTranslator {
91
/**
92
* Translates a pipeline into a JobGraph
93
* @param pipeline The pipeline to translate (DataStream or DataSet)
94
* @param optimizerConfiguration Configuration for optimization
95
* @param defaultParallelism Default parallelism for operations
96
* @return JobGraph ready for execution
97
*/
98
JobGraph translateToJobGraph(
99
Pipeline pipeline,
100
Configuration optimizerConfiguration,
101
int defaultParallelism);
102
}
103
104
/**
105
* Utility methods for pipeline translation
106
*/
107
public class FlinkPipelineTranslationUtil {
108
/**
109
* Gets JobGraph from pipeline with configuration
110
* @param pipeline Pipeline to translate
111
* @param configuration Flink configuration
112
* @param defaultParallelism Default parallelism
113
* @return Translated JobGraph
114
*/
115
public static JobGraph getJobGraph(
116
Pipeline pipeline,
117
Configuration configuration,
118
int defaultParallelism);
119
}
120
```
121
122
**Usage Example:**
123
124
```java
125
import org.apache.flink.client.FlinkPipelineTranslationUtil;
126
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
127
import org.apache.flink.streaming.api.datastream.DataStream;
128
129
// Create a DataStream pipeline
130
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
131
DataStream<String> stream = env.socketTextStream("localhost", 9999);
132
stream.print();
133
134
// Translate to JobGraph
135
Pipeline pipeline = env.getStreamGraph();
136
Configuration config = new Configuration();
137
JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(pipeline, config, 1);
138
```
139
140
### DataSet Plan Translation
141
142
Translator for DataSet API plans to job graphs for batch processing workloads.
143
144
```java { .api }
145
/**
146
* Translates DataSet API plans to job graphs
147
*/
148
public class PlanTranslator implements FlinkPipelineTranslator {
149
/**
150
* Translates DataSet plan to JobGraph
151
* @param pipeline DataSet pipeline to translate
152
* @param optimizerConfiguration Optimizer configuration
153
* @param defaultParallelism Default parallelism for operations
154
* @return JobGraph for DataSet execution
155
*/
156
@Override
157
public JobGraph translateToJobGraph(
158
Pipeline pipeline,
159
Configuration optimizerConfiguration,
160
int defaultParallelism);
161
}
162
```
163
164
### Stream Graph Translation
165
166
Translator for stream graphs to job graphs for streaming workloads.
167
168
```java { .api }
169
/**
170
* Translates stream graphs to job graphs
171
*/
172
public class StreamGraphTranslator implements FlinkPipelineTranslator {
173
/**
174
* Translates StreamGraph to JobGraph
175
* @param pipeline StreamGraph to translate
176
* @param optimizerConfiguration Optimizer configuration
177
* @param defaultParallelism Default parallelism for operations
178
* @return JobGraph for streaming execution
179
*/
180
@Override
181
public JobGraph translateToJobGraph(
182
Pipeline pipeline,
183
Configuration optimizerConfiguration,
184
int defaultParallelism);
185
}
186
```
187
188
## Types
189
190
```java { .api }
191
public interface Pipeline {
192
// Base interface for DataStream and DataSet pipelines
193
}
194
195
public interface SupplierWithException<T, E extends Exception> {
196
T get() throws E;
197
}
198
199
public interface WaitStrategy {
200
long sleepTime(long attemptCount);
201
}
202
203
public interface PipelineExecutorServiceLoader {
204
Stream<PipelineExecutorFactory> getExecutorFactories();
205
}
206
207
public class DefaultExecutorServiceLoader implements PipelineExecutorServiceLoader {
208
public static final DefaultExecutorServiceLoader INSTANCE;
209
210
@Override
211
public Stream<PipelineExecutorFactory> getExecutorFactories();
212
}
213
214
public interface JobClient extends AutoCloseable {
215
JobID getJobId();
216
CompletableFuture<JobStatus> getJobStatus();
217
CompletableFuture<JobResult> getJobExecutionResult();
218
CompletableFuture<Void> cancel();
219
}
220
221
public class JobInitializationException extends Exception {
222
public JobInitializationException(String message);
223
public JobInitializationException(String message, Throwable cause);
224
}
225
```