or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdbuffers.mdconfiguration.mdindex.mdprotocol.mdstreaming.mdtransport.md

protocol.mddocs/

0

# Protocol Handling

1

2

Complete message protocol with encoding/decoding support for RPC, streaming, and one-way communication, providing a type-safe and efficient binary protocol for network communication.

3

4

## Capabilities

5

6

### Core Protocol Interfaces

7

8

Base interfaces defining the protocol structure with encoding capabilities and message type markers.

9

10

```java { .api }

11

/**

12

* Base interface for all protocol messages, extends Encodable for serialization

13

*/

14

public interface Message extends Encodable {

15

}

16

17

/**

18

* Marker interface for request messages

19

*/

20

public interface RequestMessage extends Message {

21

}

22

23

/**

24

* Marker interface for response messages

25

*/

26

public interface ResponseMessage extends Message {

27

}

28

29

/**

30

* Interface for objects that can be encoded to ByteBuf for network transmission

31

*/

32

public interface Encodable {

33

/**

34

* Number of bytes this object would take up in encoding

35

* @return Encoded length in bytes

36

*/

37

int encodedLength();

38

39

/**

40

* Serializes this object by writing to the provided ByteBuf

41

* @param buf ByteBuf to write encoded data to

42

*/

43

void encode(ByteBuf buf);

44

}

45

```

46

47

### Message Base Classes

48

49

Abstract base classes providing common functionality for message implementations.

50

51

```java { .api }

52

/**

53

* Base implementation for all message types providing common functionality

54

*/

55

public abstract class AbstractMessage implements Message {

56

/**

57

* Get the message type identifier

58

* @return Message.Type enum value

59

*/

60

public abstract Message.Type type();

61

62

/**

63

* Get the message body if present

64

* @return ManagedBuffer containing message body, or null

65

*/

66

public abstract ManagedBuffer body();

67

68

/**

69

* Whether this message has a body

70

* @return true if message has body, false otherwise

71

*/

72

public abstract boolean isBodyInFrame();

73

}

74

75

/**

76

* Base class for response messages providing response-specific functionality

77

*/

78

public abstract class AbstractResponseMessage extends AbstractMessage implements ResponseMessage {

79

}

80

```

81

82

### Request Messages

83

84

Protocol messages for initiating operations, including RPC calls, stream requests, and data uploads.

85

86

```java { .api }

87

/**

88

* Request to fetch a specific chunk from a stream

89

*/

90

public final class ChunkFetchRequest extends AbstractMessage implements RequestMessage {

91

/**

92

* Create a chunk fetch request

93

* @param streamChunkId Identifier for the specific chunk to fetch

94

*/

95

public ChunkFetchRequest(StreamChunkId streamChunkId);

96

97

/**

98

* Get the stream chunk identifier

99

* @return StreamChunkId for the requested chunk

100

*/

101

public StreamChunkId streamChunkId();

102

}

103

104

/**

105

* RPC request with message payload

106

*/

107

public final class RpcRequest extends AbstractMessage implements RequestMessage {

108

/**

109

* Create an RPC request

110

* @param requestId Unique identifier for this request

111

* @param message Message payload as ManagedBuffer

112

*/

113

public RpcRequest(long requestId, ManagedBuffer message);

114

115

/**

116

* Get the request identifier

117

* @return Request ID

118

*/

119

public long requestId();

120

121

/**

122

* Get the message payload

123

* @return ManagedBuffer containing the message data

124

*/

125

public ManagedBuffer message();

126

}

127

128

/**

129

* Request to stream data with given stream ID

130

*/

131

public final class StreamRequest extends AbstractMessage implements RequestMessage {

132

/**

133

* Create a stream request

134

* @param streamId Identifier for the stream to request

135

*/

136

public StreamRequest(String streamId);

137

138

/**

139

* Get the stream identifier

140

* @return Stream ID string

141

*/

142

public String streamId();

143

}

144

145

/**

146

* One-way message that expects no response

147

*/

148

public final class OneWayMessage extends AbstractMessage implements RequestMessage {

149

/**

150

* Create a one-way message

151

* @param body Message body as ManagedBuffer

152

*/

153

public OneWayMessage(ManagedBuffer body);

154

155

/**

156

* Get the message body

157

* @return ManagedBuffer containing message data

158

*/

159

public ManagedBuffer body();

160

}

161

162

/**

163

* Request to upload streaming data with metadata

164

*/

165

public final class UploadStream extends AbstractMessage implements RequestMessage {

166

/**

167

* Create an upload stream request

168

* @param requestId Unique identifier for this upload request

169

* @param meta Metadata for the upload as ManagedBuffer

170

* @param data Data to upload as ManagedBuffer

171

*/

172

public UploadStream(long requestId, ManagedBuffer meta, ManagedBuffer data);

173

174

/**

175

* Get the request identifier

176

* @return Request ID

177

*/

178

public long requestId();

179

180

/**

181

* Get the upload metadata

182

* @return ManagedBuffer containing metadata

183

*/

184

public ManagedBuffer meta();

185

186

/**

187

* Get the data to upload

188

* @return ManagedBuffer containing data

189

*/

190

public ManagedBuffer data();

191

}

192

```

