CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-mina--mina-core

Apache MINA Core is a network application framework for building high-performance, scalable network applications with event-driven, asynchronous I/O over various transports including TCP/IP and UDP/IP via Java NIO.

Pending
Overview
Eval results
Files

protocol-codecs.mddocs/

Protocol Codecs

MINA Core provides a flexible codec system for encoding and decoding messages between higher-level application objects and binary network data. The codec system enables transparent message transformation without coupling your application logic to wire protocols.

Core Codec Interfaces

ProtocolCodecFactory

Factory interface for creating encoders and decoders:

public interface ProtocolCodecFactory {
    /**
     * Returns a new (or reusable) instance of ProtocolEncoder which
     * encodes message objects into binary or protocol-specific data.
     */
    ProtocolEncoder getEncoder(IoSession session) throws Exception;
    
    /**
     * Returns a new (or reusable) instance of ProtocolDecoder which
     * decodes binary or protocol-specific data into message objects.
     */
    ProtocolDecoder getDecoder(IoSession session) throws Exception;
}

ProtocolEncoder

Interface for encoding high-level objects to binary data:

public interface ProtocolEncoder {
    /**
     * Encodes higher-level message objects into binary or protocol-specific data.
     */
    void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception;
    
    /**
     * Releases all resources related with this encoder.
     */
    void dispose(IoSession session) throws Exception;
}

ProtocolDecoder

Interface for decoding binary data to high-level objects:

public interface ProtocolDecoder {
    /**
     * Decodes binary or protocol-specific content into higher-level message objects.
     */
    void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception;
    
    /**
     * Invoked when the session is closed to process remaining data.
     */
    void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception;
    
    /**
     * Releases all resources related with this decoder.
     */
    void dispose(IoSession session) throws Exception;
}

ProtocolCodecFilter

Filter that uses codecs for message transformation:

// Add codec filter to filter chain
TextLineCodecFactory codecFactory = new TextLineCodecFactory(
    Charset.forName("UTF-8"), 
    LineDelimiter.CRLF, 
    LineDelimiter.LF
);

ProtocolCodecFilter codecFilter = new ProtocolCodecFilter(codecFactory);
acceptor.getFilterChain().addLast("codec", codecFilter);

Built-in Codecs

TextLineCodecFactory

Codec for text-based line protocols:

// Basic text line codec (UTF-8, CRLF delimited)
TextLineCodecFactory basicCodec = new TextLineCodecFactory();

// Customized text line codec
TextLineCodecFactory customCodec = new TextLineCodecFactory(
    Charset.forName("UTF-8"),           // Character encoding
    LineDelimiter.CRLF,                 // Encoder delimiter
    LineDelimiter.AUTO                  // Decoder delimiter (auto-detect)
);

// Advanced configuration
TextLineCodecFactory advancedCodec = new TextLineCodecFactory(
    Charset.forName("UTF-8"),
    "\r\n",                             // Custom encoder delimiter
    "\r\n|\n|\r"                        // Custom decoder delimiter regex
);

// Set maximum line length to prevent buffer overflow attacks
advancedCodec.setDecoderMaxLineLength(1024);  // 1KB max line
advancedCodec.setEncoderMaxLineLength(1024);  // 1KB max line

// Usage example
public class TextProtocolHandler extends IoHandlerAdapter {
    @Override
    public void messageReceived(IoSession session, Object message) throws Exception {
        String line = (String) message; // Automatically decoded from bytes
        System.out.println("Received line: " + line);
        
        // Send response (automatically encoded to bytes)
        session.write("Echo: " + line);
    }
}

ObjectSerializationCodecFactory

Codec for Java object serialization:

// Basic object serialization codec
ObjectSerializationCodecFactory objectCodec = new ObjectSerializationCodecFactory();

// Configure class loading
ClassLoader customClassLoader = Thread.currentThread().getContextClassLoader();
ObjectSerializationCodecFactory customCodec = new ObjectSerializationCodecFactory(customClassLoader);

// Set maximum object size to prevent DoS attacks
customCodec.setMaxObjectSize(1024 * 1024); // 1MB max object

