or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.md

index.mddocs/

0

# Spark Mesos

1

2

Spark Mesos provides Apache Mesos cluster manager integration for Apache Spark, enabling Spark applications to run on Mesos clusters with both coarse-grained and fine-grained scheduling modes. It supports dynamic resource allocation, fault tolerance, and provides utilities for Mesos-specific configuration and monitoring.

3

4

## Package Information

5

6

- **Package Name**: spark-mesos_2.13

7

- **Package Type**: maven

8

- **Language**: Scala

9

- **Group ID**: org.apache.spark

10

- **Installation**: Add to your Maven dependencies or include in Spark's classpath

11

12

## Core Imports

13

14

```scala

15

import org.apache.spark.SparkConf

16

import org.apache.spark.SparkContext

17

import org.apache.spark.scheduler.cluster.mesos.MesosProtoUtils

18

import org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackendUtil

19

import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler

20

import org.apache.spark.network.shuffle.mesos.MesosExternalBlockStoreClient

21

import org.apache.spark.deploy.mesos.config._

22

```

23

24

For Java applications:

25

26

```java

27

import org.apache.spark.network.shuffle.mesos.MesosExternalBlockStoreClient;

28

import org.apache.spark.network.util.TransportConf;

29

import org.apache.spark.network.sasl.SecretKeyHolder;

30

import org.apache.spark.executor.MesosExecutorBackend;

31

```

32

33

## Basic Usage

34

35

```scala

36

import org.apache.spark.{SparkConf, SparkContext}

37

38

// Configure Spark to use Mesos

39

val conf = new SparkConf()

40

.setAppName("MySparkApp")

41

.setMaster("mesos://mesos-master:5050") // Connect to Mesos master

42

.set("spark.mesos.coarse", "true") // Use coarse-grained mode

43

.set("spark.cores.max", "4") // Limit maximum cores

44

.set("spark.mesos.executor.docker.image", "spark-executor:latest")

45

46

val sc = new SparkContext(conf)

47

48

// Your Spark application logic here

49

val data = sc.parallelize(1 to 100)

50

val result = data.map(_ * 2).collect()

51

52

sc.stop()

53

```

54

55

## Architecture

56

57

Spark Mesos integration consists of several key components:

58

59

- **Cluster Manager**: `MesosClusterManager` automatically selected when using `mesos://` URLs

60

- **Scheduler Backends**: Coarse-grained and fine-grained backends for different resource sharing modes

61

- **Cluster Mode Components**: `MesosClusterScheduler` and dispatcher for cluster mode deployments

62

- **Scheduler Utilities**: `MesosSchedulerBackendUtil` for container and volume management

63

- **Configuration System**: Extensive configuration options for Mesos-specific settings

64

- **Security Management**: `MesosSecretConfig` for handling secrets and authentication

65

- **External Shuffle Service**: Optional shuffle service integration for coarse-grained mode

66

- **Protocol Utilities**: Helper functions for working with Mesos protocol buffers

67

68

## Capabilities

69

70

### Cluster Configuration

71

72

Configure Spark to connect to and run on Mesos clusters using master URL and configuration options.

73

74

```scala { .api }

75

// Master URL format

76

val master: String = "mesos://host:port"

77

78

// Key configuration options

79

val conf = new SparkConf()

80

.setMaster(master)

81

.set("spark.mesos.coarse", "true") // Enable coarse-grained mode

82

```

83

84

### Execution Modes

85

86

Choose between coarse-grained and fine-grained execution modes depending on your resource sharing requirements.

87

88

**Coarse-grained Mode** (default):

89

- Spark acquires long-lived Mesos tasks on each machine

90

- Lower latency, predictable resource allocation

91

- Better for batch processing and long-running applications

92

93

**Fine-grained Mode**:

94

- Each Spark task maps to a separate Mesos task

95

- Dynamic resource sharing between applications

96

- Better for resource utilization in multi-tenant environments

97

98

```scala { .api }

99

// Coarse-grained mode (default)

100

conf.set("spark.mesos.coarse", "true")

101

102

// Fine-grained mode

103

conf.set("spark.mesos.coarse", "false")

104

```

105

106

### Resource Management

107

108

Control resource allocation and constraints for Spark executors running on Mesos.

