CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-runtime

The runtime module of Apache Flink that provides the core execution engine, scheduling, fault tolerance, checkpointing, and resource management capabilities for distributed stream processing applications

Pending
Overview
Eval results
Files

task-execution.mddocs/

Task Execution

Distributed task execution engine with slot management and lifecycle coordination. TaskExecutors are the worker processes responsible for executing individual tasks and managing computational slots.

Capabilities

TaskExecutorGateway

RPC gateway interface for communication with TaskExecutor instances, enabling task deployment, slot management, and coordination.

/**
 * TaskExecutor RPC gateway interface for remote communication with TaskExecutor instances.
 * Provides methods for slot management, task lifecycle, and coordination.
 */
public interface TaskExecutorGateway 
    extends RpcGateway, TaskExecutorOperatorEventGateway, TaskExecutorThreadInfoGateway {
    
    /** Request a slot from the TaskManager */
    CompletableFuture<Acknowledge> requestSlot(
        SlotID slotId,
        JobID jobId,
        AllocationID allocationId,
        ResourceProfile resourceProfile,
        String targetAddress,
        ResourceManagerId resourceManagerId,
        Duration timeout
    );
    
    /** Submit a task for execution */
    CompletableFuture<Acknowledge> submitTask(
        TaskDeploymentDescriptor tdd,
        JobMasterId jobMasterId,
        Duration timeout
    );
    
    /** Update partitions for a task */
    CompletableFuture<Acknowledge> updatePartitions(
        ExecutionAttemptID executionAttemptID,
        Iterable<PartitionInfo> partitionInfos,
        Duration timeout
    );
    
    /** Release partitions */
    void releasePartitions(
        JobID jobId,
        Set<ResultPartitionID> partitionIds
    );
    
    /** Promote partitions */
    CompletableFuture<Acknowledge> promotePartitions(
        JobID jobId,
        Set<ResultPartitionID> partitionIds,
        Duration timeout
    );
    
    /** Cancel a task */
    CompletableFuture<Acknowledge> cancelTask(
        ExecutionAttemptID executionAttemptID,
        Duration timeout
    );
    
    /** Trigger checkpoint for specific task */
    CompletableFuture<Acknowledge> triggerCheckpoint(
        ExecutionAttemptID executionAttemptID,
        long checkpointId,
        long checkpointTimestamp,
        CheckpointOptions checkpointOptions,
        Duration timeout
    );
    
    /** Confirm checkpoint complete */
    CompletableFuture<Acknowledge> confirmCheckpoint(
        ExecutionAttemptID executionAttemptID,
        long checkpointId,
        long checkpointTimestamp,
        Duration timeout
    );
    
    /** Abort checkpoint for specific task */
    CompletableFuture<Acknowledge> abortCheckpoint(
        ExecutionAttemptID executionAttemptID,
        long checkpointId,
        long checkpointTimestamp,
        Duration timeout
    );
    
    /** Free allocated slot */
    CompletableFuture<Acknowledge> freeSlot(
        AllocationID allocationId,
        Throwable cause,
        Duration timeout
    );
    
    /** Request slot report */
    CompletableFuture<SlotReport> requestSlotReport(Duration timeout);
    
    /** Heartbeat from JobManager */
    void heartbeatFromJobManager(
        ResourceID resourceID,
        AllocatedSlotReport allocatedSlotReport
    );
    
    /** Heartbeat from ResourceManager */
    void heartbeatFromResourceManager(ResourceID resourceID);
    
    /** Disconnect JobManager */
    void disconnectJobManager(JobID jobId, Exception cause);
    
    /** Disconnect ResourceManager */
    void disconnectResourceManager(Exception cause);
    
    /** Request thread dump info */
    CompletableFuture<ThreadDumpInfo> requestThreadDump(Duration timeout);
    
    /** Request profiling info */
    CompletableFuture<Collection<ProfilingInfo>> requestProfiling(
        Duration timeout,
        ProfilingMode mode,
        Duration profilingDuration
    );
    
    /** Request log list */
    CompletableFuture<Collection<LogInfo>> requestLogList(Duration timeout);
    
    /** Request specific log file */
    CompletableFuture<TransientBlobKey> requestFileUpload(
        FileType fileType,
        Duration timeout
    );
}

Usage Examples:

