0
# Network Protocol Messages
1
2
The protocol message classes define the communication format between shuffle clients and servers. All messages extend `BlockTransferMessage` and support serialization to/from byte buffers for network transmission.
3
4
## Base Protocol Class
5
6
### BlockTransferMessage
7
8
```java { .api }
9
public abstract class BlockTransferMessage implements Encodable {
10
protected abstract Type type();
11
12
public ByteBuffer toByteBuffer();
13
14
public static class Decoder {
15
public static BlockTransferMessage fromByteBuffer(ByteBuffer msg);
16
}
17
18
public static enum Type {
19
OPEN_BLOCKS(0),
20
UPLOAD_BLOCK(1),
21
REGISTER_EXECUTOR(2),
22
STREAM_HANDLE(3),
23
REGISTER_DRIVER(4);
24
25
public byte id();
26
}
27
}
28
```
29
30
Abstract base class for all network protocol messages. Provides serialization and deserialization capabilities.
31
32
**Key Methods:**
33
34
#### toByteBuffer
35
36
Serializes the message to a byte buffer for network transmission.
37
38
**Returns:**
39
- `ByteBuffer`: Serialized message with type byte prefix
40
41
#### fromByteBuffer (static)
42
43
Deserializes a message from a byte buffer received from the network.
44
45
**Parameters:**
46
- `msg` (ByteBuffer): Serialized message data
47
48
**Returns:**
49
- `BlockTransferMessage`: Deserialized message instance
50
51
**Throws:**
52
- `IllegalArgumentException`: If message type is unknown
53
54
## Configuration Messages
55
56
### ExecutorShuffleInfo
57
58
```java { .api }
59
public class ExecutorShuffleInfo implements Encodable {
60
public final String[] localDirs;
61
public final int subDirsPerLocalDir;
62
public final String shuffleManager;
63
64
public ExecutorShuffleInfo(
65
String[] localDirs,
66
int subDirsPerLocalDir,
67
String shuffleManager
68
);
69
70
public boolean equals(Object other);
71
public int hashCode();
72
public String toString();
73
74
public int encodedLength();
75
public void encode(ByteBuf buf);
76
public static ExecutorShuffleInfo decode(ByteBuf buf);
77
}
78
```
79
80
Configuration data describing where an executor stores its shuffle files and how they are organized.
81
82
**Fields:**
83
- `localDirs` (String[]): Base local directories where shuffle files are stored
84
- `subDirsPerLocalDir` (int): Number of subdirectories created within each local directory
85
- `shuffleManager` (String): Fully qualified class name of the shuffle manager (e.g., "org.apache.spark.shuffle.sort.SortShuffleManager")
86
87
## Registration Messages
88
89
### RegisterExecutor
90
91
```java { .api }
92
public class RegisterExecutor extends BlockTransferMessage {
93
public final String appId;
94
public final String execId;
95
public final ExecutorShuffleInfo executorInfo;
96
97
public RegisterExecutor(
98
String appId,
99
String execId,
100
ExecutorShuffleInfo executorInfo
101
);
102
103
protected Type type(); // Returns REGISTER_EXECUTOR
104
105
public boolean equals(Object other);
106
public int hashCode();
107
public String toString();
108
109
public int encodedLength();
110
public void encode(ByteBuf buf);
111
public static RegisterExecutor decode(ByteBuf buf);
112
}
113
```
114
115
Initial registration message sent from executor to shuffle server. Contains information needed to locate the executor's shuffle files.
116
117
**Fields:**
118
- `appId` (String): Spark application identifier
119
- `execId` (String): Executor identifier within the application
120
- `executorInfo` (ExecutorShuffleInfo): Configuration describing shuffle file locations
121
122
### RegisterDriver (Mesos)
123
124
```java { .api }
125
public class RegisterDriver extends BlockTransferMessage {
126
public final String appId;
127
128
public RegisterDriver(String appId);
129
130
protected Type type(); // Returns REGISTER_DRIVER
131
132
public boolean equals(Object other);
133
public int hashCode();
134
public String toString();
135
136
public int encodedLength();
137
public void encode(ByteBuf buf);
138
public static RegisterDriver decode(ByteBuf buf);
139
}
140
```
141
142
Mesos-specific message for registering the Spark driver with the shuffle service for proper cleanup.
143
144
**Fields:**
145
- `appId` (String): Spark application identifier
146
147
## Block Access Messages
148
149
### OpenBlocks
150
151
```java { .api }
152
public class OpenBlocks extends BlockTransferMessage {
153
public final String appId;
154
public final String execId;
155
public final String[] blockIds;
156
157
public OpenBlocks(String appId, String execId, String[] blockIds);
158
159
protected Type type(); // Returns OPEN_BLOCKS
160
161
public boolean equals(Object other);
162
public int hashCode();
163
public String toString();
164
165
public int encodedLength();
166
public void encode(ByteBuf buf);
167
public static OpenBlocks decode(ByteBuf buf);
168
}
169
```
170
171
Request to open and read a set of shuffle blocks. The server responds with a `StreamHandle` containing stream information.
172
173
**Fields:**
174
- `appId` (String): Application that owns the blocks
175
- `execId` (String): Executor that wrote the blocks
176
- `blockIds` (String[]): Array of block identifiers to read
177
178
### StreamHandle
179
180
```java { .api }
181
public class StreamHandle extends BlockTransferMessage {
182
public final long streamId;
183
public final int numChunks;
184
185
public StreamHandle(long streamId, int numChunks);
186
187
protected Type type(); // Returns STREAM_HANDLE
188
189
public boolean equals(Object other);
190
public int hashCode();
191
public String toString();
192
193
public int encodedLength();
194
public void encode(ByteBuf buf);
195
public static StreamHandle decode(ByteBuf buf);
196
}
197
```
198
199
Response to `OpenBlocks` request containing information about the opened stream for reading block data.
200
201
**Fields:**
202
- `streamId` (long): Unique identifier for the data stream
203
- `numChunks` (int): Number of chunks (blocks) available in the stream
204
205
### UploadBlock
206
207
```java { .api }
208
public class UploadBlock extends BlockTransferMessage {
209
public final String appId;
210
public final String execId;
211
public final String blockId;
212
public final byte[] metadata;
213
public final byte[] blockData;
214
215
public UploadBlock(
216
String appId,
217
String execId,
218
String blockId,
219
byte[] metadata,
220
byte[] blockData
221
);
222
223
protected Type type(); // Returns UPLOAD_BLOCK
224
225
public boolean equals(Object other);
226
public int hashCode();
227
public String toString();
228
229
public int encodedLength();
230
public void encode(ByteBuf buf);
231
public static UploadBlock decode(ByteBuf buf);
232
}
233
```
234
235
Request to upload a block with associated metadata. Used by `NettyBlockTransferService` but not typically by external shuffle service.
236
237
**Fields:**
238
- `appId` (String): Application identifier
239
- `execId` (String): Executor identifier
240
- `blockId` (String): Block identifier
241
- `metadata` (byte[]): Serialized block metadata (typically StorageLevel)
242
- `blockData` (byte[]): Raw block data bytes
243
244
## Usage Examples
245
246
### Creating Registration Message
247
248
```java
249
// Create executor shuffle info
250
ExecutorShuffleInfo info = new ExecutorShuffleInfo(
251
new String[]{"/tmp/spark-shuffle"},
252
64,
253
"org.apache.spark.shuffle.sort.SortShuffleManager"
254
);
255
256
// Create registration message
257
RegisterExecutor registerMsg = new RegisterExecutor("app-123", "executor-1", info);
258
259
// Serialize for network transmission
260
ByteBuffer serialized = registerMsg.toByteBuffer();
261
```
262
263
### Creating Block Request
264
265
```java
266
// Request multiple blocks
267
String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1", "shuffle_1_0_2"};
268
OpenBlocks openMsg = new OpenBlocks("app-123", "executor-1", blockIds);
269
270
// Serialize message
271
ByteBuffer requestData = openMsg.toByteBuffer();
272
```
273
274
### Deserializing Messages
275
276
```java
277
// Received message from network
278
ByteBuffer receivedData = ...; // from network
279
280
// Deserialize
281
BlockTransferMessage message = BlockTransferMessage.Decoder.fromByteBuffer(receivedData);
282
283
// Handle based on type
284
if (message instanceof OpenBlocks) {
285
OpenBlocks openBlocks = (OpenBlocks) message;
286
System.out.println("Request for blocks: " + Arrays.toString(openBlocks.blockIds));
287
288
} else if (message instanceof RegisterExecutor) {
289
RegisterExecutor register = (RegisterExecutor) message;
290
System.out.println("Executor registration: " + register.execId);
291
292
} else if (message instanceof StreamHandle) {
293
StreamHandle handle = (StreamHandle) message;
294
System.out.println("Stream " + handle.streamId + " with " + handle.numChunks + " chunks");
295
}
296
```
297
298
### Mesos Driver Registration
299
300
```java
301
// Mesos-specific driver registration
302
RegisterDriver driverMsg = new RegisterDriver("spark-app-123");
303
ByteBuffer driverRegistration = driverMsg.toByteBuffer();
304
305
// Send to shuffle service for cleanup coordination
306
```
307
308
## Message Flow
309
310
**Normal Block Fetch Flow:**
311
1. Client sends `RegisterExecutor` (one-time setup)
312
2. Client sends `OpenBlocks` with block IDs to fetch
313
3. Server responds with `StreamHandle` containing stream info
314
4. Client reads block data from the stream using the handle
315
316
**Mesos Cleanup Flow:**
317
1. Driver sends `RegisterDriver` for cleanup coordination
318
2. Shuffle service tracks driver registration
319
3. On application completion, service can properly clean up shuffle data
320
321
**Block Upload Flow (for NettyBlockTransferService):**
322
1. Client sends `UploadBlock` with block data and metadata
323
2. Server stores the block and responds with acknowledgment
324
325
## Error Handling
326
327
Protocol-level errors are typically handled through:
328
- **Message validation**: Invalid message formats throw `IllegalArgumentException`
329
- **Serialization errors**: Encoding/decoding failures indicate protocol version mismatches
330
- **Network errors**: Transport layer handles connection failures and retries
331
332
```java
333
try {
334
BlockTransferMessage msg = BlockTransferMessage.Decoder.fromByteBuffer(data);
335
// Process message...
336
} catch (IllegalArgumentException e) {
337
System.err.println("Unknown message type or corrupted data: " + e.getMessage());
338
// Handle protocol error
339
}
340
```