Apache Flink runtime engine providing core distributed streaming dataflow execution, task scheduling, state management, and fault tolerance capabilities.
—
The task execution framework provides the foundation for implementing and executing user-defined tasks in the Flink runtime. This framework handles task lifecycle management, resource access, and environment setup for all executable components in a Flink job.
The base class for all executable tasks in Flink runtime. All user tasks must extend this class to integrate with the execution framework.
public abstract class AbstractInvokable {
public AbstractInvokable(Environment environment);
public abstract void invoke() throws Exception;
public void cancel() throws Exception;
public final Environment getEnvironment();
public final String getTaskNameWithSubtasks();
public final Configuration getTaskConfiguration();
public final Configuration getJobConfiguration();
protected final ClassLoader getUserCodeClassLoader();
protected final MemoryManager getMemoryManager();
protected final IOManager getIOManager();
protected final BroadcastVariableManager getBroadcastVariableManager();
protected final TaskKvStateRegistry getKvStateRegistry();
}Provides tasks access to runtime resources including memory managers, IO channels, configuration, and task metadata.
public interface Environment {
JobID getJobID();
JobVertexID getJobVertexId();
ExecutionAttemptID getExecutionId();
TaskInfo getTaskInfo();
Configuration getTaskConfiguration();
Configuration getJobConfiguration();
ClassLoader getUserClassLoader();
MemoryManager getMemoryManager();
IOManager getIOManager();
InputSplitProvider getInputSplitProvider();
ResultPartitionWriter getWriter(int index);
InputGate getInputGate(int index);
InputGate[] getAllInputGates();
ResultPartitionWriter[] getAllWriters();
TaskEventDispatcher getTaskEventDispatcher();
BroadcastVariableManager getBroadcastVariableManager();
TaskStateManager getTaskStateManager();
AccumulatorRegistry getAccumulatorRegistry();
TaskKvStateRegistry getKvStateRegistry();
void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics);
void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot subtaskState);
void declineCheckpoint(long checkpointId, Throwable cause);
void failExternally(Throwable cause);
}Contains metadata about the task instance, including parallelism and indexing information.
public class TaskInfo implements Serializable {
public TaskInfo(String taskName, int maxNumberOfParallelSubtasks, int indexOfThisSubtask,
int numberOfParallelSubtasks, int attemptNumber);
public String getTaskName();
public String getTaskNameWithSubtasks();
public String getAllocationIDAsString();
public int getMaxNumberOfParallelSubtasks();
public int getIndexOfThisSubtask();
public int getNumberOfParallelSubtasks();
public int getAttemptNumber();
}Interface for providing input splits to tasks for parallel data processing.
public interface InputSplitProvider {
InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException;
}Base interface for input splits that define portions of input data.
public interface InputSplit extends Serializable {
int getSplitNumber();
}Enumeration of task execution states throughout the task lifecycle.
public enum ExecutionState {
CREATED, // Task has been created but not scheduled
SCHEDULED, // Task has been scheduled for execution
DEPLOYING, // Task is being deployed to TaskManager
RUNNING, // Task is actively executing
FINISHED, // Task completed successfully
CANCELING, // Task is being cancelled
CANCELED, // Task has been cancelled
FAILED; // Task failed during execution
public boolean isTerminal();
public boolean isSuccess();
}Manages memory allocation and deallocation for tasks, providing memory segments for data processing.
public abstract class MemoryManager {
public abstract MemorySegment allocatePages(Object owner, int numPages) throws MemoryAllocationException;
public abstract void releasePages(Object owner, MemorySegment... pages);
public abstract void releaseAllPages(Object owner);
public abstract int getPageSize();
public abstract long getMemorySize();
public abstract int computeNumberOfPages(long numBytes);
}Manages disk I/O operations for spilling data to disk during processing.
public abstract class IOManager implements AutoCloseable {
public abstract FileIOChannel.ID createChannel() throws IOException;
public abstract FileIOChannel.Enumerator createChannelEnumerator() throws IOException;
public abstract BufferFileReader createBufferFileReader(FileIOChannel.ID channelID, RequestDoneCallback<Buffer> callback) throws IOException;
public abstract BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) throws IOException;
public abstract BlockChannelReader<MemorySegment> createBlockChannelReader(FileIOChannel.ID channel) throws IOException;
public abstract BlockChannelWriter<MemorySegment> createBlockChannelWriter(FileIOChannel.ID channel) throws IOException;
public abstract boolean isProperlyShutDown();
public abstract void shutdown();
}Interface for writing output data to result partitions for downstream consumption.
public interface ResultPartitionWriter extends AutoCloseable {
ResultPartition getPartition();
BufferBuilder getBufferBuilder() throws IOException, InterruptedException;
void flushAll();
void flushAllSubpartitions(boolean finishProducers);
void fail(Throwable throwable);
void finish() throws IOException;
}Interface for reading input data from upstream tasks.
public abstract class InputGate implements AutoCloseable {
public abstract int getNumberOfInputChannels();
public abstract boolean isFinished();
public abstract Optional<BufferOrEvent> getNext() throws IOException, InterruptedException;
public abstract Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException;
public abstract void sendTaskEvent(TaskEvent event) throws IOException;
public abstract void registerListener(InputGateListener listener);
public abstract int getPageSize();
}Base exception for task execution failures.
public class RuntimeTaskException extends RuntimeException {
public RuntimeTaskException(String message);
public RuntimeTaskException(String message, Throwable cause);
}Exception thrown when input split provision fails.
public class InputSplitProviderException extends Exception {
public InputSplitProviderException(String message);
public InputSplitProviderException(String message, Throwable cause);
}import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.execution.Environment;
public class DataProcessingTask extends AbstractInvokable {
public DataProcessingTask(Environment environment) {
super(environment);
}
@Override
public void invoke() throws Exception {
// Get task information
TaskInfo taskInfo = getEnvironment().getTaskInfo();
System.out.println("Starting task: " + taskInfo.getTaskNameWithSubtasks());
System.out.println("Parallel subtask " + taskInfo.getIndexOfThisSubtask() +
" of " + taskInfo.getNumberOfParallelSubtasks());
// Access configuration
Configuration taskConfig = getTaskConfiguration();
String inputPath = taskConfig.getString("input.path", "");
// Get resource managers
MemoryManager memoryManager = getMemoryManager();
IOManager ioManager = getIOManager();
// Allocate memory for processing
MemorySegment memory = memoryManager.allocatePages(this, 10);
try {
// Process input splits
InputSplitProvider splitProvider = getEnvironment().getInputSplitProvider();
InputSplit split;
while ((split = splitProvider.getNextInputSplit(getUserCodeClassLoader())) != null) {
processInputSplit(split, memory);
}
// Write results
ResultPartitionWriter[] writers = getEnvironment().getAllWriters();
for (ResultPartitionWriter writer : writers) {
// Write output data
writeResults(writer);
writer.finish();
}
} finally {
// Clean up resources
memoryManager.releaseAllPages(this);
}
}
@Override
public void cancel() throws Exception {
// Handle task cancellation
System.out.println("Task cancelled");
}
private void processInputSplit(InputSplit split, MemorySegment memory) throws Exception {
// Custom processing logic
System.out.println("Processing split: " + split.getSplitNumber());
}
private void writeResults(ResultPartitionWriter writer) throws Exception {
// Write output logic
BufferBuilder buffer = writer.getBufferBuilder();
// ... write data to buffer
writer.flushAll();
}
}import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.configuration.Configuration;
// Create and configure a job vertex for the custom task
JobVertex processingVertex = new JobVertex("Data Processing Task");
processingVertex.setInvokableClass(DataProcessingTask.class);
processingVertex.setParallelism(4);
// Configure task-specific settings
Configuration taskConfig = new Configuration();
taskConfig.setString("input.path", "/path/to/input/data");
taskConfig.setInteger("buffer.size", 32768);
taskConfig.setLong("timeout.ms", 300000);
processingVertex.setConfiguration(taskConfig);
// Set resource requirements
processingVertex.setMinResources(ResourceSpec.newBuilder()
.setCpuCores(1.0)
.setHeapMemoryInMB(512)
.build());
processingVertex.setPreferredResources(ResourceSpec.newBuilder()
.setCpuCores(2.0)
.setHeapMemoryInMB(1024)
.build());public class StatefulTask extends AbstractInvokable implements CheckpointedFunction {
private transient ValueState<Long> countState;
public StatefulTask(Environment environment) {
super(environment);
}
@Override
public void invoke() throws Exception {
// Initialize state if needed
if (countState == null) {
initializeState();
}
// Process data with state
while (isRunning()) {
processRecord();
// Update state
Long currentCount = countState.value();
countState.update((currentCount != null ? currentCount : 0) + 1);
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// Checkpoint coordination happens automatically for managed state
System.out.println("Checkpointing at: " + context.getCheckpointTimestamp());
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ValueStateDescriptor<Long> descriptor =
new ValueStateDescriptor<>("count", Long.class);
countState = context.getKeyedStateStore().getState(descriptor);
}
private void initializeState() {
// Initialize state from environment's state manager
TaskStateManager stateManager = getEnvironment().getTaskStateManager();
// ... initialize state from checkpoint if available
}
private void processRecord() throws Exception {
// Read from input gates
InputGate[] inputGates = getEnvironment().getAllInputGates();
for (InputGate inputGate : inputGates) {
Optional<BufferOrEvent> nextBuffer = inputGate.pollNext();
if (nextBuffer.isPresent()) {
// Process the buffer
processBuffer(nextBuffer.get());
}
}
}
private void processBuffer(BufferOrEvent bufferOrEvent) {
// Custom buffer processing logic
if (bufferOrEvent.isBuffer()) {
Buffer buffer = bufferOrEvent.getBuffer();
// Process buffer data
}
}
}@Override
public void invoke() throws Exception {
MemoryManager memoryManager = getMemoryManager();
IOManager ioManager = getIOManager();
List<MemorySegment> allocatedMemory = new ArrayList<>();
List<FileIOChannel.ID> openChannels = new ArrayList<>();
try {
// Allocate resources
MemorySegment memory = memoryManager.allocatePages(this, 5);
allocatedMemory.add(memory);
FileIOChannel.ID channel = ioManager.createChannel();
openChannels.add(channel);
// Task processing logic
processData();
} finally {
// Always clean up resources
for (MemorySegment memory : allocatedMemory) {
memoryManager.releasePages(this, memory);
}
for (FileIOChannel.ID channel : openChannels) {
try {
channel.getPathFile().delete();
} catch (Exception e) {
// Log cleanup errors but don't fail the task
}
}
}
}@Override
public void invoke() throws Exception {
try {
// Normal task execution
executeTask();
} catch (Exception e) {
// Report failure to environment
getEnvironment().failExternally(e);
throw new RuntimeTaskException("Task execution failed", e);
}
}
@Override
public void cancel() throws Exception {
// Set cancellation flag
this.cancelled = true;
// Interrupt any blocking operations
if (currentThread != null) {
currentThread.interrupt();
}
// Clean up resources immediately
cleanup();
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-runtime-2-10