0
# Execution and Jobs
1
2
Apache Flink Core provides comprehensive APIs for job execution, runtime contexts, and job lifecycle management. These components enable applications to interact with the Flink runtime, access execution metadata, and manage distributed job execution.
3
4
## Job Execution
5
6
### JobExecutionResult
7
8
Access results and statistics after job completion.
9
10
```java { .api }
11
import org.apache.flink.api.common.JobExecutionResult;
12
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
13
14
public class JobExecutionExample {
15
16
public static void basicJobExecution() throws Exception {
17
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
18
19
// Build your job pipeline
20
env.fromElements(1, 2, 3, 4, 5)
21
.map(x -> x * 2)
22
.print();
23
24
// Execute and get result
25
JobExecutionResult result = env.execute("My Job");
26
27
// Access execution information
28
long netRuntime = result.getNetRuntime();
29
String jobName = result.getJobName();
30
JobID jobId = result.getJobID();
31
32
System.out.println("Job '" + jobName + "' completed in " + netRuntime + "ms");
33
System.out.println("Job ID: " + jobId);
34
35
// Access accumulators (if any were used)
36
Map<String, Object> accumulators = result.getAllAccumulatorResults();
37
for (Map.Entry<String, Object> entry : accumulators.entrySet()) {
38
System.out.println("Accumulator " + entry.getKey() + ": " + entry.getValue());
39
}
40
}
41
42
public static void detachedJobExecution() throws Exception {
43
Configuration config = new Configuration();
44
config.setBoolean(DeploymentOptions.ATTACHED, false); // Detached mode
45
46
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(config);
47
48
// Build pipeline
49
env.fromElements("hello", "world", "flink")
50
.map(String::toUpperCase)
51
.print();
52
53
// Execute in detached mode
54
JobExecutionResult result = env.execute("Detached Job");
55
56
// In detached mode, result is available immediately but job runs asynchronously
57
if (result instanceof DetachedJobExecutionResult) {
58
System.out.println("Job submitted in detached mode");
59
System.out.println("Job ID: " + result.getJobID());
60
}
61
}
62
}
63
```
64
65
### JobClient Interface
66
67
Interact with running jobs programmatically.
68
69
```java { .api }
70
import org.apache.flink.core.execution.JobClient;
71
import org.apache.flink.api.common.JobStatus;
72
73
public class JobClientExample {
74
75
public static void jobClientUsage() throws Exception {
76
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
77
78
// Build pipeline
79
env.fromElements(1, 2, 3, 4, 5)
80
.map(x -> {
81
// Simulate long-running computation
82
Thread.sleep(1000);
83
return x * 2;
84
})
85
.print();
86
87
// Execute and get JobClient
88
JobClient jobClient = env.executeAsync("Long Running Job");
89
90
// Monitor job status
91
CompletableFuture<JobStatus> statusFuture = jobClient.getJobStatus();
92
JobStatus status = statusFuture.get();
93
System.out.println("Initial job status: " + status);
94
95
// Get job execution result asynchronously
96
CompletableFuture<JobExecutionResult> resultFuture = jobClient.getJobExecutionResult();
97
98
// Cancel job if needed
99
if (shouldCancelJob()) {
100
CompletableFuture<Void> cancelFuture = jobClient.cancel();
101
cancelFuture.get(); // Wait for cancellation
102
System.out.println("Job cancelled");
103
} else {
104
// Wait for job completion
105
JobExecutionResult result = resultFuture.get();
106
System.out.println("Job completed: " + result.getJobName());
107
}
108
}
109
110
public static void jobClientWithTimeout() throws Exception {
111
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
112
113
// Configure job
114
env.fromCollection(generateLargeDataset())
115
.keyBy(Record::getKey)
116
.sum("value")
117
.print();
118
119
JobClient jobClient = env.executeAsync("Batch Processing Job");
120
121
try {
122
// Wait for completion with timeout
123
JobExecutionResult result = jobClient.getJobExecutionResult()
124
.get(10, TimeUnit.MINUTES);
125
126
System.out.println("Job completed successfully");
127
128
} catch (TimeoutException e) {
129
System.out.println("Job is taking too long, cancelling...");
130
jobClient.cancel();
131
132
} catch (ExecutionException e) {
133
System.out.println("Job failed: " + e.getCause().getMessage());
134
}
135
}
136
137
private static boolean shouldCancelJob() {
138
// Implementation-specific logic to determine if job should be cancelled
139
return false;
140
}
141
142
private static List<Record> generateLargeDataset() {
143
// Generate test data
144
return IntStream.range(0, 10000)
145
.mapToObj(i -> new Record("key" + (i % 100), i))
146
.collect(Collectors.toList());
147
}
148
}
149
```
150
151
## Runtime Context
152
153
### Accessing Runtime Information
154
155
```java { .api }
156
import org.apache.flink.api.common.functions.RuntimeContext;
157
import org.apache.flink.api.common.functions.RichMapFunction;
158
import org.apache.flink.api.common.functions.OpenContext;
159
160
public class RuntimeContextExample extends RichMapFunction<String, String> {
161
162
@Override
163
public void open(OpenContext openContext) throws Exception {
164
RuntimeContext ctx = getRuntimeContext();
165
166
// Basic runtime information
167
String taskName = ctx.getTaskName();
168
int subtaskIndex = ctx.getIndexOfThisSubtask();
169
int numberOfParallelSubtasks = ctx.getNumberOfParallelSubtasks();
170
int attemptNumber = ctx.getAttemptNumber();
171
172
System.out.println("Task: " + taskName);
173
System.out.println("Subtask: " + subtaskIndex + "/" + numberOfParallelSubtasks);
174
System.out.println("Attempt: " + attemptNumber);
175
176
// Execution configuration
177
ExecutionConfig execConfig = ctx.getExecutionConfig();
178
int parallelism = execConfig.getParallelism();
179
boolean closureCleanerEnabled = execConfig.isClosureCleanerEnabled();
180
181
// Distributed cache
182
File cachedFile = ctx.getDistributedCache().getFile("my-config-file");
183
if (cachedFile != null && cachedFile.exists()) {
184
// Use cached file
185
loadConfigurationFromFile(cachedFile);
186
}
187
188
// Job information
189
JobInfo jobInfo = ctx.getJobInfo();
190
String jobName = jobInfo.getJobName();
191
JobID jobId = jobInfo.getJobId();
192
193
System.out.println("Job: " + jobName + " (" + jobId + ")");
194
}
195
196
@Override
197
public String map(String value) throws Exception {
198
RuntimeContext ctx = getRuntimeContext();
199
200
// Access runtime context during processing
201
int subtaskIndex = ctx.getIndexOfThisSubtask();
202
203
return "[Subtask-" + subtaskIndex + "] " + value;
204
}
205
206
private void loadConfigurationFromFile(File configFile) {
207
// Load configuration from cached file
208
}
209
}
210
```
211
212
### Metrics and Accumulators
213
214
```java { .api }
215
import org.apache.flink.api.common.accumulators.IntCounter;
216
import org.apache.flink.api.common.accumulators.LongCounter;
217
import org.apache.flink.api.common.functions.OpenContext;
218
import org.apache.flink.metrics.Counter;
219
import org.apache.flink.metrics.Histogram;
220
import org.apache.flink.metrics.Meter;
221
222
public class MetricsAndAccumulatorsFunction extends RichMapFunction<String, String> {
223
224
// Accumulators for job-level statistics
225
private IntCounter processedRecords;
226
private LongCounter totalBytes;
227
228
// Metrics for runtime monitoring
229
private Counter metricsCounter;
230
private Histogram processingLatency;
231
private Meter throughput;
232
233
@Override
234
public void open(OpenContext openContext) throws Exception {
235
RuntimeContext ctx = getRuntimeContext();
236
237
// Initialize accumulators
238
processedRecords = new IntCounter();
239
totalBytes = new LongCounter();
240
241
ctx.addAccumulator("processed-records", processedRecords);
242
ctx.addAccumulator("total-bytes", totalBytes);
243
244
// Initialize metrics
245
MetricGroup metricGroup = ctx.getMetricGroup()
246
.addGroup("my-operator")
247
.addGroup("subtask", String.valueOf(ctx.getIndexOfThisSubtask()));
248
249
metricsCounter = metricGroup.counter("records-processed");
250
processingLatency = metricGroup.histogram("processing-latency");
251
throughput = metricGroup.meter("throughput");
252
253
// Custom gauge
254
metricGroup.gauge("queue-size", () -> getCurrentQueueSize());
255
}
256
257
@Override
258
public String map(String value) throws Exception {
259
long startTime = System.nanoTime();
260
261
try {
262
// Process the value
263
String result = processValue(value);
264
265
// Update accumulators
266
processedRecords.add(1);
267
totalBytes.add(value.getBytes().length);
268
269
// Update metrics
270
metricsCounter.inc();
271
throughput.markEvent();
272
273
return result;
274
275
} finally {
276
// Record processing latency
277
long latency = System.nanoTime() - startTime;
278
processingLatency.update(latency / 1_000_000); // Convert to milliseconds
279
}
280
}
281
282
private String processValue(String value) {
283
// Simulate processing
284
return value.toUpperCase();
285
}
286
287
private int getCurrentQueueSize() {
288
// Return current queue size for gauge metric
289
return 0; // Implementation specific
290
}
291
}
292
```
293
294
## Job Lifecycle Management
295
296
### Job Listeners and Hooks
297
298
```java { .api }
299
import org.apache.flink.core.execution.JobListener;
300
import org.apache.flink.core.execution.JobStatusChangedListener;
301
302
// Job listener for execution events
303
public class CustomJobListener implements JobListener {
304
305
@Override
306
public void onJobSubmitted(JobClient jobClient, Throwable throwable) {
307
if (throwable == null) {
308
System.out.println("Job submitted successfully: " + jobClient.getJobID());
309
310
// Start monitoring job
311
startJobMonitoring(jobClient);
312
313
} else {
314
System.err.println("Job submission failed: " + throwable.getMessage());
315
handleJobSubmissionFailure(throwable);
316
}
317
}
318
319
@Override
320
public void onJobExecuted(JobExecutionResult jobExecutionResult, Throwable throwable) {
321
if (throwable == null) {
322
System.out.println("Job executed successfully: " + jobExecutionResult.getJobName());
323
System.out.println("Runtime: " + jobExecutionResult.getNetRuntime() + "ms");
324
325
// Process job results
326
processJobResults(jobExecutionResult);
327
328
} else {
329
System.err.println("Job execution failed: " + throwable.getMessage());
330
handleJobExecutionFailure(throwable);
331
}
332
}
333
334
private void startJobMonitoring(JobClient jobClient) {
335
// Start background monitoring
336
CompletableFuture.runAsync(() -> {
337
try {
338
while (true) {
339
JobStatus status = jobClient.getJobStatus().get();
340
System.out.println("Job status: " + status);
341
342
if (status.isTerminalState()) {
343
break;
344
}
345
346
Thread.sleep(5000); // Check every 5 seconds
347
}
348
} catch (Exception e) {
349
System.err.println("Job monitoring failed: " + e.getMessage());
350
}
351
});
352
}
353
354
private void handleJobSubmissionFailure(Throwable throwable) {
355
// Handle submission failure (e.g., notify admin, retry, etc.)
356
}
357
358
private void processJobResults(JobExecutionResult result) {
359
// Process successful job results
360
Map<String, Object> accumulators = result.getAllAccumulatorResults();
361
for (Map.Entry<String, Object> entry : accumulators.entrySet()) {
362
System.out.println("Final " + entry.getKey() + ": " + entry.getValue());
363
}
364
}
365
366
private void handleJobExecutionFailure(Throwable throwable) {
367
// Handle execution failure
368
}
369
}
370
371
// Job status change listener
372
public class JobStatusChangeListener implements JobStatusChangedListener {
373
374
@Override
375
public void onEvent(JobStatusChangedEvent event) {
376
JobID jobId = event.getJobId();
377
JobStatus oldStatus = event.getOldJobStatus();
378
JobStatus newStatus = event.getNewJobStatus();
379
380
System.out.println("Job " + jobId + " status changed: " + oldStatus + " -> " + newStatus);
381
382
switch (newStatus) {
383
case RUNNING:
384
handleJobStarted(jobId);
385
break;
386
case FINISHED:
387
handleJobCompleted(jobId);
388
break;
389
case FAILED:
390
handleJobFailed(jobId, event.getThrowable());
391
break;
392
case CANCELED:
393
handleJobCancelled(jobId);
394
break;
395
}
396
}
397
398
private void handleJobStarted(JobID jobId) {
399
System.out.println("Job " + jobId + " has started execution");
400
// Start monitoring, notifications, etc.
401
}
402
403
private void handleJobCompleted(JobID jobId) {
404
System.out.println("Job " + jobId + " completed successfully");
405
// Clean up resources, send notifications, etc.
406
}
407
408
private void handleJobFailed(JobID jobId, Throwable throwable) {
409
System.err.println("Job " + jobId + " failed: " +
410
(throwable != null ? throwable.getMessage() : "Unknown error"));
411
// Send alerts, trigger retries, etc.
412
}
413
414
private void handleJobCancelled(JobID jobId) {
415
System.out.println("Job " + jobId + " was cancelled");
416
// Clean up resources, update status, etc.
417
}
418
}
419
```
420
421
### Using Job Listeners
422
423
```java { .api }
424
public class JobWithListeners {
425
426
public static void executeWithListeners() throws Exception {
427
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
428
429
// Register job listener
430
env.registerJobListener(new CustomJobListener());
431
432
// Build pipeline
433
env.fromElements("hello", "world", "flink")
434
.map(new MetricsAndAccumulatorsFunction())
435
.print();
436
437
// Execute job (listener will be notified)
438
env.execute("Job with Listeners");
439
}
440
441
public static void manualJobLifecycleManagement() throws Exception {
442
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
443
444
// Build pipeline
445
DataStream<String> stream = env.fromElements("data1", "data2", "data3")
446
.map(new ProcessingFunction());
447
448
stream.print();
449
450
// Execute asynchronously
451
JobClient jobClient = env.executeAsync("Manual Lifecycle Job");
452
453
// Manual lifecycle management
454
try {
455
// Monitor job progress
456
CompletableFuture<JobStatus> statusFuture = jobClient.getJobStatus();
457
JobStatus initialStatus = statusFuture.get();
458
459
if (initialStatus == JobStatus.RUNNING) {
460
System.out.println("Job is running, monitoring progress...");
461
462
// Set up periodic status checks
463
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
464
ScheduledFuture<?> monitoring = scheduler.scheduleWithFixedDelay(() -> {
465
try {
466
JobStatus currentStatus = jobClient.getJobStatus().get();
467
System.out.println("Current status: " + currentStatus);
468
} catch (Exception e) {
469
System.err.println("Failed to get job status: " + e.getMessage());
470
}
471
}, 0, 5, TimeUnit.SECONDS);
472
473
// Wait for completion
474
JobExecutionResult result = jobClient.getJobExecutionResult().get();
475
monitoring.cancel(true);
476
scheduler.shutdown();
477
478
System.out.println("Job completed: " + result.getJobName());
479
480
} else {
481
System.err.println("Job failed to start, status: " + initialStatus);
482
}
483
484
} catch (Exception e) {
485
System.err.println("Job management failed: " + e.getMessage());
486
487
// Attempt to cancel job on error
488
try {
489
jobClient.cancel().get(30, TimeUnit.SECONDS);
490
System.out.println("Job cancelled due to management failure");
491
} catch (Exception cancelException) {
492
System.err.println("Failed to cancel job: " + cancelException.getMessage());
493
}
494
}
495
}
496
497
private static class ProcessingFunction extends RichMapFunction<String, String> {
498
499
@Override
500
public String map(String value) throws Exception {
501
// Simulate processing time
502
Thread.sleep(1000);
503
return "Processed: " + value;
504
}
505
}
506
}
507
```
508
509
## Pipeline Execution
510
511
### Custom Pipeline Executors
512
513
```java { .api }
514
import org.apache.flink.core.execution.PipelineExecutor;
515
import org.apache.flink.core.execution.PipelineExecutorFactory;
516
517
// Custom pipeline executor factory
518
public class CustomPipelineExecutorFactory implements PipelineExecutorFactory {
519
520
public static final String CUSTOM_EXECUTOR_NAME = "custom";
521
522
@Override
523
public String getName() {
524
return CUSTOM_EXECUTOR_NAME;
525
}
526
527
@Override
528
public boolean isCompatibleWith(Configuration configuration) {
529
// Check if this executor is compatible with the configuration
530
String executorName = configuration.getString(DeploymentOptions.TARGET);
531
return CUSTOM_EXECUTOR_NAME.equals(executorName);
532
}
533
534
@Override
535
public PipelineExecutor getExecutor(Configuration configuration) {
536
return new CustomPipelineExecutor(configuration);
537
}
538
}
539
540
// Custom pipeline executor implementation
541
public class CustomPipelineExecutor implements PipelineExecutor {
542
543
private final Configuration configuration;
544
545
public CustomPipelineExecutor(Configuration configuration) {
546
this.configuration = configuration;
547
}
548
549
@Override
550
public CompletableFuture<JobClient> execute(Pipeline pipeline,
551
Configuration configuration,
552
ClassLoader userClassloader) throws Exception {
553
554
System.out.println("Executing pipeline with custom executor");
555
556
// Custom execution logic
557
JobGraph jobGraph = buildJobGraph(pipeline, configuration);
558
559
// Submit job to custom runtime
560
return submitJob(jobGraph);
561
}
562
563
private JobGraph buildJobGraph(Pipeline pipeline, Configuration config) {
564
// Convert pipeline to JobGraph
565
// This would typically involve more complex logic
566
return new JobGraph("Custom Job");
567
}
568
569
private CompletableFuture<JobClient> submitJob(JobGraph jobGraph) {
570
// Submit job to execution runtime
571
return CompletableFuture.supplyAsync(() -> {
572
// Simulate job submission
573
JobID jobId = JobID.generate();
574
return new CustomJobClient(jobId);
575
});
576
}
577
578
private static class CustomJobClient implements JobClient {
579
private final JobID jobId;
580
581
public CustomJobClient(JobID jobId) {
582
this.jobId = jobId;
583
}
584
585
@Override
586
public JobID getJobID() {
587
return jobId;
588
}
589
590
@Override
591
public CompletableFuture<JobStatus> getJobStatus() {
592
return CompletableFuture.completedFuture(JobStatus.RUNNING);
593
}
594
595
@Override
596
public CompletableFuture<Void> cancel() {
597
return CompletableFuture.completedFuture(null);
598
}
599
600
@Override
601
public CompletableFuture<String> stopWithSavepoint(boolean advanceToEndOfEventTime,
602
String savepointDirectory,
603
SavepointFormatType formatType) {
604
return CompletableFuture.completedFuture("savepoint-path");
605
}
606
607
@Override
608
public CompletableFuture<String> triggerSavepoint(String savepointDirectory,
609
SavepointFormatType formatType) {
610
return CompletableFuture.completedFuture("savepoint-path");
611
}
612
613
@Override
614
public CompletableFuture<Map<String, Object>> getAccumulators() {
615
return CompletableFuture.completedFuture(Collections.emptyMap());
616
}
617
618
@Override
619
public CompletableFuture<JobExecutionResult> getJobExecutionResult() {
620
// Simulate job completion
621
return CompletableFuture.supplyAsync(() -> {
622
try {
623
Thread.sleep(5000); // Simulate execution time
624
return new JobExecutionResult(jobId, 5000, Collections.emptyMap());
625
} catch (InterruptedException e) {
626
throw new RuntimeException(e);
627
}
628
});
629
}
630
}
631
}
632
```
633
634
## Distributed Cache
635
636
### Using Distributed Cache
637
638
```java { .api }
639
import org.apache.flink.api.common.cache.DistributedCache;
640
import org.apache.flink.api.common.functions.OpenContext;
641
642
public class DistributedCacheExample extends RichMapFunction<String, String> {
643
644
private Properties configProperties;
645
private List<String> referenceData;
646
647
@Override
648
public void open(OpenContext openContext) throws Exception {
649
RuntimeContext ctx = getRuntimeContext();
650
DistributedCache cache = ctx.getDistributedCache();
651
652
// Access cached files
653
File configFile = cache.getFile("config.properties");
654
if (configFile != null && configFile.exists()) {
655
configProperties = new Properties();
656
try (FileInputStream fis = new FileInputStream(configFile)) {
657
configProperties.load(fis);
658
}
659
}
660
661
// Access cached directory
662
File dataDir = cache.getFile("reference-data");
663
if (dataDir != null && dataDir.isDirectory()) {
664
referenceData = loadReferenceData(dataDir);
665
}
666
}
667
668
@Override
669
public String map(String value) throws Exception {
670
// Use cached configuration and data
671
String prefix = configProperties.getProperty("output.prefix", "");
672
673
// Lookup reference data
674
boolean isValid = referenceData.contains(value);
675
676
return prefix + value + (isValid ? " [VALID]" : " [INVALID]");
677
}
678
679
private List<String> loadReferenceData(File dataDir) {
680
List<String> data = new ArrayList<>();
681
682
File[] files = dataDir.listFiles((dir, name) -> name.endsWith(".txt"));
683
if (files != null) {
684
for (File file : files) {
685
try (BufferedReader reader = Files.newBufferedReader(file.toPath())) {
686
String line;
687
while ((line = reader.readLine()) != null) {
688
data.add(line.trim());
689
}
690
} catch (IOException e) {
691
System.err.println("Error reading reference data file: " + e.getMessage());
692
}
693
}
694
}
695
696
return data;
697
}
698
699
public static void registerCachedFiles() throws Exception {
700
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
701
702
// Register files with distributed cache
703
env.registerCachedFile("/path/to/config.properties", "config.properties");
704
env.registerCachedFile("/path/to/reference-data/", "reference-data", true); // true for directory
705
706
// Use the function that accesses cached files
707
env.fromElements("item1", "item2", "item3")
708
.map(new DistributedCacheExample())
709
.print();
710
711
env.execute("Distributed Cache Example");
712
}
713
}
714
```
715
716
Apache Flink's execution and job management APIs provide comprehensive control over job lifecycle, runtime information access, and execution monitoring. By leveraging these capabilities, you can build robust applications with proper monitoring, error handling, and resource management.