or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdentry-points.mdhigh-availability.mdindex.mdresource-management.mdtask-scheduling.mdutilities.md

resource-management.mddocs/

0

# Resource Management

1

2

Mesos-specific resource manager implementation that handles dynamic TaskManager allocation, lifecycle management, and integration with Mesos cluster resources. The resource management system provides automatic scaling, fault tolerance, and efficient resource utilization.

3

4

## Capabilities

5

6

### Mesos Services Factory

7

8

Central service factory interface for creating and managing all Mesos-related components including worker stores, artifact servers, and scheduler drivers.

9

10

```java { .api }

11

/**

12

* Service factory interface for Mesos components

13

* Provides lifecycle management for all Mesos integration services

14

*/

15

public interface MesosServices extends AutoCloseable {

16

/**

17

* Create a worker store for persistent TaskManager state

18

* @param configuration - Configuration for store implementation

19

* @return MesosWorkerStore instance (standalone or ZooKeeper-based)

20

* @throws Exception if the worker store could not be created

21

*/

22

MesosWorkerStore createMesosWorkerStore(Configuration configuration) throws Exception;

23

24

/**

25

* Create factory for Mesos resource manager actors

26

* @return Actor factory for resource management

27

*/

28

MesosResourceManagerActorFactory createMesosResourceManagerActorFactory();

29

30

/**

31

* Get artifact server for distributing job files to tasks

32

* @return Artifact server instance

33

*/

34

MesosArtifactServer getArtifactServer();

35

36

/**

37

* Create Mesos scheduler driver for framework communication

38

* @param mesosConfig - Mesos-specific configuration

39

* @param scheduler - Scheduler implementation

40

* @param implicitAcknowledgements - Whether to configure driver for implicit acknowledgements

41

* @return Configured SchedulerDriver instance

42

*/

43

SchedulerDriver createMesosSchedulerDriver(MesosConfiguration mesosConfig,

44

Scheduler scheduler,

45

boolean implicitAcknowledgements);

46

47

/**

48

* Close all services and cleanup resources

49

* @param cleanup - Whether to perform cleanup operations

50

* @throws Exception if the closing operation failed

51

*/

52

void close(boolean cleanup) throws Exception;

53

}

54

```

55

56

### Service Factory Utilities

57

58

Utility class for creating appropriate MesosServices implementations based on high availability configuration.

59

60

```java { .api }

61

/**

62

* Utilities for creating MesosServices instances

63

* Handles selection between standalone and ZooKeeper-based implementations

64

*/

65

public class MesosServicesUtils {

66

/**

67

* Create MesosServices instance based on HA configuration

68

* @param config - Flink configuration containing HA settings

69

* @param hostname - Hostname for service binding

70

* @return Appropriate MesosServices implementation

71

*/

72

public static MesosServices createMesosServices(Configuration config, String hostname);

73

}

74

```

75

76

**Usage Example:**

77

78

```java

79

import org.apache.flink.configuration.Configuration;

80

import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;

81

import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;

82

83

// Create services based on configuration

84

Configuration config = new Configuration();

85

config.setString("high-availability", "zookeeper");

86

config.setString("high-availability.zookeeper.quorum", "zk1:2181,zk2:2181");

87

88

MesosServices services = MesosServicesUtils.createMesosServices(config, "master-host");

89

90

// Use services

91

MesosWorkerStore workerStore = services.createMesosWorkerStore(config);

92

MesosArtifactServer artifactServer = services.getArtifactServer();

93

94

// Cleanup when done

95

services.close(true);

96

```

97

98

### Resource Manager Factory

99

100

Factory for creating Mesos-specific resource managers that integrate with Flink's active resource management system.

101

102

```java { .api }

103

/**

104

* Factory for creating Mesos resource managers

105

* Integrates with Flink's ActiveResourceManager framework

106

*/

107

public class MesosResourceManagerFactory extends ActiveResourceManagerFactory<RegisteredMesosWorkerNode> {

108

/**

109

* Create Mesos resource manager factory

110

* @param mesosServices - Mesos services instance

111

* @param mesosConfiguration - Mesos scheduler configuration

112

*/

113

public MesosResourceManagerFactory(MesosServices mesosServices,

114

MesosConfiguration mesosConfiguration);

115

}

116

```

117

118

### Launchable Mesos Worker

119

120

Implementation of task launching for Mesos workers, handling the conversion from Flink's container specifications to Mesos TaskInfo.

121

122

