0
# Protocol Messages
1
2
Structured communication protocol between shuffle clients and servers with efficient binary serialization using Netty ByteBuf for network transport.
3
4
## Capabilities
5
6
### Block Transfer Message Base
7
8
Abstract base class for all protocol messages handled by the external shuffle block handler.
9
10
```java { .api }
11
/**
12
* Base class for messages handled by ExternalShuffleBlockHandler
13
*/
14
public abstract class BlockTransferMessage implements Encodable {
15
/**
16
* Get the message type identifier
17
* @return Message type enum value
18
*/
19
protected abstract Type type();
20
21
/**
22
* Serialize message to byte buffer for network transport
23
* @return ByteBuffer containing serialized message
24
*/
25
public ByteBuffer toByteBuffer();
26
27
/**
28
* Message type enumeration
29
*/
30
public static enum Type {
31
/** Request to open blocks for reading */
32
OPEN_BLOCKS(0),
33
/** Request to upload a block */
34
UPLOAD_BLOCK(1),
35
/** Request to register an executor */
36
REGISTER_EXECUTOR(2),
37
/** Response containing stream handle */
38
STREAM_HANDLE(3),
39
/** Request to register a driver (Mesos) */
40
REGISTER_DRIVER(4);
41
42
/**
43
* Get the numeric identifier for this type
44
* @return Numeric type ID
45
*/
46
public byte id();
47
}
48
49
/**
50
* Message decoder for deserializing from byte buffers
51
*/
52
public static class Decoder {
53
/**
54
* Decode message from byte buffer
55
* @param msg - ByteBuffer containing serialized message
56
* @return Decoded BlockTransferMessage instance
57
*/
58
public static BlockTransferMessage fromByteBuffer(ByteBuffer msg);
59
}
60
}
61
```
62
63
### Executor Shuffle Information
64
65
Configuration information for locating shuffle files of a registered executor.
66
67
```java { .api }
68
/**
69
* Contains configuration for locating shuffle files of an executor
70
*/
71
public class ExecutorShuffleInfo implements Encodable {
72
/** Array of local directories where shuffle files are stored */
73
public final String[] localDirs;
74
/** Number of subdirectories per local directory */
75
public final int subDirsPerLocalDir;
76
/** Shuffle manager class name */
77
public final String shuffleManager;
78
79
/**
80
* Create executor shuffle information
81
* @param localDirs - Array of local directories for shuffle files
82
* @param subDirsPerLocalDir - Number of subdirectories per local directory
83
* @param shuffleManager - Shuffle manager class name
84
*/
85
public ExecutorShuffleInfo(String[] localDirs, int subDirsPerLocalDir, String shuffleManager);
86
87
/**
88
* Generate hash code for this configuration
89
* @return hash code
90
*/
91
public int hashCode();
92
93
/**
94
* String representation of this configuration
95
* @return string representation
96
*/
97
public String toString();
98
99
/**
100
* Check equality with another ExecutorShuffleInfo
101
* @param other - Object to compare
102
* @return true if equal
103
*/
104
public boolean equals(Object other);
105
106
/**
107
* Get encoded length of this message
108
* @return Length in bytes when encoded
109
*/
110
public int encodedLength();
111
112
/**
113
* Encode this message to byte buffer
114
* @param buf - ByteBuf to write encoded data
115
*/
116
public void encode(ByteBuf buf);
117
118
/**
119
* Decode ExecutorShuffleInfo from byte buffer
120
* @param buf - ByteBuf containing encoded data
121
* @return Decoded ExecutorShuffleInfo instance
122
*/
123
public static ExecutorShuffleInfo decode(ByteBuf buf);
124
}
125
```
126
127
**Usage Example:**
128
129
```java
130
// Create executor shuffle info
131
ExecutorShuffleInfo shuffleInfo = new ExecutorShuffleInfo(
132
new String[]{"/tmp/spark-local-1", "/tmp/spark-local-2"},
133
64,
134
"org.apache.spark.shuffle.sort.SortShuffleManager"
135
);
136
137
// Serialize for network transport
138
ByteBuf buffer = Unpooled.buffer(shuffleInfo.encodedLength());
139
shuffleInfo.encode(buffer);
140
141
// Deserialize from network
142
ExecutorShuffleInfo decoded = ExecutorShuffleInfo.decode(buffer);
143
```
144
145
### Open Blocks Request
146
147
Request message to read a set of blocks, which returns a StreamHandle for streaming the block data.
148
149
```java { .api }
150
/**
151
* Request to read a set of blocks, returns StreamHandle
152
*/
153
public class OpenBlocks extends BlockTransferMessage {
154
/** Application identifier */
155
public final String appId;
156
/** Executor identifier */
157
public final String execId;
158
/** Array of block identifiers to open */
159
public final String[] blockIds;
160
161
/**
162
* Create open blocks request
163
* @param appId - Application identifier
164
* @param execId - Executor identifier
165
* @param blockIds - Array of block identifiers to open
166
*/
167
public OpenBlocks(String appId, String execId, String[] blockIds);
168
169
/**
170
* Generate hash code for this request
171
* @return hash code
172
*/
173
public int hashCode();
174
175
/**
176
* String representation of this request
177
* @return string representation
178
*/
179
public String toString();
180
181
/**
182
* Check equality with another OpenBlocks
183
* @param other - Object to compare
184
* @return true if equal
185
*/
186
public boolean equals(Object other);
187
188
/**
189
* Get encoded length of this message
190
* @return Length in bytes when encoded
191
*/
192
public int encodedLength();
193
194
/**
195
* Encode this message to byte buffer
196
* @param buf - ByteBuf to write encoded data
197
*/
198
public void encode(ByteBuf buf);
199
200
/**
201
* Decode OpenBlocks from byte buffer
202
* @param buf - ByteBuf containing encoded data
203
* @return Decoded OpenBlocks instance
204
*/
205
public static OpenBlocks decode(ByteBuf buf);
206
}
207
```
208
209
### Register Executor Request
210
211
Initial registration message sent between executor and shuffle server to establish the executor's configuration.
212
213
```java { .api }
214
/**
215
* Initial registration message between executor and shuffle server
216
*/
217
public class RegisterExecutor extends BlockTransferMessage {
218
/** Application identifier */
219
public final String appId;
220
/** Executor identifier */
221
public final String execId;
222
/** Executor configuration information */
223
public final ExecutorShuffleInfo executorInfo;
224
225
/**
226
* Create register executor request
227
* @param appId - Application identifier
228
* @param execId - Executor identifier
229
* @param executorInfo - Executor configuration information
230
*/
231
public RegisterExecutor(String appId, String execId, ExecutorShuffleInfo executorInfo);
232
233
/**
234
* Generate hash code for this request
235
* @return hash code
236
*/
237
public int hashCode();
238
239
/**
240
* String representation of this request
241
* @return string representation
242
*/
243
public String toString();
244
245
/**
246
* Check equality with another RegisterExecutor
247
* @param other - Object to compare
248
* @return true if equal
249
*/
250
public boolean equals(Object other);
251
252
/**
253
* Get encoded length of this message
254
* @return Length in bytes when encoded
255
*/
256
public int encodedLength();
257
258
/**
259
* Encode this message to byte buffer
260
* @param buf - ByteBuf to write encoded data
261
*/
262
public void encode(ByteBuf buf);
263
264
/**
265
* Decode RegisterExecutor from byte buffer
266
* @param buf - ByteBuf containing encoded data
267
* @return Decoded RegisterExecutor instance
268
*/
269
public static RegisterExecutor decode(ByteBuf buf);
270
}
271
```
272
273
### Stream Handle Response
274
275
Response message containing an identifier for a fixed number of chunks to read from a stream.
276
277
```java { .api }
278
/**
279
* Identifier for fixed number of chunks to read from stream
280
*/
281
public class StreamHandle extends BlockTransferMessage {
282
/** Stream identifier for reading chunks */
283
public final long streamId;
284
/** Number of chunks available in the stream */
285
public final int numChunks;
286
287
/**
288
* Create stream handle response
289
* @param streamId - Stream identifier for reading chunks
290
* @param numChunks - Number of chunks available in the stream
291
*/
292
public StreamHandle(long streamId, int numChunks);
293
294
/**
295
* Generate hash code for this handle
296
* @return hash code
297
*/
298
public int hashCode();
299
300
/**
301
* String representation of this handle
302
* @return string representation
303
*/
304
public String toString();
305
306
/**
307
* Check equality with another StreamHandle
308
* @param other - Object to compare
309
* @return true if equal
310
*/
311
public boolean equals(Object other);
312
313
/**
314
* Get encoded length of this message
315
* @return Length in bytes when encoded
316
*/
317
public int encodedLength();
318
319
/**
320
* Encode this message to byte buffer
321
* @param buf - ByteBuf to write encoded data
322
*/
323
public void encode(ByteBuf buf);
324
325
/**
326
* Decode StreamHandle from byte buffer
327
* @param buf - ByteBuf containing encoded data
328
* @return Decoded StreamHandle instance
329
*/
330
public static StreamHandle decode(ByteBuf buf);
331
}
332
```
333
334
### Upload Block Request
335
336
Request message to upload a block with associated metadata and storage level information.
337
338
```java { .api }
339
/**
340
* Request to upload a block with StorageLevel
341
*/
342
public class UploadBlock extends BlockTransferMessage {
343
/** Application identifier */
344
public final String appId;
345
/** Executor identifier */
346
public final String execId;
347
/** Block identifier */
348
public final String blockId;
349
/** Block metadata as byte array */
350
public final byte[] metadata;
351
/** Block data as byte array */
352
public final byte[] blockData;
353
354
/**
355
* Create upload block request
356
* @param appId - Application identifier
357
* @param execId - Executor identifier
358
* @param blockId - Block identifier
359
* @param metadata - Block metadata as byte array
360
* @param blockData - Block data as byte array
361
*/
362
public UploadBlock(String appId, String execId, String blockId, byte[] metadata, byte[] blockData);
363
364
/**
365
* Generate hash code for this request
366
* @return hash code
367
*/
368
public int hashCode();
369
370
/**
371
* String representation of this request
372
* @return string representation
373
*/
374
public String toString();
375
376
/**
377
* Check equality with another UploadBlock
378
* @param other - Object to compare
379
* @return true if equal
380
*/
381
public boolean equals(Object other);
382
383
/**
384
* Get encoded length of this message
385
* @return Length in bytes when encoded
386
*/
387
public int encodedLength();
388
389
/**
390
* Encode this message to byte buffer
391
* @param buf - ByteBuf to write encoded data
392
*/
393
public void encode(ByteBuf buf);
394
395
/**
396
* Decode UploadBlock from byte buffer
397
* @param buf - ByteBuf containing encoded data
398
* @return Decoded UploadBlock instance
399
*/
400
public static UploadBlock decode(ByteBuf buf);
401
}
402
```
403
404
## Mesos Protocol Extension
405
406
### Register Driver Request
407
408
Special registration message for driver registration with MesosExternalShuffleService in coarse-grained mode.
409
410
```java { .api }
411
/**
412
* Message for driver registration with MesosExternalShuffleService
413
*/
414
public class RegisterDriver extends BlockTransferMessage {
415
/**
416
* Create register driver request
417
* @param appId - Application identifier
418
*/
419
public RegisterDriver(String appId);
420
421
/**
422
* Get application identifier
423
* @return Application identifier
424
*/
425
public String getAppId();
426
427
/**
428
* Get encoded length of this message
429
* @return Length in bytes when encoded
430
*/
431
public int encodedLength();
432
433
/**
434
* Encode this message to byte buffer
435
* @param buf - ByteBuf to write encoded data
436
*/
437
public void encode(ByteBuf buf);
438
439
/**
440
* Generate hash code for this request
441
* @return hash code
442
*/
443
public int hashCode();
444
445
/**
446
* Decode RegisterDriver from byte buffer
447
* @param buf - ByteBuf containing encoded data
448
* @return Decoded RegisterDriver instance
449
*/
450
public static RegisterDriver decode(ByteBuf buf);
451
}
452
```
453
454
## Protocol Usage Examples
455
456
### Client-Server Communication Flow
457
458
```java
459
// 1. Client registers executor with server
460
ExecutorShuffleInfo shuffleInfo = new ExecutorShuffleInfo(
461
new String[]{"/tmp/spark-local"},
462
64,
463
"org.apache.spark.shuffle.sort.SortShuffleManager"
464
);
465
RegisterExecutor registerMsg = new RegisterExecutor("app-123", "executor-1", shuffleInfo);
466
467
// 2. Client requests to open blocks
468
OpenBlocks openMsg = new OpenBlocks("app-123", "executor-1",
469
new String[]{"shuffle_0_1_0", "shuffle_0_1_1"});
470
471
// 3. Server responds with stream handle
472
StreamHandle handleMsg = new StreamHandle(12345L, 2);
473
474
// 4. Client can now read chunks using the stream ID
475
```
476
477
### Message Serialization
478
479
```java
480
// Serialize any protocol message
481
BlockTransferMessage message = new OpenBlocks("app-123", "executor-1", new String[]{"shuffle_0_1_0"});
482
ByteBuffer serialized = message.toByteBuffer();
483
484
// Deserialize from byte buffer
485
BlockTransferMessage deserialized = BlockTransferMessage.Decoder.fromByteBuffer(serialized);
486
487
// Type-safe casting
488
if (deserialized instanceof OpenBlocks) {
489
OpenBlocks openBlocks = (OpenBlocks) deserialized;
490
// Process open blocks request
491
}
492
```