0
# Task Execution Framework
1
2
The task execution framework provides the foundation for implementing and executing user-defined tasks in the Flink runtime. This framework handles task lifecycle management, resource access, and environment setup for all executable components in a Flink job.
3
4
## Core Task Components
5
6
### AbstractInvokable
7
8
The base class for all executable tasks in Flink runtime. All user tasks must extend this class to integrate with the execution framework.
9
10
```java { .api }
11
public abstract class AbstractInvokable {
12
public AbstractInvokable(Environment environment);
13
14
public abstract void invoke() throws Exception;
15
public void cancel() throws Exception;
16
17
public final Environment getEnvironment();
18
public final String getTaskNameWithSubtasks();
19
public final Configuration getTaskConfiguration();
20
public final Configuration getJobConfiguration();
21
22
protected final ClassLoader getUserCodeClassLoader();
23
protected final MemoryManager getMemoryManager();
24
protected final IOManager getIOManager();
25
protected final BroadcastVariableManager getBroadcastVariableManager();
26
protected final TaskKvStateRegistry getKvStateRegistry();
27
}
28
```
29
30
## Task Environment
31
32
### Environment
33
34
Provides tasks access to runtime resources including memory managers, IO channels, configuration, and task metadata.
35
36
```java { .api }
37
public interface Environment {
38
JobID getJobID();
39
JobVertexID getJobVertexId();
40
ExecutionAttemptID getExecutionId();
41
42
TaskInfo getTaskInfo();
43
Configuration getTaskConfiguration();
44
Configuration getJobConfiguration();
45
46
ClassLoader getUserClassLoader();
47
MemoryManager getMemoryManager();
48
IOManager getIOManager();
49
50
InputSplitProvider getInputSplitProvider();
51
ResultPartitionWriter getWriter(int index);
52
InputGate getInputGate(int index);
53
InputGate[] getAllInputGates();
54
ResultPartitionWriter[] getAllWriters();
55
56
TaskEventDispatcher getTaskEventDispatcher();
57
BroadcastVariableManager getBroadcastVariableManager();
58
TaskStateManager getTaskStateManager();
59
60
AccumulatorRegistry getAccumulatorRegistry();
61
TaskKvStateRegistry getKvStateRegistry();
62
63
void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics);
64
void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot subtaskState);
65
void declineCheckpoint(long checkpointId, Throwable cause);
66
67
void failExternally(Throwable cause);
68
}
69
```
70
71
### TaskInfo
72
73
Contains metadata about the task instance, including parallelism and indexing information.
74
75
```java { .api }
76
public class TaskInfo implements Serializable {
77
public TaskInfo(String taskName, int maxNumberOfParallelSubtasks, int indexOfThisSubtask,
78
int numberOfParallelSubtasks, int attemptNumber);
79
80
public String getTaskName();
81
public String getTaskNameWithSubtasks();
82
public String getAllocationIDAsString();
83
84
public int getMaxNumberOfParallelSubtasks();
85
public int getIndexOfThisSubtask();
86
public int getNumberOfParallelSubtasks();
87
public int getAttemptNumber();
88
}
89
```
90
91
## Input Split Processing
92
93
### InputSplitProvider
94
95
Interface for providing input splits to tasks for parallel data processing.
96
97
```java { .api }
98
public interface InputSplitProvider {
99
InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException;
100
}
101
```
102
103
### InputSplit
104
105
Base interface for input splits that define portions of input data.
106
107
```java { .api }
108
public interface InputSplit extends Serializable {
109
int getSplitNumber();
110
}
111
```
112
113
## Task States and Lifecycle
114
115
### ExecutionState
116
117
Enumeration of task execution states throughout the task lifecycle.
118
119
```java { .api }
120
public enum ExecutionState {
121
CREATED, // Task has been created but not scheduled
122
SCHEDULED, // Task has been scheduled for execution
123
DEPLOYING, // Task is being deployed to TaskManager
124
RUNNING, // Task is actively executing
125
FINISHED, // Task completed successfully
126
CANCELING, // Task is being cancelled
127
CANCELED, // Task has been cancelled
128
FAILED; // Task failed during execution
129
130
public boolean isTerminal();
131
public boolean isSuccess();
132
}
133
```
134
135
## Resource Management
136
137
### MemoryManager
138
139
Manages memory allocation and deallocation for tasks, providing memory segments for data processing.
140
141
```java { .api }
142
public abstract class MemoryManager {
143
public abstract MemorySegment allocatePages(Object owner, int numPages) throws MemoryAllocationException;
144
public abstract void releasePages(Object owner, MemorySegment... pages);
145
public abstract void releaseAllPages(Object owner);
146
147
public abstract int getPageSize();
148
public abstract long getMemorySize();
149
public abstract int computeNumberOfPages(long numBytes);
150
}
151
```
152
153
### IOManager
154
155
Manages disk I/O operations for spilling data to disk during processing.
156
157
```java { .api }
158
public abstract class IOManager implements AutoCloseable {
159
public abstract FileIOChannel.ID createChannel() throws IOException;
160
public abstract FileIOChannel.Enumerator createChannelEnumerator() throws IOException;
161
162
public abstract BufferFileReader createBufferFileReader(FileIOChannel.ID channelID, RequestDoneCallback<Buffer> callback) throws IOException;
163
public abstract BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) throws IOException;
164
165
public abstract BlockChannelReader<MemorySegment> createBlockChannelReader(FileIOChannel.ID channel) throws IOException;
166
public abstract BlockChannelWriter<MemorySegment> createBlockChannelWriter(FileIOChannel.ID channel) throws IOException;
167
168
public abstract boolean isProperlyShutDown();
169
public abstract void shutdown();
170
}
171
```
172
173
## Task Communication
174
175
### ResultPartitionWriter
176
177
Interface for writing output data to result partitions for downstream consumption.
178
179
```java { .api }
180
public interface ResultPartitionWriter extends AutoCloseable {
181
ResultPartition getPartition();
182
183
BufferBuilder getBufferBuilder() throws IOException, InterruptedException;
184
void flushAll();
185
void flushAllSubpartitions(boolean finishProducers);
186
187
void fail(Throwable throwable);
188
void finish() throws IOException;
189
}
190
```
191
192
### InputGate
193
194
Interface for reading input data from upstream tasks.
195
196
```java { .api }
197
public abstract class InputGate implements AutoCloseable {
198
public abstract int getNumberOfInputChannels();
199
public abstract boolean isFinished();
200
201
public abstract Optional<BufferOrEvent> getNext() throws IOException, InterruptedException;
202
public abstract Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException;
203
204
public abstract void sendTaskEvent(TaskEvent event) throws IOException;
205
public abstract void registerListener(InputGateListener listener);
206
207
public abstract int getPageSize();
208
}
209
```
210
211
## Exception Handling
212
213
### RuntimeTaskException
214
215
Base exception for task execution failures.
216
217
```java { .api }
218
public class RuntimeTaskException extends RuntimeException {
219
public RuntimeTaskException(String message);
220
public RuntimeTaskException(String message, Throwable cause);
221
}
222
```
223
224
### InputSplitProviderException
225
226
Exception thrown when input split provision fails.
227
228
```java { .api }
229
public class InputSplitProviderException extends Exception {
230
public InputSplitProviderException(String message);
231
public InputSplitProviderException(String message, Throwable cause);
232
}
233
```
234
235
## Usage Examples
236
237
### Implementing a Custom Task
238
239
```java
240
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
241
import org.apache.flink.runtime.execution.Environment;
242
243
public class DataProcessingTask extends AbstractInvokable {
244
245
public DataProcessingTask(Environment environment) {
246
super(environment);
247
}
248
249
@Override
250
public void invoke() throws Exception {
251
// Get task information
252
TaskInfo taskInfo = getEnvironment().getTaskInfo();
253
System.out.println("Starting task: " + taskInfo.getTaskNameWithSubtasks());
254
System.out.println("Parallel subtask " + taskInfo.getIndexOfThisSubtask() +
255
" of " + taskInfo.getNumberOfParallelSubtasks());
256
257
// Access configuration
258
Configuration taskConfig = getTaskConfiguration();
259
String inputPath = taskConfig.getString("input.path", "");
260
261
// Get resource managers
262
MemoryManager memoryManager = getMemoryManager();
263
IOManager ioManager = getIOManager();
264
265
// Allocate memory for processing
266
MemorySegment memory = memoryManager.allocatePages(this, 10);
267
268
try {
269
// Process input splits
270
InputSplitProvider splitProvider = getEnvironment().getInputSplitProvider();
271
InputSplit split;
272
273
while ((split = splitProvider.getNextInputSplit(getUserCodeClassLoader())) != null) {
274
processInputSplit(split, memory);
275
}
276
277
// Write results
278
ResultPartitionWriter[] writers = getEnvironment().getAllWriters();
279
for (ResultPartitionWriter writer : writers) {
280
// Write output data
281
writeResults(writer);
282
writer.finish();
283
}
284
285
} finally {
286
// Clean up resources
287
memoryManager.releaseAllPages(this);
288
}
289
}
290
291
@Override
292
public void cancel() throws Exception {
293
// Handle task cancellation
294
System.out.println("Task cancelled");
295
}
296
297
private void processInputSplit(InputSplit split, MemorySegment memory) throws Exception {
298
// Custom processing logic
299
System.out.println("Processing split: " + split.getSplitNumber());
300
}
301
302
private void writeResults(ResultPartitionWriter writer) throws Exception {
303
// Write output logic
304
BufferBuilder buffer = writer.getBufferBuilder();
305
// ... write data to buffer
306
writer.flushAll();
307
}
308
}
309
```
310
311
### Configuring Task Resources
312
313
```java
314
import org.apache.flink.runtime.jobgraph.JobVertex;
315
import org.apache.flink.configuration.Configuration;
316
317
// Create and configure a job vertex for the custom task
318
JobVertex processingVertex = new JobVertex("Data Processing Task");
319
processingVertex.setInvokableClass(DataProcessingTask.class);
320
processingVertex.setParallelism(4);
321
322
// Configure task-specific settings
323
Configuration taskConfig = new Configuration();
324
taskConfig.setString("input.path", "/path/to/input/data");
325
taskConfig.setInteger("buffer.size", 32768);
326
taskConfig.setLong("timeout.ms", 300000);
327
328
processingVertex.setConfiguration(taskConfig);
329
330
// Set resource requirements
331
processingVertex.setMinResources(ResourceSpec.newBuilder()
332
.setCpuCores(1.0)
333
.setHeapMemoryInMB(512)
334
.build());
335
336
processingVertex.setPreferredResources(ResourceSpec.newBuilder()
337
.setCpuCores(2.0)
338
.setHeapMemoryInMB(1024)
339
.build());
340
```
341
342
### Handling Task State and Checkpointing
343
344
```java
345
public class StatefulTask extends AbstractInvokable implements CheckpointedFunction {
346
private transient ValueState<Long> countState;
347
348
public StatefulTask(Environment environment) {
349
super(environment);
350
}
351
352
@Override
353
public void invoke() throws Exception {
354
// Initialize state if needed
355
if (countState == null) {
356
initializeState();
357
}
358
359
// Process data with state
360
while (isRunning()) {
361
processRecord();
362
363
// Update state
364
Long currentCount = countState.value();
365
countState.update((currentCount != null ? currentCount : 0) + 1);
366
}
367
}
368
369
@Override
370
public void snapshotState(FunctionSnapshotContext context) throws Exception {
371
// Checkpoint coordination happens automatically for managed state
372
System.out.println("Checkpointing at: " + context.getCheckpointTimestamp());
373
}
374
375
@Override
376
public void initializeState(FunctionInitializationContext context) throws Exception {
377
ValueStateDescriptor<Long> descriptor =
378
new ValueStateDescriptor<>("count", Long.class);
379
countState = context.getKeyedStateStore().getState(descriptor);
380
}
381
382
private void initializeState() {
383
// Initialize state from environment's state manager
384
TaskStateManager stateManager = getEnvironment().getTaskStateManager();
385
// ... initialize state from checkpoint if available
386
}
387
388
private void processRecord() throws Exception {
389
// Read from input gates
390
InputGate[] inputGates = getEnvironment().getAllInputGates();
391
for (InputGate inputGate : inputGates) {
392
Optional<BufferOrEvent> nextBuffer = inputGate.pollNext();
393
if (nextBuffer.isPresent()) {
394
// Process the buffer
395
processBuffer(nextBuffer.get());
396
}
397
}
398
}
399
400
private void processBuffer(BufferOrEvent bufferOrEvent) {
401
// Custom buffer processing logic
402
if (bufferOrEvent.isBuffer()) {
403
Buffer buffer = bufferOrEvent.getBuffer();
404
// Process buffer data
405
}
406
}
407
}
408
```
409
410
## Common Patterns
411
412
### Resource Cleanup
413
414
```java
415
@Override
416
public void invoke() throws Exception {
417
MemoryManager memoryManager = getMemoryManager();
418
IOManager ioManager = getIOManager();
419
420
List<MemorySegment> allocatedMemory = new ArrayList<>();
421
List<FileIOChannel.ID> openChannels = new ArrayList<>();
422
423
try {
424
// Allocate resources
425
MemorySegment memory = memoryManager.allocatePages(this, 5);
426
allocatedMemory.add(memory);
427
428
FileIOChannel.ID channel = ioManager.createChannel();
429
openChannels.add(channel);
430
431
// Task processing logic
432
processData();
433
434
} finally {
435
// Always clean up resources
436
for (MemorySegment memory : allocatedMemory) {
437
memoryManager.releasePages(this, memory);
438
}
439
440
for (FileIOChannel.ID channel : openChannels) {
441
try {
442
channel.getPathFile().delete();
443
} catch (Exception e) {
444
// Log cleanup errors but don't fail the task
445
}
446
}
447
}
448
}
449
```
450
451
### Error Handling and Recovery
452
453
```java
454
@Override
455
public void invoke() throws Exception {
456
try {
457
// Normal task execution
458
executeTask();
459
460
} catch (Exception e) {
461
// Report failure to environment
462
getEnvironment().failExternally(e);
463
throw new RuntimeTaskException("Task execution failed", e);
464
}
465
}
466
467
@Override
468
public void cancel() throws Exception {
469
// Set cancellation flag
470
this.cancelled = true;
471
472
// Interrupt any blocking operations
473
if (currentThread != null) {
474
currentThread.interrupt();
475
}
476
477
// Clean up resources immediately
478
cleanup();
479
}
480
```