0
# Resource Management
1
2
Mesos-specific resource manager implementation that handles dynamic TaskManager allocation, lifecycle management, and integration with Mesos cluster resources. The resource management system provides automatic scaling, fault tolerance, and efficient resource utilization.
3
4
## Capabilities
5
6
### Mesos Services Factory
7
8
Central service factory interface for creating and managing all Mesos-related components including worker stores, artifact servers, and scheduler drivers.
9
10
```java { .api }
11
/**
12
* Service factory interface for Mesos components
13
* Provides lifecycle management for all Mesos integration services
14
*/
15
public interface MesosServices extends AutoCloseable {
16
/**
17
* Create a worker store for persistent TaskManager state
18
* @param configuration - Configuration for store implementation
19
* @return MesosWorkerStore instance (standalone or ZooKeeper-based)
20
* @throws Exception if the worker store could not be created
21
*/
22
MesosWorkerStore createMesosWorkerStore(Configuration configuration) throws Exception;
23
24
/**
25
* Create factory for Mesos resource manager actors
26
* @return Actor factory for resource management
27
*/
28
MesosResourceManagerActorFactory createMesosResourceManagerActorFactory();
29
30
/**
31
* Get artifact server for distributing job files to tasks
32
* @return Artifact server instance
33
*/
34
MesosArtifactServer getArtifactServer();
35
36
/**
37
* Create Mesos scheduler driver for framework communication
38
* @param mesosConfig - Mesos-specific configuration
39
* @param scheduler - Scheduler implementation
40
* @param implicitAcknowledgements - Whether to configure driver for implicit acknowledgements
41
* @return Configured SchedulerDriver instance
42
*/
43
SchedulerDriver createMesosSchedulerDriver(MesosConfiguration mesosConfig,
44
Scheduler scheduler,
45
boolean implicitAcknowledgements);
46
47
/**
48
* Close all services and cleanup resources
49
* @param cleanup - Whether to perform cleanup operations
50
* @throws Exception if the closing operation failed
51
*/
52
void close(boolean cleanup) throws Exception;
53
}
54
```
55
56
### Service Factory Utilities
57
58
Utility class for creating appropriate MesosServices implementations based on high availability configuration.
59
60
```java { .api }
61
/**
62
* Utilities for creating MesosServices instances
63
* Handles selection between standalone and ZooKeeper-based implementations
64
*/
65
public class MesosServicesUtils {
66
/**
67
* Create MesosServices instance based on HA configuration
68
* @param config - Flink configuration containing HA settings
69
* @param hostname - Hostname for service binding
70
* @return Appropriate MesosServices implementation
71
*/
72
public static MesosServices createMesosServices(Configuration config, String hostname);
73
}
74
```
75
76
**Usage Example:**
77
78
```java
79
import org.apache.flink.configuration.Configuration;
80
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
81
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
82
83
// Create services based on configuration
84
Configuration config = new Configuration();
85
config.setString("high-availability", "zookeeper");
86
config.setString("high-availability.zookeeper.quorum", "zk1:2181,zk2:2181");
87
88
MesosServices services = MesosServicesUtils.createMesosServices(config, "master-host");
89
90
// Use services
91
MesosWorkerStore workerStore = services.createMesosWorkerStore(config);
92
MesosArtifactServer artifactServer = services.getArtifactServer();
93
94
// Cleanup when done
95
services.close(true);
96
```
97
98
### Resource Manager Factory
99
100
Factory for creating Mesos-specific resource managers that integrate with Flink's active resource management system.
101
102
```java { .api }
103
/**
104
* Factory for creating Mesos resource managers
105
* Integrates with Flink's ActiveResourceManager framework
106
*/
107
public class MesosResourceManagerFactory extends ActiveResourceManagerFactory<RegisteredMesosWorkerNode> {
108
/**
109
* Create Mesos resource manager factory
110
* @param mesosServices - Mesos services instance
111
* @param mesosConfiguration - Mesos scheduler configuration
112
*/
113
public MesosResourceManagerFactory(MesosServices mesosServices,
114
MesosConfiguration mesosConfiguration);
115
}
116
```
117
118
### Launchable Mesos Worker
119
120
Implementation of task launching for Mesos workers, handling the conversion from Flink's container specifications to Mesos TaskInfo.
121
122
```java { .api }
123
/**
124
* Handles launching of TaskManager processes in Mesos containers
125
* Converts Flink ContainerSpecification to Mesos TaskInfo
126
*/
127
public class LaunchableMesosWorker implements LaunchableTask {
128
/**
129
* Get unique task identifier
130
* @return Mesos task ID for this worker
131
*/
132
public Protos.TaskID taskID();
133
134
/**
135
* Get Fenzo task requirements for resource scheduling
136
* @return TaskRequest with resource and constraint requirements
137
*/
138
public TaskRequest taskRequest();
139
140
/**
141
* Launch the TaskManager task on the specified Mesos slave
142
* @param slaveId - Target Mesos slave for task execution
143
* @param allocation - Allocated resources for the task
144
* @return TaskInfo for Mesos task launch
145
*/
146
public Protos.TaskInfo launch(Protos.SlaveID slaveId, MesosResourceAllocation allocation);
147
148
/**
149
* Extract port configuration keys from Flink configuration
150
* @param config - Flink configuration
151
* @return Set of port keys requiring dynamic assignment
152
*/
153
public static Set<String> extractPortKeys(Configuration config);
154
155
/**
156
* Configure artifact server for task artifact distribution
157
* @param artifactServer - Server instance for artifact hosting
158
* @param taskManagerParameters - TaskManager configuration parameters
159
* @param config - Flink configuration
160
* @param logger - Logger for operation reporting
161
*/
162
public static void configureArtifactServer(MesosArtifactServer artifactServer,
163
MesosTaskManagerParameters taskManagerParameters,
164
Configuration config,
165
Logger logger);
166
}
167
```
168
169
**Worker Launch Example:**
170
171
```java
172
import org.apache.flink.mesos.runtime.clusterframework.LaunchableMesosWorker;
173
import org.apache.flink.mesos.util.MesosResourceAllocation;
174
175
// Create worker with resource requirements
176
LaunchableMesosWorker worker = new LaunchableMesosWorker(/* constructor params */);
177
178
// Get resource requirements for scheduling
179
TaskRequest taskRequest = worker.taskRequest();
180
double cpuCores = taskRequest.getCPUs();
181
double memoryMB = taskRequest.getMemory();
182
183
// Launch on allocated resources
184
Protos.SlaveID slaveId = Protos.SlaveID.newBuilder().setValue("slave-001").build();
185
MesosResourceAllocation allocation = new MesosResourceAllocation(/* resources */);
186
Protos.TaskInfo taskInfo = worker.launch(slaveId, allocation);
187
```
188
189
### Worker Resource Specification
190
191
Resource specification factory for creating Mesos worker resource specs that integrate with Flink's resource management system.
192
193
```java { .api }
194
/**
195
* Factory for creating Mesos worker resource specifications
196
* Handles resource requirement calculation and specification
197
*/
198
public class MesosWorkerResourceSpecFactory implements WorkerResourceSpecFactory<RegisteredMesosWorkerNode> {
199
/**
200
* Create worker resource specification from TaskManager parameters
201
* @param taskManagerParameters - Resource requirements
202
* @return Worker resource specification
203
*/
204
public RegisteredMesosWorkerNode createWorkerResourceSpec(MesosTaskManagerParameters taskManagerParameters);
205
}
206
207
/**
208
* Registered Mesos worker node with resource information
209
* Represents a TaskManager instance registered with the resource manager
210
*/
211
public class RegisteredMesosWorkerNode extends WorkerResourceSpec {
212
/**
213
* Get worker resource specification
214
* @return Resource requirements and allocation details
215
*/
216
public WorkerResourceSpec getResourceSpec();
217
218
/**
219
* Get Mesos task ID for this worker
220
* @return Unique task identifier
221
*/
222
public Protos.TaskID getTaskId();
223
}
224
```
225
226
### Resource Manager Actions
227
228
Interface defining actions that can be performed by the Mesos resource manager for cluster lifecycle management.
229
230
```java { .api }
231
/**
232
* Actions interface for Mesos resource manager operations
233
* Defines cluster management operations available to the resource manager
234
*/
235
public interface MesosResourceManagerActions {
236
/**
237
* Request allocation of new TaskManager resources
238
* @param resourceProfile - Required resource profile
239
* @param timeout - Timeout for resource allocation
240
*/
241
void requestNewWorker(ResourceProfile resourceProfile, Duration timeout);
242
243
/**
244
* Release allocated TaskManager resources
245
* @param workerId - Identifier of worker to release
246
*/
247
void releaseWorker(ResourceID workerId);
248
249
/**
250
* Get current cluster resource status
251
* @return Current allocation and utilization information
252
*/
253
ClusterResourceStatus getClusterResourceStatus();
254
}
255
```
256
257
## Resource Allocation Patterns
258
259
### Dynamic Scaling
260
261
The resource manager supports automatic scaling based on job requirements:
262
263
```java
264
// Configure auto-scaling behavior
265
Configuration config = new Configuration();
266
config.setString("resourcemanager.rpc.port", "0");
267
config.setString("resourcemanager.rpc.bind-port", "0");
268
269
// Enable reactive scaling
270
config.setBoolean("scheduler-mode.reactive", true);
271
config.setString("execution.checkpointing.mode", "EXACTLY_ONCE");
272
273
// Resource constraints
274
config.setDouble("mesos.resourcemanager.tasks.cpus", 2.0);
275
config.setString("taskmanager.memory.process.size", "2g");
276
```
277
278
### Resource Reservation
279
280
Configure resource reservation for guaranteed allocation:
281
282
```java
283
Configuration config = new Configuration();
284
285
// Framework role for reservations
286
config.setString(MesosOptions.RESOURCEMANAGER_FRAMEWORK_ROLE, "production");
287
288
// Resource constraints with reservations
289
config.setString("mesos.constraints.hard.attribute", "rack:LIKE:rack-1");
290
config.setString("mesos.resourcemanager.tasks.cpus", "2.0");
291
config.setString("mesos.resourcemanager.tasks.mem", "2048");
292
```
293
294
### Container Configuration
295
296
Support for both Mesos native containers and Docker containers:
297
298
```java
299
Configuration config = new Configuration();
300
301
// Docker container configuration
302
config.setString("mesos.resourcemanager.tasks.container.type", "docker");
303
config.setString("mesos.resourcemanager.tasks.container.docker.image", "flink:1.13.6-scala_2.11");
304
config.setString("mesos.resourcemanager.tasks.container.docker.network", "HOST");
305
306
// Volume mounts
307
config.setString("mesos.resourcemanager.tasks.container.volumes",
308
"/host-data:/container-data:RO,/host-logs:/container-logs:RW");
309
310
// Environment variables
311
config.setString("containerized.master.env.FLINK_CONF_DIR", "/opt/flink/conf");
312
```
313
314
## Error Handling and Recovery
315
316
The resource management system provides comprehensive error handling:
317
318
- **Framework re-registration**: Automatic recovery from Mesos master failures
319
- **Task failure recovery**: Automatic TaskManager restart with exponential backoff
320
- **Resource constraint violations**: Graceful handling of insufficient resources
321
- **Network partition recovery**: Reconnection and state synchronization
322
323
## Performance Optimization
324
325
### Resource Utilization
326
327
- **Offer caching**: Efficient resource offer management and utilization
328
- **Constraint-based scheduling**: Optimal task placement using Fenzo scheduler
329
- **Resource fragmentation reduction**: Smart resource allocation strategies
330
331
### Scalability Features
332
333
- **Batch task launching**: Efficient resource allocation for large clusters
334
- **Persistent connections**: Connection pooling for Mesos communication
335
- **State compression**: Efficient storage of worker state information
336
337
## Deprecation Notice
338
339
All resource management classes are deprecated as of Flink 1.13. Migration paths:
340
341
- **Kubernetes**: Use `org.apache.flink.kubernetes.operator.*` for resource management
342
- **YARN**: Use `org.apache.flink.yarn.*` resource management classes
343
344
## Types
345
346
```java { .api }
347
/**
348
* Resource allocation details for Mesos tasks
349
*/
350
public class MesosResourceAllocation {
351
public double cpus();
352
public double memoryMB();
353
public double diskMB();
354
public double networkMbps();
355
public List<Protos.Resource> mesosResources();
356
}
357
358
/**
359
* Cluster resource status information
360
*/
361
public class ClusterResourceStatus {
362
public int totalTaskManagers();
363
public int availableTaskSlots();
364
public int allocatedTaskSlots();
365
public ResourceProfile totalResources();
366
public ResourceProfile availableResources();
367
}
368
369
/**
370
* Task launch context and parameters
371
*/
372
public class TaskLaunchContext {
373
public ContainerSpecification containerSpec();
374
public MesosTaskManagerParameters taskManagerParameters();
375
public Map<String, String> environmentVariables();
376
public List<String> commandLineArguments();
377
}
378
```