0
# Protocol Codecs
1
2
MINA Core provides a flexible codec system for encoding and decoding messages between higher-level application objects and binary network data. The codec system enables transparent message transformation without coupling your application logic to wire protocols.
3
4
## Core Codec Interfaces
5
6
### ProtocolCodecFactory
7
8
Factory interface for creating encoders and decoders:
9
10
```java { .api }
11
public interface ProtocolCodecFactory {
12
/**
13
* Returns a new (or reusable) instance of ProtocolEncoder which
14
* encodes message objects into binary or protocol-specific data.
15
*/
16
ProtocolEncoder getEncoder(IoSession session) throws Exception;
17
18
/**
19
* Returns a new (or reusable) instance of ProtocolDecoder which
20
* decodes binary or protocol-specific data into message objects.
21
*/
22
ProtocolDecoder getDecoder(IoSession session) throws Exception;
23
}
24
```
25
26
### ProtocolEncoder
27
28
Interface for encoding high-level objects to binary data:
29
30
```java { .api }
31
public interface ProtocolEncoder {
32
/**
33
* Encodes higher-level message objects into binary or protocol-specific data.
34
*/
35
void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception;
36
37
/**
38
* Releases all resources related with this encoder.
39
*/
40
void dispose(IoSession session) throws Exception;
41
}
42
```
43
44
### ProtocolDecoder
45
46
Interface for decoding binary data to high-level objects:
47
48
```java { .api }
49
public interface ProtocolDecoder {
50
/**
51
* Decodes binary or protocol-specific content into higher-level message objects.
52
*/
53
void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception;
54
55
/**
56
* Invoked when the session is closed to process remaining data.
57
*/
58
void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception;
59
60
/**
61
* Releases all resources related with this decoder.
62
*/
63
void dispose(IoSession session) throws Exception;
64
}
65
```
66
67
### ProtocolCodecFilter
68
69
Filter that uses codecs for message transformation:
70
71
```java { .api }
72
// Add codec filter to filter chain
73
TextLineCodecFactory codecFactory = new TextLineCodecFactory(
74
Charset.forName("UTF-8"),
75
LineDelimiter.CRLF,
76
LineDelimiter.LF
77
);
78
79
ProtocolCodecFilter codecFilter = new ProtocolCodecFilter(codecFactory);
80
acceptor.getFilterChain().addLast("codec", codecFilter);
81
```
82
83
## Built-in Codecs
84
85
### TextLineCodecFactory
86
87
Codec for text-based line protocols:
88
89
```java { .api }
90
// Basic text line codec (UTF-8, CRLF delimited)
91
TextLineCodecFactory basicCodec = new TextLineCodecFactory();
92
93
// Customized text line codec
94
TextLineCodecFactory customCodec = new TextLineCodecFactory(
95
Charset.forName("UTF-8"), // Character encoding
96
LineDelimiter.CRLF, // Encoder delimiter
97
LineDelimiter.AUTO // Decoder delimiter (auto-detect)
98
);
99
100
// Advanced configuration
101
TextLineCodecFactory advancedCodec = new TextLineCodecFactory(
102
Charset.forName("UTF-8"),
103
"\r\n", // Custom encoder delimiter
104
"\r\n|\n|\r" // Custom decoder delimiter regex
105
);
106
107
// Set maximum line length to prevent buffer overflow attacks
108
advancedCodec.setDecoderMaxLineLength(1024); // 1KB max line
109
advancedCodec.setEncoderMaxLineLength(1024); // 1KB max line
110
111
// Usage example
112
public class TextProtocolHandler extends IoHandlerAdapter {
113
@Override
114
public void messageReceived(IoSession session, Object message) throws Exception {
115
String line = (String) message; // Automatically decoded from bytes
116
System.out.println("Received line: " + line);
117
118
// Send response (automatically encoded to bytes)
119
session.write("Echo: " + line);
120
}
121
}
122
```
123
124
### ObjectSerializationCodecFactory
125
126
Codec for Java object serialization:
127
128
```java { .api }
129
// Basic object serialization codec
130
ObjectSerializationCodecFactory objectCodec = new ObjectSerializationCodecFactory();
131
132
// Configure class loading
133
ClassLoader customClassLoader = Thread.currentThread().getContextClassLoader();
134
ObjectSerializationCodecFactory customCodec = new ObjectSerializationCodecFactory(customClassLoader);
135
136
// Set maximum object size to prevent DoS attacks
137
customCodec.setMaxObjectSize(1024 * 1024); // 1MB max object
138
139
// Usage with custom objects
140
public class ObjectMessage implements Serializable {
141
private String type;
142
private Map<String, Object> data;
143
private long timestamp;
144
145
// Constructor, getters, setters...
146
}
147
148
public class ObjectProtocolHandler extends IoHandlerAdapter {
149
@Override
150
public void messageReceived(IoSession session, Object message) throws Exception {
151
ObjectMessage msg = (ObjectMessage) message; // Automatically deserialized
152
153
System.out.println("Received object: " + msg.getType());
154
155
// Send object response (automatically serialized)
156
ObjectMessage response = new ObjectMessage("RESPONSE",
157
Collections.singletonMap("status", "OK"),
158
System.currentTimeMillis());
159
session.write(response);
160
}
161
}
162
```
163
164
## Custom Codec Implementation
165
166
### Simple Fixed-Length Codec
167
168
```java { .api }
169
// Fixed-length message codec (4-byte length + data)
170
public class FixedLengthCodecFactory implements ProtocolCodecFactory {
171
172
@Override
173
public ProtocolEncoder getEncoder(IoSession session) throws Exception {
174
return new FixedLengthEncoder();
175
}
176
177
@Override
178
public ProtocolDecoder getDecoder(IoSession session) throws Exception {
179
return new FixedLengthDecoder();
180
}
181
}
182
183
public class FixedLengthEncoder implements ProtocolEncoder {
184
185
@Override
186
public void encode(IoSession session, Object message, ProtocolEncoderOutput out)
187
throws Exception {
188
189
if (message instanceof String) {
190
String text = (String) message;
191
byte[] data = text.getBytes(StandardCharsets.UTF_8);
192
193
// Create buffer with length prefix
194
IoBuffer buffer = IoBuffer.allocate(4 + data.length);
195
buffer.putInt(data.length); // 4-byte length prefix
196
buffer.put(data); // Message data
197
buffer.flip();
198
199
out.write(buffer);
200
} else {
201
throw new IllegalArgumentException("Message must be String");
202
}
203
}
204
205
@Override
206
public void dispose(IoSession session) throws Exception {
207
// No resources to dispose
208
}
209
}
210
211
public class FixedLengthDecoder implements ProtocolDecoder {
212
213
@Override
214
public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
215
throws Exception {
216
217
// Read messages until buffer is empty
218
while (in.remaining() >= 4) {
219
int position = in.position();
220
int length = in.getInt();
221
222
// Validate length
223
if (length < 0 || length > 1024 * 1024) { // Max 1MB
224
throw new ProtocolDecoderException("Invalid message length: " + length);
225
}
226
227
if (in.remaining() >= length) {
228
// We have complete message
229
byte[] data = new byte[length];
230
in.get(data);
231
232
String message = new String(data, StandardCharsets.UTF_8);
233
out.write(message);
234
} else {
235
// Incomplete message, reset position and wait for more data
236
in.position(position);
237
break;
238
}
239
}
240
}
241
242
@Override
243
public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {
244
// No partial message handling needed for this codec
245
}
246
247
@Override
248
public void dispose(IoSession session) throws Exception {
249
// No resources to dispose
250
}
251
}
252
```
253
254
### Cumulative Protocol Decoder
255
256
Base class for decoders that accumulate data:
257
258
```java { .api }
259
// Decoder that accumulates data until complete message is available
260
public class MyMessageDecoder extends CumulativeProtocolDecoder {
261
262
@Override
263
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
264
throws Exception {
265
266
// Mark current position
267
in.mark();
268
269
try {
270
// Try to decode a complete message
271
MyMessage message = decodeMessage(in);
272
if (message != null) {
273
out.write(message);
274
return true; // Message decoded successfully
275
} else {
276
// Incomplete message, reset position
277
in.reset();
278
return false; // Need more data
279
}
280
} catch (Exception e) {
281
// Reset position on decode error
282
in.reset();
283
throw e;
284
}
285
}
286
287
private MyMessage decodeMessage(IoBuffer buffer) throws Exception {
288
// Check for minimum header size
289
if (buffer.remaining() < 8) {
290
return null; // Need more data
291
}
292
293
// Read header
294
byte version = buffer.get();
295
byte type = buffer.get();
296
short flags = buffer.getShort();
297
int bodyLength = buffer.getInt();
298
299
// Validate body length
300
if (bodyLength < 0 || bodyLength > 10 * 1024 * 1024) { // Max 10MB
301
throw new ProtocolDecoderException("Invalid body length: " + bodyLength);
302
}
303
304
// Check if complete body is available
305
if (buffer.remaining() < bodyLength) {
306
return null; // Need more data
307
}
308
309
// Read body
310
byte[] body = new byte[bodyLength];
311
buffer.get(body);
312
313
return new MyMessage(version, type, flags, body);
314
}
315
}
316
317
// Custom message class
318
public class MyMessage {
319
private final byte version;
320
private final byte type;
321
private final short flags;
322
private final byte[] body;
323
324
public MyMessage(byte version, byte type, short flags, byte[] body) {
325
this.version = version;
326
this.type = type;
327
this.flags = flags;
328
this.body = body;
329
}
330
331
// Getters...
332
public byte getVersion() { return version; }
333
public byte getType() { return type; }
334
public short getFlags() { return flags; }
335
public byte[] getBody() { return body; }
336
}
337
```
338
339
### JSON Protocol Codec
340
341
Codec for JSON message format:
342
343
```java { .api }
344
public class JsonCodecFactory implements ProtocolCodecFactory {
345
346
@Override
347
public ProtocolEncoder getEncoder(IoSession session) throws Exception {
348
return new JsonEncoder();
349
}
350
351
@Override
352
public ProtocolDecoder getDecoder(IoSession session) throws Exception {
353
return new JsonDecoder();
354
}
355
}
356
357
public class JsonEncoder implements ProtocolEncoder {
358
private final ObjectMapper mapper = new ObjectMapper();
359
360
@Override
361
public void encode(IoSession session, Object message, ProtocolEncoderOutput out)
362
throws Exception {
363
364
try {
365
// Convert object to JSON
366
String json = mapper.writeValueAsString(message);
367
368
// Encode as UTF-8 with length prefix
369
byte[] jsonBytes = json.getBytes(StandardCharsets.UTF_8);
370
371
IoBuffer buffer = IoBuffer.allocate(4 + jsonBytes.length);
372
buffer.putInt(jsonBytes.length);
373
buffer.put(jsonBytes);
374
buffer.flip();
375
376
out.write(buffer);
377
378
} catch (JsonProcessingException e) {
379
throw new ProtocolEncoderException("JSON encoding failed", e);
380
}
381
}
382
383
@Override
384
public void dispose(IoSession session) throws Exception {
385
// ObjectMapper is thread-safe, no cleanup needed
386
}
387
}
388
389
public class JsonDecoder extends CumulativeProtocolDecoder {
390
private final ObjectMapper mapper = new ObjectMapper();
391
392
@Override
393
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
394
throws Exception {
395
396
// Need at least 4 bytes for length
397
if (in.remaining() < 4) {
398
return false;
399
}
400
401
in.mark();
402
403
int length = in.getInt();
404
405
// Validate length
406
if (length < 0 || length > 1024 * 1024) { // Max 1MB JSON
407
throw new ProtocolDecoderException("Invalid JSON length: " + length);
408
}
409
410
if (in.remaining() < length) {
411
in.reset();
412
return false; // Need more data
413
}
414
415
// Read JSON bytes
416
byte[] jsonBytes = new byte[length];
417
in.get(jsonBytes);
418
419
try {
420
String json = new String(jsonBytes, StandardCharsets.UTF_8);
421
422
// Parse JSON to generic object
423
Object message = mapper.readValue(json, Object.class);
424
out.write(message);
425
426
return true;
427
428
} catch (JsonProcessingException e) {
429
throw new ProtocolDecoderException("JSON decoding failed", e);
430
}
431
}
432
}
433
434
// Usage with strongly-typed messages
435
public class TypedJsonDecoder extends CumulativeProtocolDecoder {
436
private final ObjectMapper mapper = new ObjectMapper();
437
private final Class<?> messageClass;
438
439
public TypedJsonDecoder(Class<?> messageClass) {
440
this.messageClass = messageClass;
441
}
442
443
@Override
444
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
445
throws Exception {
446
447
// Same length-prefixed decoding logic...
448
if (in.remaining() < 4) return false;
449
450
in.mark();
451
int length = in.getInt();
452
453
if (length < 0 || length > 1024 * 1024) {
454
throw new ProtocolDecoderException("Invalid JSON length: " + length);
455
}
456
457
if (in.remaining() < length) {
458
in.reset();
459
return false;
460
}
461
462
byte[] jsonBytes = new byte[length];
463
in.get(jsonBytes);
464
465
try {
466
String json = new String(jsonBytes, StandardCharsets.UTF_8);
467
Object message = mapper.readValue(json, messageClass);
468
out.write(message);
469
return true;
470
} catch (JsonProcessingException e) {
471
throw new ProtocolDecoderException("JSON decoding failed", e);
472
}
473
}
474
}
475
```
476
477
## Binary Protocol Codec
478
479
### Protocol Buffer Codec
480
481
```java { .api }
482
// Protocol Buffers codec implementation
483
public class ProtoBufCodecFactory implements ProtocolCodecFactory {
484
485
@Override
486
public ProtocolEncoder getEncoder(IoSession session) throws Exception {
487
return new ProtoBufEncoder();
488
}
489
490
@Override
491
public ProtocolDecoder getDecoder(IoSession session) throws Exception {
492
return new ProtoBufDecoder();
493
}
494
}
495
496
public class ProtoBufEncoder implements ProtocolEncoder {
497
498
@Override
499
public void encode(IoSession session, Object message, ProtocolEncoderOutput out)
500
throws Exception {
501
502
if (message instanceof com.google.protobuf.Message) {
503
com.google.protobuf.Message protoMessage = (com.google.protobuf.Message) message;
504
505
// Serialize to byte array
506
byte[] data = protoMessage.toByteArray();
507
508
// Create buffer with varint length encoding
509
IoBuffer buffer = IoBuffer.allocate(data.length + 10); // Extra space for varint
510
buffer.setAutoExpand(true);
511
512
// Write length as varint
513
writeVarint(buffer, data.length);
514
515
// Write message data
516
buffer.put(data);
517
buffer.flip();
518
519
out.write(buffer);
520
521
} else {
522
throw new IllegalArgumentException("Message must be a Protocol Buffer");
523
}
524
}
525
526
private void writeVarint(IoBuffer buffer, int value) {
527
while ((value & 0x80) != 0) {
528
buffer.put((byte) ((value & 0x7F) | 0x80));
529
value >>>= 7;
530
}
531
buffer.put((byte) value);
532
}
533
534
@Override
535
public void dispose(IoSession session) throws Exception {
536
// No resources to dispose
537
}
538
}
539
540
public class ProtoBufDecoder extends CumulativeProtocolDecoder {
541
542
@Override
543
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
544
throws Exception {
545
546
in.mark();
547
548
// Try to read varint length
549
int length = readVarint(in);
550
if (length < 0) {
551
in.reset();
552
return false; // Need more data
553
}
554
555
// Check if message data is available
556
if (in.remaining() < length) {
557
in.reset();
558
return false; // Need more data
559
}
560
561
// Read message data
562
byte[] data = new byte[length];
563
in.get(data);
564
565
try {
566
// Parse protocol buffer (you'd specify the actual message type)
567
// MyProtoMessage message = MyProtoMessage.parseFrom(data);
568
// out.write(message);
569
570
// For demonstration, just output the raw data
571
out.write(data);
572
573
return true;
574
575
} catch (Exception e) {
576
throw new ProtocolDecoderException("ProtoBuf decoding failed", e);
577
}
578
}
579
580
private int readVarint(IoBuffer buffer) {
581
int result = 0;
582
int shift = 0;
583
584
while (buffer.hasRemaining()) {
585
byte b = buffer.get();
586
result |= (b & 0x7F) << shift;
587
588
if ((b & 0x80) == 0) {
589
return result; // Complete varint
590
}
591
592
shift += 7;
593
if (shift >= 32) {
594
throw new ProtocolDecoderException("Varint too long");
595
}
596
}
597
598
return -1; // Incomplete varint
599
}
600
}
601
```
602
603
## Codec State Management
604
605
### Session-Specific Decoder State
606
607
```java { .api }
608
public class StatefulDecoder extends CumulativeProtocolDecoder {
609
private static final AttributeKey DECODER_STATE = new AttributeKey(DecoderState.class, "decoderState");
610
611
private static class DecoderState {
612
int expectedMessageType = -1;
613
int expectedLength = -1;
614
IoBuffer partialBuffer;
615
}
616
617
@Override
618
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
619
throws Exception {
620
621
DecoderState state = (DecoderState) session.getAttribute(DECODER_STATE);
622
if (state == null) {
623
state = new DecoderState();
624
session.setAttribute(DECODER_STATE, state);
625
}
626
627
while (in.hasRemaining()) {
628
if (state.expectedMessageType == -1) {
629
// Read message type
630
if (in.remaining() < 1) break;
631
state.expectedMessageType = in.get() & 0xFF;
632
}
633
634
if (state.expectedLength == -1) {
635
// Read message length
636
if (in.remaining() < 4) break;
637
state.expectedLength = in.getInt();
638
639
if (state.expectedLength < 0 || state.expectedLength > 1024 * 1024) {
640
throw new ProtocolDecoderException("Invalid length: " + state.expectedLength);
641
}
642
643
state.partialBuffer = IoBuffer.allocate(state.expectedLength);
644
}
645
646
// Read message body
647
int bytesToRead = Math.min(in.remaining(), state.partialBuffer.remaining());
648
if (bytesToRead > 0) {
649
byte[] chunk = new byte[bytesToRead];
650
in.get(chunk);
651
state.partialBuffer.put(chunk);
652
}
653
654
if (!state.partialBuffer.hasRemaining()) {
655
// Complete message received
656
state.partialBuffer.flip();
657
658
Object message = decodeMessage(state.expectedMessageType, state.partialBuffer);
659
out.write(message);
660
661
// Reset state for next message
662
state.expectedMessageType = -1;
663
state.expectedLength = -1;
664
state.partialBuffer = null;
665
666
return true;
667
}
668
}
669
670
return false; // Need more data
671
}
672
673
private Object decodeMessage(int messageType, IoBuffer buffer) {
674
// Decode based on message type
675
switch (messageType) {
676
case 1: return decodeTextMessage(buffer);
677
case 2: return decodeBinaryMessage(buffer);
678
case 3: return decodeStructuredMessage(buffer);
679
default: throw new ProtocolDecoderException("Unknown message type: " + messageType);
680
}
681
}
682
683
@Override
684
public void dispose(IoSession session) throws Exception {
685
session.removeAttribute(DECODER_STATE);
686
super.dispose(session);
687
}
688
}
689
```
690
691
## Error Handling and Validation
692
693
### Robust Codec with Validation
694
695
```java { .api }
696
public class ValidatedDecoder extends CumulativeProtocolDecoder {
697
private static final int MAX_MESSAGE_SIZE = 10 * 1024 * 1024; // 10MB
698
private static final int HEADER_SIZE = 8;
699
700
@Override
701
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
702
throws Exception {
703
704
// Minimum header check
705
if (in.remaining() < HEADER_SIZE) {
706
return false;
707
}
708
709
in.mark();
710
711
try {
712
// Read and validate header
713
int magic = in.getInt();
714
if (magic != 0x12345678) {
715
throw new ProtocolDecoderException("Invalid magic number: 0x" +
716
Integer.toHexString(magic));
717
}
718
719
int messageLength = in.getInt();
720
721
// Validate message length
722
if (messageLength < 0) {
723
throw new ProtocolDecoderException("Negative message length: " + messageLength);
724
}
725
726
if (messageLength > MAX_MESSAGE_SIZE) {
727
throw new ProtocolDecoderException("Message too large: " + messageLength +
728
" bytes (max: " + MAX_MESSAGE_SIZE + ")");
729
}
730
731
// Check if complete message is available
732
if (in.remaining() < messageLength) {
733
in.reset();
734
return false; // Need more data
735
}
736
737
// Read and validate message body
738
byte[] messageData = new byte[messageLength];
739
in.get(messageData);
740
741
// Validate checksum if present
742
if (messageLength >= 4) {
743
int expectedChecksum = calculateChecksum(messageData, messageLength - 4);
744
int actualChecksum = ByteBuffer.wrap(messageData, messageLength - 4, 4).getInt();
745
746
if (expectedChecksum != actualChecksum) {
747
throw new ProtocolDecoderException("Checksum mismatch");
748
}
749
}
750
751
// Create and output message
752
ValidatedMessage message = new ValidatedMessage(magic,
753
Arrays.copyOf(messageData, messageLength - 4));
754
out.write(message);
755
756
return true;
757
758
} catch (Exception e) {
759
// Reset position and handle error
760
in.reset();
761
762
if (e instanceof ProtocolDecoderException) {
763
throw e;
764
} else {
765
throw new ProtocolDecoderException("Decode error", e);
766
}
767
}
768
}
769
770
private int calculateChecksum(byte[] data, int length) {
771
int checksum = 0;
772
for (int i = 0; i < length; i++) {
773
checksum ^= data[i] & 0xFF;
774
}
775
return checksum;
776
}
777
}
778
```
779
780
Protocol codecs in MINA Core provide a clean separation between application logic and wire protocol concerns, enabling flexible and maintainable network applications that can easily adapt to different data formats and communication protocols.