or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-exchange.mdexecution-graph.mdhigh-availability.mdindex.mdjob-management.mdmessage-passing.mdmetrics.mdmini-cluster.mdrpc-framework.mdstate-management.mdtask-execution.md

execution-graph.mddocs/

0

# Execution Graph Access

1

2

The Execution Graph Access APIs provide read-only interfaces for inspecting and monitoring the runtime execution state of Flink jobs. These APIs enable external systems, web interfaces, and monitoring tools to access detailed information about job execution topology, task states, and performance metrics.

3

4

## Core Access Interfaces

5

6

### AccessExecutionGraph

7

8

Primary interface for read-only access to execution graph information, providing comprehensive job-level details.

9

10

```java { .api }

11

public interface AccessExecutionGraph {

12

JobID getJobID();

13

String getJobName();

14

JobStatus getState();

15

Throwable getFailureCause();

16

17

long getStatusTimestamp(JobStatus status);

18

boolean isStoppable();

19

20

StringifiedAccumulatorResult[] getAccumulatorResultsStringified();

21

Map<String, Object> getAccumulatorResults();

22

23

int getParallelism();

24

long getCreateTime();

25

26

Iterable<AccessExecutionJobVertex> getVerticesTopologically();

27

Iterable<AccessExecutionJobVertex> getAllVertices();

28

AccessExecutionJobVertex getJobVertex(JobVertexID id);

29

30

ArchivedExecutionConfig getArchivedExecutionConfig();

31

CheckpointCoordinatorConfiguration getCheckpointCoordinatorConfiguration();

32

JobCheckpointingConfiguration getCheckpointingConfiguration();

33

34

String getJsonPlan();

35

boolean isArchived();

36

}

37

```

38

39

### AccessExecutionJobVertex

40

41

Interface providing access to job vertex information and its parallel execution vertices.

42

43

```java { .api }

44

public interface AccessExecutionJobVertex {

45

JobVertexID getJobVertexId();

46

String getName();

47

int getParallelism();

48

int getMaxParallelism();

49

50

ResourceProfile getResourceProfile();

51

52

ExecutionState getAggregateState();

53

long getStateTimestamp(ExecutionState state);

54

55

AccessExecutionVertex[] getTaskVertices();

56

AccessExecutionVertex getTaskVertex(int subtask);

57

58

IOMetrics getIOMetrics();

59

60

Map<String, Accumulator<?, ?>> getAggregatedUserAccumulatorsStringified();

61

StringifiedAccumulatorResult[] getAggregatedUserAccumulatorsStringified();

62

}

63

```

64

65

### AccessExecutionVertex

66

67

Interface for accessing individual execution vertex (subtask) information.

68

69

```java { .api }

70

public interface AccessExecutionVertex {

71

AccessExecutionJobVertex getJobVertex();

72

int getParallelSubtaskIndex();

73

74

ExecutionState getExecutionState();

75

long getStateTimestamp(ExecutionState state);

76

Throwable getFailureCause();

77

78

AccessExecution getCurrentExecutionAttempt();

79

AccessExecution getPriorExecutionAttempt(int attemptNumber);

80

AccessExecution[] getCurrentExecutions();

81

82

int getCurrentAssignedResourceLocation();

83

TaskManagerLocation getAssignedResourceLocation();

84

}

85

```

86

87

### AccessExecution

88

89

Interface for accessing execution attempt information and metrics.

90

91

```java { .api }

92

public interface AccessExecution {

93

ExecutionAttemptID getAttemptId();

94

int getAttemptNumber();

95

96

ExecutionState getState();

97

TaskManagerLocation getAssignedResourceLocation();

98

Throwable getFailureCause();

99

100

long[] getStateTimestamps();

101

long getStateTimestamp(ExecutionState state);

102

103

StringifiedAccumulatorResult[] getUserAccumulatorsStringified();

104

105

int getParallelSubtaskIndex();

106

IOMetrics getIOMetrics();

107

108

TaskManagerLocation getCurrentAssignedResourceLocation();

109

}

110

```

111

112

## Archived Representations

113

114

### ArchivedExecutionGraph

115

116

Immutable snapshot of an execution graph that retains all information after job completion.

