CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-runtime

The runtime module of Apache Flink that provides the core execution engine, scheduling, fault tolerance, checkpointing, and resource management capabilities for distributed stream processing applications

Pending
Overview
Eval results
Files

execution-scheduling.mddocs/

Execution and Scheduling

Advanced scheduling system for distributed job execution with support for batch and streaming workloads. The execution layer transforms JobGraphs into ExecutionGraphs and coordinates task execution across the cluster.

Capabilities

SchedulerNG

Main interface for job scheduling with support for different scheduling strategies including adaptive scheduling.

/**
 * Interface for scheduling Flink jobs. Implementations receive a JobGraph when instantiated
 * and coordinate the distributed execution of the job.
 */
public interface SchedulerNG extends GlobalFailureHandler, AutoCloseableAsync {
    /** Start scheduling the job */
    void startScheduling();
    
    /** Cancel the job execution */
    void cancel();
    
    /** Get future that completes when job terminates */
    CompletableFuture<JobStatus> getJobTerminationFuture();
    
    /** Update task execution state from TaskExecutor */
    boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState);
    
    /** Request next input split for a task */
    SerializedInputSplit requestNextInputSplit(
        JobVertexID vertexID,
        ExecutionAttemptID executionAttempt
    );
    
    /** Trigger a checkpoint */
    CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointType checkpointType);
    
    /** Stop job with savepoint */
    CompletableFuture<String> stopWithSavepoint(
        String targetDirectory,
        boolean terminate,
        SavepointFormatType formatType
    );
    
    /** Trigger savepoint */
    CompletableFuture<String> triggerSavepoint(
        String targetDirectory,
        SavepointFormatType formatType
    );
    
    /** Deliver coordination events to operator coordinators */
    CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
        OperatorID operatorId,
        CoordinationRequest request
    );
    
    /** Handle operator event from task */
    void handleOperatorEvent(
        ExecutionAttemptID task,
        OperatorID operatorId,
        OperatorEvent evt
    );
    
    /** Get current job status */
    JobStatus getJobStatus();
    
    /** Suspend the job */
    CompletableFuture<Void> suspend(Throwable cause);
    
    /** Close the scheduler */
    CompletableFuture<Void> closeAsync();
}

Usage Examples:

// Create scheduler configuration
SchedulerNGFactory schedulerFactory = new DefaultSchedulerFactory();
ComponentMainThreadExecutor mainThreadExecutor = // ... get executor

// Create scheduler
SchedulerNG scheduler = schedulerFactory.createInstance(
    log,
    jobGraph,
    ioExecutor,
    jobMasterConfiguration,
    slotPoolServiceFactory,
    mainThreadExecutor,
    heartbeatServices,
    jobManagerJobMetricGroup,
    shuffleMaster,
    jobMasterPartitionTracker,
    executionGraph -> {},  // execution graph handler
    fatalErrorHandler
);

// Start scheduling
scheduler.startScheduling();

// Trigger checkpoint
CompletableFuture<CompletedCheckpoint> checkpointFuture = 
    scheduler.triggerCheckpoint(CheckpointType.CHECKPOINT);

// Stop with savepoint
CompletableFuture<String> savepointFuture = scheduler.stopWithSavepoint(
    "hdfs://cluster/savepoints/",
    true,  // terminate after savepoint
    SavepointFormatType.CANONICAL
);

ExecutionGraph

The execution graph represents the parallel execution of a job, containing detailed information about tasks, their current state, and execution history.

/**
 * The execution graph is the central data structure that coordinates the distributed 
 * execution of a data flow. It keeps representations of each parallel task, each 
 * intermediate stream, and the communication between them.
 */
public interface ExecutionGraph {
    /** Get the job name */
    String getJobName();
    
    /** Get the job ID */
    JobID getJobID();
    
    /** Get current job status */
    JobStatus getState();
    
    /** Get job vertex by ID */
    ExecutionJobVertex getJobVertex(JobVertexID id);
    
    /** Get all job vertices */
    Map<JobVertexID, ExecutionJobVertex> getAllVertices();
    
    /** Get vertices in topological order */
    Iterable<ExecutionJobVertex> getVerticesTopologically();
    
    /** Get all execution vertices */
    Iterable<ExecutionVertex> getAllExecutionVertices();
    
    /** Get total number of vertices */
    int getTotalNumberOfVertices();
    
    /** Get checkpoint coordinator */
    CheckpointCoordinator getCheckpointCoordinator();
    
    /** Get job configuration */
    Configuration getJobConfiguration();
    
    /** Get job class loader */
    ClassLoader getUserClassLoader();
    
    /** Get execution config */
    ExecutionConfig getExecutionConfig();
    