193

194

**Usage Examples:**

195

196

```java

197

// RPC request

198

long requestId = System.currentTimeMillis();

199

ManagedBuffer payload = new NioManagedBuffer(ByteBuffer.wrap(requestData));

200

RpcRequest rpcReq = new RpcRequest(requestId, payload);

201

202

// Stream request

203

StreamRequest streamReq = new StreamRequest("my-stream-123");

204

205

// Chunk fetch request

206

StreamChunkId chunkId = new StreamChunkId(streamId, chunkIndex);

207

ChunkFetchRequest chunkReq = new ChunkFetchRequest(chunkId);

208

209

// One-way message

210

ManagedBuffer notificationData = new NioManagedBuffer(ByteBuffer.wrap(statusUpdate));

211

OneWayMessage oneWay = new OneWayMessage(notificationData);

212

213

// Upload stream

214

ManagedBuffer metadata = new NioManagedBuffer(ByteBuffer.wrap(metaBytes));

215

ManagedBuffer data = new FileSegmentManagedBuffer(conf, file, 0, file.length());

216

UploadStream upload = new UploadStream(requestId, metadata, data);

217

```

218

219

### Response Messages

220

221

Protocol messages for responding to requests, including successful responses and error conditions.

222

223

```java { .api }

224

/**

225

* Successful response to chunk fetch with data

226

*/

227

public final class ChunkFetchSuccess extends AbstractResponseMessage {

228

/**

229

* Create a successful chunk fetch response

230

* @param streamChunkId Identifier for the fetched chunk

231

* @param buffer Buffer containing the chunk data

232

*/

233

public ChunkFetchSuccess(StreamChunkId streamChunkId, ManagedBuffer buffer);

234

235

/**

236

* Get the stream chunk identifier

237

* @return StreamChunkId for the fetched chunk

238

*/

239

public StreamChunkId streamChunkId();

240

241

/**

242

* Get the chunk data

243

* @return ManagedBuffer containing chunk data

244

*/

245

public ManagedBuffer buffer();

246

}

247

248

/**

249

* Failed chunk fetch response with error message

250

*/

251

public final class ChunkFetchFailure extends AbstractResponseMessage {

252

/**

253

* Create a failed chunk fetch response

254

* @param streamChunkId Identifier for the chunk that failed

255

* @param errorString Error message describing the failure

256

*/

257

public ChunkFetchFailure(StreamChunkId streamChunkId, String errorString);

258

259

/**

260

* Get the stream chunk identifier

261

* @return StreamChunkId for the failed chunk

262

*/

263

public StreamChunkId streamChunkId();

264

265

/**

266

* Get the error message

267

* @return Error description string

268

*/

269

public String errorString();

270

}

271

272

/**

273

* RPC response with result data

274

*/

275

public final class RpcResponse extends AbstractResponseMessage {

276

/**

277

* Create an RPC response

278

* @param requestId Request ID this response corresponds to

279

* @param message Response payload as ManagedBuffer

280

*/

281

public RpcResponse(long requestId, ManagedBuffer message);

282

283

/**

284

* Get the request identifier this response is for

285

* @return Request ID

286

*/

287

public long requestId();

288

289

/**

290

* Get the response payload

291

* @return ManagedBuffer containing response data

292

*/

293

public ManagedBuffer message();

294

}

295

296

/**

297

* RPC failure response with error message

298

*/

299

public final class RpcFailure extends AbstractResponseMessage {

300

/**

301

* Create an RPC failure response

302

* @param requestId Request ID this failure corresponds to

303

* @param errorString Error message describing the failure

304

*/

305

public RpcFailure(long requestId, String errorString);

306

307

/**

308

* Get the request identifier this failure is for

309

* @return Request ID

310

*/

311

public long requestId();

312

313

/**

314

* Get the error message

315

* @return Error description string

316

*/

317

public String errorString();

318

}

319

320

/**

321

* Response to stream request with stream metadata

322

*/

323

public final class StreamResponse extends AbstractResponseMessage {

324

/**

325

* Create a stream response

326

* @param streamId Stream identifier

327

* @param byteCount Total number of bytes in the stream

328

* @param buffer Initial stream data buffer

329

*/

330

public StreamResponse(String streamId, long byteCount, ManagedBuffer buffer);

331

332

/**

333

* Get the stream identifier

334

* @return Stream ID string

335

*/

336

public String streamId();

337

338

/**

339

* Get the total byte count for the stream

340

* @return Total bytes in stream

341

*/

342

public long byteCount();

343

344

/**

345

* Get the initial stream data

346

* @return ManagedBuffer containing initial data

347

*/

348

public ManagedBuffer buffer();

349

}

350

351

/**

352

* Stream failure response with error message

353

*/

354

public final class StreamFailure extends AbstractResponseMessage {

355

/**

356

* Create a stream failure response

357

* @param streamId Stream identifier that failed

358

* @param errorString Error message describing the failure

359

*/

360

public StreamFailure(String streamId, String errorString);

361

362

/**

363

* Get the stream identifier

364

* @return Stream ID string

365

*/

366

public String streamId();

367

368

/**

369

* Get the error message

370

* @return Error description string

371

*/

372

public String errorString();

373

}

374

```