109

110

```scala { .api }

111

// Core and memory configuration

112

conf.set("spark.cores.max", "8") // Maximum cores across all executors

113

conf.set("spark.executor.cores", "2") // Cores per executor

114

conf.set("spark.executor.memory", "2g") // Memory per executor

115

conf.set("spark.mesos.executor.memoryOverhead", "512") // Additional memory overhead

116

117

// GPU support

118

conf.set("spark.mesos.gpus.max", "2") // Maximum GPU resources

119

120

// Resource constraints

121

conf.set("spark.mesos.constraints", "os:centos") // Attribute-based constraints

122

conf.set("spark.mesos.role", "spark-framework") // Mesos framework role

123

```

124

125

### Docker Integration

126

127

Run Spark executors in Docker containers with full configuration support.

128

129

```scala { .api }

130

// Docker executor configuration

131

conf.set("spark.mesos.executor.docker.image", "spark:3.5.6")

132

conf.set("spark.mesos.executor.docker.forcePullImage", "true")

133

conf.set("spark.mesos.containerizer", "mesos") // Use Mesos containerizer

134

135

// Volume mounts

136

conf.set("spark.mesos.executor.docker.volumes",

137

"/host/data:/container/data:ro,/host/logs:/container/logs:rw")

138

139

// Docker parameters

140

conf.set("spark.mesos.executor.docker.parameters",

141

"memory-swap=-1,ulimit=nofile=65536:65536")

142

```

143

144

### Authentication and Security

145

146

Configure authentication credentials for secure Mesos clusters.

147

148

```scala { .api }

149

// Principal and secret authentication

150

conf.set("spark.mesos.principal", "spark-framework")

151

conf.set("spark.mesos.secret", "secret-value")

152

153

// File-based authentication

154

conf.set("spark.mesos.principal.file", "/path/to/principal.txt")

155

conf.set("spark.mesos.secret.file", "/path/to/secret.txt")

156

157

// Driver secrets

158

conf.set("spark.mesos.driver.secret.names", "secret1,secret2")

159

conf.set("spark.mesos.driver.secret.values", "value1,value2")

160

conf.set("spark.mesos.driver.secret.envkeys", "SECRET1_ENV,SECRET2_ENV")

161

```

162

163

### Network Configuration

164

165

Configure networking options for containerized and multi-tenant environments.

166

167

```scala { .api }

168

// Named network attachment

169

conf.set("spark.mesos.network.name", "spark-network")

170

conf.set("spark.mesos.network.labels", "env:production,team:data")

171

172

// Service URLs

173

conf.set("spark.mesos.driver.webui.url", "http://driver-host:4040")

174

conf.set("spark.mesos.dispatcher.webui.url", "http://dispatcher:8080")

175

```

176

177

### Task Management and Labeling

178

179

Add metadata and labels to Mesos tasks for monitoring and organization.

180

181

```scala { .api }

182

// Task labels for monitoring

183

conf.set("spark.mesos.task.labels", "app:spark,env:prod,team:analytics")

184

conf.set("spark.mesos.driver.labels", "type:driver,priority:high")

185

186

// Task constraints and placement

187

conf.set("spark.mesos.driver.constraints", "zone:us-west-1")

188

conf.set("spark.mesos.rejectOfferDuration", "120s")

189

```

190

191

### External Shuffle Service

192

193

Configure external shuffle service for improved performance in coarse-grained mode.

194

195

```java { .api }

196

/**

197

* Client for communicating with external shuffle service in Mesos coarse-grained mode

198

*/

199

public class MesosExternalBlockStoreClient extends ExternalBlockStoreClient {

200

201

/**

202

* Creates a Mesos external shuffle client

203

* @param conf Transport configuration

204

* @param secretKeyHolder Secret key holder for authentication

205

* @param authEnabled Whether authentication is enabled

206

* @param registrationTimeoutMs Timeout for registration in milliseconds

207

*/

208

public MesosExternalBlockStoreClient(

209

TransportConf conf,

210

SecretKeyHolder secretKeyHolder,

211

boolean authEnabled,

212

long registrationTimeoutMs);

213

214

/**

215

* Register driver with the shuffle service

216

* @param host Shuffle service host

217

* @param port Shuffle service port

218

* @param heartbeatTimeoutMs Heartbeat timeout in milliseconds

219

* @param heartbeatIntervalMs Heartbeat interval in milliseconds

220

*/

221

public void registerDriverWithShuffleService(

222

String host,

223

int port,

224

long heartbeatTimeoutMs,

225

long heartbeatIntervalMs) throws IOException, InterruptedException;

226

}

227

```