117

118

```java { .api }

119

public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializable {

120

public static ArchivedExecutionGraph createFrom(ExecutionGraph executionGraph);

121

122

@Override

123

public JobID getJobID();

124

125

@Override

126

public String getJobName();

127

128

@Override

129

public JobStatus getState();

130

131

@Override

132

public Throwable getFailureCause();

133

134

@Override

135

public long getStatusTimestamp(JobStatus status);

136

137

@Override

138

public StringifiedAccumulatorResult[] getAccumulatorResultsStringified();

139

140

@Override

141

public Map<String, Object> getAccumulatorResults();

142

143

@Override

144

public Iterable<AccessExecutionJobVertex> getVerticesTopologically();

145

146

@Override

147

public ArchivedExecutionConfig getArchivedExecutionConfig();

148

149

@Override

150

public String getJsonPlan();

151

152

@Override

153

public boolean isArchived();

154

155

public CompletableFuture<ArchivedExecutionGraph> archive(ClassLoader userCodeClassLoader);

156

}

157

```

158

159

### ArchivedExecutionJobVertex

160

161

Archived representation of a job vertex with complete execution information.

162

163

```java { .api }

164

public class ArchivedExecutionJobVertex implements AccessExecutionJobVertex, Serializable {

165

public ArchivedExecutionJobVertex(ExecutionJobVertex jobVertex);

166

167

@Override

168

public JobVertexID getJobVertexId();

169

170

@Override

171

public String getName();

172

173

@Override

174

public int getParallelism();

175

176

@Override

177

public int getMaxParallelism();

178

179

@Override

180

public ExecutionState getAggregateState();

181

182

@Override

183

public AccessExecutionVertex[] getTaskVertices();

184

185

@Override

186

public IOMetrics getIOMetrics();

187

188

@Override

189

public StringifiedAccumulatorResult[] getAggregatedUserAccumulatorsStringified();

190

}

191

```

192

193

### ArchivedExecutionVertex

194

195

Archived representation of an execution vertex with all attempt information.

196

197

```java { .api }

198

public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializable {

199

public ArchivedExecutionVertex(ExecutionVertex executionVertex);

200

201

@Override

202

public AccessExecutionJobVertex getJobVertex();

203

204

@Override

205

public int getParallelSubtaskIndex();

206

207

@Override

208

public ExecutionState getExecutionState();

209

210

@Override

211

public long getStateTimestamp(ExecutionState state);

212

213

@Override

214

public Throwable getFailureCause();

215

216

@Override

217

public AccessExecution getCurrentExecutionAttempt();

218

219

@Override

220

public TaskManagerLocation getAssignedResourceLocation();

221

}

222

```

223

224

## Configuration and Metrics Access

225

226

### ArchivedExecutionConfig

227

228

Archived execution configuration containing job-level settings and parameters.

229

230

```java { .api }

231

public class ArchivedExecutionConfig implements Serializable {

232

public ArchivedExecutionConfig(ExecutionConfig executionConfig);

233

234

public String getExecutionMode();

235

public int getParallelism();

236

public boolean getObjectReuseEnabled();

237

public long getAutoWatermarkInterval();

238

239

public RestartStrategy.RestartStrategyConfiguration getRestartStrategy();

240

public Map<String, String> getGlobalJobParameters();

241

242

public String getCodeAnalysisMode();

243

public long getDefaultBufferTimeout();

244

245

public boolean isTimestampsEnabled();

246

public boolean isLatencyTrackingEnabled();

247

public long getLatencyTrackingInterval();

248

249

public boolean isClosureCleanerEnabled();

250

public int getMaxParallelism();

251

}

252

```

253

254

### IOMetrics

255

256

Interface providing access to I/O metrics for tasks and operators.

257

258

```java { .api }

259

public interface IOMetrics extends Serializable {

260

long getNumBytesInLocal();

261

long getNumBytesInRemote();

262

long getNumBytesOut();

263

264

long getNumRecordsIn();

265

long getNumRecordsOut();

266

267

double getAvgRecordsInPerSec();

268

double getAvgRecordsOutPerSec();

269

270

double getAvgBytesInPerSec();

271

double getAvgBytesOutPerSec();

272

}

273

```