    /** Get JSON execution plan */
    String getJsonPlan();
    
    /** Get failure cause if job failed */
    Throwable getFailureCause();
    
    /** Get job start timestamp */
    long getStatusTimestamp(JobStatus status);
    
    /** Get job state history */
    List<ExecutionGraphHistoryEntry> getHistoryEntries();
    
    /** Check if job is stoppable */
    boolean isStoppable();
    
    /** Get KV state location registry */
    KvStateLocationRegistry getKvStateLocationRegistry();
    
    /** Enable checkpointing */
    void enableCheckpointing(
        CheckpointCoordinatorConfiguration chkConfig,
        List<MasterTriggerRestoreHook<?>> masterHooks,
        CheckpointIDCounter checkpointIDCounter,
        CompletedCheckpointStore checkpointStore,
        StateBackend checkpointStateBackend,
        CheckpointStorage checkpointStorage,
        CheckpointStatsTracker statsTracker,
        CheckpointsCleaner checkpointsCleaner
    );
}

ExecutionJobVertex

Represents a job vertex during execution, managing all parallel subtasks (ExecutionVertices) for a single operation.

/**
 * Represents one vertex from the JobGraph during execution. Holds the aggregated 
 * state of all parallel subtasks.
 */
public class ExecutionJobVertex {
    /** Get the job vertex ID */
    public JobVertexID getJobVertexId();
    
    /** Get the job vertex name */
    public String getName();
    
    /** Get current parallelism */
    public int getParallelism();
    
    /** Get maximum parallelism */
    public int getMaxParallelism();
    
    /** Get resource profile */
    public ResourceProfile getResourceProfile();
    
    /** Get all task vertices (parallel subtasks) */
    public ExecutionVertex[] getTaskVertices();
    
    /** Get specific task vertex by subtask index */
    public ExecutionVertex getTaskVertex(int subtask);
    
    /** Get operator coordinators */
    public Collection<OperatorCoordinatorHolder> getOperatorCoordinators();
    
    /** Get produced data sets */
    public IntermediateResult[] getProducedDataSets();
    
    /** Get input edges */
    public List<IntermediateResult> getInputs();
    
    /** Get aggregated task resource profile */
    public ResourceProfile getAggregatedTaskResourceProfile();
    
    /** Get slot sharing group */
    public SlotSharingGroup getSlotSharingGroup();
    
    /** Get co-location group */
    public CoLocationGroup getCoLocationGroup();
    
    /** Get vertex execution state */
    public ExecutionState getAggregateState();
    
    /** Check if vertex is finished */
    public boolean isFinished();
    
    /** Get input split assigner */
    public InputSplitAssigner getSplitAssigner();
}

ExecutionVertex

Represents one parallel subtask of an ExecutionJobVertex, containing execution attempts and current state.

/**
 * Represents one parallel subtask. For each ExecutionJobVertex, there are as many 
 * ExecutionVertices as the parallelism.
 */
public class ExecutionVertex {
    /** Get the job vertex this belongs to */
    public ExecutionJobVertex getJobVertex();
    
    /** Get subtask index */
    public int getParallelSubtaskIndex();
    
    /** Get current execution attempt */
    public Execution getCurrentExecutionAttempt();
    
    /** Get prior execution attempts */
    public ExecutionHistory getPriorExecutionHistory();
    
    /** Get execution state */
    public ExecutionState getExecutionState();
    
    /** Get failure cause if failed */
    public Throwable getFailureCause();
    
    /** Get assigned slot */
    public LogicalSlot getCurrentAssignedResource();
    
    /** Get current assigned resource location */
    public TaskManagerLocation getCurrentAssignedResourceLocation();
    
    /** Get preferred locations for scheduling */
    public CompletableFuture<Collection<TaskManagerLocation>> getPreferredLocationsBasedOnInputs();
    
    /** Check if execution is finished */
    public boolean isFinished();
    
    /** Get task name with subtask info */
    public String getTaskNameWithSubtaskIndex();
    
    /** Create deployment descriptor */
    public TaskDeploymentDescriptor createDeploymentDescriptor(
        ExecutionAttemptID executionId,
        LogicalSlot slot,
        TaskManagerGateway taskManagerGateway,
        int attemptNumber
    );
    
    /** Reset for new execution attempt */
    public void resetForNewExecution();
}

Execution

Represents one attempt to execute an ExecutionVertex, tracking deployment, state transitions, and task lifecycle.

/**
 * One attempt to execute an ExecutionVertex. There may be multiple Executions 
 * for each ExecutionVertex in case of failures or restarts.
 */