228

229

### Protocol Utilities

230

231

Utility functions for working with Mesos protocol buffers and labels.

232

233

```scala { .api }

234

/**

235

* Utilities for working with Mesos protocol buffers

236

*/

237

object MesosProtoUtils {

238

/**

239

* Parses a label string into Mesos Labels protobuf

240

* @param labelsStr Label string in format "key1:value1,key2:value2"

241

* @return Mesos Labels.Builder for constructing protobuf messages

242

*/

243

def mesosLabels(labelsStr: String): org.apache.mesos.Protos.Labels.Builder

244

}

245

```

246

247

### Scheduler Backend Utilities

248

249

Utilities for container and volume management in Mesos environments.

250

251

```scala { .api }

252

/**

253

* Utility object providing helper methods for Mesos scheduler backends

254

*/

255

object MesosSchedulerBackendUtil {

256

/**

257

* Parse volume specifications for container mounts

258

* @param volumes Sequence of volume specifications in format "hostPath:containerPath:mode"

259

* @return List of Mesos Volume objects

260

*/

261

def parseVolumesSpec(volumes: Seq[String]): List[Volume]

262

263

/**

264

* Parse port mapping specifications for Docker containers

265

* @param portmaps Sequence of port mapping specifications in format "hostPort:containerPort:protocol"

266

* @return List of DockerInfo.PortMapping objects

267

*/

268

def parsePortMappingsSpec(portmaps: Seq[String]): List[DockerInfo.PortMapping]

269

270

/**

271

* Build container information from Spark configuration

272

* @param conf Spark configuration containing container settings

273

* @return ContainerInfo.Builder for Mesos container setup

274

*/

275

def buildContainerInfo(conf: SparkConf): ContainerInfo.Builder

276

277

/**

278

* Convert Spark task state to Mesos task state

279

* @param state Spark TaskState

280

* @return Corresponding MesosTaskState

281

*/

282

def taskStateToMesos(state: TaskState): MesosTaskState

283

284

/**

285

* Extract secret environment variables from configuration

286

* @param conf Spark configuration

287

* @param secretConfig Secret configuration for driver or executor

288

* @return Sequence of environment variables for secrets

289

*/

290

def getSecretEnvVar(conf: SparkConf, secretConfig: MesosSecretConfig): Seq[Variable]

291

292

/**

293

* Extract secret volume from configuration

294

* @param conf Spark configuration

295

* @param secretConfig Secret configuration for driver or executor

296

* @return Optional volume for secret mounting

297

*/

298

def getSecretVolume(conf: SparkConf, secretConfig: MesosSecretConfig): Option[Volume]

299

}

300

```

301

302

### Cluster Mode Management

303

304

Components for managing Spark applications in Mesos cluster mode with dispatcher.

305

306

```scala { .api }

307

/**

308

* Scheduler for managing driver lifecycles in Mesos cluster mode

309

*/

310

class MesosClusterScheduler extends MesosScheduler {

311

/**

312

* Submit a new driver to the Mesos cluster

313

* @param description Driver description containing application details

314

* @return Submission response with driver ID and status

315

*/

316

def submitDriver(description: MesosDriverDescription): CreateSubmissionResponse

317

318

/**

319

* Kill a running driver

320

* @param submissionId Driver submission ID

321

* @return Kill response with success status

322

*/

323

def killDriver(submissionId: String): KillSubmissionResponse

324

325

/**

326

* Get status of a driver

327

* @param submissionId Driver submission ID

328

* @return Driver status response with current state

329

*/

330

def getDriverStatus(submissionId: String): SubmissionStatusResponse

331

}

332

333

/**

334

* Description of a driver to be submitted to Mesos cluster

335

*/

336

class MesosDriverDescription(

337

jarUrl: String,

338

mainClass: String,

339

args: Array[String],

340

conf: SparkConf,

341

supervise: Boolean = false) {

342

343

def appName: String

344

def sparkProperties: Map[String, String]

345

def environmentVariables: Map[String, String]

346

}

347

348

/**

349

* Configuration helper for managing secrets in Mesos environments

350

*/

351

class MesosSecretConfig(taskType: String) {

352

/**

353

* Get comma-separated secret names for the specified task type

354

* @return Secret names configuration value

355

*/

356

def secretNames: String

357

358

/**

359

* Get comma-separated secret values for the specified task type

360

* @return Secret values configuration value

361

*/

362

def secretValues: String

363

364

/**

365

* Get comma-separated environment variable keys for secrets

366

* @return Environment variable keys configuration value

367

*/

368

def secretEnvKeys: String

369

370

/**

371

* Get comma-separated secret filenames for file-based secrets

372

* @return Secret filenames configuration value

373

*/

374

def secretFilenames: String

375

}

376

```