// Request slot allocation
CompletableFuture<Acknowledge> slotFuture = taskExecutorGateway.requestSlot(
    new SlotID(resourceId, 0),           // slot ID
    jobId,                               // job ID
    allocationId,                        // allocation ID
    ResourceProfile.fromResources(2.0, 1024),  // resource requirements
    jobMasterAddress,                    // target address
    resourceManagerId,                   // resource manager ID
    Duration.ofMinutes(1)                // timeout
);

// Submit task for execution
TaskDeploymentDescriptor descriptor = new TaskDeploymentDescriptor(
    jobInformation,
    taskInformation,
    executionAttemptId,
    allocationId,
    subpartitionIndexRange,
    targetSlotNumber,
    taskStateSnapshot,
    inputGateDeploymentDescriptors,
    resultPartitionDeploymentDescriptors
);

CompletableFuture<Acknowledge> submitFuture = taskExecutorGateway.submitTask(
    descriptor,
    jobMasterId,
    Duration.ofMinutes(5)
);

// Trigger checkpoint
CompletableFuture<Acknowledge> checkpointFuture = taskExecutorGateway.triggerCheckpoint(
    executionAttemptId,
    checkpointId,
    System.currentTimeMillis(),
    CheckpointOptions.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
    Duration.ofSeconds(30)
);

TaskExecutor

Main TaskExecutor implementation responsible for executing tasks and managing computational slots on worker nodes.

/**
 * TaskExecutor implementation. The TaskExecutor is responsible for the execution of multiple
 * tasks and manages slots. It offers the slots to the ResourceManager and executes
 * tasks when the JobManager requests it.
 */
public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> implements TaskExecutorGateway {
    /** Start the TaskExecutor service */
    public void start();
    
    /** Get the TaskExecutor's resource ID */
    public ResourceID getResourceID();
    
    /** Get address of this TaskExecutor */
    public String getAddress();
    
    /** Get data port of this TaskExecutor */
    public int getDataPort();
    
    /** Get number of slots */
    public int getNumberOfSlots();
    
    /** Get hardware description */
    public HardwareDescription getHardwareDescription();
    
    /** Get memory configuration */
    public TaskExecutorMemoryConfiguration getMemoryConfiguration();
    
    /** Get network environment */
    public NetworkEnvironment getNetworkEnvironment();
    
    /** Get blob cache service */
    public BlobCacheService getBlobCacheService();
    
    /** Get slot table */
    public TaskSlotTable<Task> getSlotTable();
    
    /** Get job table */
    public JobTable getJobTable();
    
    /** Get job leader service */
    public JobLeaderService getJobLeaderService();
    
    /** Connect to ResourceManager */
    public void connectToResourceManager(
        ResourceManagerAddress resourceManagerAddress,
        ResourceID resourceManagerResourceId
    );
    
    /** Establish job manager connection */
    public void establishedJobManagerConnection(
        JobID jobId,
        JobMasterGateway jobMasterGateway,
        TaskManagerActions taskManagerActions
    );
    
    /** Close job manager connection */
    public void closeJobManagerConnection(JobID jobId, Exception cause);
    
    /** Get current task slot utilization */
    public SlotReport getCurrentSlotReport();
    
    /** Register timeout for slot */
    public void scheduleSlotTimeout(AllocationID allocationId, Duration timeout);
}

TaskManagerServices

Container for essential TaskManager services including network, memory management, and I/O.

/**
 * Container for the TaskManager services like network environment, memory manager,
 * IOManager, and related components.
 */
public class TaskManagerServices {
    /** Get the task manager configuration */
    public TaskManagerServicesConfiguration getTaskManagerServicesConfiguration();
    
    /** Get the network environment */
    public NetworkEnvironment getNetworkEnvironment();
    
    /** Get the shuffle environment */
    public ShuffleEnvironment<?, ?> getShuffleEnvironment();
    
    /** Get the KvState service */
    public KvStateService getKvStateService();
    
    /** Get the broadcast variable manager */
    public BroadcastVariableManager getBroadcastVariableManager();
    
    /** Get the task slot table */
    public TaskSlotTable<Task> getTaskSlotTable();
    
    /** Get the job table */
    public JobTable getJobTable();
    
    /** Get the job leader service */
    public JobLeaderService getJobLeaderService();
    
    /** Get the task state manager */
    public TaskExecutorLocalStateStoresManager getTaskManagerStateStore();
    
    /** Get the memory manager */
    public MemoryManager getMemoryManager();
    
    /** Get the IO manager */
    public IOManager getIOManager();
    
