0
# Application Deployment
1
2
Application-specific deployment classes for running Flink applications in application mode with full lifecycle management. Provides specialized executors, job clients, and deployment utilities for application clusters.
3
4
## Capabilities
5
6
### Application Configuration
7
8
Configuration class for application deployment containing program arguments, class information, and classpaths.
9
10
```java { .api }
11
/**
12
* Configuration for application deployment
13
*/
14
public class ApplicationConfiguration {
15
/**
16
* Gets the program arguments for the application
17
* @return Array of program arguments
18
*/
19
public String[] getProgramArguments();
20
21
/**
22
* Gets the application main class name
23
* @return Fully qualified application class name
24
*/
25
public String getApplicationClassName();
26
27
/**
28
* Gets the application classpaths
29
* @return List of classpath strings
30
*/
31
public List<String> getApplicationClasspaths();
32
33
/**
34
* Builder for creating ApplicationConfiguration instances
35
*/
36
public static class ApplicationConfigurationBuilder {
37
/**
38
* Sets the application class name
39
* @param applicationClassName Fully qualified class name
40
* @return This builder instance
41
*/
42
public ApplicationConfigurationBuilder setApplicationClassName(String applicationClassName);
43
44
/**
45
* Sets the program arguments
46
* @param programArguments Array of program arguments
47
* @return This builder instance
48
*/
49
public ApplicationConfigurationBuilder setProgramArguments(String[] programArguments);
50
51
/**
52
* Sets the application classpaths
53
* @param applicationClasspaths List of classpath strings
54
* @return This builder instance
55
*/
56
public ApplicationConfigurationBuilder setApplicationClasspaths(List<String> applicationClasspaths);
57
58
/**
59
* Builds the ApplicationConfiguration
60
* @return ApplicationConfiguration instance
61
*/
62
public ApplicationConfiguration build();
63
}
64
}
65
```
66
67
**Usage Example:**
68
69
```java
70
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
71
72
// Create application configuration
73
ApplicationConfiguration appConfig = new ApplicationConfiguration.ApplicationConfigurationBuilder()
74
.setApplicationClassName("com.example.MyStreamingApplication")
75
.setProgramArguments(new String[]{"--parallelism", "4", "--input", "/data/stream"})
76
.setApplicationClasspaths(Arrays.asList("/path/to/app.jar", "/path/to/lib"))
77
.build();
78
79
System.out.println("Application class: " + appConfig.getApplicationClassName());
80
System.out.println("Arguments: " + Arrays.toString(appConfig.getProgramArguments()));
81
```
82
83
### Application Runner Interface
84
85
Interface for running applications with dispatcher integration.
86
87
```java { .api }
88
/**
89
* Runner for application execution
90
*/
91
public interface ApplicationRunner {
92
/**
93
* Runs an application with the given dispatcher, program, and configuration
94
* @param dispatcherGateway Gateway to the dispatcher
95
* @param program Packaged program to run
96
* @param configuration Flink configuration
97
* @return CompletableFuture that completes when application finishes
98
*/
99
CompletableFuture<Void> run(
100
DispatcherGateway dispatcherGateway,
101
PackagedProgram program,
102
Configuration configuration);
103
}
104
105
/**
106
* Runs applications in detached mode
107
*/
108
public class DetachedApplicationRunner implements ApplicationRunner {
109
@Override
110
public CompletableFuture<Void> run(
111
DispatcherGateway dispatcherGateway,
112
PackagedProgram program,
113
Configuration configuration);
114
}
115
```
116
117
### Application Cluster Entry Point
118
119
Entry point class for application clusters providing cluster startup and lifecycle management.
120
121
```java { .api }
122
/**
123
* Entry point for application clusters
124
*/
125
public class ApplicationClusterEntryPoint extends ClusterEntrypoint {
126
/**
127
* Main method for starting application clusters
128
* @param args Command line arguments
129
*/
130
public static void main(String[] args);
131
132
// Extends ClusterEntrypoint with application-specific initialization
133
}
134
```
135
136
### Application Job Clients
137
138
Specialized job clients for application mode execution.
139
140
```java { .api }
141
/**
142
* Embedded job client for application mode
143
*/
144
public class EmbeddedJobClient implements JobClient, CoordinationRequestGateway {
145
/**
146
* Creates embedded job client
147
* @param jobId Job ID
148
* @param jobResultFuture Future containing job result
149
* @param userCodeClassloader Classloader for user code
150
* @param coordinationRequestGateway Gateway for coordination requests
151
*/
152
public EmbeddedJobClient(
153
JobID jobId,
154
CompletableFuture<JobResult> jobResultFuture,
155
ClassLoader userCodeClassloader,
156
CoordinationRequestGateway coordinationRequestGateway);
157
158
// Implements JobClient interface for embedded execution
159
}
160
161
/**
162
* Job client for web submission
163
*/
164
public class WebSubmissionJobClient implements JobClient {
165
// Implements JobClient interface for web-based submission
166
// Used when submitting applications through REST API
167
}
168
```
169
170
### Entry Class Information Providers
171
172
Providers for extracting entry class information from different sources.
173
174
```java { .api }
175
/**
176
* Provides entry class information
177
*/
178
public interface EntryClassInformationProvider {
179
/**
180
* Gets the job class name
181
* @return Fully qualified job class name
182
*/
183
String getJobClassName();
184
185
/**
186
* Gets the job parameters
187
* @return List of job parameter strings
188
*/
189
List<String> getJobParameters();
190
}
191
192
/**
193
* Entry class information from classpath
194
*/
195
public class FromClasspathEntryClassInformationProvider implements EntryClassInformationProvider {
196
/**
197
* Creates provider from classpath configuration
198
* @param programClassName Program class name from classpath
199
* @param programArguments Program arguments
200
*/
201
public FromClasspathEntryClassInformationProvider(String programClassName, String[] programArguments);
202
203
@Override
204
public String getJobClassName();
205
206
@Override
207
public List<String> getJobParameters();
208
}
209
210
/**
211
* Entry class information from JAR manifest
212
*/
213
public class FromJarEntryClassInformationProvider implements EntryClassInformationProvider {
214
/**
215
* Creates provider from JAR file
216
* @param jarFile JAR file to analyze
217
* @param programArguments Program arguments
218
*/
219
public FromJarEntryClassInformationProvider(File jarFile, String[] programArguments);
220
221
@Override
222
public String getJobClassName();
223
224
@Override
225
public List<String> getJobParameters();
226
}
227
```
228
229
### JAR Manifest Parser
230
231
Utility for parsing JAR manifest files to extract entry class information.
232
233
```java { .api }
234
/**
235
* Parser for JAR manifest information
236
*/
237
public class JarManifestParser {
238
/**
239
* Finds entry class in JAR manifest
240
* @param jarFile JAR file to analyze
241
* @return Optional containing entry class name, empty if not found
242
*/
243
public static Optional<String> findEntryClass(JarFile jarFile);
244
}
245
```
246
247
**Usage Example:**
248
249
```java
250
import org.apache.flink.client.deployment.application.JarManifestParser;
251
import java.util.jar.JarFile;
252
253
// Parse JAR manifest for entry class
254
JarFile jarFile = new JarFile(new File("my-application.jar"));
255
Optional<String> entryClass = JarManifestParser.findEntryClass(jarFile);
256
257
if (entryClass.isPresent()) {
258
System.out.println("Found entry class: " + entryClass.get());
259
260
// Create entry class provider
261
EntryClassInformationProvider provider = new FromJarEntryClassInformationProvider(
262
new File("my-application.jar"),
263
new String[]{"--config", "production"}
264
);
265
266
System.out.println("Job class: " + provider.getJobClassName());
267
System.out.println("Parameters: " + provider.getJobParameters());
268
}
269
```
270
271
### Application Executors
272
273
Specialized executors for application mode deployment and execution.
274
275
```java { .api }
276
/**
277
* Executor for embedded application execution
278
*/
279
public class EmbeddedExecutor implements PipelineExecutor {
280
@Override
281
public CompletableFuture<JobClient> execute(
282
Pipeline pipeline,
283
Configuration configuration,
284
ClassLoader userCodeClassloader);
285
}
286
287
/**
288
* Factory for embedded executors
289
*/
290
public class EmbeddedExecutorFactory implements PipelineExecutorFactory {
291
@Override
292
public String getName();
293
294
@Override
295
public boolean isCompatibleWith(Configuration configuration);
296
297
@Override
298
public PipelineExecutor getExecutor(Configuration configuration);
299
}
300
301
/**
302
* Service loader for embedded executors
303
*/
304
public class EmbeddedExecutorServiceLoader implements PipelineExecutorServiceLoader {
305
@Override
306
public Stream<PipelineExecutorFactory> getExecutorFactories();
307
}
308
309
/**
310
* Factory for web submission executors
311
*/
312
public class WebSubmissionExecutorFactory implements PipelineExecutorFactory {
313
@Override
314
public String getName();
315
316
@Override
317
public boolean isCompatibleWith(Configuration configuration);
318
319
@Override
320
public PipelineExecutor getExecutor(Configuration configuration);
321
}
322
```
323
324
### Job Status Polling Utilities
325
326
Utilities for polling job status and handling job completion.
327
328
```java { .api }
329
/**
330
* Utilities for polling job status
331
*/
332
public class JobStatusPollingUtils {
333
/**
334
* Polls job result asynchronously
335
* @param jobStatusSupplier Supplier for job status
336
* @param jobResultSupplier Supplier for job result
337
* @param userCodeClassloader Classloader for user code
338
* @return CompletableFuture containing final job result
339
*/
340
public static CompletableFuture<JobResult> pollJobResultAsync(
341
Supplier<CompletableFuture<JobStatus>> jobStatusSupplier,
342
Supplier<CompletableFuture<JobResult>> jobResultSupplier,
343
ClassLoader userCodeClassloader);
344
}
345
```
346
347
### Application Dispatcher Components
348
349
Dispatcher-related components for application cluster management.
350
351
```java { .api }
352
/**
353
* Bootstrap for application dispatcher
354
*/
355
public class ApplicationDispatcherBootstrap implements DispatcherBootstrap {
356
// Implements dispatcher bootstrap for application mode
357
}
358
359
/**
360
* Factory for application dispatcher gateway services
361
*/
362
public class ApplicationDispatcherGatewayServiceFactory
363
extends AbstractDispatcherLeaderService.DispatcherGatewayServiceFactory {
364
// Creates dispatcher gateway services for application mode
365
}
366
367
/**
368
* Factory for application dispatcher leader process factory
369
*/
370
public class ApplicationDispatcherLeaderProcessFactoryFactory
371
implements DispatcherResourceManagerComponentFactory.DispatcherLeaderProcessFactoryFactory {
372
// Creates dispatcher leader process factories for application mode
373
}
374
```
375
376
### CLI Application Deployer
377
378
CLI-specific deployer for application clusters.
379
380
```java { .api }
381
/**
382
* Deploys application clusters via CLI
383
*/
384
public class ApplicationClusterDeployer implements ApplicationDeployer {
385
@Override
386
public <ClusterID> void run(
387
Configuration configuration,
388
ApplicationConfiguration applicationConfiguration);
389
}
390
```
391
392
## Exception Types
393
394
```java { .api }
395
/**
396
* Exception for application execution failures
397
*/
398
public class ApplicationExecutionException extends FlinkException {
399
/**
400
* Creates exception with message
401
* @param message Error message
402
*/
403
public ApplicationExecutionException(String message);
404
405
/**
406
* Creates exception with message and cause
407
* @param message Error message
408
* @param cause Root cause throwable
409
*/
410
public ApplicationExecutionException(String message, Throwable cause);
411
}
412
413
/**
414
* Exception for unsuccessful execution
415
*/
416
public class UnsuccessfulExecutionException extends JobExecutionException {
417
/**
418
* Creates exception with message, job ID, and cause
419
* @param message Error message
420
* @param jobId Job ID that failed
421
* @param cause Root cause throwable
422
*/
423
public UnsuccessfulExecutionException(String message, JobID jobId, Throwable cause);
424
}
425
```
426
427
## Types
428
429
```java { .api }
430
public interface ApplicationDeployer {
431
<ClusterID> void run(Configuration configuration, ApplicationConfiguration applicationConfiguration);
432
}
433
434
public interface DispatcherGateway {
435
// Gateway interface for dispatcher communication
436
CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Duration timeout);
437
CompletableFuture<Collection<JobStatusMessage>> listJobs(Duration timeout);
438
}
439
440
public interface CoordinationRequestGateway {
441
CompletableFuture<CoordinationResponse> sendCoordinationRequest(
442
JobID jobId,
443
OperatorID operatorId,
444
CoordinationRequest request);
445
}
446
447
public interface EmbeddedJobClientCreator {
448
CompletableFuture<JobClient> getJobClient(JobID jobId);
449
}
450
451
public abstract class ClusterEntrypoint {
452
protected abstract DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration);
453
public static void main(String[] args);
454
}
455
456
public interface DispatcherBootstrap {
457
CompletableFuture<Void> initializeServices() throws Exception;
458
}
459
460
public interface PipelineExecutor {
461
CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader);
462
}
463
464
public interface PipelineExecutorFactory {
465
String getName();
466
boolean isCompatibleWith(Configuration configuration);
467
PipelineExecutor getExecutor(Configuration configuration);
468
}
469
470
public interface PipelineExecutorServiceLoader {
471
Stream<PipelineExecutorFactory> getExecutorFactories();
472
}
473
474
public class JobExecutionException extends FlinkException {
475
public JobExecutionException(String message);
476
public JobExecutionException(String message, Throwable cause);
477
}
478
```