377

378

### Cluster Mode and Dispatcher

379

380

Configuration for running Spark applications in Mesos cluster mode with dispatcher.

381

382

```scala { .api }

383

// Cluster mode configuration

384

conf.set("spark.mesos.maxDrivers", "100") // Maximum concurrent drivers

385

conf.set("spark.mesos.retainedDrivers", "50") // Number of retained drivers

386

conf.set("spark.mesos.dispatcher.queue", "default") // Dispatcher queue name

387

388

// Failover configuration

389

conf.set("spark.mesos.driver.failoverTimeout", "600.0") // Driver failover timeout in seconds

390

conf.set("spark.mesos.cluster.retry.wait.max", "60") // Maximum retry wait time

391

```

392

393

## Configuration Reference

394

395

### Core Configuration Options

396

397

```scala { .api }

398

// Execution mode

399

"spark.mesos.coarse" -> "true" // Boolean: Use coarse-grained mode (default: true)

400

401

// Resource allocation

402

"spark.cores.max" -> "8" // String: Maximum cores across all executors

403

"spark.mesos.mesosExecutor.cores" -> "1.0" // String: Cores per Mesos executor (fine-grained)

404

"spark.mesos.extra.cores" -> "0" // String: Extra cores to advertise per executor

405

"spark.mesos.executor.memoryOverhead" -> "384" // String: Additional memory per executor (MiB)

406

"spark.mesos.gpus.max" -> "0" // String: Maximum GPU resources

407

408

// Constraints and placement

409

"spark.mesos.constraints" -> "" // String: Attribute-based constraints

410

"spark.mesos.driver.constraints" -> "" // String: Driver placement constraints

411

"spark.mesos.role" -> "" // String: Mesos framework role

412

413

// Docker configuration

414

"spark.mesos.executor.docker.image" -> "" // String: Docker image for executors

415

"spark.mesos.executor.docker.forcePullImage" -> "" // String: Force pull Docker image

416

"spark.mesos.executor.docker.volumes" -> "" // String: Volume mounts (comma-separated)

417

"spark.mesos.executor.docker.portmaps" -> "" // String: Port mappings (comma-separated)

418

"spark.mesos.executor.docker.parameters" -> "" // String: Docker run parameters

419

"spark.mesos.containerizer" -> "docker" // String: Containerizer type ("docker" or "mesos")

420

```

421

422

### Authentication Configuration

423

424

```scala { .api }

425

// Principal and secret

426

"spark.mesos.principal" -> "" // String: Kerberos principal name

427

"spark.mesos.principal.file" -> "" // String: Path to principal file

428

"spark.mesos.secret" -> "" // String: Authentication secret

429

"spark.mesos.secret.file" -> "" // String: Path to secret file

430

431

// Driver secrets

432

"spark.mesos.driver.secret.names" -> "" // String: Comma-separated secret names

433

"spark.mesos.driver.secret.values" -> "" // String: Comma-separated secret values

434

"spark.mesos.driver.secret.envkeys" -> "" // String: Environment variable names

435

"spark.mesos.driver.secret.filenames" -> "" // String: Secret file paths

436

437

// Executor secrets (same pattern as driver)

438

"spark.mesos.executor.secret.names" -> ""

439

"spark.mesos.executor.secret.values" -> ""

440

"spark.mesos.executor.secret.envkeys" -> ""

441

"spark.mesos.executor.secret.filenames" -> ""

442

```