    /** Get the libraries cache manager */
    public BlobCacheService getLibraryCacheManager();
    
    /** Get the task manager metric group */
    public TaskManagerMetricGroup getTaskManagerMetricGroup();
    
    /** Get the executor service for async operations */
    public Executor getIOExecutor();
    
    /** Get the fatal error handler */
    public FatalErrorHandler getFatalErrorHandler();
    
    /** Get the partition tracker */
    public TaskExecutorPartitionTracker getPartitionTracker();
    
    /** Get the backpressure sample service */
    public BackPressureSampleService getBackPressureSampleService();
    
    /** Shutdown all services */
    public void shutDown();
}

SlotReport

Report containing information about the status and allocation of slots on a TaskExecutor.

/**
 * A slot report contains information about which slots are available and allocated
 * on a TaskManager.
 */
public class SlotReport implements Serializable {
    /** Create a new slot report */
    public SlotReport();
    
    /** Create slot report with initial slots */
    public SlotReport(Collection<SlotStatus> slotStatuses);
    
    /** Add slot status to the report */
    public void addSlotStatus(SlotStatus slotStatus);
    
    /** Get all slot statuses */
    public Collection<SlotStatus> getSlotsStatus();
    
    /** Get number of slots */
    public int getNumSlotStatus();
    
    /** Check if report is empty */
    public boolean isEmpty();
    
    /** Get iterator over slot statuses */
    public Iterator<SlotStatus> iterator();
}

/**
 * Status of a single slot on a TaskManager
 */
public class SlotStatus implements Serializable {
    /** Create slot status */
    public SlotStatus(
        SlotID slotID,
        ResourceProfile resourceProfile,
        JobID jobID,
        AllocationID allocationID
    );
    
    /** Get slot ID */
    public SlotID getSlotID();
    
    /** Get resource profile of this slot */
    public ResourceProfile getResourceProfile();
    
    /** Get job ID if slot is allocated */
    public JobID getJobID();
    
    /** Get allocation ID if slot is allocated */
    public AllocationID getAllocationID();
    
    /** Check if slot is allocated */
    public boolean isAllocated();
}

TaskDeploymentDescriptor

Complete descriptor containing all information necessary to deploy and execute a task on a TaskExecutor.

/**
 * A TaskDeploymentDescriptor contains all the information necessary to deploy a task
 * on a TaskManager.
 */
public class TaskDeploymentDescriptor implements Serializable {
    /** Create task deployment descriptor */
    public TaskDeploymentDescriptor(
        JobInformation jobInformation,
        TaskInformation taskInformation,
        ExecutionAttemptID executionAttemptID,
        AllocationID allocationID,
        SubpartitionIndexRange subpartitionIndexRange,
        int targetSlotNumber,
        TaskStateSnapshot taskStateSnapshot,
        List<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors,
        List<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors
    );
    
    /** Get job information */
    public JobInformation getJobInformation();
    
    /** Get task information */
    public TaskInformation getTaskInformation();
    
    /** Get execution attempt ID */
    public ExecutionAttemptID getExecutionAttemptID();
    
    /** Get allocation ID for the slot */
    public AllocationID getAllocationID();
    
    /** Get subpartition index range */
    public SubpartitionIndexRange getSubpartitionIndexRange();
    
    /** Get target slot number */
    public int getTargetSlotNumber();
    
    /** Get task state snapshot for recovery */
    public TaskStateSnapshot getTaskStateSnapshot();
    
    /** Get input gate deployment descriptors */
    public List<InputGateDeploymentDescriptor> getInputGateDeploymentDescriptors();
    
    /** Get result partition deployment descriptors */
    public List<ResultPartitionDeploymentDescriptor> getResultPartitionDeploymentDescriptors();
    
    /** Get produced partition IDs */
    public Collection<ResultPartitionID> getProducedPartitions();
    
    /** Get consumed partition IDs */
    public Collection<ResultPartitionID> getConsumedPartitions();
    
    /** Get job ID */
    public JobID getJobId();
    
    /** Get job vertex ID */
    public JobVertexID getJobVertexId();
    
    /** Get attempt number */
    public int getAttemptNumber();
    
    /** Get subtask index */
    public int getSubtaskIndex();
}

Usage Examples:

// Create and configure TaskManager services
TaskManagerServicesConfiguration serviceConfig = 
    TaskManagerServicesConfiguration.fromConfiguration(
        configuration,
        resourceId,
        externalAddress,
        localCommunicationOnly,
        taskManagerMetricGroup,
        tmpDirPaths
    );

