or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-exchange.mdexecution-graph.mdhigh-availability.mdindex.mdjob-management.mdmessage-passing.mdmetrics.mdmini-cluster.mdrpc-framework.mdstate-management.mdtask-execution.md

task-execution.mddocs/

0

# Task Execution Framework

1

2

The task execution framework provides the foundation for implementing and executing user-defined tasks in the Flink runtime. This framework handles task lifecycle management, resource access, and environment setup for all executable components in a Flink job.

3

4

## Core Task Components

5

6

### AbstractInvokable

7

8

The base class for all executable tasks in Flink runtime. All user tasks must extend this class to integrate with the execution framework.

9

10

```java { .api }

11

public abstract class AbstractInvokable {

12

public AbstractInvokable(Environment environment);

13

14

public abstract void invoke() throws Exception;

15

public void cancel() throws Exception;

16

17

public final Environment getEnvironment();

18

public final String getTaskNameWithSubtasks();

19

public final Configuration getTaskConfiguration();

20

public final Configuration getJobConfiguration();

21

22

protected final ClassLoader getUserCodeClassLoader();

23

protected final MemoryManager getMemoryManager();

24

protected final IOManager getIOManager();

25

protected final BroadcastVariableManager getBroadcastVariableManager();

26

protected final TaskKvStateRegistry getKvStateRegistry();

27

}

28

```

29

30

## Task Environment

31

32

### Environment

33

34

Provides tasks access to runtime resources including memory managers, IO channels, configuration, and task metadata.

35

36

```java { .api }

37

public interface Environment {

38

JobID getJobID();

39

JobVertexID getJobVertexId();

40

ExecutionAttemptID getExecutionId();

41

42

TaskInfo getTaskInfo();

43

Configuration getTaskConfiguration();

44

Configuration getJobConfiguration();

45

46

ClassLoader getUserClassLoader();

47

MemoryManager getMemoryManager();

48

IOManager getIOManager();

49

50

InputSplitProvider getInputSplitProvider();

51

ResultPartitionWriter getWriter(int index);

52

InputGate getInputGate(int index);

53

InputGate[] getAllInputGates();

54

ResultPartitionWriter[] getAllWriters();

55

56

TaskEventDispatcher getTaskEventDispatcher();

57

BroadcastVariableManager getBroadcastVariableManager();

58

TaskStateManager getTaskStateManager();

59

60

AccumulatorRegistry getAccumulatorRegistry();

61

TaskKvStateRegistry getKvStateRegistry();

62

63

void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics);

64

void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot subtaskState);

65

void declineCheckpoint(long checkpointId, Throwable cause);

66

67

void failExternally(Throwable cause);

68

}

69

```

70

71

### TaskInfo

72

73

Contains metadata about the task instance, including parallelism and indexing information.

74

75

```java { .api }

76

public class TaskInfo implements Serializable {

77

public TaskInfo(String taskName, int maxNumberOfParallelSubtasks, int indexOfThisSubtask,

78

int numberOfParallelSubtasks, int attemptNumber);

79

80

public String getTaskName();

81

public String getTaskNameWithSubtasks();

82

public String getAllocationIDAsString();

83

84

public int getMaxNumberOfParallelSubtasks();

85

public int getIndexOfThisSubtask();

86

public int getNumberOfParallelSubtasks();

87

public int getAttemptNumber();

88

}

89

```

90

91

## Input Split Processing

92

93

### InputSplitProvider

94

95

Interface for providing input splits to tasks for parallel data processing.

96

97

```java { .api }

98

public interface InputSplitProvider {

99

InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException;

100

}

101

```

102

103

### InputSplit

104

105

Base interface for input splits that define portions of input data.

106

107

```java { .api }

108

public interface InputSplit extends Serializable {

109

int getSplitNumber();

110

}

111

```

112

113

## Task States and Lifecycle

114

115

### ExecutionState

116

117

Enumeration of task execution states throughout the task lifecycle.

118

119

```java { .api }

120

public enum ExecutionState {

121

CREATED, // Task has been created but not scheduled

122

SCHEDULED, // Task has been scheduled for execution

123

DEPLOYING, // Task is being deployed to TaskManager

124

RUNNING, // Task is actively executing

125

FINISHED, // Task completed successfully

126

CANCELING, // Task is being cancelled

127

CANCELED, // Task has been cancelled

128

FAILED; // Task failed during execution

129

130

public boolean isTerminal();

131

public boolean isSuccess();

132

}

133

```

134

135

## Resource Management

136

137

### MemoryManager

138

139

Manages memory allocation and deallocation for tasks, providing memory segments for data processing.

140

141

```java { .api }

142

public abstract class MemoryManager {

143

public abstract MemorySegment allocatePages(Object owner, int numPages) throws MemoryAllocationException;

144

public abstract void releasePages(Object owner, MemorySegment... pages);

145

public abstract void releaseAllPages(Object owner);

146

147

public abstract int getPageSize();

148

public abstract long getMemorySize();

149

public abstract int computeNumberOfPages(long numBytes);

150

}

151

```

