or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdentry-points.mdhigh-availability.mdindex.mdresource-management.mdtask-scheduling.mdutilities.md

task-scheduling.mddocs/

0

# Task Scheduling

1

2

Advanced task scheduling capabilities using Netflix Fenzo integration for optimal resource utilization and task placement on Mesos clusters. The scheduling system provides intelligent task placement, resource optimization, and comprehensive lifecycle management.

3

4

## Capabilities

5

6

### Launchable Task Interface

7

8

Core interface defining task requirements and launch capabilities for Mesos scheduler integration.

9

10

```java { .api }

11

/**

12

* Interface for tasks that can be launched on Mesos

13

* Defines resource requirements and launch operations

14

*/

15

public interface LaunchableTask {

16

/**

17

* Get Fenzo task requirements for resource scheduling

18

* Specifies CPU, memory, disk, and constraint requirements

19

* @return TaskRequest with resource and placement requirements

20

*/

21

TaskRequest taskRequest();

22

23

/**

24

* Launch the task on the specified Mesos slave with allocated resources

25

* @param slaveId - Target Mesos slave for task execution

26

* @param allocation - Allocated resources including CPU, memory, disk

27

* @return TaskInfo containing complete task specification for Mesos

28

*/

29

Protos.TaskInfo launch(Protos.SlaveID slaveId, MesosResourceAllocation allocation);

30

}

31

```

32

33

### Resource Offer Management

34

35

Adapter class that transforms Mesos resource offers into Fenzo VirtualMachineLease objects for intelligent scheduling.

36

37

```java { .api }

38

/**

39

* Adapter transforming Mesos resource offers to Fenzo VirtualMachineLease

40

* Provides resource availability information for task scheduling decisions

41

*/

42

public class Offer implements VirtualMachineLease {

43

/**

44

* Create offer from Mesos resource offer

45

* @param offer - Mesos resource offer to wrap

46

*/

47

public Offer(Protos.Offer offer);

48

49

/**

50

* Create offer with specific network resource name

51

* @param offer - Mesos resource offer to wrap

52

* @param networkResourceName - Name of network resource to use

53

*/

54

public Offer(Protos.Offer offer, String networkResourceName);

55

56

/**

57

* Get available CPU cores from this offer

58

* @return Number of CPU cores available

59

*/

60

public double cpuCores();

61

62

/**

63

* Get available GPU units from this offer

64

* @return Number of GPU units available

65

*/

66

public double gpus();

67

68

/**

69

* Get available memory in megabytes

70

* @return Memory available in MB

71

*/

72

public double memoryMB();

73

74

/**

75

* Get available network bandwidth in Mbps

76

* @return Network bandwidth in megabits per second

77

*/

78

public double networkMbps();

79

80

/**

81

* Get available disk space in megabytes

82

* @return Disk space available in MB

83

*/

84

public double diskMB();

85

86

/**

87

* Get hostname of the Mesos slave offering resources

88

* @return Hostname string

89

*/

90

public String hostname();

91

92

/**

93

* Get virtual machine ID (slave ID)

94

* @return Unique VM identifier

95

*/

96

public String getVMID();

97

98

/**

99

* Get all available resources from this offer

100

* @return List of Mesos Resource objects

101

*/

102

public List<Protos.Resource> getResources();

103

104

/**

105

* Get the underlying Mesos offer

106

* @return Original Mesos Offer object

107

*/

108

public Protos.Offer getOffer();

109

}

110

```

111

112

**Offer Processing Example:**

113

114

```java

115

import org.apache.flink.mesos.scheduler.Offer;

116

import org.apache.mesos.Protos;

117

118

// Process incoming Mesos offers

119

public void processOffers(List<Protos.Offer> mesosOffers) {

120

for (Protos.Offer mesosOffer : mesosOffers) {

121

Offer offer = new Offer(mesosOffer);

122

123

// Check resource availability

124

if (offer.cpuCores() >= 2.0 && offer.memoryMB() >= 2048) {

125

// Suitable for TaskManager placement

126

scheduleTask(offer);

127

} else {

128

// Decline insufficient offer

129

declineOffer(mesosOffer);

130

}

131

}

132

}

133

```

134

135

### Scheduler Proxy