443

444

### Network and Service Configuration

445

446

```scala { .api }

447

// Networking

448

"spark.mesos.network.name" -> "" // String: Named network for containers

449

"spark.mesos.network.labels" -> "" // String: Network labels for CNI

450

451

// Web UI URLs

452

"spark.mesos.driver.webui.url" -> "" // String: Driver web UI URL

453

"spark.mesos.dispatcher.webui.url" -> "" // String: Dispatcher web UI URL

454

"spark.mesos.proxy.baseURL" -> "" // String: Proxy base URL

455

456

// History server

457

"spark.mesos.dispatcher.historyServer.url" -> "" // String: History server URL

458

```

459

460

### Advanced Configuration

461

462

```scala { .api }

463

// Offer management

464

"spark.mesos.rejectOfferDuration" -> "120s" // String: Default reject duration

465

"spark.mesos.rejectOfferDurationForUnmetConstraints" -> "" // String: Reject duration for unmet constraints

466

"spark.mesos.rejectOfferDurationForReachedMaxCores" -> "" // String: Reject duration when max cores reached

467

468

// Task and executor configuration

469

"spark.mesos.task.labels" -> "" // String: Labels for tasks

470

"spark.mesos.driver.labels" -> "" // String: Labels for driver

471

"spark.mesos.uris" -> "" // String: Comma-separated URIs to download

472

"spark.executor.uri" -> "" // String: Executor URI

473

"spark.mesos.executor.home" -> "" // String: Spark home directory on executors

474

475

// Fetcher and caching

476

"spark.mesos.fetcherCache.enable" -> "false" // String: Enable Mesos fetcher cache

477

"spark.mesos.appJar.local.resolution.mode" -> "host" // String: Local JAR resolution mode

478

479

// Cluster mode

480

"spark.mesos.maxDrivers" -> "200" // String: Maximum concurrent drivers

481

"spark.mesos.retainedDrivers" -> "200" // String: Number of retained completed drivers

482

"spark.mesos.driver.failoverTimeout" -> "0.0" // String: Driver failover timeout (seconds)

483

```

484

485

## Error Handling

486

487

Common exceptions and error scenarios:

488

489

- **SparkException**: Thrown for malformed configuration or connection issues

490

- **IOException**: Network-related errors when connecting to Mesos master or shuffle service

491

- **IllegalArgumentException**: Invalid configuration values or parameters

492

493

## Usage Examples

494

495

### Basic Mesos Application

496

497

```scala

498

import org.apache.spark.{SparkConf, SparkContext}

499

500

val conf = new SparkConf()

501

.setAppName("Mesos Example")

502

.setMaster("mesos://mesos-master:5050")

503

.set("spark.mesos.coarse", "true")

504

.set("spark.cores.max", "4")

505

506

val sc = new SparkContext(conf)

507

val rdd = sc.parallelize(1 to 1000)

508

val sum = rdd.reduce(_ + _)

509

println(s"Sum: $sum")

510

sc.stop()

511

```

512

513

### Docker-based Execution

514

515

```scala

516

val conf = new SparkConf()

517

.setAppName("Dockerized Spark")

518

.setMaster("mesos://mesos-master:5050")

519

.set("spark.mesos.executor.docker.image", "apache/spark:3.5.6")

520

.set("spark.mesos.executor.docker.volumes", "/data:/spark-data:ro")

521

.set("spark.mesos.containerizer", "mesos")

522

.set("spark.executor.memory", "2g")

523

.set("spark.executor.cores", "2")

524

525

val sc = new SparkContext(conf)

526

// Application logic here

527

sc.stop()

528

```

529

530

### External Shuffle Service Setup

531

532