// Usage with custom objects
public class ObjectMessage implements Serializable {
    private String type;
    private Map<String, Object> data;
    private long timestamp;
    
    // Constructor, getters, setters...
}

public class ObjectProtocolHandler extends IoHandlerAdapter {
    @Override
    public void messageReceived(IoSession session, Object message) throws Exception {
        ObjectMessage msg = (ObjectMessage) message; // Automatically deserialized
        
        System.out.println("Received object: " + msg.getType());
        
        // Send object response (automatically serialized)
        ObjectMessage response = new ObjectMessage("RESPONSE", 
            Collections.singletonMap("status", "OK"), 
            System.currentTimeMillis());
        session.write(response);
    }
}

Custom Codec Implementation

Simple Fixed-Length Codec

// Fixed-length message codec (4-byte length + data)
public class FixedLengthCodecFactory implements ProtocolCodecFactory {
    
    @Override
    public ProtocolEncoder getEncoder(IoSession session) throws Exception {
        return new FixedLengthEncoder();
    }
    
    @Override
    public ProtocolDecoder getDecoder(IoSession session) throws Exception {
        return new FixedLengthDecoder();
    }
}

public class FixedLengthEncoder implements ProtocolEncoder {
    
    @Override
    public void encode(IoSession session, Object message, ProtocolEncoderOutput out) 
            throws Exception {
        
        if (message instanceof String) {
            String text = (String) message;
            byte[] data = text.getBytes(StandardCharsets.UTF_8);
            
            // Create buffer with length prefix
            IoBuffer buffer = IoBuffer.allocate(4 + data.length);
            buffer.putInt(data.length);    // 4-byte length prefix
            buffer.put(data);              // Message data
            buffer.flip();
            
            out.write(buffer);
        } else {
            throw new IllegalArgumentException("Message must be String");
        }
    }
    
    @Override
    public void dispose(IoSession session) throws Exception {
        // No resources to dispose
    }
}

public class FixedLengthDecoder implements ProtocolDecoder {
    
    @Override
    public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) 
            throws Exception {
        
        // Read messages until buffer is empty
        while (in.remaining() >= 4) {
            int position = in.position();
            int length = in.getInt();
            
            // Validate length
            if (length < 0 || length > 1024 * 1024) { // Max 1MB
                throw new ProtocolDecoderException("Invalid message length: " + length);
            }
            
            if (in.remaining() >= length) {
                // We have complete message
                byte[] data = new byte[length];
                in.get(data);
                
                String message = new String(data, StandardCharsets.UTF_8);
                out.write(message);
            } else {
                // Incomplete message, reset position and wait for more data
                in.position(position);
                break;
            }
        }
    }
    
    @Override
    public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {
        // No partial message handling needed for this codec
    }
    
    @Override
    public void dispose(IoSession session) throws Exception {
        // No resources to dispose
    }
}

Cumulative Protocol Decoder

Base class for decoders that accumulate data:

// Decoder that accumulates data until complete message is available
public class MyMessageDecoder extends CumulativeProtocolDecoder {
    
    @Override
    protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) 
            throws Exception {
        
        // Mark current position
        in.mark();
        
        try {
            // Try to decode a complete message
            MyMessage message = decodeMessage(in);
            if (message != null) {
                out.write(message);
                return true; // Message decoded successfully
            } else {
                // Incomplete message, reset position
                in.reset();
                return false; // Need more data
            }
        } catch (Exception e) {
            // Reset position on decode error
            in.reset();
            throw e;
        }
    }
    
    private MyMessage decodeMessage(IoBuffer buffer) throws Exception {
        // Check for minimum header size
        if (buffer.remaining() < 8) {
            return null; // Need more data
        }
        
        // Read header
        byte version = buffer.get();
        byte type = buffer.get();
        short flags = buffer.getShort();
        int bodyLength = buffer.getInt();
        
        // Validate body length
        if (bodyLength < 0 || bodyLength > 10 * 1024 * 1024) { // Max 10MB
            throw new ProtocolDecoderException("Invalid body length: " + bodyLength);
        }
        
        // Check if complete body is available
        if (buffer.remaining() < bodyLength) {
            return null; // Need more data
        }
        
        // Read body
        byte[] body = new byte[bodyLength];
        buffer.get(body);
        
        return new MyMessage(version, type, flags, body);
    }
}