152

153

### IOManager

154

155

Manages disk I/O operations for spilling data to disk during processing.

156

157

```java { .api }

158

public abstract class IOManager implements AutoCloseable {

159

public abstract FileIOChannel.ID createChannel() throws IOException;

160

public abstract FileIOChannel.Enumerator createChannelEnumerator() throws IOException;

161

162

public abstract BufferFileReader createBufferFileReader(FileIOChannel.ID channelID, RequestDoneCallback<Buffer> callback) throws IOException;

163

public abstract BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) throws IOException;

164

165

public abstract BlockChannelReader<MemorySegment> createBlockChannelReader(FileIOChannel.ID channel) throws IOException;

166

public abstract BlockChannelWriter<MemorySegment> createBlockChannelWriter(FileIOChannel.ID channel) throws IOException;

167

168

public abstract boolean isProperlyShutDown();

169

public abstract void shutdown();

170

}

171

```

172

173

## Task Communication

174

175

### ResultPartitionWriter

176

177

Interface for writing output data to result partitions for downstream consumption.

178

179

```java { .api }

180

public interface ResultPartitionWriter extends AutoCloseable {

181

ResultPartition getPartition();

182

183

BufferBuilder getBufferBuilder() throws IOException, InterruptedException;

184

void flushAll();

185

void flushAllSubpartitions(boolean finishProducers);

186

187

void fail(Throwable throwable);

188

void finish() throws IOException;

189

}

190

```

191

192

### InputGate

193

194

Interface for reading input data from upstream tasks.

195

196

```java { .api }

197

public abstract class InputGate implements AutoCloseable {

198

public abstract int getNumberOfInputChannels();

199

public abstract boolean isFinished();

200

201

public abstract Optional<BufferOrEvent> getNext() throws IOException, InterruptedException;

202

public abstract Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException;

203

204

public abstract void sendTaskEvent(TaskEvent event) throws IOException;

205

public abstract void registerListener(InputGateListener listener);

206

207

public abstract int getPageSize();

208

}

209

```

210

211

## Exception Handling

212

213

### RuntimeTaskException

214

215

Base exception for task execution failures.

216

217

```java { .api }

218

public class RuntimeTaskException extends RuntimeException {

219

public RuntimeTaskException(String message);

220

public RuntimeTaskException(String message, Throwable cause);

221

}

222

```

223

224

### InputSplitProviderException

225

226

Exception thrown when input split provision fails.

227

228

```java { .api }

229

public class InputSplitProviderException extends Exception {

230

public InputSplitProviderException(String message);

231

public InputSplitProviderException(String message, Throwable cause);

232

}

233

```

234

235

## Usage Examples

236

237

### Implementing a Custom Task

238

239

```java

240

import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;

241

import org.apache.flink.runtime.execution.Environment;

242

243

public class DataProcessingTask extends AbstractInvokable {

244

245

public DataProcessingTask(Environment environment) {

246

super(environment);

247

}

248

249

@Override

250

public void invoke() throws Exception {

251

// Get task information

252

TaskInfo taskInfo = getEnvironment().getTaskInfo();

253

System.out.println("Starting task: " + taskInfo.getTaskNameWithSubtasks());

254

System.out.println("Parallel subtask " + taskInfo.getIndexOfThisSubtask() +

255

" of " + taskInfo.getNumberOfParallelSubtasks());

256

257

// Access configuration

258

Configuration taskConfig = getTaskConfiguration();

259

String inputPath = taskConfig.getString("input.path", "");

260

261

// Get resource managers

262

MemoryManager memoryManager = getMemoryManager();

263

IOManager ioManager = getIOManager();

264

265

// Allocate memory for processing

266

MemorySegment memory = memoryManager.allocatePages(this, 10);

267

268

try {

269

// Process input splits

270

InputSplitProvider splitProvider = getEnvironment().getInputSplitProvider();

271

InputSplit split;

272

273

while ((split = splitProvider.getNextInputSplit(getUserCodeClassLoader())) != null) {

274

processInputSplit(split, memory);

275

}

276

277

// Write results

278

ResultPartitionWriter[] writers = getEnvironment().getAllWriters();

279

for (ResultPartitionWriter writer : writers) {

280

// Write output data

281

writeResults(writer);

282

writer.finish();

283

}

284

285

} finally {

286

// Clean up resources

287

memoryManager.releaseAllPages(this);

288

}

289

}

290

291

@Override

292

public void cancel() throws Exception {

293

// Handle task cancellation

294

System.out.println("Task cancelled");

295

}

296

297

private void processInputSplit(InputSplit split, MemorySegment memory) throws Exception {

298

// Custom processing logic

299

System.out.println("Processing split: " + split.getSplitNumber());

300

}

301

302

private void writeResults(ResultPartitionWriter writer) throws Exception {

303

// Write output logic

304

BufferBuilder buffer = writer.getBufferBuilder();

305

// ... write data to buffer

306

writer.flushAll();

307

}

308

}

309

```

310