136

137

Mesos scheduler implementation that bridges Mesos scheduler callbacks with Flink's Akka actor system for event processing.

138

139

```java { .api }

140

/**

141

* Mesos scheduler proxy forwarding events to Akka actors

142

* Handles all Mesos scheduler lifecycle events and state management

143

*/

144

public class SchedulerProxy extends Scheduler {

145

/**

146

* Handle framework registration with Mesos master

147

* @param driver - Scheduler driver instance

148

* @param frameworkId - Assigned framework ID

149

* @param masterInfo - Mesos master information

150

*/

151

public void registered(SchedulerDriver driver,

152

Protos.FrameworkID frameworkId,

153

Protos.MasterInfo masterInfo);

154

155

/**

156

* Handle framework re-registration after failover

157

* @param driver - Scheduler driver instance

158

* @param masterInfo - New master information

159

*/

160

public void reregistered(SchedulerDriver driver, Protos.MasterInfo masterInfo);

161

162

/**

163

* Handle resource offers from Mesos

164

* @param driver - Scheduler driver instance

165

* @param offers - List of resource offers

166

*/

167

public void resourceOffers(SchedulerDriver driver, List<Protos.Offer> offers);

168

169

/**

170

* Handle task status updates from Mesos

171

* @param driver - Scheduler driver instance

172

* @param status - Task status update

173

*/

174

public void statusUpdate(SchedulerDriver driver, Protos.TaskStatus status);

175

176

/**

177

* Handle framework disconnection from master

178

* @param driver - Scheduler driver instance

179

*/

180

public void disconnected(SchedulerDriver driver);

181

182

/**

183

* Handle unrecoverable framework errors

184

* @param driver - Scheduler driver instance

185

* @param message - Error message

186

*/

187

public void error(SchedulerDriver driver, String message);

188

}

189

```

190

191

### Task Scheduler Builder

192

193

Builder class for configuring Fenzo task scheduler with custom constraints, fitness functions, and optimization strategies.

194

195

```java { .api }

196

/**

197

* Builder for Fenzo task scheduler configuration

198

* Provides fluent API for scheduler customization

199

*/

200

public class TaskSchedulerBuilder {

201

/**

202

* Create new task scheduler builder

203

* @return Builder instance for configuration

204

*/

205

public static TaskSchedulerBuilder newBuilder();

206

207

/**

208

* Set lease rejection action for unsuitable offers

209

* @param action - Action to take when rejecting offers

210

* @return Builder instance for chaining

211

*/

212

public TaskSchedulerBuilder withLeaseRejectAction(Action1<VirtualMachineLease> action);

213

214

/**

215

* Set lease offer expiry handler

216

* @param handler - Handler for expired offers

217

* @return Builder instance for chaining

218

*/

219

public TaskSchedulerBuilder withLeaseOfferExpiry(Action1<VirtualMachineLease> handler);

220

221

/**

222

* Add fitness calculator for task placement optimization

223

* @param calculator - Fitness function for placement decisions

224

* @return Builder instance for chaining

225

*/

226

public TaskSchedulerBuilder withFitnessCalculator(VMTaskFitnessCalculator calculator);

227

228

/**

229

* Build configured task scheduler

230

* @return Configured TaskScheduler instance

231

*/

232

public TaskScheduler build();

233

}

234

```

235

236

**Scheduler Configuration Example:**

237

238

```java

239

import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;

240

import com.netflix.fenzo.TaskScheduler;

241

import com.netflix.fenzo.VMTaskFitnessCalculator;

242

243

// Configure advanced task scheduler

244

TaskScheduler scheduler = TaskSchedulerBuilder.newBuilder()

245

.withLeaseRejectAction(offer -> {

246

// Log rejected offers for monitoring

247

logger.info("Rejecting offer from {}: insufficient resources", offer.hostname());

248

})

249

.withLeaseOfferExpiry(offer -> {

250

// Handle expired offers

251

logger.warn("Offer from {} expired", offer.hostname());

252

})

253

.withFitnessCalculator(new VMTaskFitnessCalculator() {

254

@Override

255

public double calculateFitness(TaskRequest taskRequest,

256

VirtualMachineLease lease,

257

TaskTrackerState taskTrackerState) {

258

// Custom fitness calculation for optimal placement

259

double cpuFitness = lease.cpuCores() / taskRequest.getCPUs();

260

double memoryFitness = lease.memoryMB() / taskRequest.getMemory();

261

return Math.min(cpuFitness, memoryFitness);

262

}

263

})

264

.build();

265

```

