or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-operations.mdbuffer-management.mdfilter-chain.mdindex.mdprotocol-codecs.mdservice-abstractions.mdsession-management.mdtransport-layer.md

protocol-codecs.mddocs/

0

# Protocol Codecs

1

2

MINA Core provides a flexible codec system for encoding and decoding messages between higher-level application objects and binary network data. The codec system enables transparent message transformation without coupling your application logic to wire protocols.

3

4

## Core Codec Interfaces

5

6

### ProtocolCodecFactory

7

8

Factory interface for creating encoders and decoders:

9

10

```java { .api }

11

public interface ProtocolCodecFactory {

12

/**

13

* Returns a new (or reusable) instance of ProtocolEncoder which

14

* encodes message objects into binary or protocol-specific data.

15

*/

16

ProtocolEncoder getEncoder(IoSession session) throws Exception;

17

18

/**

19

* Returns a new (or reusable) instance of ProtocolDecoder which

20

* decodes binary or protocol-specific data into message objects.

21

*/

22

ProtocolDecoder getDecoder(IoSession session) throws Exception;

23

}

24

```

25

26

### ProtocolEncoder

27

28

Interface for encoding high-level objects to binary data:

29

30

```java { .api }

31

public interface ProtocolEncoder {

32

/**

33

* Encodes higher-level message objects into binary or protocol-specific data.

34

*/

35

void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception;

36

37

/**

38

* Releases all resources related with this encoder.

39

*/

40

void dispose(IoSession session) throws Exception;

41

}

42

```

43

44

### ProtocolDecoder

45

46

Interface for decoding binary data to high-level objects:

47

48

```java { .api }

49

public interface ProtocolDecoder {

50

/**

51

* Decodes binary or protocol-specific content into higher-level message objects.

52

*/

53

void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception;

54

55

/**

56

* Invoked when the session is closed to process remaining data.

57

*/

58

void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception;

59

60

/**

61

* Releases all resources related with this decoder.

62

*/

63

void dispose(IoSession session) throws Exception;

64

}

65

```

66

67

### ProtocolCodecFilter

68

69

Filter that uses codecs for message transformation:

70

71

```java { .api }

72

// Add codec filter to filter chain

73

TextLineCodecFactory codecFactory = new TextLineCodecFactory(

74

Charset.forName("UTF-8"),

75

LineDelimiter.CRLF,

76

LineDelimiter.LF

77

);

78

79

ProtocolCodecFilter codecFilter = new ProtocolCodecFilter(codecFactory);

80

acceptor.getFilterChain().addLast("codec", codecFilter);

81

```

82

83

## Built-in Codecs

84

85

### TextLineCodecFactory

86

87

Codec for text-based line protocols:

88

89

```java { .api }

90

// Basic text line codec (UTF-8, CRLF delimited)

91

TextLineCodecFactory basicCodec = new TextLineCodecFactory();

92

93

// Customized text line codec

94

TextLineCodecFactory customCodec = new TextLineCodecFactory(

95

Charset.forName("UTF-8"), // Character encoding

96

LineDelimiter.CRLF, // Encoder delimiter

97

LineDelimiter.AUTO // Decoder delimiter (auto-detect)

98

);

99

100

// Advanced configuration

101

TextLineCodecFactory advancedCodec = new TextLineCodecFactory(

102

Charset.forName("UTF-8"),

103

"\r\n", // Custom encoder delimiter

104

"\r\n|\n|\r" // Custom decoder delimiter regex

105

);

106

107

// Set maximum line length to prevent buffer overflow attacks

108

advancedCodec.setDecoderMaxLineLength(1024); // 1KB max line

109

advancedCodec.setEncoderMaxLineLength(1024); // 1KB max line

110

111

// Usage example

112

public class TextProtocolHandler extends IoHandlerAdapter {

113

@Override

114

public void messageReceived(IoSession session, Object message) throws Exception {

115

String line = (String) message; // Automatically decoded from bytes

116

System.out.println("Received line: " + line);

117

118

// Send response (automatically encoded to bytes)

119

session.write("Echo: " + line);

120

}

121

}

122

```

123

124

### ObjectSerializationCodecFactory

125

126

Codec for Java object serialization:

127

128