274

275

### StringifiedAccumulatorResult

276

277

Container for accumulator results that have been converted to string representations.

278

279

```java { .api }

280

public class StringifiedAccumulatorResult implements Serializable {

281

public StringifiedAccumulatorResult(String name, String type, String value);

282

283

public String getName();

284

public String getType();

285

public String getValue();

286

287

@Override

288

public String toString();

289

}

290

```

291

292

## Checkpoint Information Access

293

294

### JobCheckpointingConfiguration

295

296

Read-only access to job checkpointing configuration.

297

298

```java { .api }

299

public interface JobCheckpointingConfiguration extends Serializable {

300

long getCheckpointInterval();

301

long getCheckpointTimeout();

302

long getMinPauseBetweenCheckpoints();

303

int getMaxConcurrentCheckpoints();

304

305

CheckpointRetentionPolicy getCheckpointRetentionPolicy();

306

boolean isExactlyOnce();

307

boolean isCheckpointingEnabled();

308

boolean isUnalignedCheckpointsEnabled();

309

310

long getAlignmentTimeout();

311

int getTolerableCheckpointFailureNumber();

312

}

313

```

314

315

### CheckpointCoordinatorConfiguration

316

317

Configuration details for checkpoint coordination.

318

319

```java { .api }

320

public class CheckpointCoordinatorConfiguration implements Serializable {

321

public CheckpointCoordinatorConfiguration(

322

long checkpointInterval,

323

long checkpointTimeout,

324

long minPauseBetweenCheckpoints,

325

int maxConcurrentCheckpoints,

326

CheckpointRetentionPolicy retentionPolicy,

327

boolean isExactlyOnce,

328

boolean isUnalignedCheckpoint,

329

long alignmentTimeout,

330

int tolerableCheckpointFailureNumber

331

);

332

333

public long getCheckpointInterval();

334

public long getCheckpointTimeout();

335

public long getMinPauseBetweenCheckpoints();

336

public int getMaxConcurrentCheckpoints();

337

338

public CheckpointRetentionPolicy getCheckpointRetentionPolicy();

339

public boolean isExactlyOnce();

340

public boolean isUnalignedCheckpoint();

341

public long getAlignmentTimeout();

342

public int getTolerableCheckpointFailureNumber();

343

}

344

```

345

346

## Usage Examples

347

348

### Monitoring Job Execution State

349

350

