0
# Application Mode Deployment
1
2
Specialized deployment mode for long-running applications with dedicated cluster resources, lifecycle management, and optimized resource utilization for application-centric workloads.
3
4
## Capabilities
5
6
### Application Runner Interface
7
8
Core interface for running applications in application mode with full lifecycle management.
9
10
```java { .api }
11
/**
12
* Interface for running applications in application mode
13
*/
14
public interface ApplicationRunner {
15
/**
16
* Run application with dispatcher gateway and configuration
17
* @param dispatcherGateway Gateway to the dispatcher
18
* @param scheduledExecutor Executor for scheduled operations
19
* @param applicationConfiguration Application configuration
20
* @return Future completing when application finishes
21
*/
22
CompletableFuture<Void> run(
23
DispatcherGateway dispatcherGateway,
24
ScheduledExecutor scheduledExecutor,
25
ApplicationConfiguration applicationConfiguration
26
);
27
}
28
```
29
30
### Detached Application Runner
31
32
Application runner implementation for detached execution where the client doesn't wait for job completion.
33
34
```java { .api }
35
/**
36
* Application runner for detached execution
37
*/
38
public class DetachedApplicationRunner implements ApplicationRunner {
39
/**
40
* Create detached application runner
41
* @param highAvailabilityServices High availability services
42
* @param configuration Flink configuration
43
* @param mainThreadExecutor Main thread executor
44
*/
45
public DetachedApplicationRunner(
46
HighAvailabilityServices highAvailabilityServices,
47
Configuration configuration,
48
Executor mainThreadExecutor
49
);
50
51
@Override
52
public CompletableFuture<Void> run(
53
DispatcherGateway dispatcherGateway,
54
ScheduledExecutor scheduledExecutor,
55
ApplicationConfiguration applicationConfiguration
56
);
57
}
58
```
59
60
### Application Configuration
61
62
Configuration class for application deployments containing program arguments, entry point information, and execution settings.
63
64
```java { .api }
65
/**
66
* Configuration for application deployments
67
*/
68
public class ApplicationConfiguration {
69
/**
70
* Create configuration builder from Flink configuration
71
* @param configuration Flink configuration
72
* @return Configuration builder
73
*/
74
public static Builder fromConfiguration(Configuration configuration);
75
76
/**
77
* Get program arguments
78
* @return Array of program arguments
79
*/
80
public String[] getProgramArguments();
81
82
/**
83
* Get application arguments
84
* @return Array of application arguments
85
*/
86
public String[] getApplicationArgs();
87
88
/**
89
* Get parallelism setting
90
* @return Parallelism or null if not set
91
*/
92
@Nullable
93
public Integer getParallelism();
94
95
/**
96
* Get savepoint restore settings
97
* @return Savepoint restore settings
98
*/
99
public SavepointRestoreSettings getSavepointRestoreSettings();
100
101
/**
102
* Builder for application configuration
103
*/
104
public static class Builder {
105
/**
106
* Set program arguments
107
* @param programArguments Array of arguments
108
* @return Builder instance
109
*/
110
public Builder setProgramArguments(String... programArguments);
111
112
/**
113
* Set application arguments
114
* @param applicationArgs Array of application arguments
115
* @return Builder instance
116
*/
117
public Builder setApplicationArgs(String... applicationArgs);
118
119
/**
120
* Set parallelism
121
* @param parallelism Parallelism setting
122
* @return Builder instance
123
*/
124
public Builder setParallelism(Integer parallelism);
125
126
/**
127
* Set savepoint restore settings
128
* @param savepointRestoreSettings Savepoint settings
129
* @return Builder instance
130
*/
131
public Builder setSavepointRestoreSettings(
132
SavepointRestoreSettings savepointRestoreSettings
133
);
134
135
/**
136
* Build application configuration
137
* @return Configured application configuration
138
*/
139
public ApplicationConfiguration build();
140
}
141
}
142
```
143
144
**Usage Examples:**
145
146
```java
147
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
148
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
149
150
// Create basic application configuration
151
ApplicationConfiguration config = ApplicationConfiguration
152
.fromConfiguration(flinkConfig)
153
.setProgramArguments("--input", "input.txt", "--output", "output.txt")
154
.setParallelism(4)
155
.build();
156
157
// Create configuration with savepoint restore
158
SavepointRestoreSettings savepointSettings = SavepointRestoreSettings
159
.forPath("/path/to/savepoint", false);
160
161
ApplicationConfiguration configWithSavepoint = ApplicationConfiguration
162
.fromConfiguration(flinkConfig)
163
.setProgramArguments("--mode", "batch")
164
.setSavepointRestoreSettings(savepointSettings)
165
.build();
166
```
167
168
### Application Cluster Entry Point
169
170
Entry point for application cluster mode providing cluster initialization and application execution coordination.
171
172
```java { .api }
173
/**
174
* Entry point for application cluster mode
175
*/
176
public class ApplicationClusterEntryPoint extends ClusterEntrypoint {
177
/**
178
* Main method for application cluster entry point
179
* @param args Command-line arguments
180
*/
181
public static void main(String[] args);
182
183
/**
184
* Create application cluster entry point
185
* @param configuration Cluster configuration
186
* @param pluginManager Plugin manager
187
*/
188
protected ApplicationClusterEntryPoint(
189
Configuration configuration,
190
PluginManager pluginManager
191
);
192
193
@Override
194
protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(
195
Configuration configuration
196
);
197
}
198
```
199
200
### Application Job Clients
201
202
Specialized job client implementations for application mode execution.
203
204
```java { .api }
205
/**
206
* Job client for embedded application execution
207
*/
208
public class EmbeddedJobClient implements JobClient {
209
/**
210
* Create embedded job client
211
* @param jobID Job identifier
212
* @param dispatcher Dispatcher gateway
213
* @param executorService Executor service
214
* @param classLoader User code class loader
215
*/
216
public EmbeddedJobClient(
217
JobID jobID,
218
DispatcherGateway dispatcher,
219
ScheduledExecutorService executorService,
220
ClassLoader classLoader
221
);
222
223
@Override
224
public JobID getJobID();
225
226
@Override
227
public CompletableFuture<JobStatus> getJobStatus();
228
229
@Override
230
public CompletableFuture<Void> cancel();
231
232
@Override
233
public CompletableFuture<String> stopWithSavepoint(
234
boolean advanceToEndOfEventTime,
235
@Nullable String savepointDir,
236
SavepointFormatType formatType
237
);
238
239
@Override
240
public CompletableFuture<JobExecutionResult> getJobExecutionResult();
241
}
242
243
/**
244
* Job client for web submission application execution
245
*/
246
public class WebSubmissionJobClient implements JobClient {
247
/**
248
* Create web submission job client
249
* @param jobSubmissionResult Job submission result
250
* @param jobStatusSupplier Supplier for job status
251
* @param jobResultSupplier Supplier for job result
252
* @param classLoader User code class loader
253
*/
254
public WebSubmissionJobClient(
255
JobSubmissionResult jobSubmissionResult,
256
Supplier<CompletableFuture<JobStatus>> jobStatusSupplier,
257
Supplier<CompletableFuture<JobResult>> jobResultSupplier,
258
ClassLoader classLoader
259
);
260
261
@Override
262
public JobID getJobID();
263
264
@Override
265
public CompletableFuture<JobStatus> getJobStatus();
266
267
@Override
268
public CompletableFuture<JobExecutionResult> getJobExecutionResult();
269
}
270
```
271
272
### Entry Class Information Providers
273
274
System for discovering and providing entry class information from JARs and classpaths.
275
276
```java { .api }
277
/**
278
* Provider for entry class information
279
*/
280
public interface EntryClassInformationProvider {
281
/**
282
* Get program entry point class name
283
* @return Entry point class name or null
284
*/
285
@Nullable
286
String getJobClassName();
287
288
/**
289
* Get JAR file location
290
* @return JAR file or null
291
*/
292
@Nullable
293
File getJarFile();
294
}
295
296
/**
297
* Entry class provider from JAR manifest
298
*/
299
public class FromJarEntryClassInformationProvider implements EntryClassInformationProvider {
300
/**
301
* Create provider from JAR file
302
* @param jarFile JAR file to analyze
303
* @param programArguments Program arguments
304
*/
305
public FromJarEntryClassInformationProvider(File jarFile, String[] programArguments);
306
307
@Override
308
public String getJobClassName();
309
310
@Override
311
public File getJarFile();
312
}
313
314
/**
315
* Entry class provider from classpath scanning
316
*/
317
public class FromClasspathEntryClassInformationProvider implements EntryClassInformationProvider {
318
/**
319
* Create provider from classpath
320
* @param jobClassName Job class name
321
* @param programArguments Program arguments
322
*/
323
public FromClasspathEntryClassInformationProvider(
324
String jobClassName,
325
String[] programArguments
326
);
327
328
@Override
329
public String getJobClassName();
330
331
@Override
332
public File getJarFile();
333
}
334
```
335
336
### Application Executors
337
338
Executor implementations specifically designed for application mode execution.
339
340
```java { .api }
341
/**
342
* Executor for embedded application execution
343
*/
344
public class EmbeddedExecutor implements PipelineExecutor {
345
/**
346
* Create embedded executor
347
* @param dispatcherGateway Dispatcher gateway
348
* @param executorService Executor service
349
* @param configuration Flink configuration
350
* @param userCodeClassLoader User code class loader
351
*/
352
public EmbeddedExecutor(
353
DispatcherGateway dispatcherGateway,
354
ScheduledExecutorService executorService,
355
Configuration configuration,
356
ClassLoader userCodeClassLoader
357
);
358
359
@Override
360
public CompletableFuture<JobClient> execute(
361
Pipeline pipeline,
362
Configuration configuration,
363
ClassLoader userCodeClassloader
364
) throws Exception;
365
}
366
367
/**
368
* Factory for embedded executors
369
*/
370
public class EmbeddedExecutorFactory implements PipelineExecutorFactory {
371
@Override
372
public String getName();
373
374
@Override
375
public boolean isCompatibleWith(Configuration configuration);
376
377
@Override
378
public PipelineExecutor getExecutor(Configuration configuration);
379
}
380
```
381
382
### Utility Classes
383
384
Utility classes for application mode operations and status monitoring.
385
386
```java { .api }
387
/**
388
* Utilities for polling job status in application mode
389
*/
390
public class JobStatusPollingUtils {
391
/**
392
* Poll job status until completion
393
* @param jobClient Job client
394
* @param scheduledExecutor Scheduled executor
395
* @param timeout Polling timeout
396
* @return Future with final job result
397
*/
398
public static CompletableFuture<JobExecutionResult> pollJobStatusUntilFinished(
399
JobClient jobClient,
400
ScheduledExecutorService scheduledExecutor,
401
Duration timeout
402
);
403
404
/**
405
* Poll job status with custom polling interval
406
* @param jobStatusSupplier Job status supplier
407
* @param scheduledExecutor Scheduled executor
408
* @param pollingInterval Polling interval
409
* @param timeout Total timeout
410
* @return Future with final job status
411
*/
412
public static CompletableFuture<JobStatus> pollJobStatus(
413
Supplier<CompletableFuture<JobStatus>> jobStatusSupplier,
414
ScheduledExecutorService scheduledExecutor,
415
Duration pollingInterval,
416
Duration timeout
417
);
418
}
419
420
/**
421
* Parser for JAR manifest files
422
*/
423
public class JarManifestParser {
424
/**
425
* Find entry class from JAR manifest
426
* @param jarFile JAR file to analyze
427
* @return Entry class name or null
428
*/
429
@Nullable
430
public static String findEntryClass(File jarFile);
431
432
/**
433
* Check if JAR contains main class
434
* @param jarFile JAR file to check
435
* @return true if main class found
436
*/
437
public static boolean hasMainClass(File jarFile);
438
}
439
```
440
441
## Types
442
443
```java { .api }
444
/**
445
* Job submission result for application mode
446
*/
447
public class JobSubmissionResult {
448
/**
449
* Get submitted job ID
450
* @return Job identifier
451
*/
452
public JobID getJobID();
453
454
/**
455
* Check if submission was successful
456
* @return true if successful
457
*/
458
public boolean isSuccess();
459
}
460
461
/**
462
* Embedded job client creator
463
*/
464
public class EmbeddedJobClientCreator {
465
/**
466
* Create embedded job client
467
* @param dispatcherGateway Dispatcher gateway
468
* @param executorService Executor service
469
* @param archivedExecutionGraph Archived execution graph
470
* @param userCodeClassLoader User code class loader
471
* @return Embedded job client
472
*/
473
public static EmbeddedJobClient create(
474
DispatcherGateway dispatcherGateway,
475
ScheduledExecutorService executorService,
476
ArchivedExecutionGraph archivedExecutionGraph,
477
ClassLoader userCodeClassLoader
478
);
479
}
480
```
481
482
## Exception Handling
483
484
Application mode operations handle specific error conditions:
485
486
- **Application Execution Errors**: `ApplicationExecutionException` for application runtime failures
487
- **Unsuccessful Execution**: `UnsuccessfulExecutionException` for failed job executions
488
- **Configuration Errors**: Invalid application configurations or missing parameters
489
- **Resource Errors**: Insufficient resources for application cluster deployment
490
- **Job Client Errors**: Communication failures with job management systems
491
492
**Error Handling Examples:**
493
494
```java
495
try {
496
ApplicationConfiguration config = ApplicationConfiguration
497
.fromConfiguration(flinkConfig)
498
.setProgramArguments("--input", "data.txt")
499
.build();
500
501
CompletableFuture<Void> applicationResult = runner.run(
502
dispatcherGateway,
503
scheduledExecutor,
504
config
505
);
506
507
applicationResult.get(); // Wait for completion
508
509
} catch (ApplicationExecutionException e) {
510
System.err.println("Application execution failed: " + e.getMessage());
511
} catch (UnsuccessfulExecutionException e) {
512
System.err.println("Job execution unsuccessful: " + e.getMessage());
513
}
514
```
515
516
Application mode provides dedicated cluster resources for long-running applications, enabling better resource isolation, simplified deployment, and optimized lifecycle management compared to session mode deployments.