CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-runtime-2-10

Apache Flink runtime engine providing core distributed streaming dataflow execution, task scheduling, state management, and fault tolerance capabilities.

Pending
Overview
Eval results
Files

task-execution.mddocs/

Task Execution Framework

The task execution framework provides the foundation for implementing and executing user-defined tasks in the Flink runtime. This framework handles task lifecycle management, resource access, and environment setup for all executable components in a Flink job.

Core Task Components

AbstractInvokable

The base class for all executable tasks in Flink runtime. All user tasks must extend this class to integrate with the execution framework.

public abstract class AbstractInvokable {
    public AbstractInvokable(Environment environment);
    
    public abstract void invoke() throws Exception;
    public void cancel() throws Exception;
    
    public final Environment getEnvironment();
    public final String getTaskNameWithSubtasks();
    public final Configuration getTaskConfiguration();
    public final Configuration getJobConfiguration();
    
    protected final ClassLoader getUserCodeClassLoader();
    protected final MemoryManager getMemoryManager();
    protected final IOManager getIOManager();
    protected final BroadcastVariableManager getBroadcastVariableManager();
    protected final TaskKvStateRegistry getKvStateRegistry();
}

Task Environment

Environment

Provides tasks access to runtime resources including memory managers, IO channels, configuration, and task metadata.

public interface Environment {
    JobID getJobID();
    JobVertexID getJobVertexId();
    ExecutionAttemptID getExecutionId();
    
    TaskInfo getTaskInfo();
    Configuration getTaskConfiguration();
    Configuration getJobConfiguration();
    
    ClassLoader getUserClassLoader();
    MemoryManager getMemoryManager();
    IOManager getIOManager();
    
    InputSplitProvider getInputSplitProvider();
    ResultPartitionWriter getWriter(int index);
    InputGate getInputGate(int index);
    InputGate[] getAllInputGates();
    ResultPartitionWriter[] getAllWriters();
    
    TaskEventDispatcher getTaskEventDispatcher();
    BroadcastVariableManager getBroadcastVariableManager();
    TaskStateManager getTaskStateManager();
    
    AccumulatorRegistry getAccumulatorRegistry();
    TaskKvStateRegistry getKvStateRegistry();
    
    void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics);
    void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot subtaskState);
    void declineCheckpoint(long checkpointId, Throwable cause);
    
    void failExternally(Throwable cause);
}

TaskInfo

Contains metadata about the task instance, including parallelism and indexing information.

public class TaskInfo implements Serializable {
    public TaskInfo(String taskName, int maxNumberOfParallelSubtasks, int indexOfThisSubtask, 
                    int numberOfParallelSubtasks, int attemptNumber);
    
    public String getTaskName();
    public String getTaskNameWithSubtasks();
    public String getAllocationIDAsString();
    
    public int getMaxNumberOfParallelSubtasks();
    public int getIndexOfThisSubtask();
    public int getNumberOfParallelSubtasks();
    public int getAttemptNumber();
}

Input Split Processing

InputSplitProvider

Interface for providing input splits to tasks for parallel data processing.

public interface InputSplitProvider {
    InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException;
}

InputSplit

Base interface for input splits that define portions of input data.

public interface InputSplit extends Serializable {
    int getSplitNumber();
}

Task States and Lifecycle

ExecutionState

Enumeration of task execution states throughout the task lifecycle.

public enum ExecutionState {
    CREATED,       // Task has been created but not scheduled
    SCHEDULED,     // Task has been scheduled for execution
    DEPLOYING,     // Task is being deployed to TaskManager
    RUNNING,       // Task is actively executing
    FINISHED,      // Task completed successfully
    CANCELING,     // Task is being cancelled
    CANCELED,      // Task has been cancelled
    FAILED;        // Task failed during execution
    
    public boolean isTerminal();
    public boolean isSuccess();
}

Resource Management

MemoryManager

Manages memory allocation and deallocation for tasks, providing memory segments for data processing.

public abstract class MemoryManager {
    public abstract MemorySegment allocatePages(Object owner, int numPages) throws MemoryAllocationException;
    public abstract void releasePages(Object owner, MemorySegment... pages);
    public abstract void releaseAllPages(Object owner);
    
    public abstract int getPageSize();
    public abstract long getMemorySize();
    public abstract int computeNumberOfPages(long numBytes);
}

IOManager