```java

351

import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;

352

import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;

353

import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;

354

355

public class JobMonitor {

356

357

public void monitorJob(AccessExecutionGraph executionGraph) {

358

// Print job overview

359

System.out.println("=== Job Overview ===");

360

System.out.println("Job ID: " + executionGraph.getJobID());

361

System.out.println("Job Name: " + executionGraph.getJobName());

362

System.out.println("Job State: " + executionGraph.getState());

363

System.out.println("Parallelism: " + executionGraph.getParallelism());

364

System.out.println("Create Time: " + new Date(executionGraph.getCreateTime()));

365

366

// Check for failures

367

Throwable failureCause = executionGraph.getFailureCause();

368

if (failureCause != null) {

369

System.out.println("Failure Cause: " + failureCause.getMessage());

370

}

371

372

// Print execution configuration

373

ArchivedExecutionConfig execConfig = executionGraph.getArchivedExecutionConfig();

374

if (execConfig != null) {

375

System.out.println("Execution Mode: " + execConfig.getExecutionMode());

376

System.out.println("Object Reuse: " + execConfig.getObjectReuseEnabled());

377

System.out.println("Watermark Interval: " + execConfig.getAutoWatermarkInterval());

378

}

379

380

// Monitor vertices

381

System.out.println("\n=== Vertex Details ===");

382

for (AccessExecutionJobVertex vertex : executionGraph.getVerticesTopologically()) {

383

monitorJobVertex(vertex);

384

}

385

386

// Show accumulator results

387

showAccumulators(executionGraph);

388

}

389

390

private void monitorJobVertex(AccessExecutionJobVertex vertex) {

391

System.out.println("\nVertex: " + vertex.getName() + " (" + vertex.getJobVertexId() + ")");

392

System.out.println(" Parallelism: " + vertex.getParallelism());

393

System.out.println(" Max Parallelism: " + vertex.getMaxParallelism());

394

System.out.println(" Aggregate State: " + vertex.getAggregateState());

395

396

// Show I/O metrics

397

IOMetrics ioMetrics = vertex.getIOMetrics();

398

if (ioMetrics != null) {

399

System.out.println(" I/O Metrics:");

400

System.out.println(" Records In: " + ioMetrics.getNumRecordsIn());

401

System.out.println(" Records Out: " + ioMetrics.getNumRecordsOut());

402

System.out.println(" Bytes In: " + ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote());

403

System.out.println(" Bytes Out: " + ioMetrics.getNumBytesOut());

404

System.out.println(" Avg Records In/sec: " + String.format("%.2f", ioMetrics.getAvgRecordsInPerSec()));

405

System.out.println(" Avg Records Out/sec: " + String.format("%.2f", ioMetrics.getAvgRecordsOutPerSec()));

406

}

407

408

// Monitor individual subtasks

409

System.out.println(" Subtasks:");

410

AccessExecutionVertex[] taskVertices = vertex.getTaskVertices();

411

for (int i = 0; i < taskVertices.length; i++) {

412

AccessExecutionVertex taskVertex = taskVertices[i];

413

System.out.println(" Subtask " + i + ": " + taskVertex.getExecutionState() +

414

" (Attempt " + taskVertex.getCurrentExecutionAttempt().getAttemptNumber() + ")");

415

416

TaskManagerLocation location = taskVertex.getAssignedResourceLocation();

417

if (location != null) {

418

System.out.println(" Location: " + location.getHostname() + ":" + location.getDataPort());

419

}

420

}

421

}

422

423

private void showAccumulators(AccessExecutionGraph executionGraph) {

424

System.out.println("\n=== Accumulators ===");

425

StringifiedAccumulatorResult[] accumulators = executionGraph.getAccumulatorResultsStringified();

426

427

if (accumulators != null && accumulators.length > 0) {

428

for (StringifiedAccumulatorResult accumulator : accumulators) {

429

System.out.println(accumulator.getName() + " (" + accumulator.getType() + "): " +

430

accumulator.getValue());

431

}

432

} else {

433

System.out.println("No accumulators available");

434

}

435

}

436

}

437

```

438

439

### Checkpoint Information Inspector

440

441

```java

442

public class CheckpointInspector {

443

444

public void inspectCheckpointConfiguration(AccessExecutionGraph executionGraph) {

445

System.out.println("=== Checkpoint Configuration ===");

446

447

JobCheckpointingConfiguration checkpointConfig = executionGraph.getCheckpointingConfiguration();

448

if (checkpointConfig != null && checkpointConfig.isCheckpointingEnabled()) {

449

System.out.println("Checkpointing Enabled: " + checkpointConfig.isCheckpointingEnabled());

450

System.out.println("Checkpoint Interval: " + checkpointConfig.getCheckpointInterval() + " ms");

451

System.out.println("Checkpoint Timeout: " + checkpointConfig.getCheckpointTimeout() + " ms");

452

System.out.println("Min Pause Between Checkpoints: " + checkpointConfig.getMinPauseBetweenCheckpoints() + " ms");

453

System.out.println("Max Concurrent Checkpoints: " + checkpointConfig.getMaxConcurrentCheckpoints());

454

System.out.println("Exactly Once: " + checkpointConfig.isExactlyOnce());

455

System.out.println("Unaligned Checkpoints: " + checkpointConfig.isUnalignedCheckpointsEnabled());

456

System.out.println("Retention Policy: " + checkpointConfig.getCheckpointRetentionPolicy());

457

458

if (checkpointConfig.isUnalignedCheckpointsEnabled()) {

459

System.out.println("Alignment Timeout: " + checkpointConfig.getAlignmentTimeout() + " ms");

460

}

461

462

System.out.println("Tolerable Failures: " + checkpointConfig.getTolerableCheckpointFailureNumber());

463

464

} else {

465

System.out.println("Checkpointing is not enabled for this job");

466

}

467

468

CheckpointCoordinatorConfiguration coordConfig = executionGraph.getCheckpointCoordinatorConfiguration();

469

if (coordConfig != null) {

470

System.out.println("\n=== Coordinator Configuration ===");

471

System.out.println("Coordinator Interval: " + coordConfig.getCheckpointInterval() + " ms");

472

System.out.println("Coordinator Timeout: " + coordConfig.getCheckpointTimeout() + " ms");

473

}

474

}

475

}

476

```