```java { .api }

123

/**

124

* Handles launching of TaskManager processes in Mesos containers

125

* Converts Flink ContainerSpecification to Mesos TaskInfo

126

*/

127

public class LaunchableMesosWorker implements LaunchableTask {

128

/**

129

* Get unique task identifier

130

* @return Mesos task ID for this worker

131

*/

132

public Protos.TaskID taskID();

133

134

/**

135

* Get Fenzo task requirements for resource scheduling

136

* @return TaskRequest with resource and constraint requirements

137

*/

138

public TaskRequest taskRequest();

139

140

/**

141

* Launch the TaskManager task on the specified Mesos slave

142

* @param slaveId - Target Mesos slave for task execution

143

* @param allocation - Allocated resources for the task

144

* @return TaskInfo for Mesos task launch

145

*/

146

public Protos.TaskInfo launch(Protos.SlaveID slaveId, MesosResourceAllocation allocation);

147

148

/**

149

* Extract port configuration keys from Flink configuration

150

* @param config - Flink configuration

151

* @return Set of port keys requiring dynamic assignment

152

*/

153

public static Set<String> extractPortKeys(Configuration config);

154

155

/**

156

* Configure artifact server for task artifact distribution

157

* @param artifactServer - Server instance for artifact hosting

158

* @param taskManagerParameters - TaskManager configuration parameters

159

* @param config - Flink configuration

160

* @param logger - Logger for operation reporting

161

*/

162

public static void configureArtifactServer(MesosArtifactServer artifactServer,

163

MesosTaskManagerParameters taskManagerParameters,

164

Configuration config,

165

Logger logger);

166

}

167

```

168

169

**Worker Launch Example:**

170

171

```java

172

import org.apache.flink.mesos.runtime.clusterframework.LaunchableMesosWorker;

173

import org.apache.flink.mesos.util.MesosResourceAllocation;

174

175

// Create worker with resource requirements

176

LaunchableMesosWorker worker = new LaunchableMesosWorker(/* constructor params */);

177

178

// Get resource requirements for scheduling

179

TaskRequest taskRequest = worker.taskRequest();

180

double cpuCores = taskRequest.getCPUs();

181

double memoryMB = taskRequest.getMemory();

182

183

// Launch on allocated resources

184

Protos.SlaveID slaveId = Protos.SlaveID.newBuilder().setValue("slave-001").build();

185

MesosResourceAllocation allocation = new MesosResourceAllocation(/* resources */);

186

Protos.TaskInfo taskInfo = worker.launch(slaveId, allocation);

187

```

188

189

### Worker Resource Specification

190

191

Resource specification factory for creating Mesos worker resource specs that integrate with Flink's resource management system.

192

193

```java { .api }

194

/**

195

* Factory for creating Mesos worker resource specifications

196

* Handles resource requirement calculation and specification

197

*/

198

public class MesosWorkerResourceSpecFactory implements WorkerResourceSpecFactory<RegisteredMesosWorkerNode> {

199

/**

200

* Create worker resource specification from TaskManager parameters

201

* @param taskManagerParameters - Resource requirements

202

* @return Worker resource specification

203

*/

204

public RegisteredMesosWorkerNode createWorkerResourceSpec(MesosTaskManagerParameters taskManagerParameters);

205

}

206

207

/**

208

* Registered Mesos worker node with resource information

209

* Represents a TaskManager instance registered with the resource manager

210

*/

211

public class RegisteredMesosWorkerNode extends WorkerResourceSpec {

212

/**

213

* Get worker resource specification

214

* @return Resource requirements and allocation details

215

*/

216

public WorkerResourceSpec getResourceSpec();

217

218

/**

219

* Get Mesos task ID for this worker

220

* @return Unique task identifier

221

*/

222

public Protos.TaskID getTaskId();

223

}

224

```

225

226

### Resource Manager Actions

227

228

Interface defining actions that can be performed by the Mesos resource manager for cluster lifecycle management.

229

230

```java { .api }

231

/**

232

* Actions interface for Mesos resource manager operations

233

* Defines cluster management operations available to the resource manager

234

*/

235

public interface MesosResourceManagerActions {

236

/**

237

* Request allocation of new TaskManager resources

238

* @param resourceProfile - Required resource profile

239

* @param timeout - Timeout for resource allocation

240

*/

241

void requestNewWorker(ResourceProfile resourceProfile, Duration timeout);

242

243

/**

244

* Release allocated TaskManager resources

245

* @param workerId - Identifier of worker to release

246

*/

247

void releaseWorker(ResourceID workerId);

248

249

/**

250

* Get current cluster resource status

251

* @return Current allocation and utilization information

252

*/

253

ClusterResourceStatus getClusterResourceStatus();

254

}

255

```

