or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client.mdhandler.mdindex.mdmesos.mdprotocol.mdresolver.mdsecurity.md

mesos.mddocs/

0

# Mesos Integration

1

2

Specialized components for Mesos cluster manager integration, including driver registration and heartbeat mechanisms for reliable shuffle service cleanup.

3

4

## Capabilities

5

6

### MesosExternalShuffleClient

7

8

Extended shuffle client for Mesos coarse-grained mode with driver registration and heartbeat functionality.

9

10

```java { .api }

11

/**

12

* A client for talking to the external shuffle service in Mesos coarse-grained mode.

13

* Used by the Spark driver to register with each external shuffle service on the cluster.

14

* Provides heartbeat mechanism for reliable cleanup when applications exit.

15

*/

16

public class MesosExternalShuffleClient extends ExternalShuffleClient {

17

/**

18

* Creates a Mesos external shuffle client.

19

* Inherits standard ExternalShuffleClient functionality with additional Mesos-specific features.

20

*

21

* @param conf transport configuration

22

* @param secretKeyHolder secret key holder for SASL authentication

23

* @param authEnabled whether SASL authentication is enabled

24

*/

25

public MesosExternalShuffleClient(TransportConf conf, SecretKeyHolder secretKeyHolder,

26

boolean authEnabled);

27

28

/**

29

* Registers the Spark driver with the external shuffle service.

30

* Required for the shuffle service to track driver liveness and clean up

31

* shuffle files when the driver terminates. Starts heartbeat mechanism.

32

*

33

* @param host shuffle service host

34

* @param port shuffle service port

35

* @param heartbeatTimeoutMs timeout for heartbeat responses in milliseconds

36

* @param heartbeatIntervalMs interval between heartbeats in milliseconds

37

* @throws IOException if registration fails

38

* @throws InterruptedException if registration is interrupted

39

*/

40

public void registerDriverWithShuffleService(String host, int port,

41

long heartbeatTimeoutMs, long heartbeatIntervalMs) throws IOException, InterruptedException;

42

43

/**

44

* Closes the client and stops heartbeat thread.

45

* Ensures proper cleanup of heartbeat resources.

46

*/

47

@Override

48

public void close();

49

}

50

```

51

52

**Usage Examples:**

53

54

```java

55

import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient;

56

import org.apache.spark.network.sasl.ShuffleSecretManager;

57

import org.apache.spark.network.util.TransportConf;

58

59

// Create Mesos-specific shuffle client

60

TransportConf conf = new TransportConf("shuffle");

61

ShuffleSecretManager secretManager = new ShuffleSecretManager();

62

secretManager.registerApp("spark-mesos-app", "secret-key");

63

64

MesosExternalShuffleClient client = new MesosExternalShuffleClient(

65

conf, secretManager, true);

66

67

// Initialize client

68

client.init("spark-mesos-app");

69

70

// Register driver with shuffle services on each Mesos agent

71

String[] shuffleHosts = {"agent1.mesos", "agent2.mesos", "agent3.mesos"};

72

int shufflePort = 7337;

73

long heartbeatTimeoutMs = 30000; // 30 seconds

74

long heartbeatIntervalMs = 10000; // 10 seconds

75

76

for (String host : shuffleHosts) {

77

try {

78

client.registerDriverWithShuffleService(host, shufflePort,

79

heartbeatTimeoutMs, heartbeatIntervalMs);

80

System.out.println("Registered driver with shuffle service on: " + host);

81

} catch (IOException | InterruptedException e) {

82

System.err.println("Failed to register with " + host + ": " + e.getMessage());

83

}

84

}

85

86

// Client automatically sends heartbeats to maintain registration

87

// Perform normal shuffle operations...

88

89

// Clean shutdown stops heartbeat thread

90

client.close();

91

```

92

93

### Mesos Protocol Messages

94

95

Specialized protocol messages for Mesos shuffle service communication.

96

97

#### RegisterDriver

98

99