477

478

### Performance Analytics

479

480

```java

481

public class PerformanceAnalyzer {

482

483

public JobPerformanceReport analyzePerformance(AccessExecutionGraph executionGraph) {

484

JobPerformanceReport report = new JobPerformanceReport();

485

486

// Analyze overall job metrics

487

report.jobId = executionGraph.getJobID().toString();

488

report.jobName = executionGraph.getJobName();

489

report.totalParallelism = executionGraph.getParallelism();

490

491

// Calculate job duration

492

long createTime = executionGraph.getCreateTime();

493

long finishTime = executionGraph.getStatusTimestamp(JobStatus.FINISHED);

494

if (finishTime > 0) {

495

report.totalDuration = finishTime - createTime;

496

}

497

498

// Analyze vertex performance

499

for (AccessExecutionJobVertex vertex : executionGraph.getVerticesTopologically()) {

500

VertexPerformance vertexPerf = analyzeVertexPerformance(vertex);

501

report.vertexPerformance.add(vertexPerf);

502

}

503

504

// Calculate throughput metrics

505

calculateThroughputMetrics(report, executionGraph);

506

507

return report;

508

}

509

510

private VertexPerformance analyzeVertexPerformance(AccessExecutionJobVertex vertex) {

511

VertexPerformance vertexPerf = new VertexPerformance();

512

vertexPerf.vertexId = vertex.getJobVertexId().toString();

513

vertexPerf.vertexName = vertex.getName();

514

vertexPerf.parallelism = vertex.getParallelism();

515

vertexPerf.state = vertex.getAggregateState();

516

517

// Analyze I/O metrics

518

IOMetrics ioMetrics = vertex.getIOMetrics();

519

if (ioMetrics != null) {

520

vertexPerf.totalRecordsIn = ioMetrics.getNumRecordsIn();

521

vertexPerf.totalRecordsOut = ioMetrics.getNumRecordsOut();

522

vertexPerf.totalBytesIn = ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote();

523

vertexPerf.totalBytesOut = ioMetrics.getNumBytesOut();

524

vertexPerf.avgRecordsInPerSec = ioMetrics.getAvgRecordsInPerSec();

525

vertexPerf.avgRecordsOutPerSec = ioMetrics.getAvgRecordsOutPerSec();

526

}

527

528

// Analyze subtask performance

529

AccessExecutionVertex[] taskVertices = vertex.getTaskVertices();

530

for (AccessExecutionVertex taskVertex : taskVertices) {

531

SubtaskPerformance subtaskPerf = analyzeSubtaskPerformance(taskVertex);

532

vertexPerf.subtasks.add(subtaskPerf);

533

}

534

535

return vertexPerf;

536

}

537

538

private SubtaskPerformance analyzeSubtaskPerformance(AccessExecutionVertex taskVertex) {

539

SubtaskPerformance subtaskPerf = new SubtaskPerformance();

540

subtaskPerf.subtaskIndex = taskVertex.getParallelSubtaskIndex();

541

subtaskPerf.state = taskVertex.getExecutionState();

542

543

// Get current execution attempt

544

AccessExecution execution = taskVertex.getCurrentExecutionAttempt();

545

subtaskPerf.attemptNumber = execution.getAttemptNumber();

546

subtaskPerf.attemptId = execution.getAttemptId().toString();

547

548

// Calculate execution duration

549

long[] stateTimestamps = execution.getStateTimestamps();

550

if (stateTimestamps != null) {

551

long startTime = stateTimestamps[ExecutionState.RUNNING.ordinal()];

552

long endTime = stateTimestamps[ExecutionState.FINISHED.ordinal()];

553

if (startTime > 0 && endTime > 0) {

554

subtaskPerf.executionDuration = endTime - startTime;

555

}

556

}

557

558

// Get I/O metrics for subtask

559

IOMetrics ioMetrics = execution.getIOMetrics();

560

if (ioMetrics != null) {

561

subtaskPerf.recordsProcessed = ioMetrics.getNumRecordsIn();

562

subtaskPerf.recordsProduced = ioMetrics.getNumRecordsOut();

563

}

564

565

// Get location information

566

TaskManagerLocation location = execution.getAssignedResourceLocation();

567

if (location != null) {

568

subtaskPerf.taskManagerHost = location.getHostname();

569

subtaskPerf.taskManagerPort = location.getDataPort();

570

}

571

572

return subtaskPerf;

573

}

574

575

private void calculateThroughputMetrics(JobPerformanceReport report, AccessExecutionGraph executionGraph) {

576

long totalRecordsProcessed = 0;

577

double totalAvgThroughput = 0;

578

int vertexCount = 0;

579

580

for (AccessExecutionJobVertex vertex : executionGraph.getAllVertices()) {

581

IOMetrics ioMetrics = vertex.getIOMetrics();

582

if (ioMetrics != null) {

583

totalRecordsProcessed += ioMetrics.getNumRecordsIn();

584

totalAvgThroughput += ioMetrics.getAvgRecordsInPerSec();

585

vertexCount++;

586

}

587

}

588

589

report.totalRecordsProcessed = totalRecordsProcessed;

590

if (vertexCount > 0) {

591

report.avgThroughputRecordsPerSec = totalAvgThroughput / vertexCount;

592

}

593

594

if (report.totalDuration > 0) {

595

report.overallThroughputRecordsPerSec = (double) totalRecordsProcessed / (report.totalDuration / 1000.0);

596

}

597

}

598

599

// Report data classes

600

public static class JobPerformanceReport {

601

public String jobId;

602

public String jobName;

603

public int totalParallelism;

604

public long totalDuration; // in milliseconds

605

public long totalRecordsProcessed;

606

public double avgThroughputRecordsPerSec;

607

public double overallThroughputRecordsPerSec;

608

public List<VertexPerformance> vertexPerformance = new ArrayList<>();

609

}

610

611

public static class VertexPerformance {

612

public String vertexId;

613

public String vertexName;

614

public int parallelism;

615

public ExecutionState state;

616

public long totalRecordsIn;

617

public long totalRecordsOut;

618

public long totalBytesIn;

619

public long totalBytesOut;

620

public double avgRecordsInPerSec;

621

public double avgRecordsOutPerSec;

622

public List<SubtaskPerformance> subtasks = new ArrayList<>();

623

}

624

625

public static class SubtaskPerformance {

626

public int subtaskIndex;

627

public ExecutionState state;

628

public int attemptNumber;

629

public String attemptId;

630

public long executionDuration; // in milliseconds

631

public long recordsProcessed;

632

public long recordsProduced;

633

public String taskManagerHost;

634

public int taskManagerPort;

635

}

636

}

637

```

