0
# Message Protocol
1
2
Type-safe message protocol for network communication in Apache Spark. The protocol defines specific message types for RPC requests/responses, chunk fetching, and streaming operations, ensuring reliable and structured communication between clients and servers.
3
4
## Capabilities
5
6
### Core Message Interface
7
8
Base interface for all network messages in the Spark transport protocol.
9
10
```java { .api }
11
/**
12
* Message represents a unit of communication in the Spark transport protocol.
13
* All network messages implement this interface to provide type safety and structure.
14
*/
15
public interface Message {
16
/**
17
* Gets the message type identifier.
18
*
19
* @return The Type enum value for this message
20
*/
21
Type type();
22
23
/**
24
* Gets the message body as a ManagedBuffer.
25
*
26
* @return ManagedBuffer containing the message payload
27
*/
28
ManagedBuffer body();
29
30
/**
31
* Indicates whether the message body is included in the message frame.
32
*
33
* @return true if body is in frame, false if sent separately
34
*/
35
boolean isBodyInFrame();
36
37
/**
38
* Enumeration of all message types in the protocol.
39
*/
40
enum Type {
41
ChunkFetchRequest(0),
42
ChunkFetchSuccess(1),
43
ChunkFetchFailure(2),
44
RpcRequest(3),
45
RpcResponse(4),
46
RpcFailure(5),
47
StreamRequest(6),
48
StreamResponse(7),
49
StreamFailure(8),
50
OneWayMessage(9),
51
User(-1);
52
53
private final byte id;
54
55
Type(int id) {
56
this.id = (byte) id;
57
}
58
59
public byte id() { return id; }
60
}
61
}
62
```
63
64
### Message Category Interfaces
65
66
Marker interfaces for categorizing messages as requests or responses.
67
68
```java { .api }
69
/**
70
* Marker interface for request messages.
71
* All messages that initiate communication implement this interface.
72
*/
73
public interface RequestMessage extends Message {}
74
75
/**
76
* Marker interface for response messages.
77
* All messages that respond to requests implement this interface.
78
*/
79
public interface ResponseMessage extends Message {}
80
```
81
82
### Encoding Interface
83
84
Interface for objects that can be encoded to ByteBuf for network transmission.
85
86
```java { .api }
87
/**
88
* Interface for objects that can be encoded to ByteBuf.
89
* Used by the message protocol for efficient serialization.
90
*/
91
public interface Encodable {
92
/**
93
* Gets the encoded length of this object in bytes.
94
*
95
* @return The number of bytes needed to encode this object
96
*/
97
int encodedLength();
98
99
/**
100
* Encodes this object to the provided ByteBuf.
101
*
102
* @param buf The ByteBuf to encode to
103
*/
104
void encode(ByteBuf buf);
105
}
106
```
107
108
### RPC Messages
109
110
Messages for Remote Procedure Call operations.
111
112
```java { .api }
113
/**
114
* RPC request message containing a request ID and payload.
115
*/
116
public class RpcRequest extends AbstractMessage implements RequestMessage {
117
/** Unique identifier for this RPC request */
118
public final long requestId;
119
120
/**
121
* Creates an RPC request message.
122
*
123
* @param requestId Unique identifier for the request
124
* @param message The request payload as ManagedBuffer
125
*/
126
public RpcRequest(long requestId, ManagedBuffer message);
127
128
@Override
129
public Type type() { return Type.RpcRequest; }
130
131
@Override
132
public int encodedLength() { return 8; } // requestId is 8 bytes
133
134
@Override
135
public void encode(ByteBuf buf) {
136
buf.writeLong(requestId);
137
}
138
139
public static RpcRequest decode(ByteBuf buf) {
140
long requestId = buf.readLong();
141
return new RpcRequest(requestId, null); // Body handled separately
142
}
143
}
144
145
/**
146
* RPC response message containing the response to a previous request.
147
*/
148
public class RpcResponse extends AbstractResponseMessage {
149
/** Request ID this response corresponds to */
150
public final long requestId;
151
152
/**
153
* Creates an RPC response message.
154
*
155
* @param requestId The request ID this response corresponds to
156
* @param message The response payload as ManagedBuffer
157
*/
158
public RpcResponse(long requestId, ManagedBuffer message);
159
160
@Override
161
public Type type() { return Type.RpcResponse; }
162
}
163
164
/**
165
* RPC failure message indicating an RPC request failed.
166
*/
167
public class RpcFailure extends AbstractResponseMessage {
168
/** Request ID this failure corresponds to */
169
public final long requestId;
170
171
/** Error message describing the failure */
172
public final String errorString;
173
174
/**
175
* Creates an RPC failure message.
176
*
177
* @param requestId The request ID that failed
178
* @param errorString Description of the error
179
*/
180
public RpcFailure(long requestId, String errorString);
181
182
@Override
183
public Type type() { return Type.RpcFailure; }
184
}
185
```
186
187
### Chunk Fetch Messages
188
189
Messages for fetching individual chunks of data from streams.
190
191
```java { .api }
192
/**
193
* Request to fetch a specific chunk from a stream.
194
*/
195
public class ChunkFetchRequest extends AbstractMessage implements RequestMessage {
196
/** Identifier for the stream and chunk */
197
public final StreamChunkId streamChunkId;
198
199
/**
200
* Creates a chunk fetch request.
201
*
202
* @param streamChunkId Identifies the stream and chunk to fetch
203
*/
204
public ChunkFetchRequest(StreamChunkId streamChunkId);
205
206
@Override
207
public Type type() { return Type.ChunkFetchRequest; }
208
209
@Override
210
public int encodedLength() { return 12; } // streamId (8) + chunkIndex (4)
211
212
@Override
213
public void encode(ByteBuf buf) {
214
buf.writeLong(streamChunkId.streamId);
215
buf.writeInt(streamChunkId.chunkIndex);
216
}
217
218
public static ChunkFetchRequest decode(ByteBuf buf) {
219
long streamId = buf.readLong();
220
int chunkIndex = buf.readInt();
221
return new ChunkFetchRequest(new StreamChunkId(streamId, chunkIndex));
222
}
223
}
224
225
/**
226
* Successful response to a chunk fetch request containing the chunk data.
227
*/
228
public class ChunkFetchSuccess extends AbstractResponseMessage {
229
/** Identifier for the stream and chunk */
230
public final StreamChunkId streamChunkId;
231
232
/**
233
* Creates a successful chunk fetch response.
234
*
235
* @param streamChunkId Identifies the stream and chunk
236
* @param buffer The chunk data
237
*/
238
public ChunkFetchSuccess(StreamChunkId streamChunkId, ManagedBuffer buffer);
239
240
@Override
241
public Type type() { return Type.ChunkFetchSuccess; }
242
}
243
244
/**
245
* Failure response to a chunk fetch request indicating the fetch failed.
246
*/
247
public class ChunkFetchFailure extends AbstractResponseMessage {
248
/** Identifier for the stream and chunk that failed */
249
public final StreamChunkId streamChunkId;
250
251
/** Error message describing the failure */
252
public final String errorString;
253
254
/**
255
* Creates a chunk fetch failure response.
256
*
257
* @param streamChunkId Identifies the stream and chunk that failed
258
* @param errorString Description of the error
259
*/
260
public ChunkFetchFailure(StreamChunkId streamChunkId, String errorString);
261
262
@Override
263
public Type type() { return Type.ChunkFetchFailure; }
264
}
265
```
266
267
### Stream Messages
268
269
Messages for stream-based data transfer operations.
270
271
```java { .api }
272
/**
273
* Request to open a stream for data transfer.
274
*/
275
public class StreamRequest extends AbstractMessage implements RequestMessage {
276
/** String identifier for the stream to open */
277
public final String streamId;
278
279
/**
280
* Creates a stream request.
281
*
282
* @param streamId String identifier for the stream
283
*/
284
public StreamRequest(String streamId);
285
286
@Override
287
public Type type() { return Type.StreamRequest; }
288
}
289
290
/**
291
* Successful response to a stream request indicating the stream is ready.
292
*/
293
public class StreamResponse extends AbstractResponseMessage {
294
/** String identifier for the opened stream */
295
public final String streamId;
296
297
/** Total number of bytes in the stream */
298
public final long byteCount;
299
300
/**
301
* Creates a stream response.
302
*
303
* @param streamId String identifier for the stream
304
* @param byteCount Total number of bytes in the stream
305
*/
306
public StreamResponse(String streamId, long byteCount);
307
308
@Override
309
public Type type() { return Type.StreamResponse; }
310
}
311
312
/**
313
* Failure response to a stream request indicating the stream could not be opened.
314
*/
315
public class StreamFailure extends AbstractResponseMessage {
316
/** String identifier for the stream that failed */
317
public final String streamId;
318
319
/** Error message describing the failure */
320
public final String errorString;
321
322
/**
323
* Creates a stream failure response.
324
*
325
* @param streamId String identifier for the stream that failed
326
* @param errorString Description of the error
327
*/
328
public StreamFailure(String streamId, String errorString);
329
330
@Override
331
public Type type() { return Type.StreamFailure; }
332
}
333
```
334
335
### One-Way Messages
336
337
Messages that don't expect a response.
338
339
```java { .api }
340
/**
341
* One-way message that doesn't expect a response.
342
* Used for notifications, heartbeats, or fire-and-forget operations.
343
*/
344
public class OneWayMessage extends AbstractMessage implements RequestMessage {
345
/**
346
* Creates a one-way message.
347
*
348
* @param body The message payload as ManagedBuffer
349
*/
350
public OneWayMessage(ManagedBuffer body);
351
352
@Override
353
public Type type() { return Type.OneWayMessage; }
354
355
@Override
356
public boolean isBodyInFrame() { return false; }
357
}
358
```
359
360
### Stream and Chunk Identifiers
361
362
Utility classes for identifying streams and chunks.
363
364
```java { .api }
365
/**
366
* Identifier for a specific chunk within a stream.
367
* Combines stream ID and chunk index for unique chunk identification.
368
*/
369
public class StreamChunkId {
370
/** Numeric identifier for the stream */
371
public final long streamId;
372
373
/** Index of the chunk within the stream (0-based) */
374
public final int chunkIndex;
375
376
/**
377
* Creates a stream chunk identifier.
378
*
379
* @param streamId Numeric identifier for the stream
380
* @param chunkIndex Index of the chunk within the stream
381
*/
382
public StreamChunkId(long streamId, int chunkIndex);
383
384
@Override
385
public String toString() {
386
return "StreamChunkId{streamId=" + streamId + ", chunkIndex=" + chunkIndex + "}";
387
}
388
389
@Override
390
public boolean equals(Object other) {
391
if (this == other) return true;
392
if (other == null || getClass() != other.getClass()) return false;
393
394
StreamChunkId that = (StreamChunkId) other;
395
return streamId == that.streamId && chunkIndex == that.chunkIndex;
396
}
397
398
@Override
399
public int hashCode() {
400
return Objects.hash(streamId, chunkIndex);
401
}
402
}
403
```
404
405
### Message Encoding and Decoding
406
407
Utilities for message serialization and deserialization.
408
409
```java { .api }
410
/**
411
* Utility class providing encoding and decoding functions for protocol messages.
412
*/
413
public class Encoders {
414
/**
415
* Decodes a message from a ByteBuf based on message type.
416
*
417
* @param msgType The message type to decode
418
* @param in ByteBuf containing the encoded message
419
* @return Decoded Message instance
420
*/
421
public static Message decode(Message.Type msgType, ByteBuf in);
422
423
/**
424
* Encodes a message to a ByteBuf.
425
*
426
* @param msg The message to encode
427
* @param out ByteBuf to write the encoded message to
428
*/
429
public static void encode(Message msg, ByteBuf out);
430
}
431
432
/**
433
* Netty decoder for converting ByteBuf to Message objects.
434
*/
435
public class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {
436
@Override
437
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out);
438
}
439
440
/**
441
* Netty encoder for converting Message objects to ByteBuf.
442
*/
443
public class MessageEncoder extends MessageToByteEncoder<Message> {
444
@Override
445
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out);
446
}
447
```
448
449
## Usage Examples
450
451
### Creating and Sending Messages
452
453
```java
454
import org.apache.spark.network.protocol.*;
455
import org.apache.spark.network.buffer.NioManagedBuffer;
456
import java.nio.ByteBuffer;
457
458
// Create an RPC request
459
ByteBuffer requestData = ByteBuffer.wrap("Hello Server".getBytes());
460
ManagedBuffer requestBuffer = new NioManagedBuffer(requestData);
461
RpcRequest rpcRequest = new RpcRequest(12345L, requestBuffer);
462
463
// Create a chunk fetch request
464
StreamChunkId chunkId = new StreamChunkId(1001L, 5);
465
ChunkFetchRequest chunkRequest = new ChunkFetchRequest(chunkId);
466
467
// Create a stream request
468
StreamRequest streamRequest = new StreamRequest("data-stream-001");
469
470
// Create a one-way message
471
ByteBuffer notificationData = ByteBuffer.wrap("Status update".getBytes());
472
ManagedBuffer notificationBuffer = new NioManagedBuffer(notificationData);
473
OneWayMessage notification = new OneWayMessage(notificationBuffer);
474
```
475
476
### Handling Responses
477
478
```java
479
// Handle different response types
480
public void handleResponse(Message response) {
481
switch (response.type()) {
482
case RpcResponse:
483
RpcResponse rpcResp = (RpcResponse) response;
484
System.out.println("RPC response for request: " + rpcResp.requestId);
485
processRpcResponse(rpcResp.body());
486
break;
487
488
case RpcFailure:
489
RpcFailure rpcFail = (RpcFailure) response;
490
System.err.println("RPC " + rpcFail.requestId + " failed: " + rpcFail.errorString);
491
break;
492
493
case ChunkFetchSuccess:
494
ChunkFetchSuccess chunkSuccess = (ChunkFetchSuccess) response;
495
System.out.println("Received chunk: " + chunkSuccess.streamChunkId);
496
processChunkData(chunkSuccess.body());
497
break;
498
499
case ChunkFetchFailure:
500
ChunkFetchFailure chunkFail = (ChunkFetchFailure) response;
501
System.err.println("Chunk fetch failed: " + chunkFail.streamChunkId +
502
" - " + chunkFail.errorString);
503
break;
504
505
case StreamResponse:
506
StreamResponse streamResp = (StreamResponse) response;
507
System.out.println("Stream " + streamResp.streamId +
508
" opened with " + streamResp.byteCount + " bytes");
509
break;
510
511
case StreamFailure:
512
StreamFailure streamFail = (StreamFailure) response;
513
System.err.println("Stream " + streamFail.streamId +
514
" failed: " + streamFail.errorString);
515
break;
516
517
default:
518
System.err.println("Unknown response type: " + response.type());
519
}
520
}
521
```
522
523
### Custom Message Processing
524
525
```java
526
public class MessageProcessor {
527
public void processMessage(Message message) {
528
// Check if message has body
529
if (message.body() != null && message.body().size() > 0) {
530
try {
531
ByteBuffer bodyData = message.body().nioByteBuffer();
532
533
// Process based on message type
534
if (message instanceof RpcRequest) {
535
processRpcRequest((RpcRequest) message, bodyData);
536
} else if (message instanceof ChunkFetchRequest) {
537
processChunkRequest((ChunkFetchRequest) message);
538
} else if (message instanceof StreamRequest) {
539
processStreamRequest((StreamRequest) message);
540
} else if (message instanceof OneWayMessage) {
541
processOneWayMessage(bodyData);
542
}
543
544
} catch (Exception e) {
545
System.err.println("Error processing message: " + e.getMessage());
546
} finally {
547
// Important: Release buffer when done
548
if (message.body() != null) {
549
message.body().release();
550
}
551
}
552
}
553
}
554
555
private void processRpcRequest(RpcRequest request, ByteBuffer body) {
556
System.out.println("Processing RPC request " + request.requestId);
557
558
// Extract request data
559
byte[] requestBytes = new byte[body.remaining()];
560
body.get(requestBytes);
561
String requestString = new String(requestBytes);
562
563
// Process request and generate response
564
// (Response would be sent via callback in actual implementation)
565
}
566
567
private void processChunkRequest(ChunkFetchRequest request) {
568
System.out.println("Processing chunk request: " + request.streamChunkId);
569
570
// Look up chunk data and prepare response
571
// (Would use StreamManager in actual implementation)
572
}
573
574
private void processStreamRequest(StreamRequest request) {
575
System.out.println("Processing stream request: " + request.streamId);
576
577
// Open stream and prepare response
578
// (Would use StreamManager in actual implementation)
579
}
580
581
private void processOneWayMessage(ByteBuffer body) {
582
byte[] messageBytes = new byte[body.remaining()];
583
body.get(messageBytes);
584
String message = new String(messageBytes);
585
586
System.out.println("Received one-way message: " + message);
587
// Process notification without sending response
588
}
589
}
590
```
591
592
### Message Validation
593
594
```java
595
public class MessageValidator {
596
public static boolean validateMessage(Message message) {
597
if (message == null) {
598
return false;
599
}
600
601
// Validate message type
602
if (message.type() == null) {
603
return false;
604
}
605
606
// Type-specific validation
607
switch (message.type()) {
608
case RpcRequest:
609
return validateRpcRequest((RpcRequest) message);
610
case ChunkFetchRequest:
611
return validateChunkFetchRequest((ChunkFetchRequest) message);
612
case StreamRequest:
613
return validateStreamRequest((StreamRequest) message);
614
default:
615
return true; // Other types are valid by default
616
}
617
}
618
619
private static boolean validateRpcRequest(RpcRequest request) {
620
// Validate request ID is positive
621
if (request.requestId <= 0) {
622
return false;
623
}
624
625
// Validate body exists and has reasonable size
626
if (request.body() == null || request.body().size() > MAX_RPC_SIZE) {
627
return false;
628
}
629
630
return true;
631
}
632
633
private static boolean validateChunkFetchRequest(ChunkFetchRequest request) {
634
if (request.streamChunkId == null) {
635
return false;
636
}
637
638
// Validate stream ID and chunk index
639
return request.streamChunkId.streamId > 0 &&
640
request.streamChunkId.chunkIndex >= 0;
641
}
642
643
private static boolean validateStreamRequest(StreamRequest request) {
644
// Validate stream ID is not null or empty
645
return request.streamId != null && !request.streamId.trim().isEmpty();
646
}
647
648
private static final long MAX_RPC_SIZE = 16 * 1024 * 1024; // 16MB
649
}
650
```