Apache Flink runtime engine providing core distributed streaming dataflow execution, task scheduling, state management, and fault tolerance capabilities.
—
The Execution Graph Access APIs provide read-only interfaces for inspecting and monitoring the runtime execution state of Flink jobs. These APIs enable external systems, web interfaces, and monitoring tools to access detailed information about job execution topology, task states, and performance metrics.
Primary interface for read-only access to execution graph information, providing comprehensive job-level details.
public interface AccessExecutionGraph {
JobID getJobID();
String getJobName();
JobStatus getState();
Throwable getFailureCause();
long getStatusTimestamp(JobStatus status);
boolean isStoppable();
StringifiedAccumulatorResult[] getAccumulatorResultsStringified();
Map<String, Object> getAccumulatorResults();
int getParallelism();
long getCreateTime();
Iterable<AccessExecutionJobVertex> getVerticesTopologically();
Iterable<AccessExecutionJobVertex> getAllVertices();
AccessExecutionJobVertex getJobVertex(JobVertexID id);
ArchivedExecutionConfig getArchivedExecutionConfig();
CheckpointCoordinatorConfiguration getCheckpointCoordinatorConfiguration();
JobCheckpointingConfiguration getCheckpointingConfiguration();
String getJsonPlan();
boolean isArchived();
}Interface providing access to job vertex information and its parallel execution vertices.
public interface AccessExecutionJobVertex {
JobVertexID getJobVertexId();
String getName();
int getParallelism();
int getMaxParallelism();
ResourceProfile getResourceProfile();
ExecutionState getAggregateState();
long getStateTimestamp(ExecutionState state);
AccessExecutionVertex[] getTaskVertices();
AccessExecutionVertex getTaskVertex(int subtask);
IOMetrics getIOMetrics();
Map<String, Accumulator<?, ?>> getAggregatedUserAccumulatorsStringified();
StringifiedAccumulatorResult[] getAggregatedUserAccumulatorsStringified();
}Interface for accessing individual execution vertex (subtask) information.
public interface AccessExecutionVertex {
AccessExecutionJobVertex getJobVertex();
int getParallelSubtaskIndex();
ExecutionState getExecutionState();
long getStateTimestamp(ExecutionState state);
Throwable getFailureCause();
AccessExecution getCurrentExecutionAttempt();
AccessExecution getPriorExecutionAttempt(int attemptNumber);
AccessExecution[] getCurrentExecutions();
int getCurrentAssignedResourceLocation();
TaskManagerLocation getAssignedResourceLocation();
}Interface for accessing execution attempt information and metrics.
public interface AccessExecution {
ExecutionAttemptID getAttemptId();
int getAttemptNumber();
ExecutionState getState();
TaskManagerLocation getAssignedResourceLocation();
Throwable getFailureCause();
long[] getStateTimestamps();
long getStateTimestamp(ExecutionState state);
StringifiedAccumulatorResult[] getUserAccumulatorsStringified();
int getParallelSubtaskIndex();
IOMetrics getIOMetrics();
TaskManagerLocation getCurrentAssignedResourceLocation();
}Immutable snapshot of an execution graph that retains all information after job completion.
public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializable {
public static ArchivedExecutionGraph createFrom(ExecutionGraph executionGraph);
@Override
public JobID getJobID();
@Override
public String getJobName();
@Override
public JobStatus getState();
@Override
public Throwable getFailureCause();
@Override
public long getStatusTimestamp(JobStatus status);
@Override
public StringifiedAccumulatorResult[] getAccumulatorResultsStringified();
@Override
public Map<String, Object> getAccumulatorResults();
@Override
public Iterable<AccessExecutionJobVertex> getVerticesTopologically();
@Override
public ArchivedExecutionConfig getArchivedExecutionConfig();
@Override
public String getJsonPlan();
@Override
public boolean isArchived();
public CompletableFuture<ArchivedExecutionGraph> archive(ClassLoader userCodeClassLoader);
}Archived representation of a job vertex with complete execution information.
public class ArchivedExecutionJobVertex implements AccessExecutionJobVertex, Serializable {
public ArchivedExecutionJobVertex(ExecutionJobVertex jobVertex);
@Override
public JobVertexID getJobVertexId();
@Override
public String getName();
@Override
public int getParallelism();
@Override
public int getMaxParallelism();
@Override
public ExecutionState getAggregateState();
@Override
public AccessExecutionVertex[] getTaskVertices();
@Override
public IOMetrics getIOMetrics();
@Override
public StringifiedAccumulatorResult[] getAggregatedUserAccumulatorsStringified();
}Archived representation of an execution vertex with all attempt information.
public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializable {
public ArchivedExecutionVertex(ExecutionVertex executionVertex);
@Override
public AccessExecutionJobVertex getJobVertex();
@Override
public int getParallelSubtaskIndex();
@Override
public ExecutionState getExecutionState();
@Override
public long getStateTimestamp(ExecutionState state);
@Override
public Throwable getFailureCause();
@Override
public AccessExecution getCurrentExecutionAttempt();
@Override
public TaskManagerLocation getAssignedResourceLocation();
}Archived execution configuration containing job-level settings and parameters.
public class ArchivedExecutionConfig implements Serializable {
public ArchivedExecutionConfig(ExecutionConfig executionConfig);
public String getExecutionMode();
public int getParallelism();
public boolean getObjectReuseEnabled();
public long getAutoWatermarkInterval();
public RestartStrategy.RestartStrategyConfiguration getRestartStrategy();
public Map<String, String> getGlobalJobParameters();
public String getCodeAnalysisMode();
public long getDefaultBufferTimeout();
public boolean isTimestampsEnabled();
public boolean isLatencyTrackingEnabled();
public long getLatencyTrackingInterval();
public boolean isClosureCleanerEnabled();
public int getMaxParallelism();
}Interface providing access to I/O metrics for tasks and operators.
public interface IOMetrics extends Serializable {
long getNumBytesInLocal();
long getNumBytesInRemote();
long getNumBytesOut();
long getNumRecordsIn();
long getNumRecordsOut();
double getAvgRecordsInPerSec();
double getAvgRecordsOutPerSec();
double getAvgBytesInPerSec();
double getAvgBytesOutPerSec();
}Container for accumulator results that have been converted to string representations.
public class StringifiedAccumulatorResult implements Serializable {
public StringifiedAccumulatorResult(String name, String type, String value);
public String getName();
public String getType();
public String getValue();
@Override
public String toString();
}Read-only access to job checkpointing configuration.
public interface JobCheckpointingConfiguration extends Serializable {
long getCheckpointInterval();
long getCheckpointTimeout();
long getMinPauseBetweenCheckpoints();
int getMaxConcurrentCheckpoints();
CheckpointRetentionPolicy getCheckpointRetentionPolicy();
boolean isExactlyOnce();
boolean isCheckpointingEnabled();
boolean isUnalignedCheckpointsEnabled();
long getAlignmentTimeout();
int getTolerableCheckpointFailureNumber();
}Configuration details for checkpoint coordination.
public class CheckpointCoordinatorConfiguration implements Serializable {
public CheckpointCoordinatorConfiguration(
long checkpointInterval,
long checkpointTimeout,
long minPauseBetweenCheckpoints,
int maxConcurrentCheckpoints,
CheckpointRetentionPolicy retentionPolicy,
boolean isExactlyOnce,
boolean isUnalignedCheckpoint,
long alignmentTimeout,
int tolerableCheckpointFailureNumber
);
public long getCheckpointInterval();
public long getCheckpointTimeout();
public long getMinPauseBetweenCheckpoints();
public int getMaxConcurrentCheckpoints();
public CheckpointRetentionPolicy getCheckpointRetentionPolicy();
public boolean isExactlyOnce();
public boolean isUnalignedCheckpoint();
public long getAlignmentTimeout();
public int getTolerableCheckpointFailureNumber();
}import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
public class JobMonitor {
public void monitorJob(AccessExecutionGraph executionGraph) {
// Print job overview
System.out.println("=== Job Overview ===");
System.out.println("Job ID: " + executionGraph.getJobID());
System.out.println("Job Name: " + executionGraph.getJobName());
System.out.println("Job State: " + executionGraph.getState());
System.out.println("Parallelism: " + executionGraph.getParallelism());
System.out.println("Create Time: " + new Date(executionGraph.getCreateTime()));
// Check for failures
Throwable failureCause = executionGraph.getFailureCause();
if (failureCause != null) {
System.out.println("Failure Cause: " + failureCause.getMessage());
}
// Print execution configuration
ArchivedExecutionConfig execConfig = executionGraph.getArchivedExecutionConfig();
if (execConfig != null) {
System.out.println("Execution Mode: " + execConfig.getExecutionMode());
System.out.println("Object Reuse: " + execConfig.getObjectReuseEnabled());
System.out.println("Watermark Interval: " + execConfig.getAutoWatermarkInterval());
}
// Monitor vertices
System.out.println("\n=== Vertex Details ===");
for (AccessExecutionJobVertex vertex : executionGraph.getVerticesTopologically()) {
monitorJobVertex(vertex);
}
// Show accumulator results
showAccumulators(executionGraph);
}
private void monitorJobVertex(AccessExecutionJobVertex vertex) {
System.out.println("\nVertex: " + vertex.getName() + " (" + vertex.getJobVertexId() + ")");
System.out.println(" Parallelism: " + vertex.getParallelism());
System.out.println(" Max Parallelism: " + vertex.getMaxParallelism());
System.out.println(" Aggregate State: " + vertex.getAggregateState());
// Show I/O metrics
IOMetrics ioMetrics = vertex.getIOMetrics();
if (ioMetrics != null) {
System.out.println(" I/O Metrics:");
System.out.println(" Records In: " + ioMetrics.getNumRecordsIn());
System.out.println(" Records Out: " + ioMetrics.getNumRecordsOut());
System.out.println(" Bytes In: " + ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote());
System.out.println(" Bytes Out: " + ioMetrics.getNumBytesOut());
System.out.println(" Avg Records In/sec: " + String.format("%.2f", ioMetrics.getAvgRecordsInPerSec()));
System.out.println(" Avg Records Out/sec: " + String.format("%.2f", ioMetrics.getAvgRecordsOutPerSec()));
}
// Monitor individual subtasks
System.out.println(" Subtasks:");
AccessExecutionVertex[] taskVertices = vertex.getTaskVertices();
for (int i = 0; i < taskVertices.length; i++) {
AccessExecutionVertex taskVertex = taskVertices[i];
System.out.println(" Subtask " + i + ": " + taskVertex.getExecutionState() +
" (Attempt " + taskVertex.getCurrentExecutionAttempt().getAttemptNumber() + ")");
TaskManagerLocation location = taskVertex.getAssignedResourceLocation();
if (location != null) {
System.out.println(" Location: " + location.getHostname() + ":" + location.getDataPort());
}
}
}
private void showAccumulators(AccessExecutionGraph executionGraph) {
System.out.println("\n=== Accumulators ===");
StringifiedAccumulatorResult[] accumulators = executionGraph.getAccumulatorResultsStringified();
if (accumulators != null && accumulators.length > 0) {
for (StringifiedAccumulatorResult accumulator : accumulators) {
System.out.println(accumulator.getName() + " (" + accumulator.getType() + "): " +
accumulator.getValue());
}
} else {
System.out.println("No accumulators available");
}
}
}public class CheckpointInspector {
public void inspectCheckpointConfiguration(AccessExecutionGraph executionGraph) {
System.out.println("=== Checkpoint Configuration ===");
JobCheckpointingConfiguration checkpointConfig = executionGraph.getCheckpointingConfiguration();
if (checkpointConfig != null && checkpointConfig.isCheckpointingEnabled()) {
System.out.println("Checkpointing Enabled: " + checkpointConfig.isCheckpointingEnabled());
System.out.println("Checkpoint Interval: " + checkpointConfig.getCheckpointInterval() + " ms");
System.out.println("Checkpoint Timeout: " + checkpointConfig.getCheckpointTimeout() + " ms");
System.out.println("Min Pause Between Checkpoints: " + checkpointConfig.getMinPauseBetweenCheckpoints() + " ms");
System.out.println("Max Concurrent Checkpoints: " + checkpointConfig.getMaxConcurrentCheckpoints());
System.out.println("Exactly Once: " + checkpointConfig.isExactlyOnce());
System.out.println("Unaligned Checkpoints: " + checkpointConfig.isUnalignedCheckpointsEnabled());
System.out.println("Retention Policy: " + checkpointConfig.getCheckpointRetentionPolicy());
if (checkpointConfig.isUnalignedCheckpointsEnabled()) {
System.out.println("Alignment Timeout: " + checkpointConfig.getAlignmentTimeout() + " ms");
}
System.out.println("Tolerable Failures: " + checkpointConfig.getTolerableCheckpointFailureNumber());
} else {
System.out.println("Checkpointing is not enabled for this job");
}
CheckpointCoordinatorConfiguration coordConfig = executionGraph.getCheckpointCoordinatorConfiguration();
if (coordConfig != null) {
System.out.println("\n=== Coordinator Configuration ===");
System.out.println("Coordinator Interval: " + coordConfig.getCheckpointInterval() + " ms");
System.out.println("Coordinator Timeout: " + coordConfig.getCheckpointTimeout() + " ms");
}
}
}public class PerformanceAnalyzer {
public JobPerformanceReport analyzePerformance(AccessExecutionGraph executionGraph) {
JobPerformanceReport report = new JobPerformanceReport();
// Analyze overall job metrics
report.jobId = executionGraph.getJobID().toString();
report.jobName = executionGraph.getJobName();
report.totalParallelism = executionGraph.getParallelism();
// Calculate job duration
long createTime = executionGraph.getCreateTime();
long finishTime = executionGraph.getStatusTimestamp(JobStatus.FINISHED);
if (finishTime > 0) {
report.totalDuration = finishTime - createTime;
}
// Analyze vertex performance
for (AccessExecutionJobVertex vertex : executionGraph.getVerticesTopologically()) {
VertexPerformance vertexPerf = analyzeVertexPerformance(vertex);
report.vertexPerformance.add(vertexPerf);
}
// Calculate throughput metrics
calculateThroughputMetrics(report, executionGraph);
return report;
}
private VertexPerformance analyzeVertexPerformance(AccessExecutionJobVertex vertex) {
VertexPerformance vertexPerf = new VertexPerformance();
vertexPerf.vertexId = vertex.getJobVertexId().toString();
vertexPerf.vertexName = vertex.getName();
vertexPerf.parallelism = vertex.getParallelism();
vertexPerf.state = vertex.getAggregateState();
// Analyze I/O metrics
IOMetrics ioMetrics = vertex.getIOMetrics();
if (ioMetrics != null) {
vertexPerf.totalRecordsIn = ioMetrics.getNumRecordsIn();
vertexPerf.totalRecordsOut = ioMetrics.getNumRecordsOut();
vertexPerf.totalBytesIn = ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote();
vertexPerf.totalBytesOut = ioMetrics.getNumBytesOut();
vertexPerf.avgRecordsInPerSec = ioMetrics.getAvgRecordsInPerSec();
vertexPerf.avgRecordsOutPerSec = ioMetrics.getAvgRecordsOutPerSec();
}
// Analyze subtask performance
AccessExecutionVertex[] taskVertices = vertex.getTaskVertices();
for (AccessExecutionVertex taskVertex : taskVertices) {
SubtaskPerformance subtaskPerf = analyzeSubtaskPerformance(taskVertex);
vertexPerf.subtasks.add(subtaskPerf);
}
return vertexPerf;
}
private SubtaskPerformance analyzeSubtaskPerformance(AccessExecutionVertex taskVertex) {
SubtaskPerformance subtaskPerf = new SubtaskPerformance();
subtaskPerf.subtaskIndex = taskVertex.getParallelSubtaskIndex();
subtaskPerf.state = taskVertex.getExecutionState();
// Get current execution attempt
AccessExecution execution = taskVertex.getCurrentExecutionAttempt();
subtaskPerf.attemptNumber = execution.getAttemptNumber();
subtaskPerf.attemptId = execution.getAttemptId().toString();
// Calculate execution duration
long[] stateTimestamps = execution.getStateTimestamps();
if (stateTimestamps != null) {
long startTime = stateTimestamps[ExecutionState.RUNNING.ordinal()];
long endTime = stateTimestamps[ExecutionState.FINISHED.ordinal()];
if (startTime > 0 && endTime > 0) {
subtaskPerf.executionDuration = endTime - startTime;
}
}
// Get I/O metrics for subtask
IOMetrics ioMetrics = execution.getIOMetrics();
if (ioMetrics != null) {
subtaskPerf.recordsProcessed = ioMetrics.getNumRecordsIn();
subtaskPerf.recordsProduced = ioMetrics.getNumRecordsOut();
}
// Get location information
TaskManagerLocation location = execution.getAssignedResourceLocation();
if (location != null) {
subtaskPerf.taskManagerHost = location.getHostname();
subtaskPerf.taskManagerPort = location.getDataPort();
}
return subtaskPerf;
}
private void calculateThroughputMetrics(JobPerformanceReport report, AccessExecutionGraph executionGraph) {
long totalRecordsProcessed = 0;
double totalAvgThroughput = 0;
int vertexCount = 0;
for (AccessExecutionJobVertex vertex : executionGraph.getAllVertices()) {
IOMetrics ioMetrics = vertex.getIOMetrics();
if (ioMetrics != null) {
totalRecordsProcessed += ioMetrics.getNumRecordsIn();
totalAvgThroughput += ioMetrics.getAvgRecordsInPerSec();
vertexCount++;
}
}
report.totalRecordsProcessed = totalRecordsProcessed;
if (vertexCount > 0) {
report.avgThroughputRecordsPerSec = totalAvgThroughput / vertexCount;
}
if (report.totalDuration > 0) {
report.overallThroughputRecordsPerSec = (double) totalRecordsProcessed / (report.totalDuration / 1000.0);
}
}
// Report data classes
public static class JobPerformanceReport {
public String jobId;
public String jobName;
public int totalParallelism;
public long totalDuration; // in milliseconds
public long totalRecordsProcessed;
public double avgThroughputRecordsPerSec;
public double overallThroughputRecordsPerSec;
public List<VertexPerformance> vertexPerformance = new ArrayList<>();
}
public static class VertexPerformance {
public String vertexId;
public String vertexName;
public int parallelism;
public ExecutionState state;
public long totalRecordsIn;
public long totalRecordsOut;
public long totalBytesIn;
public long totalBytesOut;
public double avgRecordsInPerSec;
public double avgRecordsOutPerSec;
public List<SubtaskPerformance> subtasks = new ArrayList<>();
}
public static class SubtaskPerformance {
public int subtaskIndex;
public ExecutionState state;
public int attemptNumber;
public String attemptId;
public long executionDuration; // in milliseconds
public long recordsProcessed;
public long recordsProduced;
public String taskManagerHost;
public int taskManagerPort;
}
}public class ExecutionStateTracker {
private final Map<JobID, JobStatus> lastKnownStates = new ConcurrentHashMap<>();
private final List<StateChangeListener> listeners = new ArrayList<>();
public interface StateChangeListener {
void onJobStateChanged(JobID jobId, JobStatus oldState, JobStatus newState, AccessExecutionGraph graph);
void onVertexStateChanged(JobID jobId, JobVertexID vertexId, ExecutionState oldState, ExecutionState newState);
}
public void trackExecution(AccessExecutionGraph executionGraph) {
JobID jobId = executionGraph.getJobID();
JobStatus currentState = executionGraph.getState();
JobStatus previousState = lastKnownStates.put(jobId, currentState);
// Notify job state changes
if (previousState != null && !previousState.equals(currentState)) {
for (StateChangeListener listener : listeners) {
listener.onJobStateChanged(jobId, previousState, currentState, executionGraph);
}
}
// Track vertex state changes
trackVertexStates(executionGraph);
}
private void trackVertexStates(AccessExecutionGraph executionGraph) {
for (AccessExecutionJobVertex vertex : executionGraph.getAllVertices()) {
ExecutionState vertexState = vertex.getAggregateState();
// Store and compare vertex states...
}
}
public void addListener(StateChangeListener listener) {
listeners.add(listener);
}
}public class ExecutionHistoryAnalyzer {
public ExecutionHistory analyzeExecutionHistory(AccessExecutionVertex vertex) {
ExecutionHistory history = new ExecutionHistory();
history.subtaskIndex = vertex.getParallelSubtaskIndex();
// Analyze current execution
AccessExecution currentExecution = vertex.getCurrentExecutionAttempt();
history.currentAttempt = analyzeExecution(currentExecution);
// Analyze previous attempts
int attemptNumber = 0;
AccessExecution priorExecution;
while ((priorExecution = vertex.getPriorExecutionAttempt(attemptNumber)) != null) {
ExecutionAttemptInfo attemptInfo = analyzeExecution(priorExecution);
history.previousAttempts.add(attemptInfo);
attemptNumber++;
}
// Calculate failure patterns
analyzeFailurePatterns(history);
return history;
}
private ExecutionAttemptInfo analyzeExecution(AccessExecution execution) {
ExecutionAttemptInfo info = new ExecutionAttemptInfo();
info.attemptId = execution.getAttemptId().toString();
info.attemptNumber = execution.getAttemptNumber();
info.state = execution.getState();
info.failureCause = execution.getFailureCause();
// Analyze state transitions
long[] timestamps = execution.getStateTimestamps();
if (timestamps != null) {
info.stateTimestamps = Arrays.copyOf(timestamps, timestamps.length);
// Calculate durations
calculateStateDurations(info, timestamps);
}
// Get resource information
TaskManagerLocation location = execution.getAssignedResourceLocation();
if (location != null) {
info.taskManagerLocation = location.getHostname() + ":" + location.getDataPort();
}
return info;
}
private void calculateStateDurations(ExecutionAttemptInfo info, long[] timestamps) {
ExecutionState[] states = ExecutionState.values();
Map<ExecutionState, Long> durations = new HashMap<>();
for (int i = 0; i < states.length - 1; i++) {
if (timestamps[i] > 0 && timestamps[i + 1] > 0) {
long duration = timestamps[i + 1] - timestamps[i];
durations.put(states[i], duration);
}
}
info.stateDurations = durations;
}
private void analyzeFailurePatterns(ExecutionHistory history) {
long totalFailures = history.previousAttempts.stream()
.filter(attempt -> attempt.state == ExecutionState.FAILED)
.count();
history.totalFailures = totalFailures;
// Analyze failure causes
Map<String, Long> failureReasons = history.previousAttempts.stream()
.filter(attempt -> attempt.failureCause != null)
.collect(Collectors.groupingBy(
attempt -> attempt.failureCause.getClass().getSimpleName(),
Collectors.counting()
));
history.failureReasonCounts = failureReasons;
}
public static class ExecutionHistory {
public int subtaskIndex;
public ExecutionAttemptInfo currentAttempt;
public List<ExecutionAttemptInfo> previousAttempts = new ArrayList<>();
public long totalFailures;
public Map<String, Long> failureReasonCounts = new HashMap<>();
}
public static class ExecutionAttemptInfo {
public String attemptId;
public int attemptNumber;
public ExecutionState state;
public Throwable failureCause;
public long[] stateTimestamps;
public Map<ExecutionState, Long> stateDurations = new HashMap<>();
public String taskManagerLocation;
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-runtime-2-10