or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdblock-fetching.mdfile-management.mdindex.mdmesos.mdprotocol.mdshuffle-client.mdshuffle-server.md

mesos.mddocs/

0

# Mesos Integration

1

2

Specialized components for Mesos deployment scenarios, including driver registration and heartbeat mechanisms.

3

4

## Capabilities

5

6

### MesosExternalShuffleClient

7

8

External shuffle client for Mesos coarse-grained mode.

9

10

```java { .api }

11

/**

12

* External shuffle client for Mesos coarse-grained mode

13

* Extends ExternalShuffleClient with Mesos-specific functionality

14

*/

15

public class MesosExternalShuffleClient extends ExternalShuffleClient {

16

/**

17

* Create a Mesos external shuffle client

18

* @param conf - Transport configuration

19

* @param secretKeyHolder - Secret key holder for authentication

20

* @param authEnabled - Whether authentication is enabled

21

* @param registrationTimeoutMs - Timeout for registration operations in milliseconds

22

*/

23

public MesosExternalShuffleClient(

24

TransportConf conf, SecretKeyHolder secretKeyHolder,

25

boolean authEnabled, long registrationTimeoutMs

26

);

27

28

/**

29

* Register driver with the Mesos external shuffle service

30

* @param host - Host name of the Mesos shuffle service

31

* @param port - Port number of the Mesos shuffle service

32

* @param heartbeatTimeoutMs - Heartbeat timeout in milliseconds

33

* @param heartbeatIntervalMs - Heartbeat interval in milliseconds

34

* @throws IOException if connection fails

35

* @throws InterruptedException if registration is interrupted

36

*/

37

public void registerDriverWithShuffleService(

38

String host, int port, long heartbeatTimeoutMs, long heartbeatIntervalMs

39

) throws IOException, InterruptedException;

40

41

/**

42

* Close the client and clean up resources

43

* Stops heartbeat mechanism and closes connections

44

*/

45

@Override

46

public void close();

47

}

48

```

49

50

### Mesos Protocol Messages

51

52

#### RegisterDriver Message

53

54

Message for driver registration with Mesos external shuffle service.

55

56

```java { .api }

57

/**

58

* Message for driver registration with Mesos external shuffle service

59

*/

60

public class RegisterDriver extends BlockTransferMessage {

61

/**

62

* Create a driver registration message

63

* @param appId - Application ID

64

* @param heartbeatTimeoutMs - Heartbeat timeout in milliseconds

65

*/

66

public RegisterDriver(String appId, long heartbeatTimeoutMs);

67

68

/**

69

* Get the application ID

70

* @return Application ID

71

*/

72

public String getAppId();

73

74

/**

75

* Get the heartbeat timeout

76

* @return Heartbeat timeout in milliseconds

77

*/

78

public long getHeartbeatTimeoutMs();

79

80

public boolean equals(Object other);

81

public int hashCode();

82

public String toString();

83

}

84

```

85

86

#### ShuffleServiceHeartbeat Message

87

88

Heartbeat message from driver to Mesos external shuffle service.

89

90

```java { .api }

91

/**

92

* Heartbeat message from driver to Mesos external shuffle service

93

*/

94

public class ShuffleServiceHeartbeat extends BlockTransferMessage {

95

/**

96

* Create a heartbeat message

97

* @param appId - Application ID

98

*/

99

public ShuffleServiceHeartbeat(String appId);

100

101

/**

102

* Get the application ID

103

* @return Application ID

104

*/

105

public String getAppId();

106

107

public boolean equals(Object other);

108

public int hashCode();

109

public String toString();

110

}

111

```

112

113

**Usage Examples:**

114

115