266

267

## Scheduler Messages

268

269

Akka actor messages for scheduler event handling and coordination between scheduler components.

270

271

### Offer Management Messages

272

273

```java { .api }

274

/**

275

* Message to accept resource offers and launch tasks

276

*/

277

public class AcceptOffers implements Serializable {

278

public AcceptOffers(List<TaskRequest> taskRequests, List<Offer> offers);

279

public List<TaskRequest> getTaskRequests();

280

public List<Offer> getOffers();

281

}

282

283

/**

284

* Message containing new resource offers from Mesos

285

*/

286

public class ResourceOffers implements Serializable {

287

public ResourceOffers(List<Offer> offers);

288

public List<Offer> getOffers();

289

}

290

291

/**

292

* Message indicating an offer was rescinded by Mesos

293

*/

294

public class OfferRescinded implements Serializable {

295

public OfferRescinded(Protos.OfferID offerId);

296

public Protos.OfferID getOfferId();

297

}

298

```

299

300

### Connection Status Messages

301

302

```java { .api }

303

/**

304

* Message indicating scheduler connected to Mesos master

305

*/

306

public class Connected implements Serializable {

307

public Connected(Protos.MasterInfo masterInfo);

308

public Protos.MasterInfo getMasterInfo();

309

}

310

311

/**

312

* Message indicating scheduler disconnected from Mesos master

313

*/

314

public class Disconnected implements Serializable {

315

public Disconnected();

316

}

317

318

/**

319

* Message indicating framework registered with Mesos

320

*/

321

public class Registered implements Serializable {

322

public Registered(Protos.FrameworkID frameworkId, Protos.MasterInfo masterInfo);

323

public Protos.FrameworkID getFrameworkId();

324

public Protos.MasterInfo getMasterInfo();

325

}

326

327

/**

328

* Message indicating framework re-registered after failover

329

*/

330

public class ReRegistered implements Serializable {

331

public ReRegistered(Protos.MasterInfo masterInfo);

332

public Protos.MasterInfo getMasterInfo();

333

}

334

```

335

336

### Task Status Messages

337

338

```java { .api }

339

/**

340

* Message containing task status update from Mesos

341

*/

342

public class StatusUpdate implements Serializable {

343

public StatusUpdate(Protos.TaskStatus status);

344

public Protos.TaskStatus getStatus();

345

public Protos.TaskID getTaskId();

346

public Protos.TaskState getState();

347

}

348

349

/**

350

* Message indicating an executor was lost

351

*/

352

public class ExecutorLost implements Serializable {

353

public ExecutorLost(Protos.ExecutorID executorId, Protos.SlaveID slaveId, int status);

354

public Protos.ExecutorID getExecutorId();

355

public Protos.SlaveID getSlaveId();

356

public int getStatus();

357

}

358

359

/**

360

* Message indicating a Mesos slave was lost

361

*/

362

public class SlaveLost implements Serializable {

363

public SlaveLost(Protos.SlaveID slaveId);

364

public Protos.SlaveID getSlaveId();

365

}

366

```

367

368

### Error Handling Messages

369

370

```java { .api }

371

/**

372

* Message for unrecoverable scheduler/driver errors

373

*/

374

public class Error implements Serializable {

375

/**

376

* Create error message

377

* @param message - Error description

378

*/

379

public Error(String message);

380

381

/**

382

* Get error message

383

* @return Error description string

384

*/

385

public String message();

386

}

387

388

/**

389

* Message containing framework messages from Mesos

390

*/

391

public class FrameworkMessage implements Serializable {

392

public FrameworkMessage(Protos.ExecutorID executorId,

393

Protos.SlaveID slaveId,

394

byte[] data);

395

public Protos.ExecutorID getExecutorId();

396

public Protos.SlaveID getSlaveId();

397

public byte[] getData();

398

}

399

```

400

401

