or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cluster-deployment.mdcluster-management.mdcoarse-grained-scheduling.mdfine-grained-scheduling.mdindex.mdresource-configuration.md

index.mddocs/

0

# Apache Spark Mesos Resource Manager

1

2

Apache Spark Mesos resource manager provides integration between Apache Spark and Apache Mesos, enabling Spark applications to run on Mesos clusters with flexible resource allocation. It supports both coarse-grained mode (long-running executors with fixed resources) and fine-grained mode (dynamic resource allocation with individual tasks as Mesos tasks).

3

4

## Package Information

5

6

- **Package Name**: spark-mesos_2.11

7

- **Package Type**: Maven

8

- **Language**: Scala

9

- **Version**: 2.2.3

10

- **GroupId**: org.apache.spark

11

- **ArtifactId**: spark-mesos_2.11

12

- **Installation**: Maven dependency or SBT dependency inclusion

13

- **License**: Apache License 2.0

14

15

## Core Imports

16

17

The Spark Mesos integration is automatically discovered through the ServiceLoader mechanism when the spark-mesos jar is on the classpath. No direct imports are needed for basic usage.

18

19

```scala

20

// Standard Spark imports for Mesos integration

21

import org.apache.spark.{SparkConf, SparkContext}

22

23

// Configuration constants (if needed for advanced configuration)

24

import org.apache.spark.deploy.mesos.config._

25

```

26

27

**Note**: The core Mesos integration classes (MesosClusterManager, scheduler backends, etc.) are internal to Spark and marked as `private[spark]`. They are automatically used when you set the master URL to start with "mesos://".

28

29

## Basic Usage

30

31

```scala

32

import org.apache.spark.{SparkConf, SparkContext}

33

34

// Configure Spark to use Mesos with coarse-grained mode

35

val conf = new SparkConf()

36

.setAppName("MySparkApp")

37

.setMaster("mesos://mesos-master:5050") // Mesos master URL

38

.set("spark.mesos.coarse", "true") // Enable coarse-grained mode (default)

39

.set("spark.mesos.executor.home", "/opt/spark") // Required: Spark installation path

40

.set("spark.executor.memory", "2g")

41

.set("spark.executor.cores", "2")

42

.set("spark.mesos.executor.memoryOverhead", "512m") // Additional memory overhead

43

44

// Optional: Configure constraints and Docker

45

// .set("spark.mesos.constraints", "os:linux")

46

// .set("spark.mesos.executor.docker.image", "spark:latest")

47

48

// Create SparkContext - automatically uses MesosClusterManager

49

val sc = new SparkContext(conf)

50

51

// Your Spark application code

52

val data = sc.parallelize(1 to 1000)

53

val result = data.map(_ * 2).collect()

54

55

sc.stop()

56

```

57

58

**Fine-grained mode example:**

59

60

```scala

61

val conf = new SparkConf()

62

.setAppName("MySparkApp")

63

.setMaster("mesos://mesos-master:5050")

64

.set("spark.mesos.coarse", "false") // Enable fine-grained mode

65

.set("spark.mesos.executor.home", "/opt/spark")

66

.set("spark.mesos.mesosExecutor.cores", "1.0") // Cores for executor backend

67

68

val sc = new SparkContext(conf)

69

// Application code...

70

sc.stop()

71

```

72

73

## Configuration

74

75

Spark Mesos integration supports extensive configuration through `spark.mesos.*` properties:

76

77

### Core Configuration

78

79

- **spark.mesos.coarse** (default: `true`): Enable coarse-grained mode vs fine-grained mode

80

- **spark.mesos.executor.home**: Spark installation directory on Mesos slaves (required)

81

- **spark.mesos.executor.cores**: Number of cores per executor (coarse-grained mode)

82

- **spark.mesos.executor.memoryOverhead**: Additional memory overhead for executors

83

- **spark.mesos.mesosExecutor.cores** (default: `1.0`): CPU cores for fine-grained executor backend

84

85

### Resource Limits

86

87

- **spark.mesos.gpus.max** (default: `0`): Maximum GPUs to acquire across all executors

88

- **spark.mesos.extra.cores** (default: `0`): Extra cores per executor for overhead

89

- **spark.mesos.maxDrivers** (default: `200`): Maximum queued drivers in cluster mode

90

- **spark.mesos.retainedDrivers** (default: `200`): Number of completed drivers to retain

91

92

### Security and Authentication

93

94

- **spark.mesos.principal**: Mesos principal for framework authentication

95

- **spark.mesos.secret**: Secret for framework authentication (requires principal)

96

- **spark.mesos.role**: Mesos role for resource allocation

97

98

### Docker Support

99

100

- **spark.mesos.executor.docker.image**: Docker image for executors

101

- **spark.mesos.executor.docker.forcePullImage**: Always pull Docker image