// Custom message class
public class MyMessage {
    private final byte version;
    private final byte type;
    private final short flags;
    private final byte[] body;
    
    public MyMessage(byte version, byte type, short flags, byte[] body) {
        this.version = version;
        this.type = type;
        this.flags = flags;
        this.body = body;
    }
    
    // Getters...
    public byte getVersion() { return version; }
    public byte getType() { return type; }
    public short getFlags() { return flags; }
    public byte[] getBody() { return body; }
}

JSON Protocol Codec

Codec for JSON message format:

public class JsonCodecFactory implements ProtocolCodecFactory {
    
    @Override
    public ProtocolEncoder getEncoder(IoSession session) throws Exception {
        return new JsonEncoder();
    }
    
    @Override
    public ProtocolDecoder getDecoder(IoSession session) throws Exception {
        return new JsonDecoder();
    }
}

public class JsonEncoder implements ProtocolEncoder {
    private final ObjectMapper mapper = new ObjectMapper();
    
    @Override
    public void encode(IoSession session, Object message, ProtocolEncoderOutput out) 
            throws Exception {
        
        try {
            // Convert object to JSON
            String json = mapper.writeValueAsString(message);
            
            // Encode as UTF-8 with length prefix
            byte[] jsonBytes = json.getBytes(StandardCharsets.UTF_8);
            
            IoBuffer buffer = IoBuffer.allocate(4 + jsonBytes.length);
            buffer.putInt(jsonBytes.length);
            buffer.put(jsonBytes);
            buffer.flip();
            
            out.write(buffer);
            
        } catch (JsonProcessingException e) {
            throw new ProtocolEncoderException("JSON encoding failed", e);
        }
    }
    
    @Override
    public void dispose(IoSession session) throws Exception {
        // ObjectMapper is thread-safe, no cleanup needed
    }
}

public class JsonDecoder extends CumulativeProtocolDecoder {
    private final ObjectMapper mapper = new ObjectMapper();
    
    @Override
    protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) 
            throws Exception {
        
        // Need at least 4 bytes for length
        if (in.remaining() < 4) {
            return false;
        }
        
        in.mark();
        
        int length = in.getInt();
        
        // Validate length
        if (length < 0 || length > 1024 * 1024) { // Max 1MB JSON
            throw new ProtocolDecoderException("Invalid JSON length: " + length);
        }
        
        if (in.remaining() < length) {
            in.reset();
            return false; // Need more data
        }
        
        // Read JSON bytes
        byte[] jsonBytes = new byte[length];
        in.get(jsonBytes);
        
        try {
            String json = new String(jsonBytes, StandardCharsets.UTF_8);
            
            // Parse JSON to generic object
            Object message = mapper.readValue(json, Object.class);
            out.write(message);
            
            return true;
            
        } catch (JsonProcessingException e) {
            throw new ProtocolDecoderException("JSON decoding failed", e);
        }
    }
}

// Usage with strongly-typed messages
public class TypedJsonDecoder extends CumulativeProtocolDecoder {
    private final ObjectMapper mapper = new ObjectMapper();
    private final Class<?> messageClass;
    
    public TypedJsonDecoder(Class<?> messageClass) {
        this.messageClass = messageClass;
    }
    
    @Override
    protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) 
            throws Exception {
        
        // Same length-prefixed decoding logic...
        if (in.remaining() < 4) return false;
        
        in.mark();
        int length = in.getInt();
        
        if (length < 0 || length > 1024 * 1024) {
            throw new ProtocolDecoderException("Invalid JSON length: " + length);
        }
        
        if (in.remaining() < length) {
            in.reset();
            return false;
        }
        
        byte[] jsonBytes = new byte[length];
        in.get(jsonBytes);
        
        try {
            String json = new String(jsonBytes, StandardCharsets.UTF_8);
            Object message = mapper.readValue(json, messageClass);
            out.write(message);
            return true;
        } catch (JsonProcessingException e) {
            throw new ProtocolDecoderException("JSON decoding failed", e);
        }
    }
}

