Apache MINA Core is a network application framework for building high-performance, scalable network applications with event-driven, asynchronous I/O over various transports including TCP/IP and UDP/IP via Java NIO.
—
MINA Core provides a flexible codec system for encoding and decoding messages between higher-level application objects and binary network data. The codec system enables transparent message transformation without coupling your application logic to wire protocols.
Factory interface for creating encoders and decoders:
public interface ProtocolCodecFactory {
/**
* Returns a new (or reusable) instance of ProtocolEncoder which
* encodes message objects into binary or protocol-specific data.
*/
ProtocolEncoder getEncoder(IoSession session) throws Exception;
/**
* Returns a new (or reusable) instance of ProtocolDecoder which
* decodes binary or protocol-specific data into message objects.
*/
ProtocolDecoder getDecoder(IoSession session) throws Exception;
}Interface for encoding high-level objects to binary data:
public interface ProtocolEncoder {
/**
* Encodes higher-level message objects into binary or protocol-specific data.
*/
void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception;
/**
* Releases all resources related with this encoder.
*/
void dispose(IoSession session) throws Exception;
}Interface for decoding binary data to high-level objects:
public interface ProtocolDecoder {
/**
* Decodes binary or protocol-specific content into higher-level message objects.
*/
void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception;
/**
* Invoked when the session is closed to process remaining data.
*/
void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception;
/**
* Releases all resources related with this decoder.
*/
void dispose(IoSession session) throws Exception;
}Filter that uses codecs for message transformation:
// Add codec filter to filter chain
TextLineCodecFactory codecFactory = new TextLineCodecFactory(
Charset.forName("UTF-8"),
LineDelimiter.CRLF,
LineDelimiter.LF
);
ProtocolCodecFilter codecFilter = new ProtocolCodecFilter(codecFactory);
acceptor.getFilterChain().addLast("codec", codecFilter);Codec for text-based line protocols:
// Basic text line codec (UTF-8, CRLF delimited)
TextLineCodecFactory basicCodec = new TextLineCodecFactory();
// Customized text line codec
TextLineCodecFactory customCodec = new TextLineCodecFactory(
Charset.forName("UTF-8"), // Character encoding
LineDelimiter.CRLF, // Encoder delimiter
LineDelimiter.AUTO // Decoder delimiter (auto-detect)
);
// Advanced configuration
TextLineCodecFactory advancedCodec = new TextLineCodecFactory(
Charset.forName("UTF-8"),
"\r\n", // Custom encoder delimiter
"\r\n|\n|\r" // Custom decoder delimiter regex
);
// Set maximum line length to prevent buffer overflow attacks
advancedCodec.setDecoderMaxLineLength(1024); // 1KB max line
advancedCodec.setEncoderMaxLineLength(1024); // 1KB max line
// Usage example
public class TextProtocolHandler extends IoHandlerAdapter {
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
String line = (String) message; // Automatically decoded from bytes
System.out.println("Received line: " + line);
// Send response (automatically encoded to bytes)
session.write("Echo: " + line);
}
}Codec for Java object serialization:
// Basic object serialization codec
ObjectSerializationCodecFactory objectCodec = new ObjectSerializationCodecFactory();
// Configure class loading
ClassLoader customClassLoader = Thread.currentThread().getContextClassLoader();
ObjectSerializationCodecFactory customCodec = new ObjectSerializationCodecFactory(customClassLoader);
// Set maximum object size to prevent DoS attacks
customCodec.setMaxObjectSize(1024 * 1024); // 1MB max object
// Usage with custom objects
public class ObjectMessage implements Serializable {
private String type;
private Map<String, Object> data;
private long timestamp;
// Constructor, getters, setters...
}
public class ObjectProtocolHandler extends IoHandlerAdapter {
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
ObjectMessage msg = (ObjectMessage) message; // Automatically deserialized
System.out.println("Received object: " + msg.getType());
// Send object response (automatically serialized)
ObjectMessage response = new ObjectMessage("RESPONSE",
Collections.singletonMap("status", "OK"),
System.currentTimeMillis());
session.write(response);
}
}// Fixed-length message codec (4-byte length + data)
public class FixedLengthCodecFactory implements ProtocolCodecFactory {
@Override
public ProtocolEncoder getEncoder(IoSession session) throws Exception {
return new FixedLengthEncoder();
}
@Override
public ProtocolDecoder getDecoder(IoSession session) throws Exception {
return new FixedLengthDecoder();
}
}
public class FixedLengthEncoder implements ProtocolEncoder {
@Override
public void encode(IoSession session, Object message, ProtocolEncoderOutput out)
throws Exception {
if (message instanceof String) {
String text = (String) message;
byte[] data = text.getBytes(StandardCharsets.UTF_8);
// Create buffer with length prefix
IoBuffer buffer = IoBuffer.allocate(4 + data.length);
buffer.putInt(data.length); // 4-byte length prefix
buffer.put(data); // Message data
buffer.flip();
out.write(buffer);
} else {
throw new IllegalArgumentException("Message must be String");
}
}
@Override
public void dispose(IoSession session) throws Exception {
// No resources to dispose
}
}
public class FixedLengthDecoder implements ProtocolDecoder {
@Override
public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
throws Exception {
// Read messages until buffer is empty
while (in.remaining() >= 4) {
int position = in.position();
int length = in.getInt();
// Validate length
if (length < 0 || length > 1024 * 1024) { // Max 1MB
throw new ProtocolDecoderException("Invalid message length: " + length);
}
if (in.remaining() >= length) {
// We have complete message
byte[] data = new byte[length];
in.get(data);
String message = new String(data, StandardCharsets.UTF_8);
out.write(message);
} else {
// Incomplete message, reset position and wait for more data
in.position(position);
break;
}
}
}
@Override
public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {
// No partial message handling needed for this codec
}
@Override
public void dispose(IoSession session) throws Exception {
// No resources to dispose
}
}Base class for decoders that accumulate data:
// Decoder that accumulates data until complete message is available
public class MyMessageDecoder extends CumulativeProtocolDecoder {
@Override
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
throws Exception {
// Mark current position
in.mark();
try {
// Try to decode a complete message
MyMessage message = decodeMessage(in);
if (message != null) {
out.write(message);
return true; // Message decoded successfully
} else {
// Incomplete message, reset position
in.reset();
return false; // Need more data
}
} catch (Exception e) {
// Reset position on decode error
in.reset();
throw e;
}
}
private MyMessage decodeMessage(IoBuffer buffer) throws Exception {
// Check for minimum header size
if (buffer.remaining() < 8) {
return null; // Need more data
}
// Read header
byte version = buffer.get();
byte type = buffer.get();
short flags = buffer.getShort();
int bodyLength = buffer.getInt();
// Validate body length
if (bodyLength < 0 || bodyLength > 10 * 1024 * 1024) { // Max 10MB
throw new ProtocolDecoderException("Invalid body length: " + bodyLength);
}
// Check if complete body is available
if (buffer.remaining() < bodyLength) {
return null; // Need more data
}
// Read body
byte[] body = new byte[bodyLength];
buffer.get(body);
return new MyMessage(version, type, flags, body);
}
}
// Custom message class
public class MyMessage {
private final byte version;
private final byte type;
private final short flags;
private final byte[] body;
public MyMessage(byte version, byte type, short flags, byte[] body) {
this.version = version;
this.type = type;
this.flags = flags;
this.body = body;
}
// Getters...
public byte getVersion() { return version; }
public byte getType() { return type; }
public short getFlags() { return flags; }
public byte[] getBody() { return body; }
}Codec for JSON message format:
public class JsonCodecFactory implements ProtocolCodecFactory {
@Override
public ProtocolEncoder getEncoder(IoSession session) throws Exception {
return new JsonEncoder();
}
@Override
public ProtocolDecoder getDecoder(IoSession session) throws Exception {
return new JsonDecoder();
}
}
public class JsonEncoder implements ProtocolEncoder {
private final ObjectMapper mapper = new ObjectMapper();
@Override
public void encode(IoSession session, Object message, ProtocolEncoderOutput out)
throws Exception {
try {
// Convert object to JSON
String json = mapper.writeValueAsString(message);
// Encode as UTF-8 with length prefix
byte[] jsonBytes = json.getBytes(StandardCharsets.UTF_8);
IoBuffer buffer = IoBuffer.allocate(4 + jsonBytes.length);
buffer.putInt(jsonBytes.length);
buffer.put(jsonBytes);
buffer.flip();
out.write(buffer);
} catch (JsonProcessingException e) {
throw new ProtocolEncoderException("JSON encoding failed", e);
}
}
@Override
public void dispose(IoSession session) throws Exception {
// ObjectMapper is thread-safe, no cleanup needed
}
}
public class JsonDecoder extends CumulativeProtocolDecoder {
private final ObjectMapper mapper = new ObjectMapper();
@Override
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
throws Exception {
// Need at least 4 bytes for length
if (in.remaining() < 4) {
return false;
}
in.mark();
int length = in.getInt();
// Validate length
if (length < 0 || length > 1024 * 1024) { // Max 1MB JSON
throw new ProtocolDecoderException("Invalid JSON length: " + length);
}
if (in.remaining() < length) {
in.reset();
return false; // Need more data
}
// Read JSON bytes
byte[] jsonBytes = new byte[length];
in.get(jsonBytes);
try {
String json = new String(jsonBytes, StandardCharsets.UTF_8);
// Parse JSON to generic object
Object message = mapper.readValue(json, Object.class);
out.write(message);
return true;
} catch (JsonProcessingException e) {
throw new ProtocolDecoderException("JSON decoding failed", e);
}
}
}
// Usage with strongly-typed messages
public class TypedJsonDecoder extends CumulativeProtocolDecoder {
private final ObjectMapper mapper = new ObjectMapper();
private final Class<?> messageClass;
public TypedJsonDecoder(Class<?> messageClass) {
this.messageClass = messageClass;
}
@Override
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
throws Exception {
// Same length-prefixed decoding logic...
if (in.remaining() < 4) return false;
in.mark();
int length = in.getInt();
if (length < 0 || length > 1024 * 1024) {
throw new ProtocolDecoderException("Invalid JSON length: " + length);
}
if (in.remaining() < length) {
in.reset();
return false;
}
byte[] jsonBytes = new byte[length];
in.get(jsonBytes);
try {
String json = new String(jsonBytes, StandardCharsets.UTF_8);
Object message = mapper.readValue(json, messageClass);
out.write(message);
return true;
} catch (JsonProcessingException e) {
throw new ProtocolDecoderException("JSON decoding failed", e);
}
}
}// Protocol Buffers codec implementation
public class ProtoBufCodecFactory implements ProtocolCodecFactory {
@Override
public ProtocolEncoder getEncoder(IoSession session) throws Exception {
return new ProtoBufEncoder();
}
@Override
public ProtocolDecoder getDecoder(IoSession session) throws Exception {
return new ProtoBufDecoder();
}
}
public class ProtoBufEncoder implements ProtocolEncoder {
@Override
public void encode(IoSession session, Object message, ProtocolEncoderOutput out)
throws Exception {
if (message instanceof com.google.protobuf.Message) {
com.google.protobuf.Message protoMessage = (com.google.protobuf.Message) message;
// Serialize to byte array
byte[] data = protoMessage.toByteArray();
// Create buffer with varint length encoding
IoBuffer buffer = IoBuffer.allocate(data.length + 10); // Extra space for varint
buffer.setAutoExpand(true);
// Write length as varint
writeVarint(buffer, data.length);
// Write message data
buffer.put(data);
buffer.flip();
out.write(buffer);
} else {
throw new IllegalArgumentException("Message must be a Protocol Buffer");
}
}
private void writeVarint(IoBuffer buffer, int value) {
while ((value & 0x80) != 0) {
buffer.put((byte) ((value & 0x7F) | 0x80));
value >>>= 7;
}
buffer.put((byte) value);
}
@Override
public void dispose(IoSession session) throws Exception {
// No resources to dispose
}
}
public class ProtoBufDecoder extends CumulativeProtocolDecoder {
@Override
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
throws Exception {
in.mark();
// Try to read varint length
int length = readVarint(in);
if (length < 0) {
in.reset();
return false; // Need more data
}
// Check if message data is available
if (in.remaining() < length) {
in.reset();
return false; // Need more data
}
// Read message data
byte[] data = new byte[length];
in.get(data);
try {
// Parse protocol buffer (you'd specify the actual message type)
// MyProtoMessage message = MyProtoMessage.parseFrom(data);
// out.write(message);
// For demonstration, just output the raw data
out.write(data);
return true;
} catch (Exception e) {
throw new ProtocolDecoderException("ProtoBuf decoding failed", e);
}
}
private int readVarint(IoBuffer buffer) {
int result = 0;
int shift = 0;
while (buffer.hasRemaining()) {
byte b = buffer.get();
result |= (b & 0x7F) << shift;
if ((b & 0x80) == 0) {
return result; // Complete varint
}
shift += 7;
if (shift >= 32) {
throw new ProtocolDecoderException("Varint too long");
}
}
return -1; // Incomplete varint
}
}public class StatefulDecoder extends CumulativeProtocolDecoder {
private static final AttributeKey DECODER_STATE = new AttributeKey(DecoderState.class, "decoderState");
private static class DecoderState {
int expectedMessageType = -1;
int expectedLength = -1;
IoBuffer partialBuffer;
}
@Override
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
throws Exception {
DecoderState state = (DecoderState) session.getAttribute(DECODER_STATE);
if (state == null) {
state = new DecoderState();
session.setAttribute(DECODER_STATE, state);
}
while (in.hasRemaining()) {
if (state.expectedMessageType == -1) {
// Read message type
if (in.remaining() < 1) break;
state.expectedMessageType = in.get() & 0xFF;
}
if (state.expectedLength == -1) {
// Read message length
if (in.remaining() < 4) break;
state.expectedLength = in.getInt();
if (state.expectedLength < 0 || state.expectedLength > 1024 * 1024) {
throw new ProtocolDecoderException("Invalid length: " + state.expectedLength);
}
state.partialBuffer = IoBuffer.allocate(state.expectedLength);
}
// Read message body
int bytesToRead = Math.min(in.remaining(), state.partialBuffer.remaining());
if (bytesToRead > 0) {
byte[] chunk = new byte[bytesToRead];
in.get(chunk);
state.partialBuffer.put(chunk);
}
if (!state.partialBuffer.hasRemaining()) {
// Complete message received
state.partialBuffer.flip();
Object message = decodeMessage(state.expectedMessageType, state.partialBuffer);
out.write(message);
// Reset state for next message
state.expectedMessageType = -1;
state.expectedLength = -1;
state.partialBuffer = null;
return true;
}
}
return false; // Need more data
}
private Object decodeMessage(int messageType, IoBuffer buffer) {
// Decode based on message type
switch (messageType) {
case 1: return decodeTextMessage(buffer);
case 2: return decodeBinaryMessage(buffer);
case 3: return decodeStructuredMessage(buffer);
default: throw new ProtocolDecoderException("Unknown message type: " + messageType);
}
}
@Override
public void dispose(IoSession session) throws Exception {
session.removeAttribute(DECODER_STATE);
super.dispose(session);
}
}public class ValidatedDecoder extends CumulativeProtocolDecoder {
private static final int MAX_MESSAGE_SIZE = 10 * 1024 * 1024; // 10MB
private static final int HEADER_SIZE = 8;
@Override
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
throws Exception {
// Minimum header check
if (in.remaining() < HEADER_SIZE) {
return false;
}
in.mark();
try {
// Read and validate header
int magic = in.getInt();
if (magic != 0x12345678) {
throw new ProtocolDecoderException("Invalid magic number: 0x" +
Integer.toHexString(magic));
}
int messageLength = in.getInt();
// Validate message length
if (messageLength < 0) {
throw new ProtocolDecoderException("Negative message length: " + messageLength);
}
if (messageLength > MAX_MESSAGE_SIZE) {
throw new ProtocolDecoderException("Message too large: " + messageLength +
" bytes (max: " + MAX_MESSAGE_SIZE + ")");
}
// Check if complete message is available
if (in.remaining() < messageLength) {
in.reset();
return false; // Need more data
}
// Read and validate message body
byte[] messageData = new byte[messageLength];
in.get(messageData);
// Validate checksum if present
if (messageLength >= 4) {
int expectedChecksum = calculateChecksum(messageData, messageLength - 4);
int actualChecksum = ByteBuffer.wrap(messageData, messageLength - 4, 4).getInt();
if (expectedChecksum != actualChecksum) {
throw new ProtocolDecoderException("Checksum mismatch");
}
}
// Create and output message
ValidatedMessage message = new ValidatedMessage(magic,
Arrays.copyOf(messageData, messageLength - 4));
out.write(message);
return true;
} catch (Exception e) {
// Reset position and handle error
in.reset();
if (e instanceof ProtocolDecoderException) {
throw e;
} else {
throw new ProtocolDecoderException("Decode error", e);
}
}
}
private int calculateChecksum(byte[] data, int length) {
int checksum = 0;
for (int i = 0; i < length; i++) {
checksum ^= data[i] & 0xFF;
}
return checksum;
}
}Protocol codecs in MINA Core provide a clean separation between application logic and wire protocol concerns, enabling flexible and maintainable network applications that can easily adapt to different data formats and communication protocols.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-mina--mina-core