102

- **spark.mesos.executor.docker.volumes**: Volume mappings (format: `/host:/container:mode`)

103

- **spark.mesos.executor.docker.portmaps**: Port mappings (format: `host:container:protocol`)

104

- **spark.mesos.executor.docker.parameters**: Custom Docker parameters

105

- **spark.mesos.containerizer** (default: `docker`): Container type (`docker` or `mesos`)

106

107

### Networking and URIs

108

109

- **spark.mesos.network.name**: Custom network name for containers

110

- **spark.mesos.uris**: Comma-separated URIs to download to sandbox

111

- **spark.mesos.fetcherCache.enable** (default: `false`): Enable Mesos fetcher cache

112

113

### Constraints and Filtering

114

115

- **spark.mesos.constraints**: Attribute constraints for resource offers

116

- **spark.mesos.rejectOfferDuration** (default: `120s`): Duration to reject unsuitable offers

117

- **spark.mesos.rejectOfferDurationForUnmetConstraints**: Reject duration for constraint mismatches

118

- **spark.mesos.rejectOfferDurationForReachedMaxCores**: Reject duration when max cores reached

119

120

### Cluster Mode Configuration

121

122

- **spark.mesos.dispatcher.webui.url**: Dispatcher web UI URL

123

- **spark.mesos.dispatcher.historyServer.url**: History server URL for driver links

124

- **spark.mesos.driver.constraints**: Constraints for driver placement

125

- **spark.mesos.driver.webui.url**: Driver web UI URL

126

- **spark.mesos.driver.frameworkId**: Framework ID for driver correlation

127

- **spark.mesos.driverEnv.***: Environment variables for driver

128

- **spark.mesos.dispatcher.driverDefault.***: Default configuration for submitted drivers

129

130

### Advanced Configuration

131

132

- **spark.mesos.coarse.shutdownTimeout** (default: `10s`): Graceful shutdown timeout

133

- **spark.mesos.task.labels**: Labels to apply to Mesos tasks

134

- **spark.mesos.cluster.retry.wait.max** (default: `60`): Maximum retry wait time in seconds

135

136

## Architecture

137

138

The Spark Mesos integration consists of several key components:

139

140

- **Cluster Manager**: `MesosClusterManager` handles integration with Spark's scheduler system

141

- **Scheduler Backends**: Two modes of operation - coarse-grained and fine-grained scheduling

142

- **Cluster Dispatcher**: For cluster deployment mode, manages driver submission and lifecycle

143

- **Executor Backend**: Runs on Mesos slaves to execute Spark tasks

144

- **Utilities**: Shared components for Mesos integration, resource management, and configuration

145

146

## Capabilities

147

148

**Note**: The following API descriptions document the internal implementation classes that provide Mesos integration functionality. These classes are automatically used by Spark when you configure it to use Mesos (master URL starting with "mesos://"). They are documented here for completeness and understanding of the underlying functionality.

149

150

### Cluster Management

151

152

Core cluster management functionality that integrates Spark with Mesos resource negotiation and task scheduling. Automatically discovered via ServiceLoader.

153

154

```scala { .api }

155

private[spark] class MesosClusterManager extends ExternalClusterManager {

156

def canCreate(masterURL: String): Boolean

157

def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler

158

def createSchedulerBackend(

159

sc: SparkContext,

160

masterURL: String,

161

scheduler: TaskScheduler

162

): SchedulerBackend

163

def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit

164

}

165

```

166

167

[Cluster Management](./cluster-management.md)

168

169

### Coarse-Grained Scheduling

170

171

Long-running executor mode that holds onto Mesos resources for the duration of the Spark job, providing lower latency and more predictable performance.

172

173

```scala { .api }

174

private[spark] class MesosCoarseGrainedSchedulerBackend(

175

scheduler: TaskSchedulerImpl,

176

sc: SparkContext,

177

master: String,

178

securityManager: SecurityManager

179

) extends CoarseGrainedSchedulerBackend with org.apache.mesos.Scheduler {

180

def start(): Unit

181

def stop(): Unit

182

def applicationId(): String

183

def sufficientResourcesRegistered(): Boolean

184

def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean]

185

def doKillExecutors(executorIds: Seq[String]): Future[Boolean]

186

}

187

```

188

189

[Coarse-Grained Scheduling](./coarse-grained-scheduling.md)

190

191

### Fine-Grained Scheduling

192

193

Dynamic resource allocation mode where each Spark task maps to a separate Mesos task, allowing for efficient resource sharing across multiple applications.

194

195