```java { .api }

100

/**

101

* Message for registering a Spark driver with the Mesos external shuffle service.

102

* Sent during driver startup to establish tracking relationship.

103

*/

104

public class RegisterDriver extends BlockTransferMessage {

105

public final String appId;

106

public final long heartbeatTimeoutMs;

107

108

/**

109

* Creates a driver registration message.

110

*

111

* @param appId application identifier for the Spark driver

112

* @param heartbeatTimeoutMs timeout for heartbeat responses in milliseconds

113

*/

114

public RegisterDriver(String appId, long heartbeatTimeoutMs);

115

116

@Override

117

protected Type type();

118

119

@Override

120

public void encode(ByteBuf buf);

121

122

public static RegisterDriver decode(ByteBuf buf);

123

124

@Override

125

public int encodedLength();

126

127

@Override

128

public boolean equals(Object other);

129

130

@Override

131

public int hashCode();

132

133

@Override

134

public String toString();

135

}

136

```

137

138

#### ShuffleServiceHeartbeat

139

140

```java { .api }

141

/**

142

* Heartbeat message sent periodically by Mesos drivers to shuffle services.

143

* Indicates driver is still alive and shuffle data should be retained.

144

*/

145

public class ShuffleServiceHeartbeat extends BlockTransferMessage {

146

/**

147

* Creates a heartbeat message.

148

*/

149

public ShuffleServiceHeartbeat();

150

151

@Override

152

protected Type type();

153

154

@Override

155

public void encode(ByteBuf buf);

156

157

public static ShuffleServiceHeartbeat decode(ByteBuf buf);

158

159

@Override

160

public int encodedLength();

161

162

@Override

163

public boolean equals(Object other);

164

165

@Override

166

public int hashCode();

167

168

@Override

169

public String toString();

170

}

171

```

172

173

## Mesos Integration Patterns

174

175

### Driver Registration and Heartbeat Flow

176

177

```java

178

/**

179

* Example Mesos driver integration with heartbeat management

180

*/

181

public class MesosShuffleIntegration {

182

private final MesosExternalShuffleClient shuffleClient;

183

private final List<String> shuffleServiceHosts;

184

private final int shuffleServicePort;

185

186

public MesosShuffleIntegration(String appId, List<String> shuffleHosts, int port) {

187

this.shuffleServiceHosts = shuffleHosts;

188

this.shuffleServicePort = port;

189

190

// Configure client

191

TransportConf conf = new TransportConf("shuffle");

192

193

this.shuffleClient = new MesosExternalShuffleClient(

194

conf, null, false);

195

196

shuffleClient.init(appId);

197

}

198

199

/**

200

* Register driver with all shuffle services in the cluster

201

*/

202

public void registerWithShuffleServices() {

203

List<CompletableFuture<Void>> registrations = new ArrayList<>();

204

205

for (String host : shuffleServiceHosts) {

206

CompletableFuture<Void> registration = CompletableFuture.runAsync(() -> {

207

try {

208

long heartbeatTimeout = 30000; // 30 seconds

209

long heartbeatInterval = 10000; // 10 seconds

210

shuffleClient.registerDriverWithShuffleService(host, shuffleServicePort,

211

heartbeatTimeout, heartbeatInterval);

212

System.out.println("Successfully registered with shuffle service: " + host);

213

} catch (Exception e) {

214

System.err.println("Failed to register with " + host + ": " + e.getMessage());

215

throw new RuntimeException(e);

216

}

217

});

218

registrations.add(registration);

219

}

220

221

// Wait for all registrations to complete

222

CompletableFuture<Void> allRegistrations = CompletableFuture.allOf(

223

registrations.toArray(new CompletableFuture[0]));

224

225

try {

226

allRegistrations.get(60, TimeUnit.SECONDS); // 60 second timeout

227

System.out.println("Driver registered with all shuffle services");

228

} catch (Exception e) {

229

System.err.println("Failed to register with all shuffle services: " + e.getMessage());

230

throw new RuntimeException(e);

231

}

232

}

233

234

/**

235

* Clean shutdown with proper heartbeat cleanup

236

*/

237

public void shutdown() {

238

try {

239

shuffleClient.close();

240

System.out.println("Mesos shuffle integration shut down cleanly");

241

} catch (Exception e) {

242

System.err.println("Error during shuffle client shutdown: " + e.getMessage());

243

}

244

}

245

}

246

```

