0
# Spark Mesos
1
2
Spark Mesos provides Apache Mesos cluster manager integration for Apache Spark, enabling Spark applications to run on Mesos clusters with both coarse-grained and fine-grained scheduling modes. It supports dynamic resource allocation, fault tolerance, and provides utilities for Mesos-specific configuration and monitoring.
3
4
## Package Information
5
6
- **Package Name**: spark-mesos_2.13
7
- **Package Type**: maven
8
- **Language**: Scala
9
- **Group ID**: org.apache.spark
10
- **Installation**: Add to your Maven dependencies or include in Spark's classpath
11
12
## Core Imports
13
14
```scala
15
import org.apache.spark.SparkConf
16
import org.apache.spark.SparkContext
17
import org.apache.spark.scheduler.cluster.mesos.MesosProtoUtils
18
import org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackendUtil
19
import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler
20
import org.apache.spark.network.shuffle.mesos.MesosExternalBlockStoreClient
21
import org.apache.spark.deploy.mesos.config._
22
```
23
24
For Java applications:
25
26
```java
27
import org.apache.spark.network.shuffle.mesos.MesosExternalBlockStoreClient;
28
import org.apache.spark.network.util.TransportConf;
29
import org.apache.spark.network.sasl.SecretKeyHolder;
30
import org.apache.spark.executor.MesosExecutorBackend;
31
```
32
33
## Basic Usage
34
35
```scala
36
import org.apache.spark.{SparkConf, SparkContext}
37
38
// Configure Spark to use Mesos
39
val conf = new SparkConf()
40
.setAppName("MySparkApp")
41
.setMaster("mesos://mesos-master:5050") // Connect to Mesos master
42
.set("spark.mesos.coarse", "true") // Use coarse-grained mode
43
.set("spark.cores.max", "4") // Limit maximum cores
44
.set("spark.mesos.executor.docker.image", "spark-executor:latest")
45
46
val sc = new SparkContext(conf)
47
48
// Your Spark application logic here
49
val data = sc.parallelize(1 to 100)
50
val result = data.map(_ * 2).collect()
51
52
sc.stop()
53
```
54
55
## Architecture
56
57
Spark Mesos integration consists of several key components:
58
59
- **Cluster Manager**: `MesosClusterManager` automatically selected when using `mesos://` URLs
60
- **Scheduler Backends**: Coarse-grained and fine-grained backends for different resource sharing modes
61
- **Cluster Mode Components**: `MesosClusterScheduler` and dispatcher for cluster mode deployments
62
- **Scheduler Utilities**: `MesosSchedulerBackendUtil` for container and volume management
63
- **Configuration System**: Extensive configuration options for Mesos-specific settings
64
- **Security Management**: `MesosSecretConfig` for handling secrets and authentication
65
- **External Shuffle Service**: Optional shuffle service integration for coarse-grained mode
66
- **Protocol Utilities**: Helper functions for working with Mesos protocol buffers
67
68
## Capabilities
69
70
### Cluster Configuration
71
72
Configure Spark to connect to and run on Mesos clusters using master URL and configuration options.
73
74
```scala { .api }
75
// Master URL format
76
val master: String = "mesos://host:port"
77
78
// Key configuration options
79
val conf = new SparkConf()
80
.setMaster(master)
81
.set("spark.mesos.coarse", "true") // Enable coarse-grained mode
82
```
83
84
### Execution Modes
85
86
Choose between coarse-grained and fine-grained execution modes depending on your resource sharing requirements.
87
88
**Coarse-grained Mode** (default):
89
- Spark acquires long-lived Mesos tasks on each machine
90
- Lower latency, predictable resource allocation
91
- Better for batch processing and long-running applications
92
93
**Fine-grained Mode**:
94
- Each Spark task maps to a separate Mesos task
95
- Dynamic resource sharing between applications
96
- Better for resource utilization in multi-tenant environments
97
98
```scala { .api }
99
// Coarse-grained mode (default)
100
conf.set("spark.mesos.coarse", "true")
101
102
// Fine-grained mode
103
conf.set("spark.mesos.coarse", "false")
104
```
105
106
### Resource Management
107
108
Control resource allocation and constraints for Spark executors running on Mesos.
109
110
```scala { .api }
111
// Core and memory configuration
112
conf.set("spark.cores.max", "8") // Maximum cores across all executors
113
conf.set("spark.executor.cores", "2") // Cores per executor
114
conf.set("spark.executor.memory", "2g") // Memory per executor
115
conf.set("spark.mesos.executor.memoryOverhead", "512") // Additional memory overhead
116
117
// GPU support
118
conf.set("spark.mesos.gpus.max", "2") // Maximum GPU resources
119
120
// Resource constraints
121
conf.set("spark.mesos.constraints", "os:centos") // Attribute-based constraints
122
conf.set("spark.mesos.role", "spark-framework") // Mesos framework role
123
```
124
125
### Docker Integration
126
127
Run Spark executors in Docker containers with full configuration support.
128
129
```scala { .api }
130
// Docker executor configuration
131
conf.set("spark.mesos.executor.docker.image", "spark:3.5.6")
132
conf.set("spark.mesos.executor.docker.forcePullImage", "true")
133
conf.set("spark.mesos.containerizer", "mesos") // Use Mesos containerizer
134
135
// Volume mounts
136
conf.set("spark.mesos.executor.docker.volumes",
137
"/host/data:/container/data:ro,/host/logs:/container/logs:rw")
138
139
// Docker parameters
140
conf.set("spark.mesos.executor.docker.parameters",
141
"memory-swap=-1,ulimit=nofile=65536:65536")
142
```
143
144
### Authentication and Security
145
146
Configure authentication credentials for secure Mesos clusters.
147
148
```scala { .api }
149
// Principal and secret authentication
150
conf.set("spark.mesos.principal", "spark-framework")
151
conf.set("spark.mesos.secret", "secret-value")
152
153
// File-based authentication
154
conf.set("spark.mesos.principal.file", "/path/to/principal.txt")
155
conf.set("spark.mesos.secret.file", "/path/to/secret.txt")
156
157
// Driver secrets
158
conf.set("spark.mesos.driver.secret.names", "secret1,secret2")
159
conf.set("spark.mesos.driver.secret.values", "value1,value2")
160
conf.set("spark.mesos.driver.secret.envkeys", "SECRET1_ENV,SECRET2_ENV")
161
```
162
163
### Network Configuration
164
165
Configure networking options for containerized and multi-tenant environments.
166
167
```scala { .api }
168
// Named network attachment
169
conf.set("spark.mesos.network.name", "spark-network")
170
conf.set("spark.mesos.network.labels", "env:production,team:data")
171
172
// Service URLs
173
conf.set("spark.mesos.driver.webui.url", "http://driver-host:4040")
174
conf.set("spark.mesos.dispatcher.webui.url", "http://dispatcher:8080")
175
```
176
177
### Task Management and Labeling
178
179
Add metadata and labels to Mesos tasks for monitoring and organization.
180
181
```scala { .api }
182
// Task labels for monitoring
183
conf.set("spark.mesos.task.labels", "app:spark,env:prod,team:analytics")
184
conf.set("spark.mesos.driver.labels", "type:driver,priority:high")
185
186
// Task constraints and placement
187
conf.set("spark.mesos.driver.constraints", "zone:us-west-1")
188
conf.set("spark.mesos.rejectOfferDuration", "120s")
189
```
190
191
### External Shuffle Service
192
193
Configure external shuffle service for improved performance in coarse-grained mode.
194
195
```java { .api }
196
/**
197
* Client for communicating with external shuffle service in Mesos coarse-grained mode
198
*/
199
public class MesosExternalBlockStoreClient extends ExternalBlockStoreClient {
200
201
/**
202
* Creates a Mesos external shuffle client
203
* @param conf Transport configuration
204
* @param secretKeyHolder Secret key holder for authentication
205
* @param authEnabled Whether authentication is enabled
206
* @param registrationTimeoutMs Timeout for registration in milliseconds
207
*/
208
public MesosExternalBlockStoreClient(
209
TransportConf conf,
210
SecretKeyHolder secretKeyHolder,
211
boolean authEnabled,
212
long registrationTimeoutMs);
213
214
/**
215
* Register driver with the shuffle service
216
* @param host Shuffle service host
217
* @param port Shuffle service port
218
* @param heartbeatTimeoutMs Heartbeat timeout in milliseconds
219
* @param heartbeatIntervalMs Heartbeat interval in milliseconds
220
*/
221
public void registerDriverWithShuffleService(
222
String host,
223
int port,
224
long heartbeatTimeoutMs,
225
long heartbeatIntervalMs) throws IOException, InterruptedException;
226
}
227
```
228
229
### Protocol Utilities
230
231
Utility functions for working with Mesos protocol buffers and labels.
232
233
```scala { .api }
234
/**
235
* Utilities for working with Mesos protocol buffers
236
*/
237
object MesosProtoUtils {
238
/**
239
* Parses a label string into Mesos Labels protobuf
240
* @param labelsStr Label string in format "key1:value1,key2:value2"
241
* @return Mesos Labels.Builder for constructing protobuf messages
242
*/
243
def mesosLabels(labelsStr: String): org.apache.mesos.Protos.Labels.Builder
244
}
245
```
246
247
### Scheduler Backend Utilities
248
249
Utilities for container and volume management in Mesos environments.
250
251
```scala { .api }
252
/**
253
* Utility object providing helper methods for Mesos scheduler backends
254
*/
255
object MesosSchedulerBackendUtil {
256
/**
257
* Parse volume specifications for container mounts
258
* @param volumes Sequence of volume specifications in format "hostPath:containerPath:mode"
259
* @return List of Mesos Volume objects
260
*/
261
def parseVolumesSpec(volumes: Seq[String]): List[Volume]
262
263
/**
264
* Parse port mapping specifications for Docker containers
265
* @param portmaps Sequence of port mapping specifications in format "hostPort:containerPort:protocol"
266
* @return List of DockerInfo.PortMapping objects
267
*/
268
def parsePortMappingsSpec(portmaps: Seq[String]): List[DockerInfo.PortMapping]
269
270
/**
271
* Build container information from Spark configuration
272
* @param conf Spark configuration containing container settings
273
* @return ContainerInfo.Builder for Mesos container setup
274
*/
275
def buildContainerInfo(conf: SparkConf): ContainerInfo.Builder
276
277
/**
278
* Convert Spark task state to Mesos task state
279
* @param state Spark TaskState
280
* @return Corresponding MesosTaskState
281
*/
282
def taskStateToMesos(state: TaskState): MesosTaskState
283
284
/**
285
* Extract secret environment variables from configuration
286
* @param conf Spark configuration
287
* @param secretConfig Secret configuration for driver or executor
288
* @return Sequence of environment variables for secrets
289
*/
290
def getSecretEnvVar(conf: SparkConf, secretConfig: MesosSecretConfig): Seq[Variable]
291
292
/**
293
* Extract secret volume from configuration
294
* @param conf Spark configuration
295
* @param secretConfig Secret configuration for driver or executor
296
* @return Optional volume for secret mounting
297
*/
298
def getSecretVolume(conf: SparkConf, secretConfig: MesosSecretConfig): Option[Volume]
299
}
300
```
301
302
### Cluster Mode Management
303
304
Components for managing Spark applications in Mesos cluster mode with dispatcher.
305
306
```scala { .api }
307
/**
308
* Scheduler for managing driver lifecycles in Mesos cluster mode
309
*/
310
class MesosClusterScheduler extends MesosScheduler {
311
/**
312
* Submit a new driver to the Mesos cluster
313
* @param description Driver description containing application details
314
* @return Submission response with driver ID and status
315
*/
316
def submitDriver(description: MesosDriverDescription): CreateSubmissionResponse
317
318
/**
319
* Kill a running driver
320
* @param submissionId Driver submission ID
321
* @return Kill response with success status
322
*/
323
def killDriver(submissionId: String): KillSubmissionResponse
324
325
/**
326
* Get status of a driver
327
* @param submissionId Driver submission ID
328
* @return Driver status response with current state
329
*/
330
def getDriverStatus(submissionId: String): SubmissionStatusResponse
331
}
332
333
/**
334
* Description of a driver to be submitted to Mesos cluster
335
*/
336
class MesosDriverDescription(
337
jarUrl: String,
338
mainClass: String,
339
args: Array[String],
340
conf: SparkConf,
341
supervise: Boolean = false) {
342
343
def appName: String
344
def sparkProperties: Map[String, String]
345
def environmentVariables: Map[String, String]
346
}
347
348
/**
349
* Configuration helper for managing secrets in Mesos environments
350
*/
351
class MesosSecretConfig(taskType: String) {
352
/**
353
* Get comma-separated secret names for the specified task type
354
* @return Secret names configuration value
355
*/
356
def secretNames: String
357
358
/**
359
* Get comma-separated secret values for the specified task type
360
* @return Secret values configuration value
361
*/
362
def secretValues: String
363
364
/**
365
* Get comma-separated environment variable keys for secrets
366
* @return Environment variable keys configuration value
367
*/
368
def secretEnvKeys: String
369
370
/**
371
* Get comma-separated secret filenames for file-based secrets
372
* @return Secret filenames configuration value
373
*/
374
def secretFilenames: String
375
}
376
```
377
378
### Cluster Mode and Dispatcher
379
380
Configuration for running Spark applications in Mesos cluster mode with dispatcher.
381
382
```scala { .api }
383
// Cluster mode configuration
384
conf.set("spark.mesos.maxDrivers", "100") // Maximum concurrent drivers
385
conf.set("spark.mesos.retainedDrivers", "50") // Number of retained drivers
386
conf.set("spark.mesos.dispatcher.queue", "default") // Dispatcher queue name
387
388
// Failover configuration
389
conf.set("spark.mesos.driver.failoverTimeout", "600.0") // Driver failover timeout in seconds
390
conf.set("spark.mesos.cluster.retry.wait.max", "60") // Maximum retry wait time
391
```
392
393
## Configuration Reference
394
395
### Core Configuration Options
396
397
```scala { .api }
398
// Execution mode
399
"spark.mesos.coarse" -> "true" // Boolean: Use coarse-grained mode (default: true)
400
401
// Resource allocation
402
"spark.cores.max" -> "8" // String: Maximum cores across all executors
403
"spark.mesos.mesosExecutor.cores" -> "1.0" // String: Cores per Mesos executor (fine-grained)
404
"spark.mesos.extra.cores" -> "0" // String: Extra cores to advertise per executor
405
"spark.mesos.executor.memoryOverhead" -> "384" // String: Additional memory per executor (MiB)
406
"spark.mesos.gpus.max" -> "0" // String: Maximum GPU resources
407
408
// Constraints and placement
409
"spark.mesos.constraints" -> "" // String: Attribute-based constraints
410
"spark.mesos.driver.constraints" -> "" // String: Driver placement constraints
411
"spark.mesos.role" -> "" // String: Mesos framework role
412
413
// Docker configuration
414
"spark.mesos.executor.docker.image" -> "" // String: Docker image for executors
415
"spark.mesos.executor.docker.forcePullImage" -> "" // String: Force pull Docker image
416
"spark.mesos.executor.docker.volumes" -> "" // String: Volume mounts (comma-separated)
417
"spark.mesos.executor.docker.portmaps" -> "" // String: Port mappings (comma-separated)
418
"spark.mesos.executor.docker.parameters" -> "" // String: Docker run parameters
419
"spark.mesos.containerizer" -> "docker" // String: Containerizer type ("docker" or "mesos")
420
```
421
422
### Authentication Configuration
423
424
```scala { .api }
425
// Principal and secret
426
"spark.mesos.principal" -> "" // String: Kerberos principal name
427
"spark.mesos.principal.file" -> "" // String: Path to principal file
428
"spark.mesos.secret" -> "" // String: Authentication secret
429
"spark.mesos.secret.file" -> "" // String: Path to secret file
430
431
// Driver secrets
432
"spark.mesos.driver.secret.names" -> "" // String: Comma-separated secret names
433
"spark.mesos.driver.secret.values" -> "" // String: Comma-separated secret values
434
"spark.mesos.driver.secret.envkeys" -> "" // String: Environment variable names
435
"spark.mesos.driver.secret.filenames" -> "" // String: Secret file paths
436
437
// Executor secrets (same pattern as driver)
438
"spark.mesos.executor.secret.names" -> ""
439
"spark.mesos.executor.secret.values" -> ""
440
"spark.mesos.executor.secret.envkeys" -> ""
441
"spark.mesos.executor.secret.filenames" -> ""
442
```
443
444
### Network and Service Configuration
445
446
```scala { .api }
447
// Networking
448
"spark.mesos.network.name" -> "" // String: Named network for containers
449
"spark.mesos.network.labels" -> "" // String: Network labels for CNI
450
451
// Web UI URLs
452
"spark.mesos.driver.webui.url" -> "" // String: Driver web UI URL
453
"spark.mesos.dispatcher.webui.url" -> "" // String: Dispatcher web UI URL
454
"spark.mesos.proxy.baseURL" -> "" // String: Proxy base URL
455
456
// History server
457
"spark.mesos.dispatcher.historyServer.url" -> "" // String: History server URL
458
```
459
460
### Advanced Configuration
461
462
```scala { .api }
463
// Offer management
464
"spark.mesos.rejectOfferDuration" -> "120s" // String: Default reject duration
465
"spark.mesos.rejectOfferDurationForUnmetConstraints" -> "" // String: Reject duration for unmet constraints
466
"spark.mesos.rejectOfferDurationForReachedMaxCores" -> "" // String: Reject duration when max cores reached
467
468
// Task and executor configuration
469
"spark.mesos.task.labels" -> "" // String: Labels for tasks
470
"spark.mesos.driver.labels" -> "" // String: Labels for driver
471
"spark.mesos.uris" -> "" // String: Comma-separated URIs to download
472
"spark.executor.uri" -> "" // String: Executor URI
473
"spark.mesos.executor.home" -> "" // String: Spark home directory on executors
474
475
// Fetcher and caching
476
"spark.mesos.fetcherCache.enable" -> "false" // String: Enable Mesos fetcher cache
477
"spark.mesos.appJar.local.resolution.mode" -> "host" // String: Local JAR resolution mode
478
479
// Cluster mode
480
"spark.mesos.maxDrivers" -> "200" // String: Maximum concurrent drivers
481
"spark.mesos.retainedDrivers" -> "200" // String: Number of retained completed drivers
482
"spark.mesos.driver.failoverTimeout" -> "0.0" // String: Driver failover timeout (seconds)
483
```
484
485
## Error Handling
486
487
Common exceptions and error scenarios:
488
489
- **SparkException**: Thrown for malformed configuration or connection issues
490
- **IOException**: Network-related errors when connecting to Mesos master or shuffle service
491
- **IllegalArgumentException**: Invalid configuration values or parameters
492
493
## Usage Examples
494
495
### Basic Mesos Application
496
497
```scala
498
import org.apache.spark.{SparkConf, SparkContext}
499
500
val conf = new SparkConf()
501
.setAppName("Mesos Example")
502
.setMaster("mesos://mesos-master:5050")
503
.set("spark.mesos.coarse", "true")
504
.set("spark.cores.max", "4")
505
506
val sc = new SparkContext(conf)
507
val rdd = sc.parallelize(1 to 1000)
508
val sum = rdd.reduce(_ + _)
509
println(s"Sum: $sum")
510
sc.stop()
511
```
512
513
### Docker-based Execution
514
515
```scala
516
val conf = new SparkConf()
517
.setAppName("Dockerized Spark")
518
.setMaster("mesos://mesos-master:5050")
519
.set("spark.mesos.executor.docker.image", "apache/spark:3.5.6")
520
.set("spark.mesos.executor.docker.volumes", "/data:/spark-data:ro")
521
.set("spark.mesos.containerizer", "mesos")
522
.set("spark.executor.memory", "2g")
523
.set("spark.executor.cores", "2")
524
525
val sc = new SparkContext(conf)
526
// Application logic here
527
sc.stop()
528
```
529
530
### External Shuffle Service Setup
531
532
```java
533
import org.apache.spark.network.shuffle.mesos.MesosExternalBlockStoreClient;
534
import org.apache.spark.network.util.TransportConf;
535
import org.apache.spark.network.sasl.SecretKeyHolder;
536
537
// Create and configure client
538
TransportConf transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle");
539
SecretKeyHolder secretKeyHolder = new SecretKeyHolder() {
540
@Override
541
public String getSaslUser(String appId) { return "spark"; }
542
@Override
543
public String getSecretKey(String appId) { return "secret"; }
544
};
545
546
MesosExternalBlockStoreClient client = new MesosExternalBlockStoreClient(
547
transportConf, secretKeyHolder, true, 5000);
548
549
// Register with shuffle service
550
client.registerDriverWithShuffleService("shuffle-host", 7337, 120000, 30000);
551
```
552
553
### Cluster Mode Application Submission
554
555
```scala
556
import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler
557
import org.apache.spark.deploy.mesos.MesosDriverDescription
558
559
// Create cluster scheduler
560
val clusterScheduler = new MesosClusterScheduler()
561
562
// Configure driver description
563
val driverConf = new SparkConf()
564
.set("spark.executor.memory", "2g")
565
.set("spark.executor.cores", "2")
566
.set("spark.mesos.coarse", "true")
567
568
val driverDescription = new MesosDriverDescription(
569
jarUrl = "hdfs://namenode:9000/spark-apps/my-app.jar",
570
mainClass = "com.example.MySparkApp",
571
args = Array("--input", "/data/input", "--output", "/data/output"),
572
conf = driverConf,
573
supervise = true
574
)
575
576
// Submit driver to cluster
577
val submissionResponse = clusterScheduler.submitDriver(driverDescription)
578
println(s"Driver submitted with ID: ${submissionResponse.submissionId}")
579
580
// Monitor driver status
581
val statusResponse = clusterScheduler.getDriverStatus(submissionResponse.submissionId)
582
println(s"Driver status: ${statusResponse.driverState}")
583
```
584
585
### Advanced Container Configuration
586
587
```scala
588
import org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackendUtil
589
590
val conf = new SparkConf()
591
.set("spark.mesos.executor.docker.image", "apache/spark:3.5.6-scala2.13")
592
.set("spark.mesos.executor.docker.volumes",
593
"/data:/spark-data:ro,/logs:/spark-logs:rw")
594
.set("spark.mesos.executor.docker.portmaps",
595
"8080:8080:tcp,8081:8081:tcp")
596
.set("spark.mesos.containerizer", "mesos")
597
598
// Parse volume specifications
599
val volumes = MesosSchedulerBackendUtil.parseVolumesSpec(
600
Seq("/data:/spark-data:ro", "/logs:/spark-logs:rw"))
601
602
// Parse port mappings
603
val portMappings = MesosSchedulerBackendUtil.parsePortMappingsSpec(
604
Seq("8080:8080:tcp", "8081:8081:tcp"))
605
606
// Build container info
607
val containerInfo = MesosSchedulerBackendUtil.buildContainerInfo(conf)
608
```
609
610
## Types
611
612
```scala { .api }
613
// Configuration type aliases
614
type ConfigKey = String
615
type ConfigValue = String
616
type MasterURL = String // Format: "mesos://host:port"
617
618
// Label parsing
619
type LabelString = String // Format: "key1:value1,key2:value2"
620
621
// Cluster mode types
622
type SubmissionId = String
623
type DriverState = String // SUBMITTED, RUNNING, FINISHED, FAILED, KILLED
624
type FrameworkId = String
625
626
// Container and volume types
627
type VolumeSpec = String // Format: "hostPath:containerPath:mode"
628
type PortMapSpec = String // Format: "hostPort:containerPort:protocol"
629
630
// Response types for cluster operations
631
trait CreateSubmissionResponse {
632
def submissionId: String
633
def success: Boolean
634
}
635
636
trait KillSubmissionResponse {
637
def submissionId: String
638
def success: Boolean
639
}
640
641
trait SubmissionStatusResponse {
642
def submissionId: String
643
def driverState: String
644
def success: Boolean
645
}
646
647
// Mesos protocol buffer types (from Apache Mesos)
648
import org.apache.mesos.Protos.{Volume, ContainerInfo, TaskState}
649
import org.apache.mesos.v1.Protos.{Variable, Labels}
650
```
651
652
## Dependencies
653
654
- Apache Mesos Java libraries (org.apache.mesos:mesos)
655
- Apache Spark Core (org.apache.spark:spark-core_2.13)
656
- Google Protobuf for Mesos protocol communication