0
# Message Protocol
1
2
The message protocol API defines the comprehensive communication system for Apache Spark's networking layer. It provides a structured approach to different types of network communication including RPC calls, streaming operations, and chunk fetching through a type-safe message system built on Netty.
3
4
## Capabilities
5
6
### Message Interface
7
8
Base interface for all network messages in the Spark transport protocol.
9
10
```java { .api }
11
public interface Message extends Encodable {
12
/**
13
* Get the type of this message
14
* @return Type enum indicating the specific message type
15
*/
16
Type type();
17
18
/**
19
* Get the body data of this message
20
* @return ManagedBuffer containing the message payload
21
*/
22
ManagedBuffer body();
23
24
/**
25
* Check if the message body is included in the frame
26
* @return boolean indicating if body is in-frame (true) or separate (false)
27
*/
28
boolean isBodyInFrame();
29
30
/**
31
* Enumeration of all supported message types
32
*/
33
enum Type {
34
ChunkFetchRequest(0), ChunkFetchSuccess(1), ChunkFetchFailure(2),
35
RpcRequest(3), RpcResponse(4), RpcFailure(5),
36
StreamRequest(6), StreamResponse(7), StreamFailure(8),
37
OneWayMessage(9), UploadStream(10),
38
MergedBlockMetaRequest(11), MergedBlockMetaSuccess(12),
39
User(-1);
40
41
private final byte id;
42
Type(int id) { this.id = (byte) id; }
43
public byte id() { return id; }
44
}
45
}
46
```
47
48
### Encodable Interface
49
50
Interface for objects that can be encoded to ByteBuf for network transmission.
51
52
```java { .api }
53
public interface Encodable {
54
/**
55
* Get the encoded length of this object in bytes
56
* @return int representing the number of bytes needed for encoding
57
*/
58
int encodedLength();
59
60
/**
61
* Encode this object into the provided ByteBuf
62
* @param buf - ByteBuf to write the encoded data to
63
*/
64
void encode(ByteBuf buf);
65
}
66
```
67
68
### Message Categories
69
70
Messages are categorized into request and response types for structured communication patterns.
71
72
```java { .api }
73
/**
74
* Marker interface for request messages
75
*/
76
public interface RequestMessage extends Message {
77
}
78
79
/**
80
* Marker interface for response messages
81
*/
82
public interface ResponseMessage extends Message {
83
}
84
```
85
86
## RPC Messages
87
88
### RpcRequest
89
90
Message for sending remote procedure calls to the server.
91
92
```java { .api }
93
public final class RpcRequest extends AbstractMessage implements RequestMessage {
94
/**
95
* Create an RPC request message
96
* @param requestId - Unique identifier for this RPC request
97
* @param message - ManagedBuffer containing the RPC data
98
*/
99
public RpcRequest(long requestId, ManagedBuffer message);
100
101
/**
102
* Get the request identifier
103
* @return long representing the unique request ID
104
*/
105
public long requestId();
106
107
@Override
108
public Type type();
109
110
@Override
111
public ManagedBuffer body();
112
113
@Override
114
public boolean isBodyInFrame();
115
}
116
```
117
118
### RpcResponse
119
120
Message for returning successful RPC call results.
121
122
```java { .api }
123
public final class RpcResponse extends AbstractResponseMessage {
124
/**
125
* Create an RPC response message
126
* @param requestId - ID of the original RPC request
127
* @param message - ManagedBuffer containing the response data
128
*/
129
public RpcResponse(long requestId, ManagedBuffer message);
130
131
/**
132
* Get the request identifier this response corresponds to
133
* @return long representing the original request ID
134
*/
135
public long requestId();
136
137
@Override
138
public Type type();
139
140
@Override
141
public ManagedBuffer body();
142
}
143
```
144
145
### RpcFailure
146
147
Message for returning RPC call failures and error information.
148
149
```java { .api }
150
public final class RpcFailure extends AbstractResponseMessage {
151
/**
152
* Create an RPC failure message
153
* @param requestId - ID of the failed RPC request
154
* @param errorString - String describing the error that occurred
155
*/
156
public RpcFailure(long requestId, String errorString);
157
158
/**
159
* Get the request identifier this failure corresponds to
160
* @return long representing the original request ID
161
*/
162
public long requestId();
163
164
/**
165
* Get the error message describing the failure
166
* @return String containing error details
167
*/
168
public String errorString();
169
170
@Override
171
public Type type();
172
}
173
```
174
175
## Chunk Transfer Messages
176
177
### ChunkFetchRequest
178
179
Message for requesting specific chunks of data from a stream.
180
181
```java { .api }
182
public final class ChunkFetchRequest extends AbstractMessage implements RequestMessage {
183
/**
184
* Create a chunk fetch request
185
* @param streamChunkId - StreamChunkId identifying the chunk to fetch
186
*/
187
public ChunkFetchRequest(StreamChunkId streamChunkId);
188
189
/**
190
* Get the stream chunk identifier
191
* @return StreamChunkId specifying which chunk to fetch
192
*/
193
public StreamChunkId streamChunkId();
194
195
@Override
196
public Type type();
197
}
198
```
199
200
### ChunkFetchSuccess
201
202
Message for returning successfully fetched chunk data.
203
204
```java { .api }
205
public final class ChunkFetchSuccess extends AbstractResponseMessage {
206
/**
207
* Create a successful chunk fetch response
208
* @param streamChunkId - StreamChunkId of the fetched chunk
209
* @param buffer - ManagedBuffer containing the chunk data
210
*/
211
public ChunkFetchSuccess(StreamChunkId streamChunkId, ManagedBuffer buffer);
212
213
/**
214
* Get the stream chunk identifier
215
* @return StreamChunkId of the returned chunk
216
*/
217
public StreamChunkId streamChunkId();
218
219
@Override
220
public Type type();
221
222
@Override
223
public ManagedBuffer body();
224
}
225
```
226
227
### ChunkFetchFailure
228
229
Message for reporting chunk fetch failures.
230
231
```java { .api }
232
public final class ChunkFetchFailure extends AbstractResponseMessage {
233
/**
234
* Create a chunk fetch failure response
235
* @param streamChunkId - StreamChunkId of the failed chunk
236
* @param errorString - String describing the fetch error
237
*/
238
public ChunkFetchFailure(StreamChunkId streamChunkId, String errorString);
239
240
/**
241
* Get the stream chunk identifier
242
* @return StreamChunkId of the failed chunk
243
*/
244
public StreamChunkId streamChunkId();
245
246
/**
247
* Get the error message describing the failure
248
* @return String containing error details
249
*/
250
public String errorString();
251
252
@Override
253
public Type type();
254
}
255
```
256
257
### StreamChunkId
258
259
Identifier for specific chunks within streams.
260
261
```java { .api }
262
public final class StreamChunkId implements Encodable {
263
/**
264
* Create a stream chunk identifier
265
* @param streamId - Identifier of the stream
266
* @param chunkIndex - Index of the chunk within the stream
267
*/
268
public StreamChunkId(long streamId, int chunkIndex);
269
270
/**
271
* Get the stream identifier
272
* @return long representing the stream ID
273
*/
274
public long streamId();
275
276
/**
277
* Get the chunk index
278
* @return int representing the chunk index within the stream
279
*/
280
public int chunkIndex();
281
282
@Override
283
public int encodedLength();
284
285
@Override
286
public void encode(ByteBuf buf);
287
}
288
```
289
290
## Streaming Messages
291
292
### StreamRequest
293
294
Message for requesting to open a named stream.
295
296
```java { .api }
297
public final class StreamRequest extends AbstractMessage implements RequestMessage {
298
/**
299
* Create a stream request
300
* @param streamId - String identifier of the stream to open
301
*/
302
public StreamRequest(String streamId);
303
304
/**
305
* Get the stream identifier
306
* @return String representing the stream ID
307
*/
308
public String streamId();
309
310
@Override
311
public Type type();
312
}
313
```
314
315
### StreamResponse
316
317
Message for returning streaming data.
318
319
```java { .api }
320
public final class StreamResponse extends AbstractResponseMessage {
321
/**
322
* Create a stream response
323
* @param streamId - String identifier of the stream
324
* @param byteCount - Number of bytes in the stream
325
* @param body - ManagedBuffer containing the stream data
326
*/
327
public StreamResponse(String streamId, long byteCount, ManagedBuffer body);
328
329
/**
330
* Get the stream identifier
331
* @return String representing the stream ID
332
*/
333
public String streamId();
334
335
/**
336
* Get the number of bytes in the stream
337
* @return long representing the byte count
338
*/
339
public long byteCount();
340
341
@Override
342
public Type type();
343
344
@Override
345
public ManagedBuffer body();
346
}
347
```
348
349
### StreamFailure
350
351
Message for reporting stream operation failures.
352
353
```java { .api }
354
public final class StreamFailure extends AbstractResponseMessage {
355
/**
356
* Create a stream failure response
357
* @param streamId - String identifier of the failed stream
358
* @param errorString - String describing the stream error
359
*/
360
public StreamFailure(String streamId, String errorString);
361
362
/**
363
* Get the stream identifier
364
* @return String representing the failed stream ID
365
*/
366
public String streamId();
367
368
/**
369
* Get the error message
370
* @return String containing error details
371
*/
372
public String errorString();
373
374
@Override
375
public Type type();
376
}
377
```
378
379
## Upload and One-Way Messages
380
381
### UploadStream
382
383
Message for uploading stream data to the server.
384
385
```java { .api }
386
public final class UploadStream extends AbstractMessage implements RequestMessage {
387
/**
388
* Create an upload stream message
389
* @param requestId - Unique identifier for this upload request
390
* @param meta - ManagedBuffer containing metadata about the upload
391
* @param body - ManagedBuffer containing the data to upload
392
*/
393
public UploadStream(long requestId, ManagedBuffer meta, ManagedBuffer body);
394
395
/**
396
* Get the request identifier
397
* @return long representing the upload request ID
398
*/
399
public long requestId();
400
401
/**
402
* Get the metadata buffer
403
* @return ManagedBuffer containing upload metadata
404
*/
405
public ManagedBuffer meta();
406
407
@Override
408
public Type type();
409
410
@Override
411
public ManagedBuffer body();
412
}
413
```
414
415
### OneWayMessage
416
417
Message for one-way communication where no response is expected.
418
419
```java { .api }
420
public final class OneWayMessage extends AbstractMessage implements RequestMessage {
421
/**
422
* Create a one-way message
423
* @param body - ManagedBuffer containing the message data
424
*/
425
public OneWayMessage(ManagedBuffer body);
426
427
@Override
428
public Type type();
429
430
@Override
431
public ManagedBuffer body();
432
433
@Override
434
public boolean isBodyInFrame();
435
}
436
```
437
438
## Merged Block Messages
439
440
### MergedBlockMetaRequest
441
442
Message for requesting metadata about merged blocks in shuffle operations.
443
444
```java { .api }
445
public final class MergedBlockMetaRequest extends AbstractMessage implements RequestMessage {
446
/**
447
* Create a merged block metadata request
448
* @param requestId - Unique identifier for this request
449
* @param appId - Application identifier
450
* @param shuffleId - Shuffle operation identifier
451
* @param shuffleMergeId - Shuffle merge identifier
452
* @param reduceId - Reducer task identifier
453
*/
454
public MergedBlockMetaRequest(long requestId, String appId, int shuffleId, int shuffleMergeId, int reduceId);
455
456
/**
457
* Get the request identifier
458
* @return long representing the request ID
459
*/
460
public long requestId();
461
462
/**
463
* Get the application identifier
464
* @return String representing the app ID
465
*/
466
public String appId();
467
468
/**
469
* Get the shuffle identifier
470
* @return int representing the shuffle ID
471
*/
472
public int shuffleId();
473
474
/**
475
* Get the shuffle merge identifier
476
* @return int representing the shuffle merge ID
477
*/
478
public int shuffleMergeId();
479
480
/**
481
* Get the reduce task identifier
482
* @return int representing the reduce ID
483
*/
484
public int reduceId();
485
486
@Override
487
public Type type();
488
}
489
```
490
491
### MergedBlockMetaSuccess
492
493
Message for returning successful merged block metadata.
494
495
```java { .api }
496
public final class MergedBlockMetaSuccess extends AbstractResponseMessage {
497
/**
498
* Create a successful merged block metadata response
499
* @param requestId - ID of the original request
500
* @param numChunks - Number of chunks in the merged block
501
* @param buffer - ManagedBuffer containing the metadata
502
*/
503
public MergedBlockMetaSuccess(long requestId, int numChunks, ManagedBuffer buffer);
504
505
/**
506
* Get the request identifier
507
* @return long representing the original request ID
508
*/
509
public long requestId();
510
511
/**
512
* Get the number of chunks
513
* @return int representing the chunk count
514
*/
515
public int numChunks();
516
517
@Override
518
public Type type();
519
520
@Override
521
public ManagedBuffer body();
522
}
523
```
524
525
## Message Encoding and Decoding
526
527
### Encoders Utility Class
528
529
Utility class for encoding and decoding protocol messages.
530
531
```java { .api }
532
public class Encoders {
533
/**
534
* Decode a message from ByteBuf
535
* @param msgType - Message type byte
536
* @param in - ByteBuf containing the encoded message
537
* @return Message instance decoded from the buffer
538
*/
539
public static Message decode(Message.Type msgType, ByteBuf in);
540
541
/**
542
* Encode a message to ByteBuf
543
* @param msg - Message to encode
544
* @param out - ByteBuf to write the encoded message to
545
*/
546
public static void encode(Message msg, ByteBuf out);
547
}
548
```
549
550
### MessageEncoder
551
552
Netty encoder for converting Message objects to ByteBuf for network transmission.
553
554
```java { .api }
555
public final class MessageEncoder extends MessageToByteEncoder<Message> {
556
public static final MessageEncoder INSTANCE = new MessageEncoder();
557
558
@Override
559
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception;
560
}
561
```
562
563
### MessageDecoder
564
565
Netty decoder for converting ByteBuf data into Message objects.
566
567
```java { .api }
568
public final class MessageDecoder extends LengthFieldBasedFrameDecoder {
569
public static final MessageDecoder INSTANCE = new MessageDecoder();
570
571
@Override
572
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception;
573
}
574
```
575
576
## Usage Examples
577
578
### Creating and Sending RPC Messages
579
580
```java
581
import org.apache.spark.network.protocol.*;
582
import org.apache.spark.network.buffer.NioManagedBuffer;
583
584
// Create RPC request
585
String rpcData = "{ \"method\": \"process\", \"params\": [1, 2, 3] }";
586
ManagedBuffer requestBuffer = new NioManagedBuffer(ByteBuffer.wrap(rpcData.getBytes()));
587
long requestId = System.currentTimeMillis();
588
RpcRequest request = new RpcRequest(requestId, requestBuffer);
589
590
System.out.println("Created RPC request with ID: " + request.requestId());
591
System.out.println("Message type: " + request.type());
592
System.out.println("Body in frame: " + request.isBodyInFrame());
593
594
// Create RPC response
595
String responseData = "{ \"result\": 6, \"status\": \"success\" }";
596
ManagedBuffer responseBuffer = new NioManagedBuffer(ByteBuffer.wrap(responseData.getBytes()));
597
RpcResponse response = new RpcResponse(requestId, responseBuffer);
598
599
// Create RPC failure
600
RpcFailure failure = new RpcFailure(requestId, "Method not found: process");
601
System.out.println("Failure message: " + failure.errorString());
602
```
603
604
### Working with Chunk Fetch Messages
605
606
```java
607
import org.apache.spark.network.protocol.*;
608
609
// Create stream chunk identifier
610
long streamId = 12345L;
611
int chunkIndex = 0;
612
StreamChunkId chunkId = new StreamChunkId(streamId, chunkIndex);
613
614
System.out.println("Stream ID: " + chunkId.streamId());
615
System.out.println("Chunk index: " + chunkId.chunkIndex());
616
617
// Create chunk fetch request
618
ChunkFetchRequest fetchRequest = new ChunkFetchRequest(chunkId);
619
System.out.println("Fetch request type: " + fetchRequest.type());
620
621
// Create successful chunk response
622
byte[] chunkData = "This is chunk data".getBytes();
623
ManagedBuffer chunkBuffer = new NioManagedBuffer(ByteBuffer.wrap(chunkData));
624
ChunkFetchSuccess fetchSuccess = new ChunkFetchSuccess(chunkId, chunkBuffer);
625
626
System.out.println("Success response type: " + fetchSuccess.type());
627
System.out.println("Chunk size: " + fetchSuccess.body().size());
628
629
// Create chunk fetch failure
630
ChunkFetchFailure fetchFailure = new ChunkFetchFailure(chunkId, "Chunk not found");
631
System.out.println("Failure error: " + fetchFailure.errorString());
632
```
633
634
### Streaming Message Examples
635
636
```java
637
// Create stream request
638
String streamId = "data-stream-001";
639
StreamRequest streamRequest = new StreamRequest(streamId);
640
System.out.println("Requesting stream: " + streamRequest.streamId());
641
642
// Create stream response
643
String streamData = "Line 1\nLine 2\nLine 3\n";
644
ManagedBuffer streamBuffer = new NioManagedBuffer(ByteBuffer.wrap(streamData.getBytes()));
645
StreamResponse streamResponse = new StreamResponse(streamId, streamData.length(), streamBuffer);
646
647
System.out.println("Stream response for: " + streamResponse.streamId());
648
System.out.println("Byte count: " + streamResponse.byteCount());
649
650
// Create stream failure
651
StreamFailure streamFailure = new StreamFailure(streamId, "Stream corrupted");
652
System.out.println("Stream failure: " + streamFailure.errorString());
653
```
654
655
### Upload and One-Way Messages
656
657
```java
658
// Create upload stream message
659
String metadata = "{ \"filename\": \"data.txt\", \"size\": 1024 }";
660
ManagedBuffer metaBuffer = new NioManagedBuffer(ByteBuffer.wrap(metadata.getBytes()));
661
662
String uploadData = "File content to upload";
663
ManagedBuffer dataBuffer = new NioManagedBuffer(ByteBuffer.wrap(uploadData.getBytes()));
664
665
long uploadRequestId = System.currentTimeMillis();
666
UploadStream uploadMessage = new UploadStream(uploadRequestId, metaBuffer, dataBuffer);
667
668
System.out.println("Upload request ID: " + uploadMessage.requestId());
669
System.out.println("Upload metadata size: " + uploadMessage.meta().size());
670
System.out.println("Upload data size: " + uploadMessage.body().size());
671
672
// Create one-way message
673
String oneWayData = "Fire and forget message";
674
ManagedBuffer oneWayBuffer = new NioManagedBuffer(ByteBuffer.wrap(oneWayData.getBytes()));
675
OneWayMessage oneWayMessage = new OneWayMessage(oneWayBuffer);
676
677
System.out.println("One-way message type: " + oneWayMessage.type());
678
System.out.println("One-way body in frame: " + oneWayMessage.isBodyInFrame());
679
```
680
681
### Message Encoding and Decoding
682
683
```java
684
import io.netty.buffer.ByteBuf;
685
import io.netty.buffer.Unpooled;
686
687
// Encode a message
688
RpcRequest request = new RpcRequest(123L, new NioManagedBuffer(ByteBuffer.wrap("test".getBytes())));
689
ByteBuf encodedBuffer = Unpooled.buffer();
690
691
try {
692
Encoders.encode(request, encodedBuffer);
693
System.out.println("Encoded message size: " + encodedBuffer.readableBytes());
694
695
// Decode the message back
696
encodedBuffer.resetReaderIndex();
697
byte msgTypeByte = encodedBuffer.readByte();
698
Message.Type msgType = Message.Type.values()[msgTypeByte];
699
700
Message decodedMessage = Encoders.decode(msgType, encodedBuffer);
701
System.out.println("Decoded message type: " + decodedMessage.type());
702
703
if (decodedMessage instanceof RpcRequest) {
704
RpcRequest decodedRequest = (RpcRequest) decodedMessage;
705
System.out.println("Decoded request ID: " + decodedRequest.requestId());
706
}
707
708
} finally {
709
encodedBuffer.release();
710
}
711
```
712
713
### Custom Message Handling
714
715
```java
716
// Process different message types
717
public void handleMessage(Message message) {
718
switch (message.type()) {
719
case RpcRequest:
720
RpcRequest rpcReq = (RpcRequest) message;
721
System.out.println("Handling RPC request: " + rpcReq.requestId());
722
break;
723
724
case ChunkFetchRequest:
725
ChunkFetchRequest chunkReq = (ChunkFetchRequest) message;
726
System.out.println("Handling chunk fetch for stream: " + chunkReq.streamChunkId().streamId());
727
break;
728
729
case StreamRequest:
730
StreamRequest streamReq = (StreamRequest) message;
731
System.out.println("Handling stream request: " + streamReq.streamId());
732
break;
733
734
case OneWayMessage:
735
OneWayMessage oneWay = (OneWayMessage) message;
736
System.out.println("Handling one-way message with body size: " + oneWay.body().size());
737
break;
738
739
default:
740
System.out.println("Unknown message type: " + message.type());
741
}
742
}
743
```
744
745
## Abstract Base Classes
746
747
### AbstractMessage
748
749
Base implementation for common message functionality.
750
751
```java { .api }
752
public abstract class AbstractMessage implements Message {
753
@Override
754
public boolean isBodyInFrame();
755
756
@Override
757
public int encodedLength();
758
759
@Override
760
public void encode(ByteBuf buf);
761
}
762
```
763
764
### AbstractResponseMessage
765
766
Base implementation for response messages.
767
768
```java { .api }
769
public abstract class AbstractResponseMessage extends AbstractMessage implements ResponseMessage {
770
@Override
771
public ManagedBuffer body();
772
773
@Override
774
public boolean isBodyInFrame();
775
}
776
```