247

248

### Heartbeat Configuration and Monitoring

249

250

```java

251

/**

252

* Utility for configuring Mesos shuffle client heartbeat parameters

253

*/

254

public class MesosHeartbeatConfig {

255

256

/**

257

* Creates heartbeat configuration based on cluster characteristics.

258

*

259

* @param clusterSize number of nodes in Mesos cluster

260

* @param networkLatency expected network latency in milliseconds

261

* @return configured MesosExternalShuffleClient

262

*/

263

public static MesosExternalShuffleClient createOptimizedClient(

264

String appId, int clusterSize, long networkLatency) {

265

266

TransportConf conf = new TransportConf("shuffle");

267

268

// Scale heartbeat parameters based on cluster size and network

269

long baseInterval = 10000; // 10 seconds base

270

long baseTimeout = 30000; // 30 seconds base

271

272

// Increase intervals for larger clusters to reduce load

273

long heartbeatInterval = Math.min(baseInterval + (clusterSize * 100), 30000);

274

long heartbeatTimeout = Math.min(baseTimeout + (networkLatency * 3), 120000);

275

276

System.out.println("Heartbeat config - Interval: " + heartbeatInterval +

277

"ms, Timeout: " + heartbeatTimeout + "ms");

278

279

MesosExternalShuffleClient client = new MesosExternalShuffleClient(

280

conf, null, false);

281

282

client.init(appId);

283

return client;

284

}

285

286

/**

287

* Monitor heartbeat health (conceptual - actual implementation would

288

* require access to internal client metrics)

289

*/

290

public static void monitorHeartbeatHealth(MesosExternalShuffleClient client) {

291

// In practice, you would implement monitoring by:

292

// 1. Exposing heartbeat metrics from the client

293

// 2. Logging heartbeat success/failure rates

294

// 3. Alerting on consecutive heartbeat failures

295

// 4. Implementing circuit breaker pattern for failed services

296

297

System.out.println("Heartbeat monitoring would track:");

298

System.out.println("- Heartbeat success rate per shuffle service");

299

System.out.println("- Network latency to each service");

300

System.out.println("- Failed heartbeat recovery time");

301

System.out.println("- Service availability metrics");

302

}

303

}

304

```

305

306

### Error Handling and Recovery

307

308

```java

309

/**

310

* Robust error handling for Mesos shuffle integration

311

*/

312

public class MesosShuffleErrorHandler {

313

private final MesosExternalShuffleClient client;

314

private final Map<String, Integer> failureCount;

315

private final int maxRetries;

316

317

public MesosShuffleErrorHandler(MesosExternalShuffleClient client, int maxRetries) {

318

this.client = client;

319

this.failureCount = new ConcurrentHashMap<>();

320

this.maxRetries = maxRetries;

321

}

322

323

/**

324

* Register with shuffle service with retry logic

325

*/

326

public boolean registerWithRetry(String host, int port) {

327

String serviceKey = host + ":" + port;

328

int failures = failureCount.getOrDefault(serviceKey, 0);

329

330

if (failures >= maxRetries) {

331

System.err.println("Max retries exceeded for " + serviceKey);

332

return false;

333

}

334

335

try {

336

long heartbeatTimeout = 30000; // 30 seconds

337

long heartbeatInterval = 10000; // 10 seconds

338

client.registerDriverWithShuffleService(host, port, heartbeatTimeout, heartbeatInterval);

339

// Reset failure count on success

340

failureCount.remove(serviceKey);

341

System.out.println("Successfully registered with " + serviceKey);

342

return true;

343

344

} catch (IOException e) {

345

failures++;

346

failureCount.put(serviceKey, failures);

347

System.err.println("Registration failed for " + serviceKey +

348

" (attempt " + failures + "/" + maxRetries + "): " + e.getMessage());

349

350

if (failures < maxRetries) {

351

// Exponential backoff

352

try {

353

long backoffMs = Math.min(1000 * (1L << failures), 30000);

354

Thread.sleep(backoffMs);

355

return registerWithRetry(host, port); // Recursive retry

356

} catch (InterruptedException ie) {

357

Thread.currentThread().interrupt();

358

return false;

359

}

360

}

361

return false;

362

363

} catch (InterruptedException e) {

364

Thread.currentThread().interrupt();

365

System.err.println("Registration interrupted for " + serviceKey);

366

return false;

367

}

368

}

369

370

/**

371

* Get services that have exceeded retry limits

372

*/

373

public Set<String> getFailedServices() {

374

return failureCount.entrySet().stream()

375

.filter(entry -> entry.getValue() >= maxRetries)

376

.map(Map.Entry::getKey)

377

.collect(Collectors.toSet());

378

}

379

}

380

```

