0
# Mesos Integration
1
2
Specialized components for Mesos cluster manager integration, including driver registration and heartbeat mechanisms for reliable shuffle service cleanup.
3
4
## Capabilities
5
6
### MesosExternalShuffleClient
7
8
Extended shuffle client for Mesos coarse-grained mode with driver registration and heartbeat functionality.
9
10
```java { .api }
11
/**
12
* A client for talking to the external shuffle service in Mesos coarse-grained mode.
13
* Used by the Spark driver to register with each external shuffle service on the cluster.
14
* Provides heartbeat mechanism for reliable cleanup when applications exit.
15
*/
16
public class MesosExternalShuffleClient extends ExternalShuffleClient {
17
/**
18
* Creates a Mesos external shuffle client.
19
* Inherits standard ExternalShuffleClient functionality with additional Mesos-specific features.
20
*
21
* @param conf transport configuration
22
* @param secretKeyHolder secret key holder for SASL authentication
23
* @param authEnabled whether SASL authentication is enabled
24
*/
25
public MesosExternalShuffleClient(TransportConf conf, SecretKeyHolder secretKeyHolder,
26
boolean authEnabled);
27
28
/**
29
* Registers the Spark driver with the external shuffle service.
30
* Required for the shuffle service to track driver liveness and clean up
31
* shuffle files when the driver terminates. Starts heartbeat mechanism.
32
*
33
* @param host shuffle service host
34
* @param port shuffle service port
35
* @param heartbeatTimeoutMs timeout for heartbeat responses in milliseconds
36
* @param heartbeatIntervalMs interval between heartbeats in milliseconds
37
* @throws IOException if registration fails
38
* @throws InterruptedException if registration is interrupted
39
*/
40
public void registerDriverWithShuffleService(String host, int port,
41
long heartbeatTimeoutMs, long heartbeatIntervalMs) throws IOException, InterruptedException;
42
43
/**
44
* Closes the client and stops heartbeat thread.
45
* Ensures proper cleanup of heartbeat resources.
46
*/
47
@Override
48
public void close();
49
}
50
```
51
52
**Usage Examples:**
53
54
```java
55
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient;
56
import org.apache.spark.network.sasl.ShuffleSecretManager;
57
import org.apache.spark.network.util.TransportConf;
58
59
// Create Mesos-specific shuffle client
60
TransportConf conf = new TransportConf("shuffle");
61
ShuffleSecretManager secretManager = new ShuffleSecretManager();
62
secretManager.registerApp("spark-mesos-app", "secret-key");
63
64
MesosExternalShuffleClient client = new MesosExternalShuffleClient(
65
conf, secretManager, true);
66
67
// Initialize client
68
client.init("spark-mesos-app");
69
70
// Register driver with shuffle services on each Mesos agent
71
String[] shuffleHosts = {"agent1.mesos", "agent2.mesos", "agent3.mesos"};
72
int shufflePort = 7337;
73
long heartbeatTimeoutMs = 30000; // 30 seconds
74
long heartbeatIntervalMs = 10000; // 10 seconds
75
76
for (String host : shuffleHosts) {
77
try {
78
client.registerDriverWithShuffleService(host, shufflePort,
79
heartbeatTimeoutMs, heartbeatIntervalMs);
80
System.out.println("Registered driver with shuffle service on: " + host);
81
} catch (IOException | InterruptedException e) {
82
System.err.println("Failed to register with " + host + ": " + e.getMessage());
83
}
84
}
85
86
// Client automatically sends heartbeats to maintain registration
87
// Perform normal shuffle operations...
88
89
// Clean shutdown stops heartbeat thread
90
client.close();
91
```
92
93
### Mesos Protocol Messages
94
95
Specialized protocol messages for Mesos shuffle service communication.
96
97
#### RegisterDriver
98
99
```java { .api }
100
/**
101
* Message for registering a Spark driver with the Mesos external shuffle service.
102
* Sent during driver startup to establish tracking relationship.
103
*/
104
public class RegisterDriver extends BlockTransferMessage {
105
public final String appId;
106
public final long heartbeatTimeoutMs;
107
108
/**
109
* Creates a driver registration message.
110
*
111
* @param appId application identifier for the Spark driver
112
* @param heartbeatTimeoutMs timeout for heartbeat responses in milliseconds
113
*/
114
public RegisterDriver(String appId, long heartbeatTimeoutMs);
115
116
@Override
117
protected Type type();
118
119
@Override
120
public void encode(ByteBuf buf);
121
122
public static RegisterDriver decode(ByteBuf buf);
123
124
@Override
125
public int encodedLength();
126
127
@Override
128
public boolean equals(Object other);
129
130
@Override
131
public int hashCode();
132
133
@Override
134
public String toString();
135
}
136
```
137
138
#### ShuffleServiceHeartbeat
139
140
```java { .api }
141
/**
142
* Heartbeat message sent periodically by Mesos drivers to shuffle services.
143
* Indicates driver is still alive and shuffle data should be retained.
144
*/
145
public class ShuffleServiceHeartbeat extends BlockTransferMessage {
146
/**
147
* Creates a heartbeat message.
148
*/
149
public ShuffleServiceHeartbeat();
150
151
@Override
152
protected Type type();
153
154
@Override
155
public void encode(ByteBuf buf);
156
157
public static ShuffleServiceHeartbeat decode(ByteBuf buf);
158
159
@Override
160
public int encodedLength();
161
162
@Override
163
public boolean equals(Object other);
164
165
@Override
166
public int hashCode();
167
168
@Override
169
public String toString();
170
}
171
```
172
173
## Mesos Integration Patterns
174
175
### Driver Registration and Heartbeat Flow
176
177
```java
178
/**
179
* Example Mesos driver integration with heartbeat management
180
*/
181
public class MesosShuffleIntegration {
182
private final MesosExternalShuffleClient shuffleClient;
183
private final List<String> shuffleServiceHosts;
184
private final int shuffleServicePort;
185
186
public MesosShuffleIntegration(String appId, List<String> shuffleHosts, int port) {
187
this.shuffleServiceHosts = shuffleHosts;
188
this.shuffleServicePort = port;
189
190
// Configure client
191
TransportConf conf = new TransportConf("shuffle");
192
193
this.shuffleClient = new MesosExternalShuffleClient(
194
conf, null, false);
195
196
shuffleClient.init(appId);
197
}
198
199
/**
200
* Register driver with all shuffle services in the cluster
201
*/
202
public void registerWithShuffleServices() {
203
List<CompletableFuture<Void>> registrations = new ArrayList<>();
204
205
for (String host : shuffleServiceHosts) {
206
CompletableFuture<Void> registration = CompletableFuture.runAsync(() -> {
207
try {
208
long heartbeatTimeout = 30000; // 30 seconds
209
long heartbeatInterval = 10000; // 10 seconds
210
shuffleClient.registerDriverWithShuffleService(host, shuffleServicePort,
211
heartbeatTimeout, heartbeatInterval);
212
System.out.println("Successfully registered with shuffle service: " + host);
213
} catch (Exception e) {
214
System.err.println("Failed to register with " + host + ": " + e.getMessage());
215
throw new RuntimeException(e);
216
}
217
});
218
registrations.add(registration);
219
}
220
221
// Wait for all registrations to complete
222
CompletableFuture<Void> allRegistrations = CompletableFuture.allOf(
223
registrations.toArray(new CompletableFuture[0]));
224
225
try {
226
allRegistrations.get(60, TimeUnit.SECONDS); // 60 second timeout
227
System.out.println("Driver registered with all shuffle services");
228
} catch (Exception e) {
229
System.err.println("Failed to register with all shuffle services: " + e.getMessage());
230
throw new RuntimeException(e);
231
}
232
}
233
234
/**
235
* Clean shutdown with proper heartbeat cleanup
236
*/
237
public void shutdown() {
238
try {
239
shuffleClient.close();
240
System.out.println("Mesos shuffle integration shut down cleanly");
241
} catch (Exception e) {
242
System.err.println("Error during shuffle client shutdown: " + e.getMessage());
243
}
244
}
245
}
246
```
247
248
### Heartbeat Configuration and Monitoring
249
250
```java
251
/**
252
* Utility for configuring Mesos shuffle client heartbeat parameters
253
*/
254
public class MesosHeartbeatConfig {
255
256
/**
257
* Creates heartbeat configuration based on cluster characteristics.
258
*
259
* @param clusterSize number of nodes in Mesos cluster
260
* @param networkLatency expected network latency in milliseconds
261
* @return configured MesosExternalShuffleClient
262
*/
263
public static MesosExternalShuffleClient createOptimizedClient(
264
String appId, int clusterSize, long networkLatency) {
265
266
TransportConf conf = new TransportConf("shuffle");
267
268
// Scale heartbeat parameters based on cluster size and network
269
long baseInterval = 10000; // 10 seconds base
270
long baseTimeout = 30000; // 30 seconds base
271
272
// Increase intervals for larger clusters to reduce load
273
long heartbeatInterval = Math.min(baseInterval + (clusterSize * 100), 30000);
274
long heartbeatTimeout = Math.min(baseTimeout + (networkLatency * 3), 120000);
275
276
System.out.println("Heartbeat config - Interval: " + heartbeatInterval +
277
"ms, Timeout: " + heartbeatTimeout + "ms");
278
279
MesosExternalShuffleClient client = new MesosExternalShuffleClient(
280
conf, null, false);
281
282
client.init(appId);
283
return client;
284
}
285
286
/**
287
* Monitor heartbeat health (conceptual - actual implementation would
288
* require access to internal client metrics)
289
*/
290
public static void monitorHeartbeatHealth(MesosExternalShuffleClient client) {
291
// In practice, you would implement monitoring by:
292
// 1. Exposing heartbeat metrics from the client
293
// 2. Logging heartbeat success/failure rates
294
// 3. Alerting on consecutive heartbeat failures
295
// 4. Implementing circuit breaker pattern for failed services
296
297
System.out.println("Heartbeat monitoring would track:");
298
System.out.println("- Heartbeat success rate per shuffle service");
299
System.out.println("- Network latency to each service");
300
System.out.println("- Failed heartbeat recovery time");
301
System.out.println("- Service availability metrics");
302
}
303
}
304
```
305
306
### Error Handling and Recovery
307
308
```java
309
/**
310
* Robust error handling for Mesos shuffle integration
311
*/
312
public class MesosShuffleErrorHandler {
313
private final MesosExternalShuffleClient client;
314
private final Map<String, Integer> failureCount;
315
private final int maxRetries;
316
317
public MesosShuffleErrorHandler(MesosExternalShuffleClient client, int maxRetries) {
318
this.client = client;
319
this.failureCount = new ConcurrentHashMap<>();
320
this.maxRetries = maxRetries;
321
}
322
323
/**
324
* Register with shuffle service with retry logic
325
*/
326
public boolean registerWithRetry(String host, int port) {
327
String serviceKey = host + ":" + port;
328
int failures = failureCount.getOrDefault(serviceKey, 0);
329
330
if (failures >= maxRetries) {
331
System.err.println("Max retries exceeded for " + serviceKey);
332
return false;
333
}
334
335
try {
336
long heartbeatTimeout = 30000; // 30 seconds
337
long heartbeatInterval = 10000; // 10 seconds
338
client.registerDriverWithShuffleService(host, port, heartbeatTimeout, heartbeatInterval);
339
// Reset failure count on success
340
failureCount.remove(serviceKey);
341
System.out.println("Successfully registered with " + serviceKey);
342
return true;
343
344
} catch (IOException e) {
345
failures++;
346
failureCount.put(serviceKey, failures);
347
System.err.println("Registration failed for " + serviceKey +
348
" (attempt " + failures + "/" + maxRetries + "): " + e.getMessage());
349
350
if (failures < maxRetries) {
351
// Exponential backoff
352
try {
353
long backoffMs = Math.min(1000 * (1L << failures), 30000);
354
Thread.sleep(backoffMs);
355
return registerWithRetry(host, port); // Recursive retry
356
} catch (InterruptedException ie) {
357
Thread.currentThread().interrupt();
358
return false;
359
}
360
}
361
return false;
362
363
} catch (InterruptedException e) {
364
Thread.currentThread().interrupt();
365
System.err.println("Registration interrupted for " + serviceKey);
366
return false;
367
}
368
}
369
370
/**
371
* Get services that have exceeded retry limits
372
*/
373
public Set<String> getFailedServices() {
374
return failureCount.entrySet().stream()
375
.filter(entry -> entry.getValue() >= maxRetries)
376
.map(Map.Entry::getKey)
377
.collect(Collectors.toSet());
378
}
379
}
380
```
381
382
## Mesos-Specific Considerations
383
384
### Application Cleanup
385
386
The Mesos integration handles automatic cleanup when drivers terminate:
387
388
```java
389
// 1. Driver registers with shuffle services using RegisterDriver message
390
// 2. Shuffle services track registered drivers
391
// 3. Driver sends periodic ShuffleServiceHeartbeat messages
392
// 4. If heartbeats stop arriving, shuffle service assumes driver died
393
// 5. Shuffle service automatically cleans up application shuffle data
394
// 6. No manual cleanup required from Mesos framework
395
```
396
397
### Fault Tolerance
398
399
```java
400
/**
401
* Fault tolerance strategies for Mesos shuffle integration
402
*/
403
public class MesosFaultTolerance {
404
405
/**
406
* Handle shuffle service failures gracefully
407
*/
408
public void handleShuffleServiceFailure(String failedHost,
409
List<String> remainingHosts,
410
MesosExternalShuffleClient client) {
411
System.err.println("Shuffle service failed: " + failedHost);
412
413
// 1. Remove failed service from active list
414
remainingHosts.remove(failedHost);
415
416
// 2. Redistribute shuffle operations to remaining services
417
if (remainingHosts.isEmpty()) {
418
System.err.println("All shuffle services failed - falling back to local shuffle");
419
// Fallback to non-external shuffle mode
420
} else {
421
System.out.println("Continuing with " + remainingHosts.size() + " remaining services");
422
}
423
424
// 3. Attempt to re-register when service recovers
425
scheduleServiceRecoveryCheck(failedHost, client);
426
}
427
428
private void scheduleServiceRecoveryCheck(String failedHost, MesosExternalShuffleClient client) {
429
// Schedule periodic checks to detect service recovery
430
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
431
scheduler.scheduleWithFixedDelay(() -> {
432
try {
433
long heartbeatTimeout = 30000; // 30 seconds
434
long heartbeatInterval = 10000; // 10 seconds
435
client.registerDriverWithShuffleService(failedHost, 7337,
436
heartbeatTimeout, heartbeatInterval);
437
System.out.println("Shuffle service recovered: " + failedHost);
438
scheduler.shutdown(); // Stop checking once recovered
439
} catch (Exception e) {
440
System.out.println("Service still unavailable: " + failedHost);
441
}
442
}, 30, 30, TimeUnit.SECONDS);
443
}
444
}
445
```
446
447
## Error Handling
448
449
Mesos-specific errors that may occur:
450
451
- **IOException**: Network failures connecting to shuffle services
452
- **InterruptedException**: Driver registration or heartbeat interruption
453
- **TimeoutException**: Heartbeat timeouts indicating service unavailability
454
- **SecurityException**: Authentication failures in secure Mesos clusters
455
456
**Mesos Error Handling Example:**
457
458
```java
459
try {
460
long heartbeatTimeout = 30000; // 30 seconds
461
long heartbeatInterval = 10000; // 10 seconds
462
mesosClient.registerDriverWithShuffleService(host, port, heartbeatTimeout, heartbeatInterval);
463
} catch (IOException e) {
464
logger.warn("Failed to register with shuffle service {}:{} - {}",
465
host, port, e.getMessage());
466
// Implement retry logic or fallback strategy
467
} catch (InterruptedException e) {
468
Thread.currentThread().interrupt();
469
logger.error("Driver registration interrupted");
470
throw new RuntimeException("Registration interrupted", e);
471
}
472
```