0
# Protocol Messages
1
2
Structured protocol messages for client-server communication, including executor registration, block requests, and streaming handles using Netty encoding.
3
4
## Capabilities
5
6
### BlockTransferMessage Base Class
7
8
Abstract base class for all shuffle protocol messages with serialization support.
9
10
```java { .api }
11
/**
12
* Base class for messages handled by ExternalShuffleBlockHandler and NettyBlockTransferService.
13
* All messages implement Encodable for efficient Netty-based serialization.
14
*/
15
public abstract class BlockTransferMessage implements Encodable {
16
/**
17
* Returns the message type identifier.
18
*/
19
protected abstract Type type();
20
21
/**
22
* Serializes the message to a ByteBuffer.
23
* Includes type byte followed by message-specific data.
24
*/
25
public ByteBuffer toByteBuffer();
26
27
/**
28
* Message type enumeration for protocol identification.
29
*/
30
public enum Type {
31
OPEN_BLOCKS(0), // Request to open blocks for streaming
32
UPLOAD_BLOCK(1), // Upload block data (NettyBlockTransferService only)
33
REGISTER_EXECUTOR(2), // Register executor with shuffle service
34
STREAM_HANDLE(3), // Handle for streaming blocks
35
REGISTER_DRIVER(4), // Register Mesos driver
36
HEARTBEAT(5); // Mesos heartbeat message
37
38
public byte id();
39
}
40
41
/**
42
* Message decoder for deserializing incoming messages.
43
*/
44
public static class Decoder {
45
/**
46
* Deserializes ByteBuffer to appropriate message type.
47
* Reads type byte and delegates to specific decode method.
48
*/
49
public static BlockTransferMessage fromByteBuffer(ByteBuffer msg);
50
}
51
}
52
```
53
54
### ExecutorShuffleInfo
55
56
Configuration message containing all information needed to locate executor shuffle files.
57
58
```java { .api }
59
/**
60
* Contains all configuration necessary for locating the shuffle files of an executor.
61
* Sent during executor registration to inform shuffle service about file locations.
62
*/
63
public class ExecutorShuffleInfo implements Encodable {
64
/** The base set of local directories that the executor stores its shuffle files in. */
65
public final String[] localDirs;
66
67
/** Number of subdirectories created within each localDir. */
68
public final int subDirsPerLocalDir;
69
70
/** Shuffle manager (e.g., SortShuffleManager) that the executor is using. */
71
public final String shuffleManager;
72
73
/**
74
* Creates executor shuffle configuration.
75
*
76
* @param localDirs array of local directory paths for shuffle files
77
* @param subDirsPerLocalDir number of subdirectories per local directory
78
* @param shuffleManager fully qualified shuffle manager class name
79
*/
80
public ExecutorShuffleInfo(String[] localDirs, int subDirsPerLocalDir, String shuffleManager);
81
82
/**
83
* Encodes the shuffle info to a Netty ByteBuf.
84
*/
85
@Override
86
public void encode(ByteBuf buf);
87
88
/**
89
* Decodes shuffle info from a Netty ByteBuf.
90
*/
91
public static ExecutorShuffleInfo decode(ByteBuf buf);
92
93
/**
94
* Returns the encoded length in bytes.
95
*/
96
@Override
97
public int encodedLength();
98
99
@Override
100
public boolean equals(Object other);
101
102
@Override
103
public int hashCode();
104
105
@Override
106
public String toString();
107
}
108
```
109
110
**Usage Example:**
111
112
```java
113
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
114
115
// Create executor shuffle configuration
116
String[] localDirs = {
117
"/tmp/spark-local-dir1",
118
"/tmp/spark-local-dir2",
119
"/mnt/spark-ssd/local"
120
};
121
int subDirsPerLocalDir = 64; // Hash-based directory distribution
122
String shuffleManager = "org.apache.spark.shuffle.sort.SortShuffleManager";
123
124
ExecutorShuffleInfo info = new ExecutorShuffleInfo(localDirs, subDirsPerLocalDir, shuffleManager);
125
126
// Shuffle info is automatically encoded when sent over network
127
System.out.println("Shuffle info: " + info.toString());
128
System.out.println("Encoded size: " + info.encodedLength() + " bytes");
129
```
130
131
### OpenBlocks
132
133
Message requesting the server to open shuffle blocks for streaming.
134
135
```java { .api }
136
/**
137
* Message to request opening blocks for streaming.
138
* Server responds with StreamHandle containing stream ID and chunk count.
139
*/
140
public class OpenBlocks extends BlockTransferMessage {
141
public final String appId;
142
public final String execId;
143
public final String[] blockIds;
144
145
/**
146
* Creates an open blocks request.
147
*
148
* @param appId application identifier
149
* @param execId executor identifier
150
* @param blockIds array of block identifiers to open
151
*/
152
public OpenBlocks(String appId, String execId, String[] blockIds);
153
154
@Override
155
protected Type type();
156
157
@Override
158
public void encode(ByteBuf buf);
159
160
public static OpenBlocks decode(ByteBuf buf);
161
162
@Override
163
public int encodedLength();
164
165
@Override
166
public boolean equals(Object other);
167
168
@Override
169
public int hashCode();
170
171
@Override
172
public String toString();
173
}
174
```
175
176
### RegisterExecutor
177
178
Message for registering an executor with the shuffle service.
179
180
```java { .api }
181
/**
182
* Message for registering an executor with the external shuffle service.
183
* Contains executor ID and shuffle configuration information.
184
*/
185
public class RegisterExecutor extends BlockTransferMessage {
186
public final String appId;
187
public final String execId;
188
public final ExecutorShuffleInfo executorInfo;
189
190
/**
191
* Creates an executor registration message.
192
*
193
* @param appId application identifier
194
* @param execId executor identifier
195
* @param executorInfo shuffle configuration for this executor
196
*/
197
public RegisterExecutor(String appId, String execId, ExecutorShuffleInfo executorInfo);
198
199
@Override
200
protected Type type();
201
202
@Override
203
public void encode(ByteBuf buf);
204
205
public static RegisterExecutor decode(ByteBuf buf);
206
207
@Override
208
public int encodedLength();
209
210
@Override
211
public boolean equals(Object other);
212
213
@Override
214
public int hashCode();
215
216
@Override
217
public String toString();
218
}
219
```
220
221
### StreamHandle
222
223
Response message containing stream information for block fetching.
224
225
```java { .api }
226
/**
227
* Handle for streaming blocks from server to client.
228
* Returned in response to OpenBlocks request.
229
*/
230
public class StreamHandle extends BlockTransferMessage {
231
public final long streamId;
232
public final int numChunks;
233
234
/**
235
* Creates a stream handle.
236
*
237
* @param streamId unique identifier for the stream
238
* @param numChunks number of chunks (blocks) in the stream
239
*/
240
public StreamHandle(long streamId, int numChunks);
241
242
@Override
243
protected Type type();
244
245
@Override
246
public void encode(ByteBuf buf);
247
248
public static StreamHandle decode(ByteBuf buf);
249
250
@Override
251
public int encodedLength();
252
253
@Override
254
public boolean equals(Object other);
255
256
@Override
257
public int hashCode();
258
259
@Override
260
public String toString();
261
}
262
```
263
264
### UploadBlock
265
266
Message for uploading block data (used by NettyBlockTransferService, not external shuffle service).
267
268
```java { .api }
269
/**
270
* Message for uploading block data to a remote executor.
271
* Used by NettyBlockTransferService, not by external shuffle service.
272
*/
273
public class UploadBlock extends BlockTransferMessage {
274
public final String appId;
275
public final String execId;
276
public final String blockId;
277
public final byte[] blockData;
278
public final String level;
279
280
/**
281
* Creates an upload block message.
282
*
283
* @param appId application identifier
284
* @param execId executor identifier
285
* @param blockId block identifier
286
* @param blockData block data bytes
287
* @param level storage level
288
*/
289
public UploadBlock(String appId, String execId, String blockId, byte[] blockData, String level);
290
291
@Override
292
protected Type type();
293
294
@Override
295
public void encode(ByteBuf buf);
296
297
public static UploadBlock decode(ByteBuf buf);
298
299
@Override
300
public int encodedLength();
301
302
@Override
303
public boolean equals(Object other);
304
305
@Override
306
public int hashCode();
307
308
@Override
309
public String toString();
310
}
311
```
312
313
## Protocol Usage Examples
314
315
### Client-Server Communication Flow
316
317
```java
318
// 1. Client registers executor with shuffle service
319
ExecutorShuffleInfo info = new ExecutorShuffleInfo(localDirs, 64, shuffleManager);
320
RegisterExecutor registerMsg = new RegisterExecutor(appId, execId, info);
321
ByteBuffer encoded = registerMsg.toByteBuffer();
322
client.sendRpcSync(encoded, timeout);
323
324
// 2. Client requests blocks for streaming
325
String[] blockIds = {"shuffle_0_1_0", "shuffle_0_1_1", "shuffle_0_2_0"};
326
OpenBlocks openMsg = new OpenBlocks(appId, execId, blockIds);
327
ByteBuffer request = openMsg.toByteBuffer();
328
ByteBuffer response = client.sendRpcSync(request, timeout);
329
330
// 3. Server responds with stream handle
331
BlockTransferMessage responseMsg = BlockTransferMessage.Decoder.fromByteBuffer(response);
332
if (responseMsg instanceof StreamHandle) {
333
StreamHandle handle = (StreamHandle) responseMsg;
334
System.out.println("Stream ID: " + handle.streamId);
335
System.out.println("Chunks: " + handle.numChunks);
336
337
// 4. Client fetches stream chunks using stream ID
338
// (handled by transport layer)
339
}
340
```
341
342
### Message Encoding and Decoding
343
344
```java
345
import org.apache.spark.network.shuffle.protocol.*;
346
import java.nio.ByteBuffer;
347
348
// Encoding messages for network transmission
349
RegisterExecutor message = new RegisterExecutor(appId, execId, executorInfo);
350
ByteBuffer encoded = message.toByteBuffer();
351
System.out.println("Encoded message size: " + encoded.remaining() + " bytes");
352
353
// Decoding messages from network
354
ByteBuffer received = // ... received from network
355
BlockTransferMessage decoded = BlockTransferMessage.Decoder.fromByteBuffer(received);
356
357
switch (decoded.type()) {
358
case REGISTER_EXECUTOR:
359
RegisterExecutor regMsg = (RegisterExecutor) decoded;
360
System.out.println("Received registration for executor: " + regMsg.execId);
361
break;
362
363
case OPEN_BLOCKS:
364
OpenBlocks openMsg = (OpenBlocks) decoded;
365
System.out.println("Received request for " + openMsg.blockIds.length + " blocks");
366
break;
367
368
case STREAM_HANDLE:
369
StreamHandle handle = (StreamHandle) decoded;
370
System.out.println("Received stream handle: " + handle.streamId);
371
break;
372
373
default:
374
System.err.println("Unknown message type: " + decoded.type());
375
}
376
```
377
378
### Custom Message Processing
379
380
```java
381
/**
382
* Example message handler implementation
383
*/
384
public class ShuffleMessageProcessor {
385
386
public void processMessage(ByteBuffer messageBuffer) {
387
try {
388
BlockTransferMessage message = BlockTransferMessage.Decoder.fromByteBuffer(messageBuffer);
389
390
switch (message.type()) {
391
case REGISTER_EXECUTOR:
392
handleRegisterExecutor((RegisterExecutor) message);
393
break;
394
395
case OPEN_BLOCKS:
396
handleOpenBlocks((OpenBlocks) message);
397
break;
398
399
default:
400
throw new UnsupportedOperationException("Unsupported message: " + message.type());
401
}
402
} catch (IllegalArgumentException e) {
403
System.err.println("Failed to decode message: " + e.getMessage());
404
}
405
}
406
407
private void handleRegisterExecutor(RegisterExecutor message) {
408
System.out.println("Registering executor " + message.execId +
409
" with " + message.executorInfo.localDirs.length + " local directories");
410
411
// Validate executor info
412
if (message.executorInfo.localDirs.length == 0) {
413
throw new IllegalArgumentException("Executor must have at least one local directory");
414
}
415
416
// Process registration...
417
}
418
419
private void handleOpenBlocks(OpenBlocks message) {
420
System.out.println("Opening " + message.blockIds.length + " blocks for executor " + message.execId);
421
422
// Validate block IDs
423
for (String blockId : message.blockIds) {
424
if (!blockId.startsWith("shuffle_")) {
425
throw new IllegalArgumentException("Invalid block ID format: " + blockId);
426
}
427
}
428
429
// Process block opening...
430
}
431
}
432
```
433
434
## Error Handling
435
436
Protocol messages can fail during encoding/decoding:
437
438
- **IllegalArgumentException**: Invalid message format, unknown message type
439
- **UnsupportedOperationException**: Unsupported message types
440
- **BufferUnderflowException**: Incomplete message data
441
- **BufferOverflowException**: Message too large for buffer
442
443
**Error Handling Example:**
444
445
```java
446
try {
447
BlockTransferMessage message = BlockTransferMessage.Decoder.fromByteBuffer(buffer);
448
// Process message
449
} catch (IllegalArgumentException e) {
450
System.err.println("Invalid message format: " + e.getMessage());
451
} catch (Exception e) {
452
System.err.println("Message processing failed: " + e.getMessage());
453
}
454
```