311

### Configuring Task Resources

312

313

```java

314

import org.apache.flink.runtime.jobgraph.JobVertex;

315

import org.apache.flink.configuration.Configuration;

316

317

// Create and configure a job vertex for the custom task

318

JobVertex processingVertex = new JobVertex("Data Processing Task");

319

processingVertex.setInvokableClass(DataProcessingTask.class);

320

processingVertex.setParallelism(4);

321

322

// Configure task-specific settings

323

Configuration taskConfig = new Configuration();

324

taskConfig.setString("input.path", "/path/to/input/data");

325

taskConfig.setInteger("buffer.size", 32768);

326

taskConfig.setLong("timeout.ms", 300000);

327

328

processingVertex.setConfiguration(taskConfig);

329

330

// Set resource requirements

331

processingVertex.setMinResources(ResourceSpec.newBuilder()

332

.setCpuCores(1.0)

333

.setHeapMemoryInMB(512)

334

.build());

335

336

processingVertex.setPreferredResources(ResourceSpec.newBuilder()

337

.setCpuCores(2.0)

338

.setHeapMemoryInMB(1024)

339

.build());

340

```

341

342

### Handling Task State and Checkpointing

343

344

```java

345

public class StatefulTask extends AbstractInvokable implements CheckpointedFunction {

346

private transient ValueState<Long> countState;

347

348

public StatefulTask(Environment environment) {

349

super(environment);

350

}

351

352

@Override

353

public void invoke() throws Exception {

354

// Initialize state if needed

355

if (countState == null) {

356

initializeState();

357

}

358

359

// Process data with state

360

while (isRunning()) {

361

processRecord();

362

363

// Update state

364

Long currentCount = countState.value();

365

countState.update((currentCount != null ? currentCount : 0) + 1);

366

}

367

}

368

369

@Override

370

public void snapshotState(FunctionSnapshotContext context) throws Exception {

371

// Checkpoint coordination happens automatically for managed state

372

System.out.println("Checkpointing at: " + context.getCheckpointTimestamp());

373

}

374

375

@Override

376

public void initializeState(FunctionInitializationContext context) throws Exception {

377

ValueStateDescriptor<Long> descriptor =

378

new ValueStateDescriptor<>("count", Long.class);

379

countState = context.getKeyedStateStore().getState(descriptor);

380

}

381

382

private void initializeState() {

383

// Initialize state from environment's state manager

384

TaskStateManager stateManager = getEnvironment().getTaskStateManager();

385

// ... initialize state from checkpoint if available

386

}

387

388

private void processRecord() throws Exception {

389

// Read from input gates

390

InputGate[] inputGates = getEnvironment().getAllInputGates();

391

for (InputGate inputGate : inputGates) {

392

Optional<BufferOrEvent> nextBuffer = inputGate.pollNext();

393

if (nextBuffer.isPresent()) {

394

// Process the buffer

395

processBuffer(nextBuffer.get());

396

}

397

}

398

}

399

400

private void processBuffer(BufferOrEvent bufferOrEvent) {

401

// Custom buffer processing logic

402

if (bufferOrEvent.isBuffer()) {

403

Buffer buffer = bufferOrEvent.getBuffer();

404

// Process buffer data

405

}

406

}

407

}

408

```

409

410

## Common Patterns

411

412

### Resource Cleanup

413

414

```java

415

@Override

416

public void invoke() throws Exception {

417

MemoryManager memoryManager = getMemoryManager();

418

IOManager ioManager = getIOManager();

419

420

List<MemorySegment> allocatedMemory = new ArrayList<>();

421

List<FileIOChannel.ID> openChannels = new ArrayList<>();

422

423

try {

424

// Allocate resources

425

MemorySegment memory = memoryManager.allocatePages(this, 5);

426

allocatedMemory.add(memory);

427

428

FileIOChannel.ID channel = ioManager.createChannel();

429

openChannels.add(channel);

430

431

// Task processing logic

432

processData();

433

434

} finally {

435

// Always clean up resources

436

for (MemorySegment memory : allocatedMemory) {

437

memoryManager.releasePages(this, memory);

438

}

439

440

for (FileIOChannel.ID channel : openChannels) {

441

try {

442

channel.getPathFile().delete();

443

} catch (Exception e) {

444

// Log cleanup errors but don't fail the task

445

}

446

}

447

}

448

}

449

```

450

451

### Error Handling and Recovery

452

453

```java

454

@Override

455

public void invoke() throws Exception {

456

try {

457

// Normal task execution

458

executeTask();

459

460

} catch (Exception e) {

461

// Report failure to environment

462

getEnvironment().failExternally(e);

463

throw new RuntimeTaskException("Task execution failed", e);

464

}

465

}

466

467

@Override

468

public void cancel() throws Exception {

469

// Set cancellation flag

470

this.cancelled = true;

471

472

// Interrupt any blocking operations

473

if (currentThread != null) {

474

currentThread.interrupt();

475

}

476

477

// Clean up resources immediately

478

cleanup();

479

}

480

```