public class Execution {
    /** Get execution attempt ID */
    public ExecutionAttemptID getAttemptId();
    
    /** Get execution vertex this belongs to */
    public ExecutionVertex getVertex();
    
    /** Get execution state */
    public ExecutionState getState();
    
    /** Get assigned logical slot */
    public LogicalSlot getAssignedResource();
    
    /** Get assigned resource location */
    public TaskManagerLocation getAssignedResourceLocation();
    
    /** Get failure cause if failed */
    public Throwable getFailureCause();
    
    /** Get state timestamps */
    public long[] getStateTimestamps();
    
    /** Get state timestamp for specific state */
    public long getStateTimestamp(ExecutionState state);
    
    /** Get state end timestamp for specific state */
    public long getStateEndTimestamp(ExecutionState state);
    
    /** Deploy execution to assigned TaskManager */
    public CompletableFuture<Void> deploy();
    
    /** Cancel execution */
    public void cancel();
    
    /** Fail execution with cause */
    public void fail(Throwable t);
    
    /** Mark execution as finished */
    public void markFinished();
    
    /** Update execution state */
    public boolean updateState(TaskExecutionState state);
    
    /** Trigger checkpoint for this execution */
    public void triggerCheckpoint(
        long checkpointId,
        long timestamp,
        CheckpointOptions checkpointOptions
    );
    
    /** Notify checkpoint complete */
    public void notifyCheckpointComplete(
        long checkpointId,
        long timestamp
    );
    
    /** Notify checkpoint aborted */
    public void notifyCheckpointAborted(
        long checkpointId,
        long timestamp
    );
}

Usage Examples:

// Access execution graph information
ExecutionGraph executionGraph = scheduler.getExecutionGraph();

// Get job status
JobStatus status = executionGraph.getState();
System.out.println("Job status: " + status);

// Iterate through vertices
for (ExecutionJobVertex jobVertex : executionGraph.getVerticesTopologically()) {
    System.out.println("Vertex: " + jobVertex.getName() + 
                      ", Parallelism: " + jobVertex.getParallelism());
    
    // Check individual subtasks
    for (ExecutionVertex execVertex : jobVertex.getTaskVertices()) {
        ExecutionState state = execVertex.getExecutionState();
        System.out.println("  Subtask " + execVertex.getParallelSubtaskIndex() + 
                          ": " + state);
    }
}

// Get checkpoint coordinator
CheckpointCoordinator coordinator = executionGraph.getCheckpointCoordinator();
if (coordinator != null) {
    CompletedCheckpointStore store = coordinator.getCheckpointStore();
    System.out.println("Latest checkpoint: " + store.getLatestCheckpoint());
}

Types

// Execution identifiers
public class ExecutionAttemptID implements Serializable {
    public ExecutionAttemptID();
    public ExecutionAttemptID(ExecutionVertexID vertexId, int attemptNumber);
    public static ExecutionAttemptID randomId();
    
    public ExecutionVertexID getExecutionVertexId();
    public int getAttemptNumber();
}

public class ExecutionVertexID implements Serializable {
    public ExecutionVertexID(JobVertexID jobVertexId, int subtaskIndex);
    
    public JobVertexID getJobVertexId();
    public int getSubtaskIndex();
}

// Execution states
public enum ExecutionState {
    CREATED,
    SCHEDULED,
    DEPLOYING,
    INITIALIZING,
    RUNNING,
    FINISHED,
    CANCELING,
    CANCELED,
    FAILED;
    
    public boolean isTerminal();
    public boolean isRunning();
}

// Task execution state transition
public class TaskExecutionStateTransition implements Serializable {
    public TaskExecutionStateTransition(TaskExecutionState taskExecutionState);
    
    public ExecutionAttemptID getID();
    public ExecutionState getExecutionState();
    public Throwable getError();
    public TaskExecutionState getTaskExecutionState();
}

// Scheduler configurations
public class DefaultSchedulerComponents {
    public static DefaultSchedulerComponents createSchedulerComponents(
        JobType jobType,
        boolean isApproximateLocalRecoveryEnabled,
        Configuration jobMasterConfiguration,
        SlotPool slotPool,
        Duration slotRequestTimeout
    );
    
    public ExecutionSlotAllocatorFactory getAllocatorFactory();
    public RestartStrategy getRestartStrategy();
    public ExecutionVertexVersioner getVersioner();
}

// Scheduling strategies
public enum SchedulingStrategy {
    LEGACY_SCHEDULER,
    DEFAULT,
    ADAPTIVE_BATCH
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-runtime

docs

execution-scheduling.md

high-availability.md

index.md

job-graph.md

resource-management.md

state-checkpointing.md

task-execution.md

tile.json