638

639

## Common Patterns

640

641

### State Tracking and Alerting

642

643

```java

644

public class ExecutionStateTracker {

645

private final Map<JobID, JobStatus> lastKnownStates = new ConcurrentHashMap<>();

646

private final List<StateChangeListener> listeners = new ArrayList<>();

647

648

public interface StateChangeListener {

649

void onJobStateChanged(JobID jobId, JobStatus oldState, JobStatus newState, AccessExecutionGraph graph);

650

void onVertexStateChanged(JobID jobId, JobVertexID vertexId, ExecutionState oldState, ExecutionState newState);

651

}

652

653

public void trackExecution(AccessExecutionGraph executionGraph) {

654

JobID jobId = executionGraph.getJobID();

655

JobStatus currentState = executionGraph.getState();

656

JobStatus previousState = lastKnownStates.put(jobId, currentState);

657

658

// Notify job state changes

659

if (previousState != null && !previousState.equals(currentState)) {

660

for (StateChangeListener listener : listeners) {

661

listener.onJobStateChanged(jobId, previousState, currentState, executionGraph);

662

}

663

}

664

665

// Track vertex state changes

666

trackVertexStates(executionGraph);

667

}

668

669

private void trackVertexStates(AccessExecutionGraph executionGraph) {

670

for (AccessExecutionJobVertex vertex : executionGraph.getAllVertices()) {

671

ExecutionState vertexState = vertex.getAggregateState();

672

// Store and compare vertex states...

673

}

674

}

675

676

public void addListener(StateChangeListener listener) {

677

listeners.add(listener);

678

}

679

}

680

```

