0
# Protocol Messages
1
2
Serializable message classes for communication between shuffle clients and servers, including registration, block requests, and data transfers.
3
4
## Capabilities
5
6
### BlockTransferMessage Base Class
7
8
Base class for all shuffle protocol messages.
9
10
```java { .api }
11
/**
12
* Base class for all shuffle protocol messages
13
*/
14
public abstract class BlockTransferMessage implements Encodable {
15
/**
16
* Convert the message to a ByteBuffer for network transmission
17
* @return ByteBuffer containing the serialized message
18
*/
19
public ByteBuffer toByteBuffer();
20
21
/**
22
* Enumeration of all supported message types
23
*/
24
public enum Type {
25
OPEN_BLOCKS, UPLOAD_BLOCK, REGISTER_EXECUTOR, STREAM_HANDLE,
26
REGISTER_DRIVER, HEARTBEAT, UPLOAD_BLOCK_STREAM
27
}
28
29
/**
30
* Decoder for deserializing messages from ByteBuffer
31
*/
32
public static class Decoder {
33
/**
34
* Deserialize a message from ByteBuffer
35
* @param msg - ByteBuffer containing serialized message
36
* @return Deserialized BlockTransferMessage
37
*/
38
public static BlockTransferMessage fromByteBuffer(ByteBuffer msg);
39
}
40
}
41
```
42
43
### ExecutorShuffleInfo
44
45
Contains executor configuration for locating shuffle files.
46
47
```java { .api }
48
/**
49
* Contains executor configuration for locating shuffle files
50
*/
51
public class ExecutorShuffleInfo implements Encodable {
52
/**
53
* Local directories where shuffle files are stored
54
*/
55
public final String[] localDirs;
56
57
/**
58
* Number of subdirectories per local directory
59
*/
60
public final int subDirsPerLocalDir;
61
62
/**
63
* Shuffle manager class name
64
*/
65
public final String shuffleManager;
66
67
/**
68
* Create executor shuffle information
69
* @param localDirs - Array of local directory paths for shuffle files
70
* @param subDirsPerLocalDir - Number of subdirectories per local directory
71
* @param shuffleManager - Name of the shuffle manager implementation
72
*/
73
public ExecutorShuffleInfo(String[] localDirs, int subDirsPerLocalDir, String shuffleManager);
74
75
public boolean equals(Object other);
76
public int hashCode();
77
public String toString();
78
}
79
```
80
81
### OpenBlocks Message
82
83
Request to read a set of blocks.
84
85
```java { .api }
86
/**
87
* Request to read a set of blocks from the shuffle service
88
*/
89
public class OpenBlocks extends BlockTransferMessage {
90
/**
91
* Application ID
92
*/
93
public final String appId;
94
95
/**
96
* Executor ID
97
*/
98
public final String execId;
99
100
/**
101
* Array of block IDs to open
102
*/
103
public final String[] blockIds;
104
105
/**
106
* Create an open blocks request
107
* @param appId - Application ID
108
* @param execId - Executor ID
109
* @param blockIds - Array of block IDs to request
110
*/
111
public OpenBlocks(String appId, String execId, String[] blockIds);
112
113
public boolean equals(Object other);
114
public int hashCode();
115
public String toString();
116
}
117
```
118
119
### RegisterExecutor Message
120
121
Initial registration message between executor and shuffle server.
122
123
```java { .api }
124
/**
125
* Initial registration message between executor and shuffle server
126
*/
127
public class RegisterExecutor extends BlockTransferMessage {
128
/**
129
* Application ID
130
*/
131
public final String appId;
132
133
/**
134
* Executor ID
135
*/
136
public final String execId;
137
138
/**
139
* Executor shuffle configuration information
140
*/
141
public final ExecutorShuffleInfo executorInfo;
142
143
/**
144
* Create an executor registration message
145
* @param appId - Application ID
146
* @param execId - Executor ID
147
* @param executorInfo - Executor shuffle configuration
148
*/
149
public RegisterExecutor(String appId, String execId, ExecutorShuffleInfo executorInfo);
150
151
public boolean equals(Object other);
152
public int hashCode();
153
public String toString();
154
}
155
```
156
157
### StreamHandle Message
158
159
Identifier for fixed number of chunks in a stream.
160
161
```java { .api }
162
/**
163
* Identifier for a fixed number of chunks in a stream
164
*/
165
public class StreamHandle extends BlockTransferMessage {
166
/**
167
* Stream identifier
168
*/
169
public final long streamId;
170
171
/**
172
* Number of chunks in the stream
173
*/
174
public final int numChunks;
175
176
/**
177
* Create a stream handle
178
* @param streamId - Unique stream identifier
179
* @param numChunks - Number of chunks in the stream
180
*/
181
public StreamHandle(long streamId, int numChunks);
182
183
public boolean equals(Object other);
184
public int hashCode();
185
public String toString();
186
}
187
```
188
189
### UploadBlock Message
190
191
Request to upload a block with storage level.
192
193
```java { .api }
194
/**
195
* Request to upload a block with storage level
196
*/
197
public class UploadBlock extends BlockTransferMessage {
198
/**
199
* Application ID
200
*/
201
public final String appId;
202
203
/**
204
* Executor ID
205
*/
206
public final String execId;
207
208
/**
209
* Block ID
210
*/
211
public final String blockId;
212
213
/**
214
* Block metadata
215
*/
216
public final byte[] metadata;
217
218
/**
219
* Block data
220
*/
221
public final byte[] blockData;
222
223
/**
224
* Create an upload block request
225
* @param appId - Application ID
226
* @param execId - Executor ID
227
* @param blockId - Block ID to upload
228
* @param metadata - Block metadata
229
* @param blockData - Block data bytes
230
*/
231
public UploadBlock(String appId, String execId, String blockId, byte[] metadata, byte[] blockData);
232
233
public boolean equals(Object other);
234
public int hashCode();
235
public String toString();
236
}
237
```
238
239
### UploadBlockStream Message
240
241
Request to upload block as stream.
242
243
```java { .api }
244
/**
245
* Request to upload block as stream
246
*/
247
public class UploadBlockStream extends BlockTransferMessage {
248
/**
249
* Block ID
250
*/
251
public final String blockId;
252
253
/**
254
* Block metadata
255
*/
256
public final byte[] metadata;
257
258
/**
259
* Create an upload block stream request
260
* @param blockId - Block ID to upload
261
* @param metadata - Block metadata
262
*/
263
public UploadBlockStream(String blockId, byte[] metadata);
264
265
public boolean equals(Object other);
266
public int hashCode();
267
public String toString();
268
}
269
```
270
271
### Mesos Protocol Messages
272
273
#### RegisterDriver Message
274
275
Message for driver registration with Mesos external shuffle service.
276
277
```java { .api }
278
/**
279
* Message for driver registration with Mesos external shuffle service
280
*/
281
public class RegisterDriver extends BlockTransferMessage {
282
/**
283
* Create a driver registration message
284
* @param appId - Application ID
285
* @param heartbeatTimeoutMs - Heartbeat timeout in milliseconds
286
*/
287
public RegisterDriver(String appId, long heartbeatTimeoutMs);
288
289
/**
290
* Get the application ID
291
* @return Application ID
292
*/
293
public String getAppId();
294
295
/**
296
* Get the heartbeat timeout
297
* @return Heartbeat timeout in milliseconds
298
*/
299
public long getHeartbeatTimeoutMs();
300
301
public boolean equals(Object other);
302
public int hashCode();
303
public String toString();
304
}
305
```
306
307
#### ShuffleServiceHeartbeat Message
308
309
Heartbeat message from driver to Mesos external shuffle service.
310
311
```java { .api }
312
/**
313
* Heartbeat message from driver to Mesos external shuffle service
314
*/
315
public class ShuffleServiceHeartbeat extends BlockTransferMessage {
316
/**
317
* Create a heartbeat message
318
* @param appId - Application ID
319
*/
320
public ShuffleServiceHeartbeat(String appId);
321
322
/**
323
* Get the application ID
324
* @return Application ID
325
*/
326
public String getAppId();
327
328
public boolean equals(Object other);
329
public int hashCode();
330
public String toString();
331
}
332
```
333
334
**Usage Examples:**
335
336
```java
337
import org.apache.spark.network.shuffle.protocol.*;
338
import org.apache.spark.network.shuffle.protocol.mesos.*;
339
340
// Example 1: Executor registration
341
String[] localDirs = {"/tmp/spark-local-1", "/tmp/spark-local-2"};
342
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(localDirs, 64, "sort");
343
RegisterExecutor registerMsg = new RegisterExecutor("app-001", "executor-1", executorInfo);
344
345
// Serialize for network transmission
346
ByteBuffer serialized = registerMsg.toByteBuffer();
347
System.out.println("Serialized registration message: " + serialized.remaining() + " bytes");
348
349
// Deserialize received message
350
BlockTransferMessage received = BlockTransferMessage.Decoder.fromByteBuffer(serialized);
351
if (received instanceof RegisterExecutor) {
352
RegisterExecutor regMsg = (RegisterExecutor) received;
353
System.out.println("Received registration for app: " + regMsg.appId +
354
", executor: " + regMsg.execId);
355
}
356
357
// Example 2: Block request
358
String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1", "shuffle_1_1_0"};
359
OpenBlocks openBlocks = new OpenBlocks("app-001", "executor-1", blockIds);
360
361
ByteBuffer openBlocksBuffer = openBlocks.toByteBuffer();
362
System.out.println("Open blocks request size: " + openBlocksBuffer.remaining() + " bytes");
363
364
// Example 3: Stream handling
365
StreamHandle streamHandle = new StreamHandle(12345L, 3);
366
System.out.println("Stream handle: ID=" + streamHandle.streamId +
367
", chunks=" + streamHandle.numChunks);
368
369
// Example 4: Block upload
370
byte[] metadata = "block-metadata".getBytes();
371
byte[] blockData = "actual-block-data-here".getBytes();
372
UploadBlock uploadBlock = new UploadBlock("app-001", "executor-1", "block-123", metadata, blockData);
373
374
System.out.println("Upload block: " + uploadBlock.blockId +
375
", metadata size: " + uploadBlock.metadata.length +
376
", data size: " + uploadBlock.blockData.length);
377
378
// Example 5: Mesos driver registration
379
RegisterDriver driverReg = new RegisterDriver("app-001", 30000L);
380
System.out.println("Mesos driver registration: app=" + driverReg.getAppId() +
381
", timeout=" + driverReg.getHeartbeatTimeoutMs() + "ms");
382
383
// Example 6: Heartbeat
384
ShuffleServiceHeartbeat heartbeat = new ShuffleServiceHeartbeat("app-001");
385
System.out.println("Heartbeat for app: " + heartbeat.getAppId());
386
```
387
388
### Protocol Message Flow
389
390
The typical message flow between shuffle clients and servers:
391
392
1. **Registration Phase**:
393
- Client sends `RegisterExecutor` with executor configuration
394
- Server acknowledges and stores executor information
395
396
2. **Block Request Phase**:
397
- Client sends `OpenBlocks` with list of required block IDs
398
- Server responds with `StreamHandle` containing stream information
399
400
3. **Data Transfer Phase**:
401
- Client receives block data through established stream
402
- Multiple blocks can be transferred through a single stream
403
404
4. **Upload Operations** (if needed):
405
- Client sends `UploadBlock` or `UploadBlockStream` for block storage
406
- Server stores blocks according to shuffle configuration
407
408
5. **Mesos-Specific Flow**:
409
- Driver sends `RegisterDriver` to establish connection
410
- Periodic `ShuffleServiceHeartbeat` messages maintain connection
411
412
### Message Serialization
413
414
All protocol messages implement the `Encodable` interface and provide:
415
416
- **Encoding**: Convert message objects to ByteBuffer for network transmission
417
- **Decoding**: Reconstruct message objects from received ByteBuffer
418
- **Type Safety**: Message type identification for proper deserialization
419
- **Efficiency**: Optimized serialization for high-throughput shuffle operations
420
421
### Error Handling
422
423
Protocol messages include built-in error handling:
424
425
- **Validation**: Input parameter validation during message creation
426
- **Serialization Errors**: Proper exception handling during encoding/decoding
427
- **Version Compatibility**: Forward/backward compatibility for protocol evolution
428
- **Corruption Detection**: Built-in mechanisms to detect corrupted messages