```java { .api }

129

// Basic object serialization codec

130

ObjectSerializationCodecFactory objectCodec = new ObjectSerializationCodecFactory();

131

132

// Configure class loading

133

ClassLoader customClassLoader = Thread.currentThread().getContextClassLoader();

134

ObjectSerializationCodecFactory customCodec = new ObjectSerializationCodecFactory(customClassLoader);

135

136

// Set maximum object size to prevent DoS attacks

137

customCodec.setMaxObjectSize(1024 * 1024); // 1MB max object

138

139

// Usage with custom objects

140

public class ObjectMessage implements Serializable {

141

private String type;

142

private Map<String, Object> data;

143

private long timestamp;

144

145

// Constructor, getters, setters...

146

}

147

148

public class ObjectProtocolHandler extends IoHandlerAdapter {

149

@Override

150

public void messageReceived(IoSession session, Object message) throws Exception {

151

ObjectMessage msg = (ObjectMessage) message; // Automatically deserialized

152

153

System.out.println("Received object: " + msg.getType());

154

155

// Send object response (automatically serialized)

156

ObjectMessage response = new ObjectMessage("RESPONSE",

157

Collections.singletonMap("status", "OK"),

158

System.currentTimeMillis());

159

session.write(response);

160

}

161

}

162

```

163

164

## Custom Codec Implementation

165

166

### Simple Fixed-Length Codec

167

168

```java { .api }

169

// Fixed-length message codec (4-byte length + data)

170

public class FixedLengthCodecFactory implements ProtocolCodecFactory {

171

172

@Override

173

public ProtocolEncoder getEncoder(IoSession session) throws Exception {

174

return new FixedLengthEncoder();

175

}

176

177

@Override

178

public ProtocolDecoder getDecoder(IoSession session) throws Exception {

179

return new FixedLengthDecoder();

180

}

181

}

182

183

public class FixedLengthEncoder implements ProtocolEncoder {

184

185

@Override

186

public void encode(IoSession session, Object message, ProtocolEncoderOutput out)

187

throws Exception {

188

189

if (message instanceof String) {

190

String text = (String) message;

191

byte[] data = text.getBytes(StandardCharsets.UTF_8);

192

193

// Create buffer with length prefix

194

IoBuffer buffer = IoBuffer.allocate(4 + data.length);

195

buffer.putInt(data.length); // 4-byte length prefix

196

buffer.put(data); // Message data

197

buffer.flip();

198

199

out.write(buffer);

200

} else {

201

throw new IllegalArgumentException("Message must be String");

202

}

203

}

204

205

@Override

206

public void dispose(IoSession session) throws Exception {

207

// No resources to dispose

208

}

209

}

210

211

public class FixedLengthDecoder implements ProtocolDecoder {

212

213

@Override

214

public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)

215

throws Exception {

216

217

// Read messages until buffer is empty

218

while (in.remaining() >= 4) {

219

int position = in.position();

220

int length = in.getInt();

221

222

// Validate length

223

if (length < 0 || length > 1024 * 1024) { // Max 1MB

224

throw new ProtocolDecoderException("Invalid message length: " + length);

225

}

226

227

if (in.remaining() >= length) {

228

// We have complete message

229

byte[] data = new byte[length];

230

in.get(data);

231

232

String message = new String(data, StandardCharsets.UTF_8);

233

out.write(message);

234

} else {

235

// Incomplete message, reset position and wait for more data

236

in.position(position);

237

break;

238

}

239

}

240

}

241

242

@Override

243

public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {

244

// No partial message handling needed for this codec

245

}

246

247

@Override

248

public void dispose(IoSession session) throws Exception {

249

// No resources to dispose

250

}

251

}

252

```

253

254

### Cumulative Protocol Decoder

255

256

Base class for decoders that accumulate data:

257

258

```java { .api }

259

// Decoder that accumulates data until complete message is available

260

public class MyMessageDecoder extends CumulativeProtocolDecoder {

261

262

@Override

263

protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)

264

throws Exception {

265

266

// Mark current position

267

in.mark();

268

269

try {

270

// Try to decode a complete message

271

MyMessage message = decodeMessage(in);

272

if (message != null) {

273

out.write(message);

274

return true; // Message decoded successfully

275

} else {

276

// Incomplete message, reset position

277

in.reset();

278

return false; // Need more data

279

}

280

} catch (Exception e) {

281

// Reset position on decode error

282

in.reset();

283

throw e;

284

}

285

}

286

287

private MyMessage decodeMessage(IoBuffer buffer) throws Exception {

288

// Check for minimum header size

289

if (buffer.remaining() < 8) {

290

return null; // Need more data

291

}

292

293

// Read header

294

byte version = buffer.get();

295

byte type = buffer.get();

296

short flags = buffer.getShort();

297

int bodyLength = buffer.getInt();

298

299

// Validate body length

300

if (bodyLength < 0 || bodyLength > 10 * 1024 * 1024) { // Max 10MB

301

throw new ProtocolDecoderException("Invalid body length: " + bodyLength);

302

}

303

304

// Check if complete body is available

305

if (buffer.remaining() < bodyLength) {

306

return null; // Need more data

307

}

308

309

// Read body

310

byte[] body = new byte[bodyLength];

311

buffer.get(body);

312

313

return new MyMessage(version, type, flags, body);

314

}

315

}

316

317

// Custom message class

318

public class MyMessage {

319

private final byte version;

320

private final byte type;

321

private final short flags;

322

private final byte[] body;

323

324

public MyMessage(byte version, byte type, short flags, byte[] body) {

325

this.version = version;

326

this.type = type;

327

this.flags = flags;

328

this.body = body;

329

}

330

331

// Getters...

332

public byte getVersion() { return version; }

333

public byte getType() { return type; }

334

public short getFlags() { return flags; }

335

public byte[] getBody() { return body; }

336

}

337

```