256

257

## Resource Allocation Patterns

258

259

### Dynamic Scaling

260

261

The resource manager supports automatic scaling based on job requirements:

262

263

```java

264

// Configure auto-scaling behavior

265

Configuration config = new Configuration();

266

config.setString("resourcemanager.rpc.port", "0");

267

config.setString("resourcemanager.rpc.bind-port", "0");

268

269

// Enable reactive scaling

270

config.setBoolean("scheduler-mode.reactive", true);

271

config.setString("execution.checkpointing.mode", "EXACTLY_ONCE");

272

273

// Resource constraints

274

config.setDouble("mesos.resourcemanager.tasks.cpus", 2.0);

275

config.setString("taskmanager.memory.process.size", "2g");

276

```

277

278

### Resource Reservation

279

280

Configure resource reservation for guaranteed allocation:

281

282

```java

283

Configuration config = new Configuration();

284

285

// Framework role for reservations

286

config.setString(MesosOptions.RESOURCEMANAGER_FRAMEWORK_ROLE, "production");

287

288

// Resource constraints with reservations

289

config.setString("mesos.constraints.hard.attribute", "rack:LIKE:rack-1");

290

config.setString("mesos.resourcemanager.tasks.cpus", "2.0");

291

config.setString("mesos.resourcemanager.tasks.mem", "2048");

292

```

293

294

### Container Configuration

295

296

Support for both Mesos native containers and Docker containers:

297

298

```java

299

Configuration config = new Configuration();

300

301

// Docker container configuration

302

config.setString("mesos.resourcemanager.tasks.container.type", "docker");

303

config.setString("mesos.resourcemanager.tasks.container.docker.image", "flink:1.13.6-scala_2.11");

304

config.setString("mesos.resourcemanager.tasks.container.docker.network", "HOST");

305

306

// Volume mounts

307

config.setString("mesos.resourcemanager.tasks.container.volumes",

308

"/host-data:/container-data:RO,/host-logs:/container-logs:RW");

309

310

// Environment variables

311

config.setString("containerized.master.env.FLINK_CONF_DIR", "/opt/flink/conf");

312

```

313

314

## Error Handling and Recovery

315

316

The resource management system provides comprehensive error handling:

317

318

- **Framework re-registration**: Automatic recovery from Mesos master failures

319

- **Task failure recovery**: Automatic TaskManager restart with exponential backoff

320

- **Resource constraint violations**: Graceful handling of insufficient resources

321

- **Network partition recovery**: Reconnection and state synchronization

322

323

## Performance Optimization

324

325

### Resource Utilization

326

327

- **Offer caching**: Efficient resource offer management and utilization

328

- **Constraint-based scheduling**: Optimal task placement using Fenzo scheduler

329

- **Resource fragmentation reduction**: Smart resource allocation strategies

330

331

### Scalability Features

332

333

- **Batch task launching**: Efficient resource allocation for large clusters

334

- **Persistent connections**: Connection pooling for Mesos communication

335

- **State compression**: Efficient storage of worker state information

336

337

## Deprecation Notice

338

339

All resource management classes are deprecated as of Flink 1.13. Migration paths:

340

341

- **Kubernetes**: Use `org.apache.flink.kubernetes.operator.*` for resource management

342

- **YARN**: Use `org.apache.flink.yarn.*` resource management classes

343

344

## Types

345

346

```java { .api }

347

/**

348

* Resource allocation details for Mesos tasks

349

*/

350

public class MesosResourceAllocation {

351

public double cpus();

352

public double memoryMB();

353

public double diskMB();

354

public double networkMbps();

355

public List<Protos.Resource> mesosResources();

356

}

357

358

/**

359

* Cluster resource status information

360

*/

361

public class ClusterResourceStatus {

362

public int totalTaskManagers();

363

public int availableTaskSlots();

364

public int allocatedTaskSlots();

365

public ResourceProfile totalResources();

366

public ResourceProfile availableResources();

367

}

368

369

/**

370

* Task launch context and parameters

371

*/

372

public class TaskLaunchContext {

373

public ContainerSpecification containerSpec();

374

public MesosTaskManagerParameters taskManagerParameters();

375

public Map<String, String> environmentVariables();

376

public List<String> commandLineArguments();

377

}

378

```