TaskManagerServices taskManagerServices = TaskManagerServices.createTaskManagerServices(
    serviceConfig,
    resourceId,
    rpcService,
    highAvailabilityServices,
    heartbeatServices,
    metricRegistry,
    blobCacheService,
    localRecoveryDirectoryProvider,
    fatalErrorHandler
);

// Create and start TaskExecutor
TaskExecutor taskExecutor = new TaskExecutor(
    rpcService,
    taskManagerConfiguration,
    haServices,
    taskManagerServices,
    externalResourceInfoProvider,
    heartbeatServices,
    tokenManager,
    aggregateManager,
    fatalErrorHandler
);

taskExecutor.start();

// Monitor slot utilization
SlotReport slotReport = taskExecutor.getCurrentSlotReport();
for (SlotStatus slotStatus : slotReport.getSlotsStatus()) {
    System.out.println("Slot " + slotStatus.getSlotID() + 
                      " allocated: " + slotStatus.isAllocated());
    if (slotStatus.isAllocated()) {
        System.out.println("  Job: " + slotStatus.getJobID());
        System.out.println("  Allocation: " + slotStatus.getAllocationID());
    }
}

Types

// Slot and allocation identifiers
public class SlotID implements Serializable {
    public SlotID(ResourceID resourceId, int slotNumber);
    
    public ResourceID getResourceID();
    public int getSlotNumber();
}

public class AllocationID implements Serializable {
    public AllocationID();
    public AllocationID(byte[] bytes);
    public static AllocationID generate();
    
    public byte[] getBytes();
}

// Resource specifications
public class ResourceID implements Serializable {
    public ResourceID(String resourceId);
    public static ResourceID generate();
    
    public String getResourceIdString();
    public String getStringWithMetadata();
}

// Task execution states
public enum ExecutionState {
    CREATED,
    SCHEDULED,
    DEPLOYING,
    INITIALIZING, 
    RUNNING,
    FINISHED,
    CANCELING,
    CANCELED,
    FAILED;
    
    public boolean isTerminal();
    public boolean isRunning();
}

// Task manager configuration
public class TaskManagerConfiguration {
    public static TaskManagerConfiguration fromConfiguration(
        Configuration configuration,
        TaskManagerOptions.TaskManagerLoadBalanceMode loadBalanceMode,
        WorkerResourceSpec workerResourceSpec,
        InetAddress remoteAddress,
        boolean localCommunicationOnly
    );
    
    public String getTmpDirectoryPath();
    public Time getTaskCancellationTimeout();
    public Time getTaskCancellationInterval();
    public Duration getSlotTimeout();
    public boolean isExitJvmOnOutOfMemoryError();
    public float getNetworkBuffersMemoryFraction();
    public int getNetworkBuffersMemoryMin();
    public int getNetworkBuffersMemoryMax();
    public int getNetworkBuffersPerChannel();
    public int getFloatingNetworkBuffersPerGate();
    public Duration getPartitionRequestInitialBackoff();
    public Duration getPartitionRequestMaxBackoff();
    public int getNetworkRequestBackoffMultiplier();
}

// Hardware and memory descriptions
public class HardwareDescription implements Serializable {
    public HardwareDescription(
        int numberOfCPUCores,
        long sizeOfPhysicalMemory,
        long sizeOfJvmHeap,
        long sizeOfJvmDirectMemory
    );
    
    public int getNumberOfCPUCores();
    public long getSizeOfPhysicalMemory();
    public long getSizeOfJvmHeap();
    public long getSizeOfJvmDirectMemory();
}

public class TaskExecutorMemoryConfiguration {
    public MemorySize getFrameworkHeapSize();
    public MemorySize getFrameworkOffHeapSize();
    public MemorySize getTaskHeapSize();
    public MemorySize getTaskOffHeapSize();
    public MemorySize getNetworkMemorySize();
    public MemorySize getManagedMemorySize();
    public MemorySize getJvmMetaspaceSize();
    public MemorySize getJvmOverheadSize();
    public MemorySize getTotalFlinkMemorySize();
    public MemorySize getTotalProcessMemorySize();
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-runtime

docs

execution-scheduling.md

high-availability.md

index.md

job-graph.md

resource-management.md

state-checkpointing.md

task-execution.md

tile.json