338

339

### JSON Protocol Codec

340

341

Codec for JSON message format:

342

343

```java { .api }

344

public class JsonCodecFactory implements ProtocolCodecFactory {

345

346

@Override

347

public ProtocolEncoder getEncoder(IoSession session) throws Exception {

348

return new JsonEncoder();

349

}

350

351

@Override

352

public ProtocolDecoder getDecoder(IoSession session) throws Exception {

353

return new JsonDecoder();

354

}

355

}

356

357

public class JsonEncoder implements ProtocolEncoder {

358

private final ObjectMapper mapper = new ObjectMapper();

359

360

@Override

361

public void encode(IoSession session, Object message, ProtocolEncoderOutput out)

362

throws Exception {

363

364

try {

365

// Convert object to JSON

366

String json = mapper.writeValueAsString(message);

367

368

// Encode as UTF-8 with length prefix

369

byte[] jsonBytes = json.getBytes(StandardCharsets.UTF_8);

370

371

IoBuffer buffer = IoBuffer.allocate(4 + jsonBytes.length);

372

buffer.putInt(jsonBytes.length);

373

buffer.put(jsonBytes);

374

buffer.flip();

375

376

out.write(buffer);

377

378

} catch (JsonProcessingException e) {

379

throw new ProtocolEncoderException("JSON encoding failed", e);

380

}

381

}

382

383

@Override

384

public void dispose(IoSession session) throws Exception {

385

// ObjectMapper is thread-safe, no cleanup needed

386

}

387

}

388

389

public class JsonDecoder extends CumulativeProtocolDecoder {

390

private final ObjectMapper mapper = new ObjectMapper();

391

392

@Override

393

protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)

394

throws Exception {

395

396

// Need at least 4 bytes for length

397

if (in.remaining() < 4) {

398

return false;

399

}

400

401

in.mark();

402

403

int length = in.getInt();

404

405

// Validate length

406

if (length < 0 || length > 1024 * 1024) { // Max 1MB JSON

407

throw new ProtocolDecoderException("Invalid JSON length: " + length);

408

}

409

410

if (in.remaining() < length) {

411

in.reset();

412

return false; // Need more data

413

}

414

415

// Read JSON bytes

416

byte[] jsonBytes = new byte[length];

417

in.get(jsonBytes);

418

419

try {

420

String json = new String(jsonBytes, StandardCharsets.UTF_8);

421

422

// Parse JSON to generic object

423

Object message = mapper.readValue(json, Object.class);

424

out.write(message);

425

426

return true;

427

428

} catch (JsonProcessingException e) {

429

throw new ProtocolDecoderException("JSON decoding failed", e);

430

}

431

}

432

}

433

434

// Usage with strongly-typed messages

435

public class TypedJsonDecoder extends CumulativeProtocolDecoder {

436

private final ObjectMapper mapper = new ObjectMapper();

437

private final Class<?> messageClass;

438

439

public TypedJsonDecoder(Class<?> messageClass) {

440

this.messageClass = messageClass;

441

}

442

443

@Override

444

protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)

445

throws Exception {

446

447

// Same length-prefixed decoding logic...

448

if (in.remaining() < 4) return false;

449

450

in.mark();

451

int length = in.getInt();

452

453

if (length < 0 || length > 1024 * 1024) {

454

throw new ProtocolDecoderException("Invalid JSON length: " + length);

455

}

456

457

if (in.remaining() < length) {

458

in.reset();

459

return false;

460

}

461

462

byte[] jsonBytes = new byte[length];

463

in.get(jsonBytes);

464

465

try {

466

String json = new String(jsonBytes, StandardCharsets.UTF_8);

467

Object message = mapper.readValue(json, messageClass);

468

out.write(message);

469

return true;

470

} catch (JsonProcessingException e) {

471

throw new ProtocolDecoderException("JSON decoding failed", e);

472

}

473

}

474

}

475

```

