Apache Spark Mesos resource manager that enables Spark applications to run on Apache Mesos clusters with both coarse-grained and fine-grained scheduling modes
npx @tessl/cli install tessl/maven-org-apache-spark--spark-mesos_2-11@2.2.00
# Apache Spark Mesos Resource Manager
1
2
Apache Spark Mesos resource manager provides integration between Apache Spark and Apache Mesos, enabling Spark applications to run on Mesos clusters with flexible resource allocation. It supports both coarse-grained mode (long-running executors with fixed resources) and fine-grained mode (dynamic resource allocation with individual tasks as Mesos tasks).
3
4
## Package Information
5
6
- **Package Name**: spark-mesos_2.11
7
- **Package Type**: Maven
8
- **Language**: Scala
9
- **Version**: 2.2.3
10
- **GroupId**: org.apache.spark
11
- **ArtifactId**: spark-mesos_2.11
12
- **Installation**: Maven dependency or SBT dependency inclusion
13
- **License**: Apache License 2.0
14
15
## Core Imports
16
17
The Spark Mesos integration is automatically discovered through the ServiceLoader mechanism when the spark-mesos jar is on the classpath. No direct imports are needed for basic usage.
18
19
```scala
20
// Standard Spark imports for Mesos integration
21
import org.apache.spark.{SparkConf, SparkContext}
22
23
// Configuration constants (if needed for advanced configuration)
24
import org.apache.spark.deploy.mesos.config._
25
```
26
27
**Note**: The core Mesos integration classes (MesosClusterManager, scheduler backends, etc.) are internal to Spark and marked as `private[spark]`. They are automatically used when you set the master URL to start with "mesos://".
28
29
## Basic Usage
30
31
```scala
32
import org.apache.spark.{SparkConf, SparkContext}
33
34
// Configure Spark to use Mesos with coarse-grained mode
35
val conf = new SparkConf()
36
.setAppName("MySparkApp")
37
.setMaster("mesos://mesos-master:5050") // Mesos master URL
38
.set("spark.mesos.coarse", "true") // Enable coarse-grained mode (default)
39
.set("spark.mesos.executor.home", "/opt/spark") // Required: Spark installation path
40
.set("spark.executor.memory", "2g")
41
.set("spark.executor.cores", "2")
42
.set("spark.mesos.executor.memoryOverhead", "512m") // Additional memory overhead
43
44
// Optional: Configure constraints and Docker
45
// .set("spark.mesos.constraints", "os:linux")
46
// .set("spark.mesos.executor.docker.image", "spark:latest")
47
48
// Create SparkContext - automatically uses MesosClusterManager
49
val sc = new SparkContext(conf)
50
51
// Your Spark application code
52
val data = sc.parallelize(1 to 1000)
53
val result = data.map(_ * 2).collect()
54
55
sc.stop()
56
```
57
58
**Fine-grained mode example:**
59
60
```scala
61
val conf = new SparkConf()
62
.setAppName("MySparkApp")
63
.setMaster("mesos://mesos-master:5050")
64
.set("spark.mesos.coarse", "false") // Enable fine-grained mode
65
.set("spark.mesos.executor.home", "/opt/spark")
66
.set("spark.mesos.mesosExecutor.cores", "1.0") // Cores for executor backend
67
68
val sc = new SparkContext(conf)
69
// Application code...
70
sc.stop()
71
```
72
73
## Configuration
74
75
Spark Mesos integration supports extensive configuration through `spark.mesos.*` properties:
76
77
### Core Configuration
78
79
- **spark.mesos.coarse** (default: `true`): Enable coarse-grained mode vs fine-grained mode
80
- **spark.mesos.executor.home**: Spark installation directory on Mesos slaves (required)
81
- **spark.mesos.executor.cores**: Number of cores per executor (coarse-grained mode)
82
- **spark.mesos.executor.memoryOverhead**: Additional memory overhead for executors
83
- **spark.mesos.mesosExecutor.cores** (default: `1.0`): CPU cores for fine-grained executor backend
84
85
### Resource Limits
86
87
- **spark.mesos.gpus.max** (default: `0`): Maximum GPUs to acquire across all executors
88
- **spark.mesos.extra.cores** (default: `0`): Extra cores per executor for overhead
89
- **spark.mesos.maxDrivers** (default: `200`): Maximum queued drivers in cluster mode
90
- **spark.mesos.retainedDrivers** (default: `200`): Number of completed drivers to retain
91
92
### Security and Authentication
93
94
- **spark.mesos.principal**: Mesos principal for framework authentication
95
- **spark.mesos.secret**: Secret for framework authentication (requires principal)
96
- **spark.mesos.role**: Mesos role for resource allocation
97
98
### Docker Support
99
100
- **spark.mesos.executor.docker.image**: Docker image for executors
101
- **spark.mesos.executor.docker.forcePullImage**: Always pull Docker image
102
- **spark.mesos.executor.docker.volumes**: Volume mappings (format: `/host:/container:mode`)
103
- **spark.mesos.executor.docker.portmaps**: Port mappings (format: `host:container:protocol`)
104
- **spark.mesos.executor.docker.parameters**: Custom Docker parameters
105
- **spark.mesos.containerizer** (default: `docker`): Container type (`docker` or `mesos`)
106
107
### Networking and URIs
108
109
- **spark.mesos.network.name**: Custom network name for containers
110
- **spark.mesos.uris**: Comma-separated URIs to download to sandbox
111
- **spark.mesos.fetcherCache.enable** (default: `false`): Enable Mesos fetcher cache
112
113
### Constraints and Filtering
114
115
- **spark.mesos.constraints**: Attribute constraints for resource offers
116
- **spark.mesos.rejectOfferDuration** (default: `120s`): Duration to reject unsuitable offers
117
- **spark.mesos.rejectOfferDurationForUnmetConstraints**: Reject duration for constraint mismatches
118
- **spark.mesos.rejectOfferDurationForReachedMaxCores**: Reject duration when max cores reached
119
120
### Cluster Mode Configuration
121
122
- **spark.mesos.dispatcher.webui.url**: Dispatcher web UI URL
123
- **spark.mesos.dispatcher.historyServer.url**: History server URL for driver links
124
- **spark.mesos.driver.constraints**: Constraints for driver placement
125
- **spark.mesos.driver.webui.url**: Driver web UI URL
126
- **spark.mesos.driver.frameworkId**: Framework ID for driver correlation
127
- **spark.mesos.driverEnv.***: Environment variables for driver
128
- **spark.mesos.dispatcher.driverDefault.***: Default configuration for submitted drivers
129
130
### Advanced Configuration
131
132
- **spark.mesos.coarse.shutdownTimeout** (default: `10s`): Graceful shutdown timeout
133
- **spark.mesos.task.labels**: Labels to apply to Mesos tasks
134
- **spark.mesos.cluster.retry.wait.max** (default: `60`): Maximum retry wait time in seconds
135
136
## Architecture
137
138
The Spark Mesos integration consists of several key components:
139
140
- **Cluster Manager**: `MesosClusterManager` handles integration with Spark's scheduler system
141
- **Scheduler Backends**: Two modes of operation - coarse-grained and fine-grained scheduling
142
- **Cluster Dispatcher**: For cluster deployment mode, manages driver submission and lifecycle
143
- **Executor Backend**: Runs on Mesos slaves to execute Spark tasks
144
- **Utilities**: Shared components for Mesos integration, resource management, and configuration
145
146
## Capabilities
147
148
**Note**: The following API descriptions document the internal implementation classes that provide Mesos integration functionality. These classes are automatically used by Spark when you configure it to use Mesos (master URL starting with "mesos://"). They are documented here for completeness and understanding of the underlying functionality.
149
150
### Cluster Management
151
152
Core cluster management functionality that integrates Spark with Mesos resource negotiation and task scheduling. Automatically discovered via ServiceLoader.
153
154
```scala { .api }
155
private[spark] class MesosClusterManager extends ExternalClusterManager {
156
def canCreate(masterURL: String): Boolean
157
def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler
158
def createSchedulerBackend(
159
sc: SparkContext,
160
masterURL: String,
161
scheduler: TaskScheduler
162
): SchedulerBackend
163
def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit
164
}
165
```
166
167
[Cluster Management](./cluster-management.md)
168
169
### Coarse-Grained Scheduling
170
171
Long-running executor mode that holds onto Mesos resources for the duration of the Spark job, providing lower latency and more predictable performance.
172
173
```scala { .api }
174
private[spark] class MesosCoarseGrainedSchedulerBackend(
175
scheduler: TaskSchedulerImpl,
176
sc: SparkContext,
177
master: String,
178
securityManager: SecurityManager
179
) extends CoarseGrainedSchedulerBackend with org.apache.mesos.Scheduler {
180
def start(): Unit
181
def stop(): Unit
182
def applicationId(): String
183
def sufficientResourcesRegistered(): Boolean
184
def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean]
185
def doKillExecutors(executorIds: Seq[String]): Future[Boolean]
186
}
187
```
188
189
[Coarse-Grained Scheduling](./coarse-grained-scheduling.md)
190
191
### Fine-Grained Scheduling
192
193
Dynamic resource allocation mode where each Spark task maps to a separate Mesos task, allowing for efficient resource sharing across multiple applications.
194
195
```scala { .api }
196
private[spark] class MesosFineGrainedSchedulerBackend(
197
scheduler: TaskSchedulerImpl,
198
sc: SparkContext,
199
master: String
200
) extends SchedulerBackend with org.apache.mesos.Scheduler {
201
def start(): Unit
202
def stop(): Unit
203
def reviveOffers(): Unit
204
def killTask(taskId: Long, executorId: String, interruptThread: Boolean, reason: String): Unit
205
def defaultParallelism(): Int
206
def applicationId(): String
207
}
208
```
209
210
[Fine-Grained Scheduling](./fine-grained-scheduling.md)
211
212
### Cluster Deployment
213
214
Cluster mode deployment functionality for submitting Spark drivers as Mesos tasks, including driver lifecycle management and recovery.
215
216
```scala { .api }
217
private[spark] class MesosClusterScheduler(
218
engineFactory: MesosClusterPersistenceEngineFactory,
219
conf: SparkConf
220
) extends Scheduler with MesosSchedulerUtils {
221
def submitDriver(desc: MesosDriverDescription): CreateSubmissionResponse
222
def killDriver(submissionId: String): KillSubmissionResponse
223
def getDriverStatus(submissionId: String): SubmissionStatusResponse
224
def start(): Unit
225
def stop(): Unit
226
}
227
228
private[mesos] class MesosClusterDispatcher(
229
args: MesosClusterDispatcherArguments,
230
conf: SparkConf
231
) {
232
def start(): Unit
233
def stop(): Unit
234
def awaitShutdown(): Unit
235
}
236
```
237
238
[Cluster Deployment](./cluster-deployment.md)
239
240
### Resource and Configuration Management
241
242
Utilities for Mesos resource negotiation, constraint matching, and configuration management.
243
244
```scala { .api }
245
trait MesosSchedulerUtils {
246
def createSchedulerDriver(
247
masterUrl: String,
248
scheduler: Scheduler,
249
sparkUser: String,
250
appName: String,
251
conf: SparkConf,
252
webuiUrl: Option[String] = None,
253
checkpoint: Option[Boolean] = None,
254
failoverTimeout: Option[Double] = None,
255
frameworkId: Option[String] = None
256
): SchedulerDriver
257
258
def getResource(res: JList[Resource], name: String): Double
259
def partitionResources(
260
resources: JList[Resource],
261
resourceName: String,
262
amountToUse: Double
263
): (List[Resource], List[Resource])
264
def matchesAttributeRequirements(
265
slaveOfferConstraints: Map[String, Set[String]],
266
offerAttributes: Map[String, GeneratedMessage]
267
): Boolean
268
}
269
```
270
271
[Resource and Configuration Management](./resource-configuration.md)
272
273
### External Shuffle Service
274
275
Mesos-specific external shuffle service that provides shuffle data persistence and cleanup when drivers terminate.
276
277
```scala { .api }
278
private[mesos] class MesosExternalShuffleService extends ExternalShuffleService {
279
def start(): Unit
280
def stop(): Unit
281
}
282
283
private[mesos] class MesosExternalShuffleBlockHandler(
284
transportConf: TransportConf,
285
cleanerIntervalS: Long
286
) extends ExternalShuffleBlockHandler with Logging {
287
def registerApplication(appShuffleInfo: ApplicationShuffleInfo): Unit
288
def applicationRemoved(appId: String, cleanupLocalDirs: Boolean): Unit
289
}
290
291
## Types
292
293
```scala { .api }
294
// Required imports for types
295
import java.util.Date
296
import org.apache.spark.deploy.Command
297
import org.apache.mesos.Protos.{TaskID, SlaveID, TaskStatus}
298
import org.apache.spark.scheduler.cluster.mesos.MesosClusterRetryState
299
import org.apache.spark.internal.config.ConfigEntry
300
301
// Driver submission and state (internal classes)
302
private[spark] class MesosDriverDescription(
303
name: String,
304
jarUrl: String,
305
mem: Int,
306
cores: Double,
307
supervise: Boolean,
308
command: Command,
309
schedulerProperties: Map[String, String],
310
submissionId: String,
311
submissionDate: Date,
312
retryState: Option[MesosClusterRetryState] = None
313
)
314
315
private[spark] class MesosClusterSubmissionState(
316
val driverDescription: MesosDriverDescription,
317
val taskId: TaskID,
318
val slaveId: SlaveID,
319
var mesosTaskStatus: Option[TaskStatus],
320
var startDate: Date,
321
var finishDate: Option[Date],
322
val frameworkId: String
323
) extends Serializable
324
325
// Configuration objects
326
package object config {
327
val RECOVERY_MODE: ConfigEntry[String]
328
val DISPATCHER_WEBUI_URL: ConfigEntry[Option[String]]
329
val ZOOKEEPER_URL: ConfigEntry[Option[String]]
330
val HISTORY_SERVER_URL: ConfigEntry[Option[String]]
331
val DRIVER_CONSTRAINTS: ConfigEntry[String]
332
}
333
```