0
# Protocol Handling
1
2
Complete message protocol with encoding/decoding support for RPC, streaming, and one-way communication, providing a type-safe and efficient binary protocol for network communication.
3
4
## Capabilities
5
6
### Core Protocol Interfaces
7
8
Base interfaces defining the protocol structure with encoding capabilities and message type markers.
9
10
```java { .api }
11
/**
12
* Base interface for all protocol messages, extends Encodable for serialization
13
*/
14
public interface Message extends Encodable {
15
}
16
17
/**
18
* Marker interface for request messages
19
*/
20
public interface RequestMessage extends Message {
21
}
22
23
/**
24
* Marker interface for response messages
25
*/
26
public interface ResponseMessage extends Message {
27
}
28
29
/**
30
* Interface for objects that can be encoded to ByteBuf for network transmission
31
*/
32
public interface Encodable {
33
/**
34
* Number of bytes this object would take up in encoding
35
* @return Encoded length in bytes
36
*/
37
int encodedLength();
38
39
/**
40
* Serializes this object by writing to the provided ByteBuf
41
* @param buf ByteBuf to write encoded data to
42
*/
43
void encode(ByteBuf buf);
44
}
45
```
46
47
### Message Base Classes
48
49
Abstract base classes providing common functionality for message implementations.
50
51
```java { .api }
52
/**
53
* Base implementation for all message types providing common functionality
54
*/
55
public abstract class AbstractMessage implements Message {
56
/**
57
* Get the message type identifier
58
* @return Message.Type enum value
59
*/
60
public abstract Message.Type type();
61
62
/**
63
* Get the message body if present
64
* @return ManagedBuffer containing message body, or null
65
*/
66
public abstract ManagedBuffer body();
67
68
/**
69
* Whether this message has a body
70
* @return true if message has body, false otherwise
71
*/
72
public abstract boolean isBodyInFrame();
73
}
74
75
/**
76
* Base class for response messages providing response-specific functionality
77
*/
78
public abstract class AbstractResponseMessage extends AbstractMessage implements ResponseMessage {
79
}
80
```
81
82
### Request Messages
83
84
Protocol messages for initiating operations, including RPC calls, stream requests, and data uploads.
85
86
```java { .api }
87
/**
88
* Request to fetch a specific chunk from a stream
89
*/
90
public final class ChunkFetchRequest extends AbstractMessage implements RequestMessage {
91
/**
92
* Create a chunk fetch request
93
* @param streamChunkId Identifier for the specific chunk to fetch
94
*/
95
public ChunkFetchRequest(StreamChunkId streamChunkId);
96
97
/**
98
* Get the stream chunk identifier
99
* @return StreamChunkId for the requested chunk
100
*/
101
public StreamChunkId streamChunkId();
102
}
103
104
/**
105
* RPC request with message payload
106
*/
107
public final class RpcRequest extends AbstractMessage implements RequestMessage {
108
/**
109
* Create an RPC request
110
* @param requestId Unique identifier for this request
111
* @param message Message payload as ManagedBuffer
112
*/
113
public RpcRequest(long requestId, ManagedBuffer message);
114
115
/**
116
* Get the request identifier
117
* @return Request ID
118
*/
119
public long requestId();
120
121
/**
122
* Get the message payload
123
* @return ManagedBuffer containing the message data
124
*/
125
public ManagedBuffer message();
126
}
127
128
/**
129
* Request to stream data with given stream ID
130
*/
131
public final class StreamRequest extends AbstractMessage implements RequestMessage {
132
/**
133
* Create a stream request
134
* @param streamId Identifier for the stream to request
135
*/
136
public StreamRequest(String streamId);
137
138
/**
139
* Get the stream identifier
140
* @return Stream ID string
141
*/
142
public String streamId();
143
}
144
145
/**
146
* One-way message that expects no response
147
*/
148
public final class OneWayMessage extends AbstractMessage implements RequestMessage {
149
/**
150
* Create a one-way message
151
* @param body Message body as ManagedBuffer
152
*/
153
public OneWayMessage(ManagedBuffer body);
154
155
/**
156
* Get the message body
157
* @return ManagedBuffer containing message data
158
*/
159
public ManagedBuffer body();
160
}
161
162
/**
163
* Request to upload streaming data with metadata
164
*/
165
public final class UploadStream extends AbstractMessage implements RequestMessage {
166
/**
167
* Create an upload stream request
168
* @param requestId Unique identifier for this upload request
169
* @param meta Metadata for the upload as ManagedBuffer
170
* @param data Data to upload as ManagedBuffer
171
*/
172
public UploadStream(long requestId, ManagedBuffer meta, ManagedBuffer data);
173
174
/**
175
* Get the request identifier
176
* @return Request ID
177
*/
178
public long requestId();
179
180
/**
181
* Get the upload metadata
182
* @return ManagedBuffer containing metadata
183
*/
184
public ManagedBuffer meta();
185
186
/**
187
* Get the data to upload
188
* @return ManagedBuffer containing data
189
*/
190
public ManagedBuffer data();
191
}
192
```
193
194
**Usage Examples:**
195
196
```java
197
// RPC request
198
long requestId = System.currentTimeMillis();
199
ManagedBuffer payload = new NioManagedBuffer(ByteBuffer.wrap(requestData));
200
RpcRequest rpcReq = new RpcRequest(requestId, payload);
201
202
// Stream request
203
StreamRequest streamReq = new StreamRequest("my-stream-123");
204
205
// Chunk fetch request
206
StreamChunkId chunkId = new StreamChunkId(streamId, chunkIndex);
207
ChunkFetchRequest chunkReq = new ChunkFetchRequest(chunkId);
208
209
// One-way message
210
ManagedBuffer notificationData = new NioManagedBuffer(ByteBuffer.wrap(statusUpdate));
211
OneWayMessage oneWay = new OneWayMessage(notificationData);
212
213
// Upload stream
214
ManagedBuffer metadata = new NioManagedBuffer(ByteBuffer.wrap(metaBytes));
215
ManagedBuffer data = new FileSegmentManagedBuffer(conf, file, 0, file.length());
216
UploadStream upload = new UploadStream(requestId, metadata, data);
217
```
218
219
### Response Messages
220
221
Protocol messages for responding to requests, including successful responses and error conditions.
222
223
```java { .api }
224
/**
225
* Successful response to chunk fetch with data
226
*/
227
public final class ChunkFetchSuccess extends AbstractResponseMessage {
228
/**
229
* Create a successful chunk fetch response
230
* @param streamChunkId Identifier for the fetched chunk
231
* @param buffer Buffer containing the chunk data
232
*/
233
public ChunkFetchSuccess(StreamChunkId streamChunkId, ManagedBuffer buffer);
234
235
/**
236
* Get the stream chunk identifier
237
* @return StreamChunkId for the fetched chunk
238
*/
239
public StreamChunkId streamChunkId();
240
241
/**
242
* Get the chunk data
243
* @return ManagedBuffer containing chunk data
244
*/
245
public ManagedBuffer buffer();
246
}
247
248
/**
249
* Failed chunk fetch response with error message
250
*/
251
public final class ChunkFetchFailure extends AbstractResponseMessage {
252
/**
253
* Create a failed chunk fetch response
254
* @param streamChunkId Identifier for the chunk that failed
255
* @param errorString Error message describing the failure
256
*/
257
public ChunkFetchFailure(StreamChunkId streamChunkId, String errorString);
258
259
/**
260
* Get the stream chunk identifier
261
* @return StreamChunkId for the failed chunk
262
*/
263
public StreamChunkId streamChunkId();
264
265
/**
266
* Get the error message
267
* @return Error description string
268
*/
269
public String errorString();
270
}
271
272
/**
273
* RPC response with result data
274
*/
275
public final class RpcResponse extends AbstractResponseMessage {
276
/**
277
* Create an RPC response
278
* @param requestId Request ID this response corresponds to
279
* @param message Response payload as ManagedBuffer
280
*/
281
public RpcResponse(long requestId, ManagedBuffer message);
282
283
/**
284
* Get the request identifier this response is for
285
* @return Request ID
286
*/
287
public long requestId();
288
289
/**
290
* Get the response payload
291
* @return ManagedBuffer containing response data
292
*/
293
public ManagedBuffer message();
294
}
295
296
/**
297
* RPC failure response with error message
298
*/
299
public final class RpcFailure extends AbstractResponseMessage {
300
/**
301
* Create an RPC failure response
302
* @param requestId Request ID this failure corresponds to
303
* @param errorString Error message describing the failure
304
*/
305
public RpcFailure(long requestId, String errorString);
306
307
/**
308
* Get the request identifier this failure is for
309
* @return Request ID
310
*/
311
public long requestId();
312
313
/**
314
* Get the error message
315
* @return Error description string
316
*/
317
public String errorString();
318
}
319
320
/**
321
* Response to stream request with stream metadata
322
*/
323
public final class StreamResponse extends AbstractResponseMessage {
324
/**
325
* Create a stream response
326
* @param streamId Stream identifier
327
* @param byteCount Total number of bytes in the stream
328
* @param buffer Initial stream data buffer
329
*/
330
public StreamResponse(String streamId, long byteCount, ManagedBuffer buffer);
331
332
/**
333
* Get the stream identifier
334
* @return Stream ID string
335
*/
336
public String streamId();
337
338
/**
339
* Get the total byte count for the stream
340
* @return Total bytes in stream
341
*/
342
public long byteCount();
343
344
/**
345
* Get the initial stream data
346
* @return ManagedBuffer containing initial data
347
*/
348
public ManagedBuffer buffer();
349
}
350
351
/**
352
* Stream failure response with error message
353
*/
354
public final class StreamFailure extends AbstractResponseMessage {
355
/**
356
* Create a stream failure response
357
* @param streamId Stream identifier that failed
358
* @param errorString Error message describing the failure
359
*/
360
public StreamFailure(String streamId, String errorString);
361
362
/**
363
* Get the stream identifier
364
* @return Stream ID string
365
*/
366
public String streamId();
367
368
/**
369
* Get the error message
370
* @return Error description string
371
*/
372
public String errorString();
373
}
374
```
375
376
**Usage Examples:**
377
378
```java
379
// Successful RPC response
380
ManagedBuffer responseData = new NioManagedBuffer(ByteBuffer.wrap(resultBytes));
381
RpcResponse rpcResp = new RpcResponse(originalRequestId, responseData);
382
383
// RPC failure
384
RpcFailure rpcFail = new RpcFailure(originalRequestId, "Processing failed: invalid input");
385
386
// Successful chunk fetch
387
ManagedBuffer chunkData = new FileSegmentManagedBuffer(conf, file, offset, length);
388
ChunkFetchSuccess chunkSuccess = new ChunkFetchSuccess(streamChunkId, chunkData);
389
390
// Chunk fetch failure
391
ChunkFetchFailure chunkFail = new ChunkFetchFailure(streamChunkId, "Chunk not found");
392
393
// Stream response
394
ManagedBuffer streamData = // ... initial stream data
395
StreamResponse streamResp = new StreamResponse("stream-456", totalBytes, streamData);
396
397
// Stream failure
398
StreamFailure streamFail = new StreamFailure("stream-456", "Stream source unavailable");
399
```
400
401
### Encoding and Decoding
402
403
Netty-based encoder and decoder for efficient message serialization and deserialization with frame-based transport.
404
405
```java { .api }
406
/**
407
* Netty encoder for Message objects, converts messages to ByteBuf for transmission
408
*/
409
public final class MessageEncoder extends MessageToByteEncoder<Message> {
410
/**
411
* Singleton instance for reuse
412
*/
413
public static final MessageEncoder INSTANCE = new MessageEncoder();
414
415
/**
416
* Encode a message to ByteBuf for transmission
417
* @param ctx Channel handler context
418
* @param msg Message to encode
419
* @param out Output ByteBuf to write encoded data
420
* @throws Exception if encoding fails
421
*/
422
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception;
423
}
424
425
/**
426
* Netty decoder for Message objects, converts ByteBuf frames to Message instances
427
*/
428
public final class MessageDecoder extends LengthFieldBasedFrameDecoder {
429
/**
430
* Singleton instance for reuse
431
*/
432
public static final MessageDecoder INSTANCE = new MessageDecoder();
433
434
/**
435
* Decode a ByteBuf frame to Message object
436
* @param ctx Channel handler context
437
* @param in Input ByteBuf containing encoded message
438
* @return Decoded Message object
439
* @throws Exception if decoding fails
440
*/
441
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception;
442
}
443
```
444
445
**Usage Examples:**
446
447
```java
448
// Adding to Netty pipeline
449
ChannelPipeline pipeline = channel.pipeline();
450
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
451
pipeline.addLast("messageDecoder", MessageDecoder.INSTANCE);
452
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
453
pipeline.addLast("messageEncoder", MessageEncoder.INSTANCE);
454
455
// Messages are automatically encoded/decoded by the pipeline
456
```
457
458
### Encoding Utilities
459
460
Utility classes for encoding common data types with consistent byte representation.
461
462
```java { .api }
463
/**
464
* Utility classes for encoding common data types
465
*/
466
public class Encoders {
467
468
/**
469
* String encoding utilities
470
*/
471
public static class Strings {
472
/**
473
* Encode string length and content to ByteBuf
474
* @param buf ByteBuf to write to
475
* @param s String to encode
476
*/
477
public static void encode(ByteBuf buf, String s);
478
479
/**
480
* Decode string from ByteBuf
481
* @param buf ByteBuf to read from
482
* @return Decoded string
483
*/
484
public static String decode(ByteBuf buf);
485
486
/**
487
* Calculate encoded length of string
488
* @param s String to measure
489
* @return Encoded length in bytes
490
*/
491
public static int encodedLength(String s);
492
}
493
494
/**
495
* Byte array encoding utilities
496
*/
497
public static class ByteArrays {
498
/**
499
* Encode byte array length and content to ByteBuf
500
* @param buf ByteBuf to write to
501
* @param arr Byte array to encode
502
*/
503
public static void encode(ByteBuf buf, byte[] arr);
504
505
/**
506
* Decode byte array from ByteBuf
507
* @param buf ByteBuf to read from
508
* @return Decoded byte array
509
*/
510
public static byte[] decode(ByteBuf buf);
511
512
/**
513
* Calculate encoded length of byte array
514
* @param arr Byte array to measure
515
* @return Encoded length in bytes
516
*/
517
public static int encodedLength(byte[] arr);
518
}
519
520
/**
521
* String array encoding utilities
522
*/
523
public static class StringArrays {
524
/**
525
* Encode string array to ByteBuf
526
* @param buf ByteBuf to write to
527
* @param strings String array to encode
528
*/
529
public static void encode(ByteBuf buf, String[] strings);
530
531
/**
532
* Decode string array from ByteBuf
533
* @param buf ByteBuf to read from
534
* @return Decoded string array
535
*/
536
public static String[] decode(ByteBuf buf);
537
538
/**
539
* Calculate encoded length of string array
540
* @param strings String array to measure
541
* @return Encoded length in bytes
542
*/
543
public static int encodedLength(String[] strings);
544
}
545
}
546
```
547
548
### Stream Chunk Identifier
549
550
Utility for identifying specific chunks within streams for efficient chunk-based data transfer.
551
552
```java { .api }
553
/**
554
* Identifies a specific chunk within a stream for chunk-based data transfer
555
*/
556
public final class StreamChunkId implements Encodable {
557
/**
558
* Create a stream chunk identifier
559
* @param streamId Numeric stream identifier
560
* @param chunkIndex Index of the chunk within the stream
561
*/
562
public StreamChunkId(long streamId, int chunkIndex);
563
564
/**
565
* Get the stream identifier
566
* @return Stream ID
567
*/
568
public long streamId();
569
570
/**
571
* Get the chunk index
572
* @return Chunk index within the stream
573
*/
574
public int chunkIndex();
575
576
/**
577
* Calculate encoded length of this identifier
578
* @return Encoded length in bytes
579
*/
580
public int encodedLength();
581
582
/**
583
* Encode this identifier to ByteBuf
584
* @param buf ByteBuf to write encoded data to
585
*/
586
public void encode(ByteBuf buf);
587
588
/**
589
* Decode a StreamChunkId from ByteBuffer
590
* @param buffer ByteBuffer containing encoded data
591
* @return Decoded StreamChunkId
592
*/
593
public static StreamChunkId decode(ByteBuffer buffer);
594
}
595
```
596
597
**Usage Examples:**
598
599
```java
600
// Create chunk identifier
601
StreamChunkId chunkId = new StreamChunkId(streamId, 5); // 5th chunk of the stream
602
603
// Use in requests
604
ChunkFetchRequest request = new ChunkFetchRequest(chunkId);
605
606
// Encoding for transmission
607
ByteBuf buf = Unpooled.buffer(chunkId.encodedLength());
608
chunkId.encode(buf);
609
610
// Decoding from received data
611
StreamChunkId decoded = StreamChunkId.decode(receivedBuffer);
612
```
613
614
## Protocol Usage Patterns
615
616
### Message Creation and Handling
617
618
```java
619
// Creating and sending an RPC request
620
long requestId = generateRequestId();
621
ManagedBuffer requestData = new NioManagedBuffer(ByteBuffer.wrap(payload));
622
RpcRequest request = new RpcRequest(requestId, requestData);
623
624
// Handle RPC response
625
if (response instanceof RpcResponse) {
626
RpcResponse rpcResp = (RpcResponse) response;
627
if (rpcResp.requestId() == requestId) {
628
ManagedBuffer responseData = rpcResp.message();
629
processResponse(responseData);
630
}
631
} else if (response instanceof RpcFailure) {
632
RpcFailure failure = (RpcFailure) response;
633
handleError(failure.errorString());
634
}
635
```
636
637
### Error Handling
638
639
```java
640
// Comprehensive error handling
641
public void handleMessage(Message message) {
642
if (message instanceof ChunkFetchFailure) {
643
ChunkFetchFailure failure = (ChunkFetchFailure) message;
644
logger.error("Chunk fetch failed for {}: {}",
645
failure.streamChunkId(), failure.errorString());
646
retryChunkFetch(failure.streamChunkId());
647
648
} else if (message instanceof RpcFailure) {
649
RpcFailure failure = (RpcFailure) message;
650
logger.error("RPC {} failed: {}", failure.requestId(), failure.errorString());
651
handleRpcFailure(failure.requestId(), failure.errorString());
652
653
} else if (message instanceof StreamFailure) {
654
StreamFailure failure = (StreamFailure) message;
655
logger.error("Stream {} failed: {}", failure.streamId(), failure.errorString());
656
cleanupStream(failure.streamId());
657
}
658
}
659
```