Binary Protocol Codec

Protocol Buffer Codec

// Protocol Buffers codec implementation
public class ProtoBufCodecFactory implements ProtocolCodecFactory {
    
    @Override
    public ProtocolEncoder getEncoder(IoSession session) throws Exception {
        return new ProtoBufEncoder();
    }
    
    @Override
    public ProtocolDecoder getDecoder(IoSession session) throws Exception {
        return new ProtoBufDecoder();
    }
}

public class ProtoBufEncoder implements ProtocolEncoder {
    
    @Override
    public void encode(IoSession session, Object message, ProtocolEncoderOutput out) 
            throws Exception {
        
        if (message instanceof com.google.protobuf.Message) {
            com.google.protobuf.Message protoMessage = (com.google.protobuf.Message) message;
            
            // Serialize to byte array
            byte[] data = protoMessage.toByteArray();
            
            // Create buffer with varint length encoding
            IoBuffer buffer = IoBuffer.allocate(data.length + 10); // Extra space for varint
            buffer.setAutoExpand(true);
            
            // Write length as varint
            writeVarint(buffer, data.length);
            
            // Write message data
            buffer.put(data);
            buffer.flip();
            
            out.write(buffer);
            
        } else {
            throw new IllegalArgumentException("Message must be a Protocol Buffer");
        }
    }
    
    private void writeVarint(IoBuffer buffer, int value) {
        while ((value & 0x80) != 0) {
            buffer.put((byte) ((value & 0x7F) | 0x80));
            value >>>= 7;
        }
        buffer.put((byte) value);
    }
    
    @Override
    public void dispose(IoSession session) throws Exception {
        // No resources to dispose
    }
}

public class ProtoBufDecoder extends CumulativeProtocolDecoder {
    
    @Override
    protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) 
            throws Exception {
        
        in.mark();
        
        // Try to read varint length
        int length = readVarint(in);
        if (length < 0) {
            in.reset();
            return false; // Need more data
        }
        
        // Check if message data is available
        if (in.remaining() < length) {
            in.reset();
            return false; // Need more data
        }
        
        // Read message data
        byte[] data = new byte[length];
        in.get(data);
        
        try {
            // Parse protocol buffer (you'd specify the actual message type)
            // MyProtoMessage message = MyProtoMessage.parseFrom(data);
            // out.write(message);
            
            // For demonstration, just output the raw data
            out.write(data);
            
            return true;
            
        } catch (Exception e) {
            throw new ProtocolDecoderException("ProtoBuf decoding failed", e);
        }
    }
    
    private int readVarint(IoBuffer buffer) {
        int result = 0;
        int shift = 0;
        
        while (buffer.hasRemaining()) {
            byte b = buffer.get();
            result |= (b & 0x7F) << shift;
            
            if ((b & 0x80) == 0) {
                return result; // Complete varint
            }
            
            shift += 7;
            if (shift >= 32) {
                throw new ProtocolDecoderException("Varint too long");
            }
        }
        
        return -1; // Incomplete varint
    }
}

Codec State Management

Session-Specific Decoder State

public class StatefulDecoder extends CumulativeProtocolDecoder {
    private static final AttributeKey DECODER_STATE = new AttributeKey(DecoderState.class, "decoderState");
    
    private static class DecoderState {
        int expectedMessageType = -1;
        int expectedLength = -1;
        IoBuffer partialBuffer;
    }
    