476

477

## Binary Protocol Codec

478

479

### Protocol Buffer Codec

480

481

```java { .api }

482

// Protocol Buffers codec implementation

483

public class ProtoBufCodecFactory implements ProtocolCodecFactory {

484

485

@Override

486

public ProtocolEncoder getEncoder(IoSession session) throws Exception {

487

return new ProtoBufEncoder();

488

}

489

490

@Override

491

public ProtocolDecoder getDecoder(IoSession session) throws Exception {

492

return new ProtoBufDecoder();

493

}

494

}

495

496

public class ProtoBufEncoder implements ProtocolEncoder {

497

498

@Override

499

public void encode(IoSession session, Object message, ProtocolEncoderOutput out)

500

throws Exception {

501

502

if (message instanceof com.google.protobuf.Message) {

503

com.google.protobuf.Message protoMessage = (com.google.protobuf.Message) message;

504

505

// Serialize to byte array

506

byte[] data = protoMessage.toByteArray();

507

508

// Create buffer with varint length encoding

509

IoBuffer buffer = IoBuffer.allocate(data.length + 10); // Extra space for varint

510

buffer.setAutoExpand(true);

511

512

// Write length as varint

513

writeVarint(buffer, data.length);

514

515

// Write message data

516

buffer.put(data);

517

buffer.flip();

518

519

out.write(buffer);

520

521

} else {

522

throw new IllegalArgumentException("Message must be a Protocol Buffer");

523

}

524

}

525

526

private void writeVarint(IoBuffer buffer, int value) {

527

while ((value & 0x80) != 0) {

528

buffer.put((byte) ((value & 0x7F) | 0x80));

529

value >>>= 7;

530

}

531

buffer.put((byte) value);

532

}

533

534

@Override

535

public void dispose(IoSession session) throws Exception {

536

// No resources to dispose

537

}

538

}

539

540

public class ProtoBufDecoder extends CumulativeProtocolDecoder {

541

542

@Override

543

protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)

544

throws Exception {

545

546

in.mark();

547

548

// Try to read varint length

549

int length = readVarint(in);

550

if (length < 0) {

551

in.reset();

552

return false; // Need more data

553

}

554

555

// Check if message data is available

556

if (in.remaining() < length) {

557

in.reset();

558

return false; // Need more data

559

}

560

561

// Read message data

562

byte[] data = new byte[length];

563

in.get(data);

564

565

try {

566

// Parse protocol buffer (you'd specify the actual message type)

567

// MyProtoMessage message = MyProtoMessage.parseFrom(data);

568

// out.write(message);

569

570

// For demonstration, just output the raw data

571

out.write(data);

572

573

return true;

574

575

} catch (Exception e) {

576

throw new ProtocolDecoderException("ProtoBuf decoding failed", e);

577

}

578

}

579

580

private int readVarint(IoBuffer buffer) {

581

int result = 0;

582

int shift = 0;

583

584

while (buffer.hasRemaining()) {

585

byte b = buffer.get();

586

result |= (b & 0x7F) << shift;

587

588

if ((b & 0x80) == 0) {

589

return result; // Complete varint

590

}

591

592

shift += 7;

593

if (shift >= 32) {

594

throw new ProtocolDecoderException("Varint too long");

595

}

596

}

597

598

return -1; // Incomplete varint

599

}

600

}

601

```

602

603

## Codec State Management

604

605

### Session-Specific Decoder State

606

607