375

376

**Usage Examples:**

377

378

```java

379

// Successful RPC response

380

ManagedBuffer responseData = new NioManagedBuffer(ByteBuffer.wrap(resultBytes));

381

RpcResponse rpcResp = new RpcResponse(originalRequestId, responseData);

382

383

// RPC failure

384

RpcFailure rpcFail = new RpcFailure(originalRequestId, "Processing failed: invalid input");

385

386

// Successful chunk fetch

387

ManagedBuffer chunkData = new FileSegmentManagedBuffer(conf, file, offset, length);

388

ChunkFetchSuccess chunkSuccess = new ChunkFetchSuccess(streamChunkId, chunkData);

389

390

// Chunk fetch failure

391

ChunkFetchFailure chunkFail = new ChunkFetchFailure(streamChunkId, "Chunk not found");

392

393

// Stream response

394

ManagedBuffer streamData = // ... initial stream data

395

StreamResponse streamResp = new StreamResponse("stream-456", totalBytes, streamData);

396

397

// Stream failure

398

StreamFailure streamFail = new StreamFailure("stream-456", "Stream source unavailable");

399

```

400

401

### Encoding and Decoding

402

403

Netty-based encoder and decoder for efficient message serialization and deserialization with frame-based transport.

404

405

```java { .api }

406

/**

407

* Netty encoder for Message objects, converts messages to ByteBuf for transmission

408

*/

409

public final class MessageEncoder extends MessageToByteEncoder<Message> {

410

/**

411

* Singleton instance for reuse

412

*/

413

public static final MessageEncoder INSTANCE = new MessageEncoder();

414

415

/**

416

* Encode a message to ByteBuf for transmission

417

* @param ctx Channel handler context

418

* @param msg Message to encode

419

* @param out Output ByteBuf to write encoded data

420

* @throws Exception if encoding fails

421

*/

422

protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception;

423

}

424

425

/**

426

* Netty decoder for Message objects, converts ByteBuf frames to Message instances

427

*/

428

public final class MessageDecoder extends LengthFieldBasedFrameDecoder {

429

/**

430

* Singleton instance for reuse

431

*/

432

public static final MessageDecoder INSTANCE = new MessageDecoder();

433

434

/**

435

* Decode a ByteBuf frame to Message object

436

* @param ctx Channel handler context

437

* @param in Input ByteBuf containing encoded message

438

* @return Decoded Message object

439

* @throws Exception if decoding fails

440

*/

441

protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception;

442

}

443

```

444

445

**Usage Examples:**

446

447

```java

448

// Adding to Netty pipeline

449

ChannelPipeline pipeline = channel.pipeline();

450

pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));

451

pipeline.addLast("messageDecoder", MessageDecoder.INSTANCE);

452

pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));

453

pipeline.addLast("messageEncoder", MessageEncoder.INSTANCE);

454

455

// Messages are automatically encoded/decoded by the pipeline

456

```

457

458

### Encoding Utilities

459

460

Utility classes for encoding common data types with consistent byte representation.

461

462