381

382

## Mesos-Specific Considerations

383

384

### Application Cleanup

385

386

The Mesos integration handles automatic cleanup when drivers terminate:

387

388

```java

389

// 1. Driver registers with shuffle services using RegisterDriver message

390

// 2. Shuffle services track registered drivers

391

// 3. Driver sends periodic ShuffleServiceHeartbeat messages

392

// 4. If heartbeats stop arriving, shuffle service assumes driver died

393

// 5. Shuffle service automatically cleans up application shuffle data

394

// 6. No manual cleanup required from Mesos framework

395

```

396

397

### Fault Tolerance

398

399

```java

400

/**

401

* Fault tolerance strategies for Mesos shuffle integration

402

*/

403

public class MesosFaultTolerance {

404

405

/**

406

* Handle shuffle service failures gracefully

407

*/

408

public void handleShuffleServiceFailure(String failedHost,

409

List<String> remainingHosts,

410

MesosExternalShuffleClient client) {

411

System.err.println("Shuffle service failed: " + failedHost);

412

413

// 1. Remove failed service from active list

414

remainingHosts.remove(failedHost);

415

416

// 2. Redistribute shuffle operations to remaining services

417

if (remainingHosts.isEmpty()) {

418

System.err.println("All shuffle services failed - falling back to local shuffle");

419

// Fallback to non-external shuffle mode

420

} else {

421

System.out.println("Continuing with " + remainingHosts.size() + " remaining services");

422

}

423

424

// 3. Attempt to re-register when service recovers

425

scheduleServiceRecoveryCheck(failedHost, client);

426

}

427

428

private void scheduleServiceRecoveryCheck(String failedHost, MesosExternalShuffleClient client) {

429

// Schedule periodic checks to detect service recovery

430

ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

431

scheduler.scheduleWithFixedDelay(() -> {

432

try {

433

long heartbeatTimeout = 30000; // 30 seconds

434

long heartbeatInterval = 10000; // 10 seconds

435

client.registerDriverWithShuffleService(failedHost, 7337,

436

heartbeatTimeout, heartbeatInterval);

437

System.out.println("Shuffle service recovered: " + failedHost);

438

scheduler.shutdown(); // Stop checking once recovered

439

} catch (Exception e) {

440

System.out.println("Service still unavailable: " + failedHost);

441

}

442

}, 30, 30, TimeUnit.SECONDS);

443

}

444

}

445

```

446

447

## Error Handling

448

449

Mesos-specific errors that may occur:

450

451

- **IOException**: Network failures connecting to shuffle services

452

- **InterruptedException**: Driver registration or heartbeat interruption

453

- **TimeoutException**: Heartbeat timeouts indicating service unavailability

454

- **SecurityException**: Authentication failures in secure Mesos clusters

455

456

**Mesos Error Handling Example:**

457

458

```java

459

try {

460

long heartbeatTimeout = 30000; // 30 seconds

461

long heartbeatInterval = 10000; // 10 seconds

462

mesosClient.registerDriverWithShuffleService(host, port, heartbeatTimeout, heartbeatInterval);

463

} catch (IOException e) {

464

logger.warn("Failed to register with shuffle service {}:{} - {}",

465

host, port, e.getMessage());

466

// Implement retry logic or fallback strategy

467

} catch (InterruptedException e) {

468

Thread.currentThread().interrupt();

469

logger.error("Driver registration interrupted");

470

throw new RuntimeException("Registration interrupted", e);

471

}

472

```