0
# Mesos Integration
1
2
Specialized components for Mesos deployment scenarios, including driver registration and heartbeat mechanisms.
3
4
## Capabilities
5
6
### MesosExternalShuffleClient
7
8
External shuffle client for Mesos coarse-grained mode.
9
10
```java { .api }
11
/**
12
* External shuffle client for Mesos coarse-grained mode
13
* Extends ExternalShuffleClient with Mesos-specific functionality
14
*/
15
public class MesosExternalShuffleClient extends ExternalShuffleClient {
16
/**
17
* Create a Mesos external shuffle client
18
* @param conf - Transport configuration
19
* @param secretKeyHolder - Secret key holder for authentication
20
* @param authEnabled - Whether authentication is enabled
21
* @param registrationTimeoutMs - Timeout for registration operations in milliseconds
22
*/
23
public MesosExternalShuffleClient(
24
TransportConf conf, SecretKeyHolder secretKeyHolder,
25
boolean authEnabled, long registrationTimeoutMs
26
);
27
28
/**
29
* Register driver with the Mesos external shuffle service
30
* @param host - Host name of the Mesos shuffle service
31
* @param port - Port number of the Mesos shuffle service
32
* @param heartbeatTimeoutMs - Heartbeat timeout in milliseconds
33
* @param heartbeatIntervalMs - Heartbeat interval in milliseconds
34
* @throws IOException if connection fails
35
* @throws InterruptedException if registration is interrupted
36
*/
37
public void registerDriverWithShuffleService(
38
String host, int port, long heartbeatTimeoutMs, long heartbeatIntervalMs
39
) throws IOException, InterruptedException;
40
41
/**
42
* Close the client and clean up resources
43
* Stops heartbeat mechanism and closes connections
44
*/
45
@Override
46
public void close();
47
}
48
```
49
50
### Mesos Protocol Messages
51
52
#### RegisterDriver Message
53
54
Message for driver registration with Mesos external shuffle service.
55
56
```java { .api }
57
/**
58
* Message for driver registration with Mesos external shuffle service
59
*/
60
public class RegisterDriver extends BlockTransferMessage {
61
/**
62
* Create a driver registration message
63
* @param appId - Application ID
64
* @param heartbeatTimeoutMs - Heartbeat timeout in milliseconds
65
*/
66
public RegisterDriver(String appId, long heartbeatTimeoutMs);
67
68
/**
69
* Get the application ID
70
* @return Application ID
71
*/
72
public String getAppId();
73
74
/**
75
* Get the heartbeat timeout
76
* @return Heartbeat timeout in milliseconds
77
*/
78
public long getHeartbeatTimeoutMs();
79
80
public boolean equals(Object other);
81
public int hashCode();
82
public String toString();
83
}
84
```
85
86
#### ShuffleServiceHeartbeat Message
87
88
Heartbeat message from driver to Mesos external shuffle service.
89
90
```java { .api }
91
/**
92
* Heartbeat message from driver to Mesos external shuffle service
93
*/
94
public class ShuffleServiceHeartbeat extends BlockTransferMessage {
95
/**
96
* Create a heartbeat message
97
* @param appId - Application ID
98
*/
99
public ShuffleServiceHeartbeat(String appId);
100
101
/**
102
* Get the application ID
103
* @return Application ID
104
*/
105
public String getAppId();
106
107
public boolean equals(Object other);
108
public int hashCode();
109
public String toString();
110
}
111
```
112
113
**Usage Examples:**
114
115
```java
116
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient;
117
import org.apache.spark.network.shuffle.protocol.mesos.*;
118
import org.apache.spark.network.sasl.ShuffleSecretManager;
119
import org.apache.spark.network.util.TransportConf;
120
121
// Example 1: Basic Mesos shuffle client setup
122
public class MesosShuffleClientExample {
123
public void setupMesosShuffleClient() {
124
// Create transport configuration for Mesos environment
125
TransportConf conf = new TransportConf("shuffle");
126
127
// Set up authentication for Mesos deployment
128
ShuffleSecretManager secretManager = new ShuffleSecretManager();
129
String appId = "mesos-app-20231201-001";
130
String appSecret = "mesos-shuffle-secret-123";
131
secretManager.registerApp(appId, appSecret);
132
133
// Create Mesos external shuffle client
134
MesosExternalShuffleClient mesosClient = new MesosExternalShuffleClient(
135
conf, secretManager, true, 10000 // 10 second registration timeout
136
);
137
138
// Initialize the client for the application
139
mesosClient.init(appId);
140
141
// Register driver with Mesos shuffle service
142
String mesosShuffleHost = "mesos-shuffle-service.cluster.local";
143
int mesosShufflePort = 7337;
144
long heartbeatTimeoutMs = 60000; // 60 seconds
145
long heartbeatIntervalMs = 30000; // 30 seconds
146
147
mesosClient.registerDriverWithShuffleService(
148
mesosShuffleHost, mesosShufflePort,
149
heartbeatTimeoutMs, heartbeatIntervalMs
150
);
151
152
System.out.println("Mesos shuffle client registered with service at " +
153
mesosShuffleHost + ":" + mesosShufflePort);
154
155
// Use the client for normal shuffle operations
156
performShuffleOperations(mesosClient, appId);
157
158
// Clean up when done
159
mesosClient.close();
160
secretManager.unregisterApp(appId);
161
}
162
163
private void performShuffleOperations(MesosExternalShuffleClient client, String appId) {
164
// Register executors
165
String[] localDirs = {"/mesos/work/spark-1", "/mesos/work/spark-2"};
166
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(localDirs, 64, "sort");
167
168
client.registerWithShuffleServer("executor-host-1", 7337, "executor-1", executorInfo);
169
client.registerWithShuffleServer("executor-host-2", 7337, "executor-2", executorInfo);
170
171
// Fetch shuffle blocks
172
BlockFetchingListener listener = new MesosBlockFetchingListener();
173
String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1"};
174
175
client.fetchBlocks("executor-host-1", 7337, "executor-1", blockIds, listener, null);
176
}
177
}
178
179
// Example 2: Custom Mesos block fetching listener
180
public class MesosBlockFetchingListener implements BlockFetchingListener {
181
private final String mesosTaskId;
182
private final MetricRegistry metrics;
183
184
public MesosBlockFetchingListener(String mesosTaskId, MetricRegistry metrics) {
185
this.mesosTaskId = mesosTaskId;
186
this.metrics = metrics;
187
}
188
189
@Override
190
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
191
System.out.println("Mesos task " + mesosTaskId + " successfully fetched block: " + blockId);
192
193
// Update Mesos-specific metrics
194
metrics.counter("mesos.shuffle.blocks.success").inc();
195
metrics.histogram("mesos.shuffle.block.size").update(data.size());
196
197
try {
198
// Process the shuffle data
199
processShuffleBlock(blockId, data);
200
} finally {
201
data.release();
202
}
203
}
204
205
@Override
206
public void onBlockFetchFailure(String blockId, Throwable exception) {
207
System.err.println("Mesos task " + mesosTaskId + " failed to fetch block: " + blockId);
208
System.err.println("Error: " + exception.getMessage());
209
210
// Update failure metrics
211
metrics.counter("mesos.shuffle.blocks.failure").inc();
212
213
// Handle Mesos-specific error scenarios
214
if (exception instanceof IOException) {
215
// Network issues in Mesos cluster
216
handleMesosNetworkError(blockId, exception);
217
} else if (exception.getMessage().contains("authentication")) {
218
// Authentication issues with Mesos shuffle service
219
handleMesosAuthError(blockId, exception);
220
}
221
}
222
223
private void processShuffleBlock(String blockId, ManagedBuffer data) {
224
// Mesos-specific block processing logic
225
System.out.println("Processing block " + blockId + " in Mesos environment");
226
}
227
228
private void handleMesosNetworkError(String blockId, Throwable exception) {
229
System.err.println("Mesos network error for block " + blockId + ": " + exception.getMessage());
230
// Implement Mesos-specific retry or failover logic
231
}
232
233
private void handleMesosAuthError(String blockId, Throwable exception) {
234
System.err.println("Mesos authentication error for block " + blockId + ": " + exception.getMessage());
235
// Implement Mesos-specific authentication recovery
236
}
237
}
238
239
// Example 3: Mesos protocol message handling
240
public class MesosProtocolExample {
241
public void demonstrateProtocolMessages() {
242
String appId = "mesos-app-001";
243
244
// Create driver registration message
245
long heartbeatTimeout = 60000; // 60 seconds
246
RegisterDriver registerDriver = new RegisterDriver(appId, heartbeatTimeout);
247
248
System.out.println("Driver registration message:");
249
System.out.println(" App ID: " + registerDriver.getAppId());
250
System.out.println(" Heartbeat Timeout: " + registerDriver.getHeartbeatTimeoutMs() + "ms");
251
252
// Serialize for network transmission
253
ByteBuffer serializedRegister = registerDriver.toByteBuffer();
254
System.out.println("Serialized size: " + serializedRegister.remaining() + " bytes");
255
256
// Create heartbeat message
257
ShuffleServiceHeartbeat heartbeat = new ShuffleServiceHeartbeat(appId);
258
System.out.println("Heartbeat message for app: " + heartbeat.getAppId());
259
260
// Serialize heartbeat
261
ByteBuffer serializedHeartbeat = heartbeat.toByteBuffer();
262
System.out.println("Heartbeat serialized size: " + serializedHeartbeat.remaining() + " bytes");
263
264
// Demonstrate message deserialization
265
BlockTransferMessage deserializedRegister =
266
BlockTransferMessage.Decoder.fromByteBuffer(serializedRegister);
267
268
if (deserializedRegister instanceof RegisterDriver) {
269
RegisterDriver reg = (RegisterDriver) deserializedRegister;
270
System.out.println("Deserialized driver registration: " + reg.getAppId());
271
}
272
}
273
}
274
275
// Example 4: Mesos deployment configuration
276
public class MesosDeploymentConfig {
277
public MesosExternalShuffleClient createConfiguredClient() {
278
// Transport configuration with Mesos-specific settings
279
Properties props = new Properties();
280
props.setProperty("spark.shuffle.io.connectionTimeout", "30s");
281
props.setProperty("spark.shuffle.io.numConnectionsPerPeer", "2");
282
props.setProperty("spark.mesos.executor.home", "/opt/spark");
283
props.setProperty("spark.mesos.principal", "spark-principal");
284
285
TransportConf conf = new TransportConf("shuffle", ConfigProvider.fromProperties(props));
286
287
// Security configuration for Mesos
288
ShuffleSecretManager secretManager = new ShuffleSecretManager();
289
String appSecret = loadSecretFromMesosSecret();
290
secretManager.registerApp("mesos-spark-app", appSecret);
291
292
// Create client with Mesos-optimized settings
293
return new MesosExternalShuffleClient(
294
conf, secretManager, true, 15000 // Longer timeout for Mesos
295
);
296
}
297
298
private String loadSecretFromMesosSecret() {
299
// Load secret from Mesos secret store or environment
300
return System.getenv("MESOS_SHUFFLE_SECRET");
301
}
302
}
303
304
// Example 5: Mesos failure handling and recovery
305
public class MesosFailureHandling {
306
private MesosExternalShuffleClient client;
307
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
308
309
public void setupClientWithFailureHandling(String appId) {
310
// Create client with failure detection
311
client = createMesosClient(appId);
312
313
// Schedule periodic health checks
314
scheduler.scheduleAtFixedRate(
315
() -> checkClientHealth(),
316
30, 30, TimeUnit.SECONDS
317
);
318
319
// Register shutdown hook for cleanup
320
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
321
System.out.println("Shutting down Mesos shuffle client...");
322
scheduler.shutdown();
323
if (client != null) {
324
client.close();
325
}
326
}));
327
}
328
329
private void checkClientHealth() {
330
try {
331
// Check if client is still responsive
332
MetricSet metrics = client.shuffleMetrics();
333
System.out.println("Client health check passed, metrics: " + metrics);
334
} catch (Exception e) {
335
System.err.println("Client health check failed: " + e.getMessage());
336
// Implement recovery logic
337
recoverClient();
338
}
339
}
340
341
private void recoverClient() {
342
System.out.println("Attempting to recover Mesos shuffle client...");
343
try {
344
// Close existing client
345
if (client != null) {
346
client.close();
347
}
348
349
// Recreate client
350
client = createMesosClient("recovered-app-id");
351
352
// Re-register with shuffle service
353
client.registerDriverWithShuffleService("mesos-shuffle", 7337, 60000, 30000);
354
355
System.out.println("Mesos shuffle client recovery successful");
356
} catch (Exception e) {
357
System.err.println("Failed to recover Mesos shuffle client: " + e.getMessage());
358
}
359
}
360
361
private MesosExternalShuffleClient createMesosClient(String appId) {
362
// Implementation for creating configured Mesos client
363
TransportConf conf = new TransportConf("shuffle");
364
ShuffleSecretManager secretManager = new ShuffleSecretManager();
365
secretManager.registerApp(appId, "recovery-secret");
366
367
return new MesosExternalShuffleClient(conf, secretManager, true, 10000);
368
}
369
}
370
```
371
372
### Mesos-Specific Configuration
373
374
Key configuration parameters for Mesos deployments:
375
376
- `spark.mesos.executor.home` - Spark home directory in Mesos executors
377
- `spark.mesos.principal` - Mesos principal for authentication
378
- `spark.mesos.secret` - Mesos secret for authentication
379
- `spark.shuffle.service.enabled` - Enable external shuffle service
380
- `spark.dynamicAllocation.enabled` - Enable dynamic allocation in Mesos
381
382
### Deployment Considerations
383
384
1. **Driver Registration**:
385
- Driver must register with Mesos shuffle service before executor operations
386
- Registration includes heartbeat configuration for connection monitoring
387
- Failed registration prevents shuffle operations
388
389
2. **Heartbeat Mechanism**:
390
- Periodic heartbeats maintain connection with Mesos shuffle service
391
- Heartbeat failures trigger connection recovery
392
- Configurable timeout and interval settings
393
394
3. **Resource Management**:
395
- Integration with Mesos resource allocation
396
- Proper cleanup when Mesos tasks are terminated
397
- Handling of Mesos executor failures
398
399
4. **Security Integration**:
400
- Works with Mesos authentication mechanisms
401
- Supports Mesos secret management
402
- Compatible with Mesos SSL/TLS configuration
403
404
### Troubleshooting Mesos Integration
405
406
Common issues and solutions:
407
408
1. **Registration Failures**:
409
- Verify Mesos shuffle service is running and accessible
410
- Check network connectivity between driver and shuffle service
411
- Validate authentication credentials
412
413
2. **Heartbeat Issues**:
414
- Monitor heartbeat timeout and interval settings
415
- Check for network instability affecting heartbeats
416
- Verify shuffle service heartbeat handling
417
418
3. **Task Failures**:
419
- Handle Mesos task preemption gracefully
420
- Implement proper cleanup for failed executors
421
- Monitor Mesos cluster resource availability
422
423
4. **Performance Issues**:
424
- Tune network settings for Mesos environment
425
- Optimize shuffle block sizes for Mesos network
426
- Monitor Mesos cluster network performance