Manages disk I/O operations for spilling data to disk during processing.

public abstract class IOManager implements AutoCloseable {
    public abstract FileIOChannel.ID createChannel() throws IOException;
    public abstract FileIOChannel.Enumerator createChannelEnumerator() throws IOException;
    
    public abstract BufferFileReader createBufferFileReader(FileIOChannel.ID channelID, RequestDoneCallback<Buffer> callback) throws IOException;
    public abstract BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) throws IOException;
    
    public abstract BlockChannelReader<MemorySegment> createBlockChannelReader(FileIOChannel.ID channel) throws IOException;
    public abstract BlockChannelWriter<MemorySegment> createBlockChannelWriter(FileIOChannel.ID channel) throws IOException;
    
    public abstract boolean isProperlyShutDown();
    public abstract void shutdown();
}

Task Communication

ResultPartitionWriter

Interface for writing output data to result partitions for downstream consumption.

public interface ResultPartitionWriter extends AutoCloseable {
    ResultPartition getPartition();
    
    BufferBuilder getBufferBuilder() throws IOException, InterruptedException;
    void flushAll();
    void flushAllSubpartitions(boolean finishProducers);
    
    void fail(Throwable throwable);
    void finish() throws IOException;
}

InputGate

Interface for reading input data from upstream tasks.

public abstract class InputGate implements AutoCloseable {
    public abstract int getNumberOfInputChannels();
    public abstract boolean isFinished();
    
    public abstract Optional<BufferOrEvent> getNext() throws IOException, InterruptedException;
    public abstract Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException;
    
    public abstract void sendTaskEvent(TaskEvent event) throws IOException;
    public abstract void registerListener(InputGateListener listener);
    
    public abstract int getPageSize();
}

Exception Handling

RuntimeTaskException

Base exception for task execution failures.

public class RuntimeTaskException extends RuntimeException {
    public RuntimeTaskException(String message);
    public RuntimeTaskException(String message, Throwable cause);
}

InputSplitProviderException

Exception thrown when input split provision fails.

public class InputSplitProviderException extends Exception {
    public InputSplitProviderException(String message);
    public InputSplitProviderException(String message, Throwable cause);
}

Usage Examples

Implementing a Custom Task

import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.execution.Environment;

public class DataProcessingTask extends AbstractInvokable {
    
    public DataProcessingTask(Environment environment) {
        super(environment);
    }
    
    @Override
    public void invoke() throws Exception {
        // Get task information
        TaskInfo taskInfo = getEnvironment().getTaskInfo();
        System.out.println("Starting task: " + taskInfo.getTaskNameWithSubtasks());
        System.out.println("Parallel subtask " + taskInfo.getIndexOfThisSubtask() + 
                          " of " + taskInfo.getNumberOfParallelSubtasks());
        
        // Access configuration
        Configuration taskConfig = getTaskConfiguration();
        String inputPath = taskConfig.getString("input.path", "");
        
        // Get resource managers
        MemoryManager memoryManager = getMemoryManager();
        IOManager ioManager = getIOManager();
        
        // Allocate memory for processing
        MemorySegment memory = memoryManager.allocatePages(this, 10);
        
        try {
            // Process input splits
            InputSplitProvider splitProvider = getEnvironment().getInputSplitProvider();
            InputSplit split;
            
            while ((split = splitProvider.getNextInputSplit(getUserCodeClassLoader())) != null) {
                processInputSplit(split, memory);
            }
            
            // Write results
            ResultPartitionWriter[] writers = getEnvironment().getAllWriters();
            for (ResultPartitionWriter writer : writers) {
                // Write output data
                writeResults(writer);
                writer.finish();
            }
            
        } finally {
            // Clean up resources
            memoryManager.releaseAllPages(this);
        }
    }
    
    @Override
    public void cancel() throws Exception {
        // Handle task cancellation
        System.out.println("Task cancelled");
    }
    
    private void processInputSplit(InputSplit split, MemorySegment memory) throws Exception {
        // Custom processing logic
        System.out.println("Processing split: " + split.getSplitNumber());
    }
    
    private void writeResults(ResultPartitionWriter writer) throws Exception {
        // Write output logic
        BufferBuilder buffer = writer.getBufferBuilder();
        // ... write data to buffer
        writer.flushAll();
    }
}

Configuring Task Resources

import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.configuration.Configuration;