681

682

### Execution History Analysis

683

684

```java

685

public class ExecutionHistoryAnalyzer {

686

687

public ExecutionHistory analyzeExecutionHistory(AccessExecutionVertex vertex) {

688

ExecutionHistory history = new ExecutionHistory();

689

history.subtaskIndex = vertex.getParallelSubtaskIndex();

690

691

// Analyze current execution

692

AccessExecution currentExecution = vertex.getCurrentExecutionAttempt();

693

history.currentAttempt = analyzeExecution(currentExecution);

694

695

// Analyze previous attempts

696

int attemptNumber = 0;

697

AccessExecution priorExecution;

698

while ((priorExecution = vertex.getPriorExecutionAttempt(attemptNumber)) != null) {

699

ExecutionAttemptInfo attemptInfo = analyzeExecution(priorExecution);

700

history.previousAttempts.add(attemptInfo);

701

attemptNumber++;

702

}

703

704

// Calculate failure patterns

705

analyzeFailurePatterns(history);

706

707

return history;

708

}

709

710

private ExecutionAttemptInfo analyzeExecution(AccessExecution execution) {

711

ExecutionAttemptInfo info = new ExecutionAttemptInfo();

712

info.attemptId = execution.getAttemptId().toString();

713

info.attemptNumber = execution.getAttemptNumber();

714

info.state = execution.getState();

715

info.failureCause = execution.getFailureCause();

716

717

// Analyze state transitions

718

long[] timestamps = execution.getStateTimestamps();

719

if (timestamps != null) {

720

info.stateTimestamps = Arrays.copyOf(timestamps, timestamps.length);

721

722

// Calculate durations

723

calculateStateDurations(info, timestamps);

724

}

725

726

// Get resource information

727

TaskManagerLocation location = execution.getAssignedResourceLocation();

728

if (location != null) {

729

info.taskManagerLocation = location.getHostname() + ":" + location.getDataPort();

730

}

731

732

return info;

733

}

734

735

private void calculateStateDurations(ExecutionAttemptInfo info, long[] timestamps) {

736

ExecutionState[] states = ExecutionState.values();

737

Map<ExecutionState, Long> durations = new HashMap<>();

738

739

for (int i = 0; i < states.length - 1; i++) {

740

if (timestamps[i] > 0 && timestamps[i + 1] > 0) {

741

long duration = timestamps[i + 1] - timestamps[i];

742

durations.put(states[i], duration);

743

}

744

}

745

746

info.stateDurations = durations;

747

}

748

749

private void analyzeFailurePatterns(ExecutionHistory history) {

750

long totalFailures = history.previousAttempts.stream()

751

.filter(attempt -> attempt.state == ExecutionState.FAILED)

752

.count();

753

754

history.totalFailures = totalFailures;

755

756

// Analyze failure causes

757

Map<String, Long> failureReasons = history.previousAttempts.stream()

758

.filter(attempt -> attempt.failureCause != null)

759

.collect(Collectors.groupingBy(

760

attempt -> attempt.failureCause.getClass().getSimpleName(),

761

Collectors.counting()

762

));

763

764

history.failureReasonCounts = failureReasons;

765

}

766

767

public static class ExecutionHistory {

768

public int subtaskIndex;

769

public ExecutionAttemptInfo currentAttempt;

770

public List<ExecutionAttemptInfo> previousAttempts = new ArrayList<>();

771

public long totalFailures;

772

public Map<String, Long> failureReasonCounts = new HashMap<>();

773

}

774

775

public static class ExecutionAttemptInfo {

776

public String attemptId;

777

public int attemptNumber;

778

public ExecutionState state;

779

public Throwable failureCause;

780

public long[] stateTimestamps;

781

public Map<ExecutionState, Long> stateDurations = new HashMap<>();

782

public String taskManagerLocation;

783

}

784

}

785

```