0
# Task Scheduling
1
2
Advanced task scheduling capabilities using Netflix Fenzo integration for optimal resource utilization and task placement on Mesos clusters. The scheduling system provides intelligent task placement, resource optimization, and comprehensive lifecycle management.
3
4
## Capabilities
5
6
### Launchable Task Interface
7
8
Core interface defining task requirements and launch capabilities for Mesos scheduler integration.
9
10
```java { .api }
11
/**
12
* Interface for tasks that can be launched on Mesos
13
* Defines resource requirements and launch operations
14
*/
15
public interface LaunchableTask {
16
/**
17
* Get Fenzo task requirements for resource scheduling
18
* Specifies CPU, memory, disk, and constraint requirements
19
* @return TaskRequest with resource and placement requirements
20
*/
21
TaskRequest taskRequest();
22
23
/**
24
* Launch the task on the specified Mesos slave with allocated resources
25
* @param slaveId - Target Mesos slave for task execution
26
* @param allocation - Allocated resources including CPU, memory, disk
27
* @return TaskInfo containing complete task specification for Mesos
28
*/
29
Protos.TaskInfo launch(Protos.SlaveID slaveId, MesosResourceAllocation allocation);
30
}
31
```
32
33
### Resource Offer Management
34
35
Adapter class that transforms Mesos resource offers into Fenzo VirtualMachineLease objects for intelligent scheduling.
36
37
```java { .api }
38
/**
39
* Adapter transforming Mesos resource offers to Fenzo VirtualMachineLease
40
* Provides resource availability information for task scheduling decisions
41
*/
42
public class Offer implements VirtualMachineLease {
43
/**
44
* Create offer from Mesos resource offer
45
* @param offer - Mesos resource offer to wrap
46
*/
47
public Offer(Protos.Offer offer);
48
49
/**
50
* Create offer with specific network resource name
51
* @param offer - Mesos resource offer to wrap
52
* @param networkResourceName - Name of network resource to use
53
*/
54
public Offer(Protos.Offer offer, String networkResourceName);
55
56
/**
57
* Get available CPU cores from this offer
58
* @return Number of CPU cores available
59
*/
60
public double cpuCores();
61
62
/**
63
* Get available GPU units from this offer
64
* @return Number of GPU units available
65
*/
66
public double gpus();
67
68
/**
69
* Get available memory in megabytes
70
* @return Memory available in MB
71
*/
72
public double memoryMB();
73
74
/**
75
* Get available network bandwidth in Mbps
76
* @return Network bandwidth in megabits per second
77
*/
78
public double networkMbps();
79
80
/**
81
* Get available disk space in megabytes
82
* @return Disk space available in MB
83
*/
84
public double diskMB();
85
86
/**
87
* Get hostname of the Mesos slave offering resources
88
* @return Hostname string
89
*/
90
public String hostname();
91
92
/**
93
* Get virtual machine ID (slave ID)
94
* @return Unique VM identifier
95
*/
96
public String getVMID();
97
98
/**
99
* Get all available resources from this offer
100
* @return List of Mesos Resource objects
101
*/
102
public List<Protos.Resource> getResources();
103
104
/**
105
* Get the underlying Mesos offer
106
* @return Original Mesos Offer object
107
*/
108
public Protos.Offer getOffer();
109
}
110
```
111
112
**Offer Processing Example:**
113
114
```java
115
import org.apache.flink.mesos.scheduler.Offer;
116
import org.apache.mesos.Protos;
117
118
// Process incoming Mesos offers
119
public void processOffers(List<Protos.Offer> mesosOffers) {
120
for (Protos.Offer mesosOffer : mesosOffers) {
121
Offer offer = new Offer(mesosOffer);
122
123
// Check resource availability
124
if (offer.cpuCores() >= 2.0 && offer.memoryMB() >= 2048) {
125
// Suitable for TaskManager placement
126
scheduleTask(offer);
127
} else {
128
// Decline insufficient offer
129
declineOffer(mesosOffer);
130
}
131
}
132
}
133
```
134
135
### Scheduler Proxy
136
137
Mesos scheduler implementation that bridges Mesos scheduler callbacks with Flink's Akka actor system for event processing.
138
139
```java { .api }
140
/**
141
* Mesos scheduler proxy forwarding events to Akka actors
142
* Handles all Mesos scheduler lifecycle events and state management
143
*/
144
public class SchedulerProxy extends Scheduler {
145
/**
146
* Handle framework registration with Mesos master
147
* @param driver - Scheduler driver instance
148
* @param frameworkId - Assigned framework ID
149
* @param masterInfo - Mesos master information
150
*/
151
public void registered(SchedulerDriver driver,
152
Protos.FrameworkID frameworkId,
153
Protos.MasterInfo masterInfo);
154
155
/**
156
* Handle framework re-registration after failover
157
* @param driver - Scheduler driver instance
158
* @param masterInfo - New master information
159
*/
160
public void reregistered(SchedulerDriver driver, Protos.MasterInfo masterInfo);
161
162
/**
163
* Handle resource offers from Mesos
164
* @param driver - Scheduler driver instance
165
* @param offers - List of resource offers
166
*/
167
public void resourceOffers(SchedulerDriver driver, List<Protos.Offer> offers);
168
169
/**
170
* Handle task status updates from Mesos
171
* @param driver - Scheduler driver instance
172
* @param status - Task status update
173
*/
174
public void statusUpdate(SchedulerDriver driver, Protos.TaskStatus status);
175
176
/**
177
* Handle framework disconnection from master
178
* @param driver - Scheduler driver instance
179
*/
180
public void disconnected(SchedulerDriver driver);
181
182
/**
183
* Handle unrecoverable framework errors
184
* @param driver - Scheduler driver instance
185
* @param message - Error message
186
*/
187
public void error(SchedulerDriver driver, String message);
188
}
189
```
190
191
### Task Scheduler Builder
192
193
Builder class for configuring Fenzo task scheduler with custom constraints, fitness functions, and optimization strategies.
194
195
```java { .api }
196
/**
197
* Builder for Fenzo task scheduler configuration
198
* Provides fluent API for scheduler customization
199
*/
200
public class TaskSchedulerBuilder {
201
/**
202
* Create new task scheduler builder
203
* @return Builder instance for configuration
204
*/
205
public static TaskSchedulerBuilder newBuilder();
206
207
/**
208
* Set lease rejection action for unsuitable offers
209
* @param action - Action to take when rejecting offers
210
* @return Builder instance for chaining
211
*/
212
public TaskSchedulerBuilder withLeaseRejectAction(Action1<VirtualMachineLease> action);
213
214
/**
215
* Set lease offer expiry handler
216
* @param handler - Handler for expired offers
217
* @return Builder instance for chaining
218
*/
219
public TaskSchedulerBuilder withLeaseOfferExpiry(Action1<VirtualMachineLease> handler);
220
221
/**
222
* Add fitness calculator for task placement optimization
223
* @param calculator - Fitness function for placement decisions
224
* @return Builder instance for chaining
225
*/
226
public TaskSchedulerBuilder withFitnessCalculator(VMTaskFitnessCalculator calculator);
227
228
/**
229
* Build configured task scheduler
230
* @return Configured TaskScheduler instance
231
*/
232
public TaskScheduler build();
233
}
234
```
235
236
**Scheduler Configuration Example:**
237
238
```java
239
import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
240
import com.netflix.fenzo.TaskScheduler;
241
import com.netflix.fenzo.VMTaskFitnessCalculator;
242
243
// Configure advanced task scheduler
244
TaskScheduler scheduler = TaskSchedulerBuilder.newBuilder()
245
.withLeaseRejectAction(offer -> {
246
// Log rejected offers for monitoring
247
logger.info("Rejecting offer from {}: insufficient resources", offer.hostname());
248
})
249
.withLeaseOfferExpiry(offer -> {
250
// Handle expired offers
251
logger.warn("Offer from {} expired", offer.hostname());
252
})
253
.withFitnessCalculator(new VMTaskFitnessCalculator() {
254
@Override
255
public double calculateFitness(TaskRequest taskRequest,
256
VirtualMachineLease lease,
257
TaskTrackerState taskTrackerState) {
258
// Custom fitness calculation for optimal placement
259
double cpuFitness = lease.cpuCores() / taskRequest.getCPUs();
260
double memoryFitness = lease.memoryMB() / taskRequest.getMemory();
261
return Math.min(cpuFitness, memoryFitness);
262
}
263
})
264
.build();
265
```
266
267
## Scheduler Messages
268
269
Akka actor messages for scheduler event handling and coordination between scheduler components.
270
271
### Offer Management Messages
272
273
```java { .api }
274
/**
275
* Message to accept resource offers and launch tasks
276
*/
277
public class AcceptOffers implements Serializable {
278
public AcceptOffers(List<TaskRequest> taskRequests, List<Offer> offers);
279
public List<TaskRequest> getTaskRequests();
280
public List<Offer> getOffers();
281
}
282
283
/**
284
* Message containing new resource offers from Mesos
285
*/
286
public class ResourceOffers implements Serializable {
287
public ResourceOffers(List<Offer> offers);
288
public List<Offer> getOffers();
289
}
290
291
/**
292
* Message indicating an offer was rescinded by Mesos
293
*/
294
public class OfferRescinded implements Serializable {
295
public OfferRescinded(Protos.OfferID offerId);
296
public Protos.OfferID getOfferId();
297
}
298
```
299
300
### Connection Status Messages
301
302
```java { .api }
303
/**
304
* Message indicating scheduler connected to Mesos master
305
*/
306
public class Connected implements Serializable {
307
public Connected(Protos.MasterInfo masterInfo);
308
public Protos.MasterInfo getMasterInfo();
309
}
310
311
/**
312
* Message indicating scheduler disconnected from Mesos master
313
*/
314
public class Disconnected implements Serializable {
315
public Disconnected();
316
}
317
318
/**
319
* Message indicating framework registered with Mesos
320
*/
321
public class Registered implements Serializable {
322
public Registered(Protos.FrameworkID frameworkId, Protos.MasterInfo masterInfo);
323
public Protos.FrameworkID getFrameworkId();
324
public Protos.MasterInfo getMasterInfo();
325
}
326
327
/**
328
* Message indicating framework re-registered after failover
329
*/
330
public class ReRegistered implements Serializable {
331
public ReRegistered(Protos.MasterInfo masterInfo);
332
public Protos.MasterInfo getMasterInfo();
333
}
334
```
335
336
### Task Status Messages
337
338
```java { .api }
339
/**
340
* Message containing task status update from Mesos
341
*/
342
public class StatusUpdate implements Serializable {
343
public StatusUpdate(Protos.TaskStatus status);
344
public Protos.TaskStatus getStatus();
345
public Protos.TaskID getTaskId();
346
public Protos.TaskState getState();
347
}
348
349
/**
350
* Message indicating an executor was lost
351
*/
352
public class ExecutorLost implements Serializable {
353
public ExecutorLost(Protos.ExecutorID executorId, Protos.SlaveID slaveId, int status);
354
public Protos.ExecutorID getExecutorId();
355
public Protos.SlaveID getSlaveId();
356
public int getStatus();
357
}
358
359
/**
360
* Message indicating a Mesos slave was lost
361
*/
362
public class SlaveLost implements Serializable {
363
public SlaveLost(Protos.SlaveID slaveId);
364
public Protos.SlaveID getSlaveId();
365
}
366
```
367
368
### Error Handling Messages
369
370
```java { .api }
371
/**
372
* Message for unrecoverable scheduler/driver errors
373
*/
374
public class Error implements Serializable {
375
/**
376
* Create error message
377
* @param message - Error description
378
*/
379
public Error(String message);
380
381
/**
382
* Get error message
383
* @return Error description string
384
*/
385
public String message();
386
}
387
388
/**
389
* Message containing framework messages from Mesos
390
*/
391
public class FrameworkMessage implements Serializable {
392
public FrameworkMessage(Protos.ExecutorID executorId,
393
Protos.SlaveID slaveId,
394
byte[] data);
395
public Protos.ExecutorID getExecutorId();
396
public Protos.SlaveID getSlaveId();
397
public byte[] getData();
398
}
399
```
400
401
## Scheduling Strategies
402
403
### Constraint-Based Scheduling
404
405
Configure placement constraints for optimal resource utilization:
406
407
```java
408
Configuration config = new Configuration();
409
410
// Attribute-based constraints
411
config.setString("mesos.constraints.hard.attribute", "rack:LIKE:rack-[12]");
412
config.setString("mesos.constraints.soft.attribute", "datacenter:EQUALS:us-west");
413
414
// Resource constraints
415
config.setString("mesos.resourcemanager.tasks.cpus", "2.0");
416
config.setString("mesos.resourcemanager.tasks.mem", "2048");
417
config.setString("mesos.resourcemanager.tasks.disk", "1024");
418
419
// Network constraints
420
config.setString("mesos.constraints.hard.hostname", "UNIQUE");
421
```
422
423
### Resource Optimization
424
425
Advanced resource allocation strategies for cluster efficiency:
426
427
```java
428
// Configure resource optimization
429
Configuration config = new Configuration();
430
431
// Bin packing strategy
432
config.setString("mesos.scheduler.placement.strategy", "BIN_PACK");
433
434
// Resource utilization thresholds
435
config.setDouble("mesos.scheduler.cpu.utilization.threshold", 0.8);
436
config.setDouble("mesos.scheduler.memory.utilization.threshold", 0.85);
437
438
// Offer management
439
config.setLong("mesos.scheduler.offer.expiry.duration", 30000L);
440
config.setInteger("mesos.scheduler.offer.batch.size", 10);
441
```
442
443
### Task Lifecycle Management
444
445
Comprehensive task state management and recovery:
446
447
```java
448
// Configure task lifecycle settings
449
Configuration config = new Configuration();
450
451
// Task restart policy
452
config.setString("restart-strategy", "exponential-delay");
453
config.setInteger("restart-strategy.exponential-delay.max-failures", 3);
454
config.setString("restart-strategy.exponential-delay.delay", "10s");
455
456
// Health checking
457
config.setString("mesos.task.health.check.enabled", "true");
458
config.setString("mesos.task.health.check.interval", "30s");
459
config.setString("mesos.task.health.check.timeout", "10s");
460
```
461
462
## Performance Optimization
463
464
### Batch Task Scheduling
465
466
Efficient handling of multiple task launches:
467
468
- **Offer batching**: Group offers for bulk processing
469
- **Task batching**: Launch multiple tasks simultaneously
470
- **Resource reservation**: Pre-allocate resources for predictable workloads
471
472
### Constraint Optimization
473
474
- **Hard constraints**: Mandatory placement requirements
475
- **Soft constraints**: Preferred placement with fallback options
476
- **Fitness functions**: Custom scoring for optimal placement decisions
477
478
## Error Handling
479
480
The scheduling system provides robust error handling:
481
482
- **Task failure recovery**: Automatic restart with backoff strategies
483
- **Offer timeout handling**: Graceful cleanup of expired offers
484
- **Scheduler disconnection**: Automatic reconnection and state recovery
485
- **Resource constraint violations**: Intelligent fallback and rescheduling
486
487
## Deprecation Notice
488
489
All task scheduling classes are deprecated as of Flink 1.13. Migration alternatives:
490
491
- **Kubernetes**: Use Kubernetes-native scheduling with `org.apache.flink.kubernetes.*`
492
- **YARN**: Use YARN resource management with `org.apache.flink.yarn.*`
493
494
## Types
495
496
```java { .api }
497
/**
498
* Task placement request with resource requirements
499
*/
500
public class TaskPlacementRequest {
501
public String taskId();
502
public double cpuCores();
503
public double memoryMB();
504
public double diskMB();
505
public Map<String, String> constraints();
506
public List<String> preferredHosts();
507
}
508
509
/**
510
* Scheduling result with placement decisions
511
*/
512
public class SchedulingResult {
513
public List<TaskAssignment> assignments();
514
public List<Offer> unusedOffers();
515
public Map<String, String> failures();
516
}
517
518
/**
519
* Task assignment to specific resource offer
520
*/
521
public class TaskAssignment {
522
public TaskRequest task();
523
public Offer offer();
524
public Map<String, String> assignmentDetails();
525
}
526
```