// Create and configure a job vertex for the custom task
JobVertex processingVertex = new JobVertex("Data Processing Task");
processingVertex.setInvokableClass(DataProcessingTask.class);
processingVertex.setParallelism(4);

// Configure task-specific settings
Configuration taskConfig = new Configuration();
taskConfig.setString("input.path", "/path/to/input/data");
taskConfig.setInteger("buffer.size", 32768);
taskConfig.setLong("timeout.ms", 300000);

processingVertex.setConfiguration(taskConfig);

// Set resource requirements
processingVertex.setMinResources(ResourceSpec.newBuilder()
    .setCpuCores(1.0)
    .setHeapMemoryInMB(512)
    .build());

processingVertex.setPreferredResources(ResourceSpec.newBuilder()
    .setCpuCores(2.0)
    .setHeapMemoryInMB(1024)
    .build());

Handling Task State and Checkpointing

public class StatefulTask extends AbstractInvokable implements CheckpointedFunction {
    private transient ValueState<Long> countState;
    
    public StatefulTask(Environment environment) {
        super(environment);
    }
    
    @Override
    public void invoke() throws Exception {
        // Initialize state if needed
        if (countState == null) {
            initializeState();
        }
        
        // Process data with state
        while (isRunning()) {
            processRecord();
            
            // Update state
            Long currentCount = countState.value();
            countState.update((currentCount != null ? currentCount : 0) + 1);
        }
    }
    
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // Checkpoint coordination happens automatically for managed state
        System.out.println("Checkpointing at: " + context.getCheckpointTimestamp());
    }
    
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ValueStateDescriptor<Long> descriptor = 
            new ValueStateDescriptor<>("count", Long.class);
        countState = context.getKeyedStateStore().getState(descriptor);
    }
    
    private void initializeState() {
        // Initialize state from environment's state manager
        TaskStateManager stateManager = getEnvironment().getTaskStateManager();
        // ... initialize state from checkpoint if available
    }
    
    private void processRecord() throws Exception {
        // Read from input gates
        InputGate[] inputGates = getEnvironment().getAllInputGates();
        for (InputGate inputGate : inputGates) {
            Optional<BufferOrEvent> nextBuffer = inputGate.pollNext();
            if (nextBuffer.isPresent()) {
                // Process the buffer
                processBuffer(nextBuffer.get());
            }
        }
    }
    
    private void processBuffer(BufferOrEvent bufferOrEvent) {
        // Custom buffer processing logic
        if (bufferOrEvent.isBuffer()) {
            Buffer buffer = bufferOrEvent.getBuffer();
            // Process buffer data
        }
    }
}

Common Patterns

Resource Cleanup

@Override
public void invoke() throws Exception {
    MemoryManager memoryManager = getMemoryManager();
    IOManager ioManager = getIOManager();
    
    List<MemorySegment> allocatedMemory = new ArrayList<>();
    List<FileIOChannel.ID> openChannels = new ArrayList<>();
    
    try {
        // Allocate resources
        MemorySegment memory = memoryManager.allocatePages(this, 5);
        allocatedMemory.add(memory);
        
        FileIOChannel.ID channel = ioManager.createChannel();
        openChannels.add(channel);
        
        // Task processing logic
        processData();
        
    } finally {
        // Always clean up resources
        for (MemorySegment memory : allocatedMemory) {
            memoryManager.releasePages(this, memory);
        }
        
        for (FileIOChannel.ID channel : openChannels) {
            try {
                channel.getPathFile().delete();
            } catch (Exception e) {
                // Log cleanup errors but don't fail the task
            }
        }
    }
}

Error Handling and Recovery

@Override
public void invoke() throws Exception {
    try {
        // Normal task execution
        executeTask();
        
    } catch (Exception e) {
        // Report failure to environment
        getEnvironment().failExternally(e);
        throw new RuntimeTaskException("Task execution failed", e);
    }
}

@Override
public void cancel() throws Exception {
    // Set cancellation flag
    this.cancelled = true;
    
    // Interrupt any blocking operations
    if (currentThread != null) {
        currentThread.interrupt();
    }
    
    // Clean up resources immediately
    cleanup();
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-runtime-2-10

docs

data-exchange.md

execution-graph.md

high-availability.md

index.md

job-management.md

message-passing.md

metrics.md

mini-cluster.md

rpc-framework.md

state-management.md

task-execution.md

tile.json