```scala { .api }

196

private[spark] class MesosFineGrainedSchedulerBackend(

197

scheduler: TaskSchedulerImpl,

198

sc: SparkContext,

199

master: String

200

) extends SchedulerBackend with org.apache.mesos.Scheduler {

201

def start(): Unit

202

def stop(): Unit

203

def reviveOffers(): Unit

204

def killTask(taskId: Long, executorId: String, interruptThread: Boolean, reason: String): Unit

205

def defaultParallelism(): Int

206

def applicationId(): String

207

}

208

```

209

210

[Fine-Grained Scheduling](./fine-grained-scheduling.md)

211

212

### Cluster Deployment

213

214

Cluster mode deployment functionality for submitting Spark drivers as Mesos tasks, including driver lifecycle management and recovery.

215

216

```scala { .api }

217

private[spark] class MesosClusterScheduler(

218

engineFactory: MesosClusterPersistenceEngineFactory,

219

conf: SparkConf

220

) extends Scheduler with MesosSchedulerUtils {

221

def submitDriver(desc: MesosDriverDescription): CreateSubmissionResponse

222

def killDriver(submissionId: String): KillSubmissionResponse

223

def getDriverStatus(submissionId: String): SubmissionStatusResponse

224

def start(): Unit

225

def stop(): Unit

226

}

227

228

private[mesos] class MesosClusterDispatcher(

229

args: MesosClusterDispatcherArguments,

230

conf: SparkConf

231

) {

232

def start(): Unit

233

def stop(): Unit

234

def awaitShutdown(): Unit

235

}

236

```

237

238

[Cluster Deployment](./cluster-deployment.md)

239

240

### Resource and Configuration Management

241

242

Utilities for Mesos resource negotiation, constraint matching, and configuration management.

243

244

```scala { .api }

245

trait MesosSchedulerUtils {

246

def createSchedulerDriver(

247

masterUrl: String,

248

scheduler: Scheduler,

249

sparkUser: String,

250

appName: String,

251

conf: SparkConf,

252

webuiUrl: Option[String] = None,

253

checkpoint: Option[Boolean] = None,

254

failoverTimeout: Option[Double] = None,

255

frameworkId: Option[String] = None

256

): SchedulerDriver

257

258

def getResource(res: JList[Resource], name: String): Double

259

def partitionResources(

260

resources: JList[Resource],

261

resourceName: String,

262

amountToUse: Double

263

): (List[Resource], List[Resource])

264

def matchesAttributeRequirements(

265

slaveOfferConstraints: Map[String, Set[String]],

266

offerAttributes: Map[String, GeneratedMessage]

267

): Boolean

268

}

269

```

270

271

[Resource and Configuration Management](./resource-configuration.md)

272

273

### External Shuffle Service

274

275

Mesos-specific external shuffle service that provides shuffle data persistence and cleanup when drivers terminate.

276

277

```scala { .api }

278

private[mesos] class MesosExternalShuffleService extends ExternalShuffleService {

279

def start(): Unit

280

def stop(): Unit

281

}

282

283

private[mesos] class MesosExternalShuffleBlockHandler(

284

transportConf: TransportConf,

285

cleanerIntervalS: Long

286

) extends ExternalShuffleBlockHandler with Logging {

287

def registerApplication(appShuffleInfo: ApplicationShuffleInfo): Unit

288

def applicationRemoved(appId: String, cleanupLocalDirs: Boolean): Unit

289

}

290

291

## Types

292

293

```scala { .api }

294

// Required imports for types

295

import java.util.Date

296

import org.apache.spark.deploy.Command

297

import org.apache.mesos.Protos.{TaskID, SlaveID, TaskStatus}

298

import org.apache.spark.scheduler.cluster.mesos.MesosClusterRetryState

299

import org.apache.spark.internal.config.ConfigEntry

300

301

// Driver submission and state (internal classes)

302

private[spark] class MesosDriverDescription(

303

name: String,

304

jarUrl: String,

305

mem: Int,

306

cores: Double,

307

supervise: Boolean,

308

command: Command,

309

schedulerProperties: Map[String, String],

310

submissionId: String,

311

submissionDate: Date,

312

retryState: Option[MesosClusterRetryState] = None

313

)

314

315

private[spark] class MesosClusterSubmissionState(

316

val driverDescription: MesosDriverDescription,

317

val taskId: TaskID,

318

val slaveId: SlaveID,

319

var mesosTaskStatus: Option[TaskStatus],

320

var startDate: Date,

321

var finishDate: Option[Date],

322

val frameworkId: String

323

) extends Serializable

324

325

// Configuration objects

326

package object config {

327

val RECOVERY_MODE: ConfigEntry[String]

328

val DISPATCHER_WEBUI_URL: ConfigEntry[Option[String]]

329

val ZOOKEEPER_URL: ConfigEntry[Option[String]]

330

val HISTORY_SERVER_URL: ConfigEntry[Option[String]]

331

val DRIVER_CONSTRAINTS: ConfigEntry[String]

332

}

333

```