    @Override
    protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) 
            throws Exception {
        
        DecoderState state = (DecoderState) session.getAttribute(DECODER_STATE);
        if (state == null) {
            state = new DecoderState();
            session.setAttribute(DECODER_STATE, state);
        }
        
        while (in.hasRemaining()) {
            if (state.expectedMessageType == -1) {
                // Read message type
                if (in.remaining() < 1) break;
                state.expectedMessageType = in.get() & 0xFF;
            }
            
            if (state.expectedLength == -1) {
                // Read message length
                if (in.remaining() < 4) break;
                state.expectedLength = in.getInt();
                
                if (state.expectedLength < 0 || state.expectedLength > 1024 * 1024) {
                    throw new ProtocolDecoderException("Invalid length: " + state.expectedLength);
                }
                
                state.partialBuffer = IoBuffer.allocate(state.expectedLength);
            }
            
            // Read message body
            int bytesToRead = Math.min(in.remaining(), state.partialBuffer.remaining());
            if (bytesToRead > 0) {
                byte[] chunk = new byte[bytesToRead];
                in.get(chunk);
                state.partialBuffer.put(chunk);
            }
            
            if (!state.partialBuffer.hasRemaining()) {
                // Complete message received
                state.partialBuffer.flip();
                
                Object message = decodeMessage(state.expectedMessageType, state.partialBuffer);
                out.write(message);
                
                // Reset state for next message
                state.expectedMessageType = -1;
                state.expectedLength = -1;
                state.partialBuffer = null;
                
                return true;
            }
        }
        
        return false; // Need more data
    }
    
    private Object decodeMessage(int messageType, IoBuffer buffer) {
        // Decode based on message type
        switch (messageType) {
            case 1: return decodeTextMessage(buffer);
            case 2: return decodeBinaryMessage(buffer);
            case 3: return decodeStructuredMessage(buffer);
            default: throw new ProtocolDecoderException("Unknown message type: " + messageType);
        }
    }
    
    @Override
    public void dispose(IoSession session) throws Exception {
        session.removeAttribute(DECODER_STATE);
        super.dispose(session);
    }
}

Error Handling and Validation

Robust Codec with Validation

public class ValidatedDecoder extends CumulativeProtocolDecoder {
    private static final int MAX_MESSAGE_SIZE = 10 * 1024 * 1024; // 10MB
    private static final int HEADER_SIZE = 8;
    
    @Override
    protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) 
            throws Exception {
        
        // Minimum header check
        if (in.remaining() < HEADER_SIZE) {
            return false;
        }
        
        in.mark();
        
        try {
            // Read and validate header
            int magic = in.getInt();
            if (magic != 0x12345678) {
                throw new ProtocolDecoderException("Invalid magic number: 0x" + 
                    Integer.toHexString(magic));
            }
            
            int messageLength = in.getInt();
            
            // Validate message length
            if (messageLength < 0) {
                throw new ProtocolDecoderException("Negative message length: " + messageLength);
            }
            
            if (messageLength > MAX_MESSAGE_SIZE) {
                throw new ProtocolDecoderException("Message too large: " + messageLength + 
                    " bytes (max: " + MAX_MESSAGE_SIZE + ")");
            }
            
            // Check if complete message is available
            if (in.remaining() < messageLength) {
                in.reset();
                return false; // Need more data
            }
            
            // Read and validate message body
            byte[] messageData = new byte[messageLength];
            in.get(messageData);
            
            // Validate checksum if present
            if (messageLength >= 4) {
                int expectedChecksum = calculateChecksum(messageData, messageLength - 4);
                int actualChecksum = ByteBuffer.wrap(messageData, messageLength - 4, 4).getInt();
                
                if (expectedChecksum != actualChecksum) {
                    throw new ProtocolDecoderException("Checksum mismatch");
                }
            }
            
            // Create and output message
            ValidatedMessage message = new ValidatedMessage(magic, 
                Arrays.copyOf(messageData, messageLength - 4));
            out.write(message);
            
            return true;
            
        } catch (Exception e) {
            // Reset position and handle error
            in.reset();
            
            if (e instanceof ProtocolDecoderException) {
                throw e;
            } else {
                throw new ProtocolDecoderException("Decode error", e);
            }
        }
    }
    
    private int calculateChecksum(byte[] data, int length) {
        int checksum = 0;
        for (int i = 0; i < length; i++) {
            checksum ^= data[i] & 0xFF;
        }
        return checksum;
    }
}

Protocol codecs in MINA Core provide a clean separation between application logic and wire protocol concerns, enabling flexible and maintainable network applications that can easily adapt to different data formats and communication protocols.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-mina--mina-core

docs

async-operations.md

buffer-management.md

filter-chain.md

index.md

protocol-codecs.md

service-abstractions.md

session-management.md

transport-layer.md

tile.json