0
# Execution Graph Access
1
2
The Execution Graph Access APIs provide read-only interfaces for inspecting and monitoring the runtime execution state of Flink jobs. These APIs enable external systems, web interfaces, and monitoring tools to access detailed information about job execution topology, task states, and performance metrics.
3
4
## Core Access Interfaces
5
6
### AccessExecutionGraph
7
8
Primary interface for read-only access to execution graph information, providing comprehensive job-level details.
9
10
```java { .api }
11
public interface AccessExecutionGraph {
12
JobID getJobID();
13
String getJobName();
14
JobStatus getState();
15
Throwable getFailureCause();
16
17
long getStatusTimestamp(JobStatus status);
18
boolean isStoppable();
19
20
StringifiedAccumulatorResult[] getAccumulatorResultsStringified();
21
Map<String, Object> getAccumulatorResults();
22
23
int getParallelism();
24
long getCreateTime();
25
26
Iterable<AccessExecutionJobVertex> getVerticesTopologically();
27
Iterable<AccessExecutionJobVertex> getAllVertices();
28
AccessExecutionJobVertex getJobVertex(JobVertexID id);
29
30
ArchivedExecutionConfig getArchivedExecutionConfig();
31
CheckpointCoordinatorConfiguration getCheckpointCoordinatorConfiguration();
32
JobCheckpointingConfiguration getCheckpointingConfiguration();
33
34
String getJsonPlan();
35
boolean isArchived();
36
}
37
```
38
39
### AccessExecutionJobVertex
40
41
Interface providing access to job vertex information and its parallel execution vertices.
42
43
```java { .api }
44
public interface AccessExecutionJobVertex {
45
JobVertexID getJobVertexId();
46
String getName();
47
int getParallelism();
48
int getMaxParallelism();
49
50
ResourceProfile getResourceProfile();
51
52
ExecutionState getAggregateState();
53
long getStateTimestamp(ExecutionState state);
54
55
AccessExecutionVertex[] getTaskVertices();
56
AccessExecutionVertex getTaskVertex(int subtask);
57
58
IOMetrics getIOMetrics();
59
60
Map<String, Accumulator<?, ?>> getAggregatedUserAccumulatorsStringified();
61
StringifiedAccumulatorResult[] getAggregatedUserAccumulatorsStringified();
62
}
63
```
64
65
### AccessExecutionVertex
66
67
Interface for accessing individual execution vertex (subtask) information.
68
69
```java { .api }
70
public interface AccessExecutionVertex {
71
AccessExecutionJobVertex getJobVertex();
72
int getParallelSubtaskIndex();
73
74
ExecutionState getExecutionState();
75
long getStateTimestamp(ExecutionState state);
76
Throwable getFailureCause();
77
78
AccessExecution getCurrentExecutionAttempt();
79
AccessExecution getPriorExecutionAttempt(int attemptNumber);
80
AccessExecution[] getCurrentExecutions();
81
82
int getCurrentAssignedResourceLocation();
83
TaskManagerLocation getAssignedResourceLocation();
84
}
85
```
86
87
### AccessExecution
88
89
Interface for accessing execution attempt information and metrics.
90
91
```java { .api }
92
public interface AccessExecution {
93
ExecutionAttemptID getAttemptId();
94
int getAttemptNumber();
95
96
ExecutionState getState();
97
TaskManagerLocation getAssignedResourceLocation();
98
Throwable getFailureCause();
99
100
long[] getStateTimestamps();
101
long getStateTimestamp(ExecutionState state);
102
103
StringifiedAccumulatorResult[] getUserAccumulatorsStringified();
104
105
int getParallelSubtaskIndex();
106
IOMetrics getIOMetrics();
107
108
TaskManagerLocation getCurrentAssignedResourceLocation();
109
}
110
```
111
112
## Archived Representations
113
114
### ArchivedExecutionGraph
115
116
Immutable snapshot of an execution graph that retains all information after job completion.
117
118
```java { .api }
119
public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializable {
120
public static ArchivedExecutionGraph createFrom(ExecutionGraph executionGraph);
121
122
@Override
123
public JobID getJobID();
124
125
@Override
126
public String getJobName();
127
128
@Override
129
public JobStatus getState();
130
131
@Override
132
public Throwable getFailureCause();
133
134
@Override
135
public long getStatusTimestamp(JobStatus status);
136
137
@Override
138
public StringifiedAccumulatorResult[] getAccumulatorResultsStringified();
139
140
@Override
141
public Map<String, Object> getAccumulatorResults();
142
143
@Override
144
public Iterable<AccessExecutionJobVertex> getVerticesTopologically();
145
146
@Override
147
public ArchivedExecutionConfig getArchivedExecutionConfig();
148
149
@Override
150
public String getJsonPlan();
151
152
@Override
153
public boolean isArchived();
154
155
public CompletableFuture<ArchivedExecutionGraph> archive(ClassLoader userCodeClassLoader);
156
}
157
```
158
159
### ArchivedExecutionJobVertex
160
161
Archived representation of a job vertex with complete execution information.
162
163
```java { .api }
164
public class ArchivedExecutionJobVertex implements AccessExecutionJobVertex, Serializable {
165
public ArchivedExecutionJobVertex(ExecutionJobVertex jobVertex);
166
167
@Override
168
public JobVertexID getJobVertexId();
169
170
@Override
171
public String getName();
172
173
@Override
174
public int getParallelism();
175
176
@Override
177
public int getMaxParallelism();
178
179
@Override
180
public ExecutionState getAggregateState();
181
182
@Override
183
public AccessExecutionVertex[] getTaskVertices();
184
185
@Override
186
public IOMetrics getIOMetrics();
187
188
@Override
189
public StringifiedAccumulatorResult[] getAggregatedUserAccumulatorsStringified();
190
}
191
```
192
193
### ArchivedExecutionVertex
194
195
Archived representation of an execution vertex with all attempt information.
196
197
```java { .api }
198
public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializable {
199
public ArchivedExecutionVertex(ExecutionVertex executionVertex);
200
201
@Override
202
public AccessExecutionJobVertex getJobVertex();
203
204
@Override
205
public int getParallelSubtaskIndex();
206
207
@Override
208
public ExecutionState getExecutionState();
209
210
@Override
211
public long getStateTimestamp(ExecutionState state);
212
213
@Override
214
public Throwable getFailureCause();
215
216
@Override
217
public AccessExecution getCurrentExecutionAttempt();
218
219
@Override
220
public TaskManagerLocation getAssignedResourceLocation();
221
}
222
```
223
224
## Configuration and Metrics Access
225
226
### ArchivedExecutionConfig
227
228
Archived execution configuration containing job-level settings and parameters.
229
230
```java { .api }
231
public class ArchivedExecutionConfig implements Serializable {
232
public ArchivedExecutionConfig(ExecutionConfig executionConfig);
233
234
public String getExecutionMode();
235
public int getParallelism();
236
public boolean getObjectReuseEnabled();
237
public long getAutoWatermarkInterval();
238
239
public RestartStrategy.RestartStrategyConfiguration getRestartStrategy();
240
public Map<String, String> getGlobalJobParameters();
241
242
public String getCodeAnalysisMode();
243
public long getDefaultBufferTimeout();
244
245
public boolean isTimestampsEnabled();
246
public boolean isLatencyTrackingEnabled();
247
public long getLatencyTrackingInterval();
248
249
public boolean isClosureCleanerEnabled();
250
public int getMaxParallelism();
251
}
252
```
253
254
### IOMetrics
255
256
Interface providing access to I/O metrics for tasks and operators.
257
258
```java { .api }
259
public interface IOMetrics extends Serializable {
260
long getNumBytesInLocal();
261
long getNumBytesInRemote();
262
long getNumBytesOut();
263
264
long getNumRecordsIn();
265
long getNumRecordsOut();
266
267
double getAvgRecordsInPerSec();
268
double getAvgRecordsOutPerSec();
269
270
double getAvgBytesInPerSec();
271
double getAvgBytesOutPerSec();
272
}
273
```
274
275
### StringifiedAccumulatorResult
276
277
Container for accumulator results that have been converted to string representations.
278
279
```java { .api }
280
public class StringifiedAccumulatorResult implements Serializable {
281
public StringifiedAccumulatorResult(String name, String type, String value);
282
283
public String getName();
284
public String getType();
285
public String getValue();
286
287
@Override
288
public String toString();
289
}
290
```
291
292
## Checkpoint Information Access
293
294
### JobCheckpointingConfiguration
295
296
Read-only access to job checkpointing configuration.
297
298
```java { .api }
299
public interface JobCheckpointingConfiguration extends Serializable {
300
long getCheckpointInterval();
301
long getCheckpointTimeout();
302
long getMinPauseBetweenCheckpoints();
303
int getMaxConcurrentCheckpoints();
304
305
CheckpointRetentionPolicy getCheckpointRetentionPolicy();
306
boolean isExactlyOnce();
307
boolean isCheckpointingEnabled();
308
boolean isUnalignedCheckpointsEnabled();
309
310
long getAlignmentTimeout();
311
int getTolerableCheckpointFailureNumber();
312
}
313
```
314
315
### CheckpointCoordinatorConfiguration
316
317
Configuration details for checkpoint coordination.
318
319
```java { .api }
320
public class CheckpointCoordinatorConfiguration implements Serializable {
321
public CheckpointCoordinatorConfiguration(
322
long checkpointInterval,
323
long checkpointTimeout,
324
long minPauseBetweenCheckpoints,
325
int maxConcurrentCheckpoints,
326
CheckpointRetentionPolicy retentionPolicy,
327
boolean isExactlyOnce,
328
boolean isUnalignedCheckpoint,
329
long alignmentTimeout,
330
int tolerableCheckpointFailureNumber
331
);
332
333
public long getCheckpointInterval();
334
public long getCheckpointTimeout();
335
public long getMinPauseBetweenCheckpoints();
336
public int getMaxConcurrentCheckpoints();
337
338
public CheckpointRetentionPolicy getCheckpointRetentionPolicy();
339
public boolean isExactlyOnce();
340
public boolean isUnalignedCheckpoint();
341
public long getAlignmentTimeout();
342
public int getTolerableCheckpointFailureNumber();
343
}
344
```
345
346
## Usage Examples
347
348
### Monitoring Job Execution State
349
350
```java
351
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
352
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
353
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
354
355
public class JobMonitor {
356
357
public void monitorJob(AccessExecutionGraph executionGraph) {
358
// Print job overview
359
System.out.println("=== Job Overview ===");
360
System.out.println("Job ID: " + executionGraph.getJobID());
361
System.out.println("Job Name: " + executionGraph.getJobName());
362
System.out.println("Job State: " + executionGraph.getState());
363
System.out.println("Parallelism: " + executionGraph.getParallelism());
364
System.out.println("Create Time: " + new Date(executionGraph.getCreateTime()));
365
366
// Check for failures
367
Throwable failureCause = executionGraph.getFailureCause();
368
if (failureCause != null) {
369
System.out.println("Failure Cause: " + failureCause.getMessage());
370
}
371
372
// Print execution configuration
373
ArchivedExecutionConfig execConfig = executionGraph.getArchivedExecutionConfig();
374
if (execConfig != null) {
375
System.out.println("Execution Mode: " + execConfig.getExecutionMode());
376
System.out.println("Object Reuse: " + execConfig.getObjectReuseEnabled());
377
System.out.println("Watermark Interval: " + execConfig.getAutoWatermarkInterval());
378
}
379
380
// Monitor vertices
381
System.out.println("\n=== Vertex Details ===");
382
for (AccessExecutionJobVertex vertex : executionGraph.getVerticesTopologically()) {
383
monitorJobVertex(vertex);
384
}
385
386
// Show accumulator results
387
showAccumulators(executionGraph);
388
}
389
390
private void monitorJobVertex(AccessExecutionJobVertex vertex) {
391
System.out.println("\nVertex: " + vertex.getName() + " (" + vertex.getJobVertexId() + ")");
392
System.out.println(" Parallelism: " + vertex.getParallelism());
393
System.out.println(" Max Parallelism: " + vertex.getMaxParallelism());
394
System.out.println(" Aggregate State: " + vertex.getAggregateState());
395
396
// Show I/O metrics
397
IOMetrics ioMetrics = vertex.getIOMetrics();
398
if (ioMetrics != null) {
399
System.out.println(" I/O Metrics:");
400
System.out.println(" Records In: " + ioMetrics.getNumRecordsIn());
401
System.out.println(" Records Out: " + ioMetrics.getNumRecordsOut());
402
System.out.println(" Bytes In: " + ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote());
403
System.out.println(" Bytes Out: " + ioMetrics.getNumBytesOut());
404
System.out.println(" Avg Records In/sec: " + String.format("%.2f", ioMetrics.getAvgRecordsInPerSec()));
405
System.out.println(" Avg Records Out/sec: " + String.format("%.2f", ioMetrics.getAvgRecordsOutPerSec()));
406
}
407
408
// Monitor individual subtasks
409
System.out.println(" Subtasks:");
410
AccessExecutionVertex[] taskVertices = vertex.getTaskVertices();
411
for (int i = 0; i < taskVertices.length; i++) {
412
AccessExecutionVertex taskVertex = taskVertices[i];
413
System.out.println(" Subtask " + i + ": " + taskVertex.getExecutionState() +
414
" (Attempt " + taskVertex.getCurrentExecutionAttempt().getAttemptNumber() + ")");
415
416
TaskManagerLocation location = taskVertex.getAssignedResourceLocation();
417
if (location != null) {
418
System.out.println(" Location: " + location.getHostname() + ":" + location.getDataPort());
419
}
420
}
421
}
422
423
private void showAccumulators(AccessExecutionGraph executionGraph) {
424
System.out.println("\n=== Accumulators ===");
425
StringifiedAccumulatorResult[] accumulators = executionGraph.getAccumulatorResultsStringified();
426
427
if (accumulators != null && accumulators.length > 0) {
428
for (StringifiedAccumulatorResult accumulator : accumulators) {
429
System.out.println(accumulator.getName() + " (" + accumulator.getType() + "): " +
430
accumulator.getValue());
431
}
432
} else {
433
System.out.println("No accumulators available");
434
}
435
}
436
}
437
```
438
439
### Checkpoint Information Inspector
440
441
```java
442
public class CheckpointInspector {
443
444
public void inspectCheckpointConfiguration(AccessExecutionGraph executionGraph) {
445
System.out.println("=== Checkpoint Configuration ===");
446
447
JobCheckpointingConfiguration checkpointConfig = executionGraph.getCheckpointingConfiguration();
448
if (checkpointConfig != null && checkpointConfig.isCheckpointingEnabled()) {
449
System.out.println("Checkpointing Enabled: " + checkpointConfig.isCheckpointingEnabled());
450
System.out.println("Checkpoint Interval: " + checkpointConfig.getCheckpointInterval() + " ms");
451
System.out.println("Checkpoint Timeout: " + checkpointConfig.getCheckpointTimeout() + " ms");
452
System.out.println("Min Pause Between Checkpoints: " + checkpointConfig.getMinPauseBetweenCheckpoints() + " ms");
453
System.out.println("Max Concurrent Checkpoints: " + checkpointConfig.getMaxConcurrentCheckpoints());
454
System.out.println("Exactly Once: " + checkpointConfig.isExactlyOnce());
455
System.out.println("Unaligned Checkpoints: " + checkpointConfig.isUnalignedCheckpointsEnabled());
456
System.out.println("Retention Policy: " + checkpointConfig.getCheckpointRetentionPolicy());
457
458
if (checkpointConfig.isUnalignedCheckpointsEnabled()) {
459
System.out.println("Alignment Timeout: " + checkpointConfig.getAlignmentTimeout() + " ms");
460
}
461
462
System.out.println("Tolerable Failures: " + checkpointConfig.getTolerableCheckpointFailureNumber());
463
464
} else {
465
System.out.println("Checkpointing is not enabled for this job");
466
}
467
468
CheckpointCoordinatorConfiguration coordConfig = executionGraph.getCheckpointCoordinatorConfiguration();
469
if (coordConfig != null) {
470
System.out.println("\n=== Coordinator Configuration ===");
471
System.out.println("Coordinator Interval: " + coordConfig.getCheckpointInterval() + " ms");
472
System.out.println("Coordinator Timeout: " + coordConfig.getCheckpointTimeout() + " ms");
473
}
474
}
475
}
476
```
477
478
### Performance Analytics
479
480
```java
481
public class PerformanceAnalyzer {
482
483
public JobPerformanceReport analyzePerformance(AccessExecutionGraph executionGraph) {
484
JobPerformanceReport report = new JobPerformanceReport();
485
486
// Analyze overall job metrics
487
report.jobId = executionGraph.getJobID().toString();
488
report.jobName = executionGraph.getJobName();
489
report.totalParallelism = executionGraph.getParallelism();
490
491
// Calculate job duration
492
long createTime = executionGraph.getCreateTime();
493
long finishTime = executionGraph.getStatusTimestamp(JobStatus.FINISHED);
494
if (finishTime > 0) {
495
report.totalDuration = finishTime - createTime;
496
}
497
498
// Analyze vertex performance
499
for (AccessExecutionJobVertex vertex : executionGraph.getVerticesTopologically()) {
500
VertexPerformance vertexPerf = analyzeVertexPerformance(vertex);
501
report.vertexPerformance.add(vertexPerf);
502
}
503
504
// Calculate throughput metrics
505
calculateThroughputMetrics(report, executionGraph);
506
507
return report;
508
}
509
510
private VertexPerformance analyzeVertexPerformance(AccessExecutionJobVertex vertex) {
511
VertexPerformance vertexPerf = new VertexPerformance();
512
vertexPerf.vertexId = vertex.getJobVertexId().toString();
513
vertexPerf.vertexName = vertex.getName();
514
vertexPerf.parallelism = vertex.getParallelism();
515
vertexPerf.state = vertex.getAggregateState();
516
517
// Analyze I/O metrics
518
IOMetrics ioMetrics = vertex.getIOMetrics();
519
if (ioMetrics != null) {
520
vertexPerf.totalRecordsIn = ioMetrics.getNumRecordsIn();
521
vertexPerf.totalRecordsOut = ioMetrics.getNumRecordsOut();
522
vertexPerf.totalBytesIn = ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote();
523
vertexPerf.totalBytesOut = ioMetrics.getNumBytesOut();
524
vertexPerf.avgRecordsInPerSec = ioMetrics.getAvgRecordsInPerSec();
525
vertexPerf.avgRecordsOutPerSec = ioMetrics.getAvgRecordsOutPerSec();
526
}
527
528
// Analyze subtask performance
529
AccessExecutionVertex[] taskVertices = vertex.getTaskVertices();
530
for (AccessExecutionVertex taskVertex : taskVertices) {
531
SubtaskPerformance subtaskPerf = analyzeSubtaskPerformance(taskVertex);
532
vertexPerf.subtasks.add(subtaskPerf);
533
}
534
535
return vertexPerf;
536
}
537
538
private SubtaskPerformance analyzeSubtaskPerformance(AccessExecutionVertex taskVertex) {
539
SubtaskPerformance subtaskPerf = new SubtaskPerformance();
540
subtaskPerf.subtaskIndex = taskVertex.getParallelSubtaskIndex();
541
subtaskPerf.state = taskVertex.getExecutionState();
542
543
// Get current execution attempt
544
AccessExecution execution = taskVertex.getCurrentExecutionAttempt();
545
subtaskPerf.attemptNumber = execution.getAttemptNumber();
546
subtaskPerf.attemptId = execution.getAttemptId().toString();
547
548
// Calculate execution duration
549
long[] stateTimestamps = execution.getStateTimestamps();
550
if (stateTimestamps != null) {
551
long startTime = stateTimestamps[ExecutionState.RUNNING.ordinal()];
552
long endTime = stateTimestamps[ExecutionState.FINISHED.ordinal()];
553
if (startTime > 0 && endTime > 0) {
554
subtaskPerf.executionDuration = endTime - startTime;
555
}
556
}
557
558
// Get I/O metrics for subtask
559
IOMetrics ioMetrics = execution.getIOMetrics();
560
if (ioMetrics != null) {
561
subtaskPerf.recordsProcessed = ioMetrics.getNumRecordsIn();
562
subtaskPerf.recordsProduced = ioMetrics.getNumRecordsOut();
563
}
564
565
// Get location information
566
TaskManagerLocation location = execution.getAssignedResourceLocation();
567
if (location != null) {
568
subtaskPerf.taskManagerHost = location.getHostname();
569
subtaskPerf.taskManagerPort = location.getDataPort();
570
}
571
572
return subtaskPerf;
573
}
574
575
private void calculateThroughputMetrics(JobPerformanceReport report, AccessExecutionGraph executionGraph) {
576
long totalRecordsProcessed = 0;
577
double totalAvgThroughput = 0;
578
int vertexCount = 0;
579
580
for (AccessExecutionJobVertex vertex : executionGraph.getAllVertices()) {
581
IOMetrics ioMetrics = vertex.getIOMetrics();
582
if (ioMetrics != null) {
583
totalRecordsProcessed += ioMetrics.getNumRecordsIn();
584
totalAvgThroughput += ioMetrics.getAvgRecordsInPerSec();
585
vertexCount++;
586
}
587
}
588
589
report.totalRecordsProcessed = totalRecordsProcessed;
590
if (vertexCount > 0) {
591
report.avgThroughputRecordsPerSec = totalAvgThroughput / vertexCount;
592
}
593
594
if (report.totalDuration > 0) {
595
report.overallThroughputRecordsPerSec = (double) totalRecordsProcessed / (report.totalDuration / 1000.0);
596
}
597
}
598
599
// Report data classes
600
public static class JobPerformanceReport {
601
public String jobId;
602
public String jobName;
603
public int totalParallelism;
604
public long totalDuration; // in milliseconds
605
public long totalRecordsProcessed;
606
public double avgThroughputRecordsPerSec;
607
public double overallThroughputRecordsPerSec;
608
public List<VertexPerformance> vertexPerformance = new ArrayList<>();
609
}
610
611
public static class VertexPerformance {
612
public String vertexId;
613
public String vertexName;
614
public int parallelism;
615
public ExecutionState state;
616
public long totalRecordsIn;
617
public long totalRecordsOut;
618
public long totalBytesIn;
619
public long totalBytesOut;
620
public double avgRecordsInPerSec;
621
public double avgRecordsOutPerSec;
622
public List<SubtaskPerformance> subtasks = new ArrayList<>();
623
}
624
625
public static class SubtaskPerformance {
626
public int subtaskIndex;
627
public ExecutionState state;
628
public int attemptNumber;
629
public String attemptId;
630
public long executionDuration; // in milliseconds
631
public long recordsProcessed;
632
public long recordsProduced;
633
public String taskManagerHost;
634
public int taskManagerPort;
635
}
636
}
637
```
638
639
## Common Patterns
640
641
### State Tracking and Alerting
642
643
```java
644
public class ExecutionStateTracker {
645
private final Map<JobID, JobStatus> lastKnownStates = new ConcurrentHashMap<>();
646
private final List<StateChangeListener> listeners = new ArrayList<>();
647
648
public interface StateChangeListener {
649
void onJobStateChanged(JobID jobId, JobStatus oldState, JobStatus newState, AccessExecutionGraph graph);
650
void onVertexStateChanged(JobID jobId, JobVertexID vertexId, ExecutionState oldState, ExecutionState newState);
651
}
652
653
public void trackExecution(AccessExecutionGraph executionGraph) {
654
JobID jobId = executionGraph.getJobID();
655
JobStatus currentState = executionGraph.getState();
656
JobStatus previousState = lastKnownStates.put(jobId, currentState);
657
658
// Notify job state changes
659
if (previousState != null && !previousState.equals(currentState)) {
660
for (StateChangeListener listener : listeners) {
661
listener.onJobStateChanged(jobId, previousState, currentState, executionGraph);
662
}
663
}
664
665
// Track vertex state changes
666
trackVertexStates(executionGraph);
667
}
668
669
private void trackVertexStates(AccessExecutionGraph executionGraph) {
670
for (AccessExecutionJobVertex vertex : executionGraph.getAllVertices()) {
671
ExecutionState vertexState = vertex.getAggregateState();
672
// Store and compare vertex states...
673
}
674
}
675
676
public void addListener(StateChangeListener listener) {
677
listeners.add(listener);
678
}
679
}
680
```
681
682
### Execution History Analysis
683
684
```java
685
public class ExecutionHistoryAnalyzer {
686
687
public ExecutionHistory analyzeExecutionHistory(AccessExecutionVertex vertex) {
688
ExecutionHistory history = new ExecutionHistory();
689
history.subtaskIndex = vertex.getParallelSubtaskIndex();
690
691
// Analyze current execution
692
AccessExecution currentExecution = vertex.getCurrentExecutionAttempt();
693
history.currentAttempt = analyzeExecution(currentExecution);
694
695
// Analyze previous attempts
696
int attemptNumber = 0;
697
AccessExecution priorExecution;
698
while ((priorExecution = vertex.getPriorExecutionAttempt(attemptNumber)) != null) {
699
ExecutionAttemptInfo attemptInfo = analyzeExecution(priorExecution);
700
history.previousAttempts.add(attemptInfo);
701
attemptNumber++;
702
}
703
704
// Calculate failure patterns
705
analyzeFailurePatterns(history);
706
707
return history;
708
}
709
710
private ExecutionAttemptInfo analyzeExecution(AccessExecution execution) {
711
ExecutionAttemptInfo info = new ExecutionAttemptInfo();
712
info.attemptId = execution.getAttemptId().toString();
713
info.attemptNumber = execution.getAttemptNumber();
714
info.state = execution.getState();
715
info.failureCause = execution.getFailureCause();
716
717
// Analyze state transitions
718
long[] timestamps = execution.getStateTimestamps();
719
if (timestamps != null) {
720
info.stateTimestamps = Arrays.copyOf(timestamps, timestamps.length);
721
722
// Calculate durations
723
calculateStateDurations(info, timestamps);
724
}
725
726
// Get resource information
727
TaskManagerLocation location = execution.getAssignedResourceLocation();
728
if (location != null) {
729
info.taskManagerLocation = location.getHostname() + ":" + location.getDataPort();
730
}
731
732
return info;
733
}
734
735
private void calculateStateDurations(ExecutionAttemptInfo info, long[] timestamps) {
736
ExecutionState[] states = ExecutionState.values();
737
Map<ExecutionState, Long> durations = new HashMap<>();
738
739
for (int i = 0; i < states.length - 1; i++) {
740
if (timestamps[i] > 0 && timestamps[i + 1] > 0) {
741
long duration = timestamps[i + 1] - timestamps[i];
742
durations.put(states[i], duration);
743
}
744
}
745
746
info.stateDurations = durations;
747
}
748
749
private void analyzeFailurePatterns(ExecutionHistory history) {
750
long totalFailures = history.previousAttempts.stream()
751
.filter(attempt -> attempt.state == ExecutionState.FAILED)
752
.count();
753
754
history.totalFailures = totalFailures;
755
756
// Analyze failure causes
757
Map<String, Long> failureReasons = history.previousAttempts.stream()
758
.filter(attempt -> attempt.failureCause != null)
759
.collect(Collectors.groupingBy(
760
attempt -> attempt.failureCause.getClass().getSimpleName(),
761
Collectors.counting()
762
));
763
764
history.failureReasonCounts = failureReasons;
765
}
766
767
public static class ExecutionHistory {
768
public int subtaskIndex;
769
public ExecutionAttemptInfo currentAttempt;
770
public List<ExecutionAttemptInfo> previousAttempts = new ArrayList<>();
771
public long totalFailures;
772
public Map<String, Long> failureReasonCounts = new HashMap<>();
773
}
774
775
public static class ExecutionAttemptInfo {
776
public String attemptId;
777
public int attemptNumber;
778
public ExecutionState state;
779
public Throwable failureCause;
780
public long[] stateTimestamps;
781
public Map<ExecutionState, Long> stateDurations = new HashMap<>();
782
public String taskManagerLocation;
783
}
784
}
785
```