```java { .api }

463

/**

464

* Utility classes for encoding common data types

465

*/

466

public class Encoders {

467

468

/**

469

* String encoding utilities

470

*/

471

public static class Strings {

472

/**

473

* Encode string length and content to ByteBuf

474

* @param buf ByteBuf to write to

475

* @param s String to encode

476

*/

477

public static void encode(ByteBuf buf, String s);

478

479

/**

480

* Decode string from ByteBuf

481

* @param buf ByteBuf to read from

482

* @return Decoded string

483

*/

484

public static String decode(ByteBuf buf);

485

486

/**

487

* Calculate encoded length of string

488

* @param s String to measure

489

* @return Encoded length in bytes

490

*/

491

public static int encodedLength(String s);

492

}

493

494

/**

495

* Byte array encoding utilities

496

*/

497

public static class ByteArrays {

498

/**

499

* Encode byte array length and content to ByteBuf

500

* @param buf ByteBuf to write to

501

* @param arr Byte array to encode

502

*/

503

public static void encode(ByteBuf buf, byte[] arr);

504

505

/**

506

* Decode byte array from ByteBuf

507

* @param buf ByteBuf to read from

508

* @return Decoded byte array

509

*/

510

public static byte[] decode(ByteBuf buf);

511

512

/**

513

* Calculate encoded length of byte array

514

* @param arr Byte array to measure

515

* @return Encoded length in bytes

516

*/

517

public static int encodedLength(byte[] arr);

518

}

519

520

/**

521

* String array encoding utilities

522

*/

523

public static class StringArrays {

524

/**

525

* Encode string array to ByteBuf

526

* @param buf ByteBuf to write to

527

* @param strings String array to encode

528

*/

529

public static void encode(ByteBuf buf, String[] strings);

530

531

/**

532

* Decode string array from ByteBuf

533

* @param buf ByteBuf to read from

534

* @return Decoded string array

535

*/

536

public static String[] decode(ByteBuf buf);

537

538

/**

539

* Calculate encoded length of string array

540

* @param strings String array to measure

541

* @return Encoded length in bytes

542

*/

543

public static int encodedLength(String[] strings);

544

}

545

}

546

```

547

548

### Stream Chunk Identifier

549

550

Utility for identifying specific chunks within streams for efficient chunk-based data transfer.

551

552

```java { .api }

553

/**

554

* Identifies a specific chunk within a stream for chunk-based data transfer

555

*/

556

public final class StreamChunkId implements Encodable {

557

/**

558

* Create a stream chunk identifier

559

* @param streamId Numeric stream identifier

560

* @param chunkIndex Index of the chunk within the stream

561

*/

562

public StreamChunkId(long streamId, int chunkIndex);

563

564

/**

565

* Get the stream identifier

566

* @return Stream ID

567

*/

568

public long streamId();

569

570

/**

571

* Get the chunk index

572

* @return Chunk index within the stream

573

*/

574

public int chunkIndex();

575

576

/**

577

* Calculate encoded length of this identifier

578

* @return Encoded length in bytes

579

*/

580

public int encodedLength();

581

582

/**

583

* Encode this identifier to ByteBuf

584

* @param buf ByteBuf to write encoded data to

585

*/

586

public void encode(ByteBuf buf);

587

588

/**

589

* Decode a StreamChunkId from ByteBuffer

590

* @param buffer ByteBuffer containing encoded data

591

* @return Decoded StreamChunkId

592

*/

593

public static StreamChunkId decode(ByteBuffer buffer);

594

}

595

```

596

597

**Usage Examples:**

598

599

```java

600

// Create chunk identifier

601

StreamChunkId chunkId = new StreamChunkId(streamId, 5); // 5th chunk of the stream

602

603

// Use in requests

604

ChunkFetchRequest request = new ChunkFetchRequest(chunkId);

605

606

// Encoding for transmission

607

ByteBuf buf = Unpooled.buffer(chunkId.encodedLength());

608

chunkId.encode(buf);

609

610

// Decoding from received data

611

StreamChunkId decoded = StreamChunkId.decode(receivedBuffer);

612

```

613

614

## Protocol Usage Patterns

615

616

### Message Creation and Handling

617

618

```java

619

// Creating and sending an RPC request

620

long requestId = generateRequestId();

621

ManagedBuffer requestData = new NioManagedBuffer(ByteBuffer.wrap(payload));

622

RpcRequest request = new RpcRequest(requestId, requestData);

623

624

// Handle RPC response

625

if (response instanceof RpcResponse) {

626

RpcResponse rpcResp = (RpcResponse) response;

627

if (rpcResp.requestId() == requestId) {

628

ManagedBuffer responseData = rpcResp.message();

629

processResponse(responseData);

630

}

631

} else if (response instanceof RpcFailure) {

632

RpcFailure failure = (RpcFailure) response;

633

handleError(failure.errorString());

634

}

635

```

636

637

### Error Handling

638

639

```java

640

// Comprehensive error handling

641

public void handleMessage(Message message) {

642

if (message instanceof ChunkFetchFailure) {

643

ChunkFetchFailure failure = (ChunkFetchFailure) message;

644

logger.error("Chunk fetch failed for {}: {}",

645

failure.streamChunkId(), failure.errorString());

646

retryChunkFetch(failure.streamChunkId());

647

648

} else if (message instanceof RpcFailure) {

649

RpcFailure failure = (RpcFailure) message;

650

logger.error("RPC {} failed: {}", failure.requestId(), failure.errorString());

651

handleRpcFailure(failure.requestId(), failure.errorString());

652

653

} else if (message instanceof StreamFailure) {

654

StreamFailure failure = (StreamFailure) message;

655

logger.error("Stream {} failed: {}", failure.streamId(), failure.errorString());

656

cleanupStream(failure.streamId());

657

}

658

}

659

```