```java

116

import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient;

117

import org.apache.spark.network.shuffle.protocol.mesos.*;

118

import org.apache.spark.network.sasl.ShuffleSecretManager;

119

import org.apache.spark.network.util.TransportConf;

120

121

// Example 1: Basic Mesos shuffle client setup

122

public class MesosShuffleClientExample {

123

public void setupMesosShuffleClient() {

124

// Create transport configuration for Mesos environment

125

TransportConf conf = new TransportConf("shuffle");

126

127

// Set up authentication for Mesos deployment

128

ShuffleSecretManager secretManager = new ShuffleSecretManager();

129

String appId = "mesos-app-20231201-001";

130

String appSecret = "mesos-shuffle-secret-123";

131

secretManager.registerApp(appId, appSecret);

132

133

// Create Mesos external shuffle client

134

MesosExternalShuffleClient mesosClient = new MesosExternalShuffleClient(

135

conf, secretManager, true, 10000 // 10 second registration timeout

136

);

137

138

// Initialize the client for the application

139

mesosClient.init(appId);

140

141

// Register driver with Mesos shuffle service

142

String mesosShuffleHost = "mesos-shuffle-service.cluster.local";

143

int mesosShufflePort = 7337;

144

long heartbeatTimeoutMs = 60000; // 60 seconds

145

long heartbeatIntervalMs = 30000; // 30 seconds

146

147

mesosClient.registerDriverWithShuffleService(

148

mesosShuffleHost, mesosShufflePort,

149

heartbeatTimeoutMs, heartbeatIntervalMs

150

);

151

152

System.out.println("Mesos shuffle client registered with service at " +

153

mesosShuffleHost + ":" + mesosShufflePort);

154

155

// Use the client for normal shuffle operations

156

performShuffleOperations(mesosClient, appId);

157

158

// Clean up when done

159

mesosClient.close();

160

secretManager.unregisterApp(appId);

161

}

162

163

private void performShuffleOperations(MesosExternalShuffleClient client, String appId) {

164

// Register executors

165

String[] localDirs = {"/mesos/work/spark-1", "/mesos/work/spark-2"};

166

ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(localDirs, 64, "sort");

167

168

client.registerWithShuffleServer("executor-host-1", 7337, "executor-1", executorInfo);

169

client.registerWithShuffleServer("executor-host-2", 7337, "executor-2", executorInfo);

170

171

// Fetch shuffle blocks

172

BlockFetchingListener listener = new MesosBlockFetchingListener();

173

String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1"};

174

175

client.fetchBlocks("executor-host-1", 7337, "executor-1", blockIds, listener, null);

176

}

177

}

178

179

// Example 2: Custom Mesos block fetching listener

180

public class MesosBlockFetchingListener implements BlockFetchingListener {

181

private final String mesosTaskId;

182

private final MetricRegistry metrics;

183

184

public MesosBlockFetchingListener(String mesosTaskId, MetricRegistry metrics) {

185

this.mesosTaskId = mesosTaskId;

186

this.metrics = metrics;

187

}

188

189

@Override

190

public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {

191

System.out.println("Mesos task " + mesosTaskId + " successfully fetched block: " + blockId);

192

193

// Update Mesos-specific metrics

194

metrics.counter("mesos.shuffle.blocks.success").inc();

195

metrics.histogram("mesos.shuffle.block.size").update(data.size());

196

197

try {

198

// Process the shuffle data

199

processShuffleBlock(blockId, data);

200

} finally {

201

data.release();

202

}

203

}

204

205

@Override

206

public void onBlockFetchFailure(String blockId, Throwable exception) {

207

System.err.println("Mesos task " + mesosTaskId + " failed to fetch block: " + blockId);

208

System.err.println("Error: " + exception.getMessage());

209

210

// Update failure metrics

211

metrics.counter("mesos.shuffle.blocks.failure").inc();

212

213

// Handle Mesos-specific error scenarios

214

if (exception instanceof IOException) {

215

// Network issues in Mesos cluster

216

handleMesosNetworkError(blockId, exception);

217

} else if (exception.getMessage().contains("authentication")) {

218

// Authentication issues with Mesos shuffle service

219

handleMesosAuthError(blockId, exception);

220

}

221

}

222

223

private void processShuffleBlock(String blockId, ManagedBuffer data) {

224

// Mesos-specific block processing logic

225

System.out.println("Processing block " + blockId + " in Mesos environment");

226

}

227

228

private void handleMesosNetworkError(String blockId, Throwable exception) {

229

System.err.println("Mesos network error for block " + blockId + ": " + exception.getMessage());

230

// Implement Mesos-specific retry or failover logic

231

}

232

233

private void handleMesosAuthError(String blockId, Throwable exception) {

234

System.err.println("Mesos authentication error for block " + blockId + ": " + exception.getMessage());

235

// Implement Mesos-specific authentication recovery

236

}

237

}

238

239

// Example 3: Mesos protocol message handling

240

public class MesosProtocolExample {

241

public void demonstrateProtocolMessages() {

242

String appId = "mesos-app-001";

243

244

// Create driver registration message

245

long heartbeatTimeout = 60000; // 60 seconds

246

RegisterDriver registerDriver = new RegisterDriver(appId, heartbeatTimeout);

247

248

System.out.println("Driver registration message:");

249

System.out.println(" App ID: " + registerDriver.getAppId());

250

System.out.println(" Heartbeat Timeout: " + registerDriver.getHeartbeatTimeoutMs() + "ms");

251

252

// Serialize for network transmission

253

ByteBuffer serializedRegister = registerDriver.toByteBuffer();

254

System.out.println("Serialized size: " + serializedRegister.remaining() + " bytes");

255

256

// Create heartbeat message

257

ShuffleServiceHeartbeat heartbeat = new ShuffleServiceHeartbeat(appId);

258

System.out.println("Heartbeat message for app: " + heartbeat.getAppId());

259

260

// Serialize heartbeat

261

ByteBuffer serializedHeartbeat = heartbeat.toByteBuffer();

262

System.out.println("Heartbeat serialized size: " + serializedHeartbeat.remaining() + " bytes");

263

264

// Demonstrate message deserialization

265

BlockTransferMessage deserializedRegister =

266

BlockTransferMessage.Decoder.fromByteBuffer(serializedRegister);

267

268

if (deserializedRegister instanceof RegisterDriver) {

269

RegisterDriver reg = (RegisterDriver) deserializedRegister;

270

System.out.println("Deserialized driver registration: " + reg.getAppId());

271

}

272

}

273

}

274

275

// Example 4: Mesos deployment configuration

276

public class MesosDeploymentConfig {

277

public MesosExternalShuffleClient createConfiguredClient() {

278

// Transport configuration with Mesos-specific settings

279

Properties props = new Properties();

280

props.setProperty("spark.shuffle.io.connectionTimeout", "30s");

281

props.setProperty("spark.shuffle.io.numConnectionsPerPeer", "2");

282

props.setProperty("spark.mesos.executor.home", "/opt/spark");

283

props.setProperty("spark.mesos.principal", "spark-principal");

284

285

TransportConf conf = new TransportConf("shuffle", ConfigProvider.fromProperties(props));

286

287

// Security configuration for Mesos

288

ShuffleSecretManager secretManager = new ShuffleSecretManager();

289

String appSecret = loadSecretFromMesosSecret();

290

secretManager.registerApp("mesos-spark-app", appSecret);

291

292

// Create client with Mesos-optimized settings

293

return new MesosExternalShuffleClient(

294

conf, secretManager, true, 15000 // Longer timeout for Mesos

295

);

296

}

297

298

private String loadSecretFromMesosSecret() {

299

// Load secret from Mesos secret store or environment

300

return System.getenv("MESOS_SHUFFLE_SECRET");

301

}

302

}

303

304

// Example 5: Mesos failure handling and recovery

305

public class MesosFailureHandling {

306

private MesosExternalShuffleClient client;

307

private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

308

309

public void setupClientWithFailureHandling(String appId) {

310

// Create client with failure detection

311

client = createMesosClient(appId);

312

313

// Schedule periodic health checks

314

scheduler.scheduleAtFixedRate(

315

() -> checkClientHealth(),

316

30, 30, TimeUnit.SECONDS

317

);

318

319

// Register shutdown hook for cleanup

320

Runtime.getRuntime().addShutdownHook(new Thread(() -> {

321

System.out.println("Shutting down Mesos shuffle client...");

322

scheduler.shutdown();

323

if (client != null) {

324

client.close();

325

}

326

}));

327

}

328

329

private void checkClientHealth() {

330

try {

331

// Check if client is still responsive

332

MetricSet metrics = client.shuffleMetrics();

333

System.out.println("Client health check passed, metrics: " + metrics);

334

} catch (Exception e) {

335

System.err.println("Client health check failed: " + e.getMessage());

336

// Implement recovery logic

337

recoverClient();

338

}

339

}

340

341

private void recoverClient() {

342

System.out.println("Attempting to recover Mesos shuffle client...");

343

try {

344

// Close existing client

345

if (client != null) {

346

client.close();

347

}

348

349

// Recreate client

350

client = createMesosClient("recovered-app-id");

351

352

// Re-register with shuffle service

353

client.registerDriverWithShuffleService("mesos-shuffle", 7337, 60000, 30000);

354

355

System.out.println("Mesos shuffle client recovery successful");

356

} catch (Exception e) {

357

System.err.println("Failed to recover Mesos shuffle client: " + e.getMessage());

358

}

359

}

360

361

private MesosExternalShuffleClient createMesosClient(String appId) {

362

// Implementation for creating configured Mesos client

363

TransportConf conf = new TransportConf("shuffle");

364

ShuffleSecretManager secretManager = new ShuffleSecretManager();

365

secretManager.registerApp(appId, "recovery-secret");

366

367

return new MesosExternalShuffleClient(conf, secretManager, true, 10000);

368

}

369

}

370

```