```java

533

import org.apache.spark.network.shuffle.mesos.MesosExternalBlockStoreClient;

534

import org.apache.spark.network.util.TransportConf;

535

import org.apache.spark.network.sasl.SecretKeyHolder;

536

537

// Create and configure client

538

TransportConf transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle");

539

SecretKeyHolder secretKeyHolder = new SecretKeyHolder() {

540

@Override

541

public String getSaslUser(String appId) { return "spark"; }

542

@Override

543

public String getSecretKey(String appId) { return "secret"; }

544

};

545

546

MesosExternalBlockStoreClient client = new MesosExternalBlockStoreClient(

547

transportConf, secretKeyHolder, true, 5000);

548

549

// Register with shuffle service

550

client.registerDriverWithShuffleService("shuffle-host", 7337, 120000, 30000);

551

```

552

553

### Cluster Mode Application Submission

554

555

```scala

556

import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler

557

import org.apache.spark.deploy.mesos.MesosDriverDescription

558

559

// Create cluster scheduler

560

val clusterScheduler = new MesosClusterScheduler()

561

562

// Configure driver description

563

val driverConf = new SparkConf()

564

.set("spark.executor.memory", "2g")

565

.set("spark.executor.cores", "2")

566

.set("spark.mesos.coarse", "true")

567

568

val driverDescription = new MesosDriverDescription(

569

jarUrl = "hdfs://namenode:9000/spark-apps/my-app.jar",

570

mainClass = "com.example.MySparkApp",

571

args = Array("--input", "/data/input", "--output", "/data/output"),

572

conf = driverConf,

573

supervise = true

574

)

575

576

// Submit driver to cluster

577

val submissionResponse = clusterScheduler.submitDriver(driverDescription)

578

println(s"Driver submitted with ID: ${submissionResponse.submissionId}")

579

580

// Monitor driver status

581

val statusResponse = clusterScheduler.getDriverStatus(submissionResponse.submissionId)

582

println(s"Driver status: ${statusResponse.driverState}")

583

```

584

585

### Advanced Container Configuration

586

587

```scala

588

import org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackendUtil

589

590

val conf = new SparkConf()

591

.set("spark.mesos.executor.docker.image", "apache/spark:3.5.6-scala2.13")

592

.set("spark.mesos.executor.docker.volumes",

593

"/data:/spark-data:ro,/logs:/spark-logs:rw")

594

.set("spark.mesos.executor.docker.portmaps",

595

"8080:8080:tcp,8081:8081:tcp")

596

.set("spark.mesos.containerizer", "mesos")

597

598

// Parse volume specifications

599

val volumes = MesosSchedulerBackendUtil.parseVolumesSpec(

600

Seq("/data:/spark-data:ro", "/logs:/spark-logs:rw"))

601

602

// Parse port mappings

603

val portMappings = MesosSchedulerBackendUtil.parsePortMappingsSpec(

604

Seq("8080:8080:tcp", "8081:8081:tcp"))

605

606

// Build container info

607

val containerInfo = MesosSchedulerBackendUtil.buildContainerInfo(conf)

608

```

609

610

## Types

611

612

```scala { .api }

613

// Configuration type aliases

614

type ConfigKey = String

615

type ConfigValue = String

616

type MasterURL = String // Format: "mesos://host:port"

617

618

// Label parsing

619

type LabelString = String // Format: "key1:value1,key2:value2"

620

621

// Cluster mode types

622

type SubmissionId = String

623

type DriverState = String // SUBMITTED, RUNNING, FINISHED, FAILED, KILLED

624

type FrameworkId = String

625

626

// Container and volume types

627

type VolumeSpec = String // Format: "hostPath:containerPath:mode"

628

type PortMapSpec = String // Format: "hostPort:containerPort:protocol"

629

630

// Response types for cluster operations

631

trait CreateSubmissionResponse {

632

def submissionId: String

633

def success: Boolean

634

}

635

636

trait KillSubmissionResponse {

637

def submissionId: String

638

def success: Boolean

639

}

640

641

trait SubmissionStatusResponse {

642

def submissionId: String

643

def driverState: String

644

def success: Boolean

645

}

646

647

// Mesos protocol buffer types (from Apache Mesos)

648

import org.apache.mesos.Protos.{Volume, ContainerInfo, TaskState}

649

import org.apache.mesos.v1.Protos.{Variable, Labels}

650

```

651

652

## Dependencies

653

654

- Apache Mesos Java libraries (org.apache.mesos:mesos)

655

- Apache Spark Core (org.apache.spark:spark-core_2.13)

656

- Google Protobuf for Mesos protocol communication