## Scheduling Strategies

402

403

### Constraint-Based Scheduling

404

405

Configure placement constraints for optimal resource utilization:

406

407

```java

408

Configuration config = new Configuration();

409

410

// Attribute-based constraints

411

config.setString("mesos.constraints.hard.attribute", "rack:LIKE:rack-[12]");

412

config.setString("mesos.constraints.soft.attribute", "datacenter:EQUALS:us-west");

413

414

// Resource constraints

415

config.setString("mesos.resourcemanager.tasks.cpus", "2.0");

416

config.setString("mesos.resourcemanager.tasks.mem", "2048");

417

config.setString("mesos.resourcemanager.tasks.disk", "1024");

418

419

// Network constraints

420

config.setString("mesos.constraints.hard.hostname", "UNIQUE");

421

```

422

423

### Resource Optimization

424

425

Advanced resource allocation strategies for cluster efficiency:

426

427

```java

428

// Configure resource optimization

429

Configuration config = new Configuration();

430

431

// Bin packing strategy

432

config.setString("mesos.scheduler.placement.strategy", "BIN_PACK");

433

434

// Resource utilization thresholds

435

config.setDouble("mesos.scheduler.cpu.utilization.threshold", 0.8);

436

config.setDouble("mesos.scheduler.memory.utilization.threshold", 0.85);

437

438

// Offer management

439

config.setLong("mesos.scheduler.offer.expiry.duration", 30000L);

440

config.setInteger("mesos.scheduler.offer.batch.size", 10);

441

```

442

443

### Task Lifecycle Management

444

445

Comprehensive task state management and recovery:

446

447

```java

448

// Configure task lifecycle settings

449

Configuration config = new Configuration();

450

451

// Task restart policy

452

config.setString("restart-strategy", "exponential-delay");

453

config.setInteger("restart-strategy.exponential-delay.max-failures", 3);

454

config.setString("restart-strategy.exponential-delay.delay", "10s");

455

456

// Health checking

457

config.setString("mesos.task.health.check.enabled", "true");

458

config.setString("mesos.task.health.check.interval", "30s");

459

config.setString("mesos.task.health.check.timeout", "10s");

460

```

461

462

## Performance Optimization

463

464

### Batch Task Scheduling

465

466

Efficient handling of multiple task launches:

467

468

- **Offer batching**: Group offers for bulk processing

469

- **Task batching**: Launch multiple tasks simultaneously

470

- **Resource reservation**: Pre-allocate resources for predictable workloads

471

472

### Constraint Optimization

473

474

- **Hard constraints**: Mandatory placement requirements

475

- **Soft constraints**: Preferred placement with fallback options

476

- **Fitness functions**: Custom scoring for optimal placement decisions

477

478

## Error Handling

479

480

The scheduling system provides robust error handling:

481

482

- **Task failure recovery**: Automatic restart with backoff strategies

483

- **Offer timeout handling**: Graceful cleanup of expired offers

484

- **Scheduler disconnection**: Automatic reconnection and state recovery

485

- **Resource constraint violations**: Intelligent fallback and rescheduling

486

487

## Deprecation Notice

488

489

All task scheduling classes are deprecated as of Flink 1.13. Migration alternatives:

490

491

- **Kubernetes**: Use Kubernetes-native scheduling with `org.apache.flink.kubernetes.*`

492

- **YARN**: Use YARN resource management with `org.apache.flink.yarn.*`

493

494

## Types

495

496

```java { .api }

497

/**

498

* Task placement request with resource requirements

499

*/

500

public class TaskPlacementRequest {

501

public String taskId();

502

public double cpuCores();

503

public double memoryMB();

504

public double diskMB();

505

public Map<String, String> constraints();

506

public List<String> preferredHosts();

507

}

508

509

/**

510

* Scheduling result with placement decisions

511

*/

512

public class SchedulingResult {

513

public List<TaskAssignment> assignments();

514

public List<Offer> unusedOffers();

515

public Map<String, String> failures();

516

}

517

518

/**

519

* Task assignment to specific resource offer

520

*/

521

public class TaskAssignment {

522

public TaskRequest task();

523

public Offer offer();

524

public Map<String, String> assignmentDetails();

525

}

526

```