371

372

### Mesos-Specific Configuration

373

374

Key configuration parameters for Mesos deployments:

375

376

- `spark.mesos.executor.home` - Spark home directory in Mesos executors

377

- `spark.mesos.principal` - Mesos principal for authentication

378

- `spark.mesos.secret` - Mesos secret for authentication

379

- `spark.shuffle.service.enabled` - Enable external shuffle service

380

- `spark.dynamicAllocation.enabled` - Enable dynamic allocation in Mesos

381

382

### Deployment Considerations

383

384

1. **Driver Registration**:

385

- Driver must register with Mesos shuffle service before executor operations

386

- Registration includes heartbeat configuration for connection monitoring

387

- Failed registration prevents shuffle operations

388

389

2. **Heartbeat Mechanism**:

390

- Periodic heartbeats maintain connection with Mesos shuffle service

391

- Heartbeat failures trigger connection recovery

392

- Configurable timeout and interval settings

393

394

3. **Resource Management**:

395

- Integration with Mesos resource allocation

396

- Proper cleanup when Mesos tasks are terminated

397

- Handling of Mesos executor failures

398

399

4. **Security Integration**:

400

- Works with Mesos authentication mechanisms

401

- Supports Mesos secret management

402

- Compatible with Mesos SSL/TLS configuration

403

404

### Troubleshooting Mesos Integration

405

406

Common issues and solutions:

407

408

1. **Registration Failures**:

409

- Verify Mesos shuffle service is running and accessible

410

- Check network connectivity between driver and shuffle service

411

- Validate authentication credentials

412

413

2. **Heartbeat Issues**:

414

- Monitor heartbeat timeout and interval settings

415

- Check for network instability affecting heartbeats

416

- Verify shuffle service heartbeat handling

417

418

3. **Task Failures**:

419

- Handle Mesos task preemption gracefully

420

- Implement proper cleanup for failed executors

421

- Monitor Mesos cluster resource availability

422

423

4. **Performance Issues**:

424

- Tune network settings for Mesos environment

425

- Optimize shuffle block sizes for Mesos network

426

- Monitor Mesos cluster network performance