```java { .api }

608

public class StatefulDecoder extends CumulativeProtocolDecoder {

609

private static final AttributeKey DECODER_STATE = new AttributeKey(DecoderState.class, "decoderState");

610

611

private static class DecoderState {

612

int expectedMessageType = -1;

613

int expectedLength = -1;

614

IoBuffer partialBuffer;

615

}

616

617

@Override

618

protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)

619

throws Exception {

620

621

DecoderState state = (DecoderState) session.getAttribute(DECODER_STATE);

622

if (state == null) {

623

state = new DecoderState();

624

session.setAttribute(DECODER_STATE, state);

625

}

626

627

while (in.hasRemaining()) {

628

if (state.expectedMessageType == -1) {

629

// Read message type

630

if (in.remaining() < 1) break;

631

state.expectedMessageType = in.get() & 0xFF;

632

}

633

634

if (state.expectedLength == -1) {

635

// Read message length

636

if (in.remaining() < 4) break;

637

state.expectedLength = in.getInt();

638

639

if (state.expectedLength < 0 || state.expectedLength > 1024 * 1024) {

640

throw new ProtocolDecoderException("Invalid length: " + state.expectedLength);

641

}

642

643

state.partialBuffer = IoBuffer.allocate(state.expectedLength);

644

}

645

646

// Read message body

647

int bytesToRead = Math.min(in.remaining(), state.partialBuffer.remaining());

648

if (bytesToRead > 0) {

649

byte[] chunk = new byte[bytesToRead];

650

in.get(chunk);

651

state.partialBuffer.put(chunk);

652

}

653

654

if (!state.partialBuffer.hasRemaining()) {

655

// Complete message received

656

state.partialBuffer.flip();

657

658

Object message = decodeMessage(state.expectedMessageType, state.partialBuffer);

659

out.write(message);

660

661

// Reset state for next message

662

state.expectedMessageType = -1;

663

state.expectedLength = -1;

664

state.partialBuffer = null;

665

666

return true;

667

}

668

}

669

670

return false; // Need more data

671

}

672

673

private Object decodeMessage(int messageType, IoBuffer buffer) {

674

// Decode based on message type

675

switch (messageType) {

676

case 1: return decodeTextMessage(buffer);

677

case 2: return decodeBinaryMessage(buffer);

678

case 3: return decodeStructuredMessage(buffer);

679

default: throw new ProtocolDecoderException("Unknown message type: " + messageType);

680

}

681

}

682

683

@Override

684

public void dispose(IoSession session) throws Exception {

685

session.removeAttribute(DECODER_STATE);

686

super.dispose(session);

687

}

688

}

689

```

690

691

## Error Handling and Validation

692

693

### Robust Codec with Validation

694

695

```java { .api }

696

public class ValidatedDecoder extends CumulativeProtocolDecoder {

697

private static final int MAX_MESSAGE_SIZE = 10 * 1024 * 1024; // 10MB

698

private static final int HEADER_SIZE = 8;

699

700

@Override

701

protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)

702

throws Exception {

703

704

// Minimum header check

705

if (in.remaining() < HEADER_SIZE) {

706

return false;

707

}

708

709

in.mark();

710

711

try {

712

// Read and validate header

713

int magic = in.getInt();

714

if (magic != 0x12345678) {

715

throw new ProtocolDecoderException("Invalid magic number: 0x" +

716

Integer.toHexString(magic));

717

}

718

719

int messageLength = in.getInt();

720

721

// Validate message length

722

if (messageLength < 0) {

723

throw new ProtocolDecoderException("Negative message length: " + messageLength);

724

}

725

726

if (messageLength > MAX_MESSAGE_SIZE) {

727

throw new ProtocolDecoderException("Message too large: " + messageLength +

728

" bytes (max: " + MAX_MESSAGE_SIZE + ")");

729

}

730

731

// Check if complete message is available

732

if (in.remaining() < messageLength) {

733

in.reset();

734

return false; // Need more data

735

}

736

737

// Read and validate message body

738

byte[] messageData = new byte[messageLength];

739

in.get(messageData);

740

741

// Validate checksum if present

742

if (messageLength >= 4) {

743

int expectedChecksum = calculateChecksum(messageData, messageLength - 4);

744

int actualChecksum = ByteBuffer.wrap(messageData, messageLength - 4, 4).getInt();

745

746

if (expectedChecksum != actualChecksum) {

747

throw new ProtocolDecoderException("Checksum mismatch");

748

}

749

}

750

751

// Create and output message

752

ValidatedMessage message = new ValidatedMessage(magic,

753

Arrays.copyOf(messageData, messageLength - 4));

754

out.write(message);

755

756

return true;

757

758

} catch (Exception e) {

759

// Reset position and handle error

760

in.reset();

761

762

if (e instanceof ProtocolDecoderException) {

763

throw e;

764

} else {

765

throw new ProtocolDecoderException("Decode error", e);

766

}

767

}

768

}

769

770

private int calculateChecksum(byte[] data, int length) {

771

int checksum = 0;

772

for (int i = 0; i < length; i++) {

773

checksum ^= data[i] & 0xFF;

774

}

775

return checksum;

776

}

777

}

778

```

779

780

Protocol codecs in MINA Core provide a clean separation between application logic and wire protocol concerns, enabling flexible and maintainable network applications that can easily adapt to different data formats and communication protocols.