or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

buffer-management.mdclient-operations.mdconfiguration-utilities.mdindex.mdmessage-protocol.mdsasl-authentication.mdserver-operations.mdtransport-setup.md

message-protocol.mddocs/

0

# Message Protocol

1

2

Type-safe message protocol for network communication in Apache Spark. The protocol defines specific message types for RPC requests/responses, chunk fetching, and streaming operations, ensuring reliable and structured communication between clients and servers.

3

4

## Capabilities

5

6

### Core Message Interface

7

8

Base interface for all network messages in the Spark transport protocol.

9

10

```java { .api }

11

/**

12

* Message represents a unit of communication in the Spark transport protocol.

13

* All network messages implement this interface to provide type safety and structure.

14

*/

15

public interface Message {

16

/**

17

* Gets the message type identifier.

18

*

19

* @return The Type enum value for this message

20

*/

21

Type type();

22

23

/**

24

* Gets the message body as a ManagedBuffer.

25

*

26

* @return ManagedBuffer containing the message payload

27

*/

28

ManagedBuffer body();

29

30

/**

31

* Indicates whether the message body is included in the message frame.

32

*

33

* @return true if body is in frame, false if sent separately

34

*/

35

boolean isBodyInFrame();

36

37

/**

38

* Enumeration of all message types in the protocol.

39

*/

40

enum Type {

41

ChunkFetchRequest(0),

42

ChunkFetchSuccess(1),

43

ChunkFetchFailure(2),

44

RpcRequest(3),

45

RpcResponse(4),

46

RpcFailure(5),

47

StreamRequest(6),

48

StreamResponse(7),

49

StreamFailure(8),

50

OneWayMessage(9),

51

User(-1);

52

53

private final byte id;

54

55

Type(int id) {

56

this.id = (byte) id;

57

}

58

59

public byte id() { return id; }

60

}

61

}

62

```

63

64

### Message Category Interfaces

65

66

Marker interfaces for categorizing messages as requests or responses.

67

68

```java { .api }

69

/**

70

* Marker interface for request messages.

71

* All messages that initiate communication implement this interface.

72

*/

73

public interface RequestMessage extends Message {}

74

75

/**

76

* Marker interface for response messages.

77

* All messages that respond to requests implement this interface.

78

*/

79

public interface ResponseMessage extends Message {}

80

```

81

82

### Encoding Interface

83

84

Interface for objects that can be encoded to ByteBuf for network transmission.

85

86

```java { .api }

87

/**

88

* Interface for objects that can be encoded to ByteBuf.

89

* Used by the message protocol for efficient serialization.

90

*/

91

public interface Encodable {

92

/**

93

* Gets the encoded length of this object in bytes.

94

*

95

* @return The number of bytes needed to encode this object

96

*/

97

int encodedLength();

98

99

/**

100

* Encodes this object to the provided ByteBuf.

101

*

102

* @param buf The ByteBuf to encode to

103

*/

104

void encode(ByteBuf buf);

105

}

106

```

107

108

### RPC Messages

109

110

Messages for Remote Procedure Call operations.

111

112

```java { .api }

113

/**

114

* RPC request message containing a request ID and payload.

115

*/

116

public class RpcRequest extends AbstractMessage implements RequestMessage {

117

/** Unique identifier for this RPC request */

118

public final long requestId;

119

120

/**

121

* Creates an RPC request message.

122

*

123

* @param requestId Unique identifier for the request

124

* @param message The request payload as ManagedBuffer

125

*/

126

public RpcRequest(long requestId, ManagedBuffer message);

127

128

@Override

129

public Type type() { return Type.RpcRequest; }

130

131

@Override

132

public int encodedLength() { return 8; } // requestId is 8 bytes

133

134

@Override

135

public void encode(ByteBuf buf) {

136

buf.writeLong(requestId);

137

}

138

139

public static RpcRequest decode(ByteBuf buf) {

140

long requestId = buf.readLong();

141

return new RpcRequest(requestId, null); // Body handled separately

142

}

143

}

144

145

/**

146

* RPC response message containing the response to a previous request.

147

*/

148

public class RpcResponse extends AbstractResponseMessage {

149

/** Request ID this response corresponds to */

150

public final long requestId;

151

152

/**

153

* Creates an RPC response message.

154

*

155

* @param requestId The request ID this response corresponds to

156

* @param message The response payload as ManagedBuffer

157

*/

158

public RpcResponse(long requestId, ManagedBuffer message);

159

160

@Override

161

public Type type() { return Type.RpcResponse; }

162

}

163

164

/**

165

* RPC failure message indicating an RPC request failed.

166

*/

167

public class RpcFailure extends AbstractResponseMessage {

168

/** Request ID this failure corresponds to */

169

public final long requestId;

170

171

/** Error message describing the failure */

172

public final String errorString;

173

174

/**

175

* Creates an RPC failure message.

176

*

177

* @param requestId The request ID that failed

178

* @param errorString Description of the error

179

*/

180

public RpcFailure(long requestId, String errorString);

181

182

@Override

183

public Type type() { return Type.RpcFailure; }

184

}

185

```

186

187

### Chunk Fetch Messages

188

189

Messages for fetching individual chunks of data from streams.

190

191

```java { .api }

192

/**

193

* Request to fetch a specific chunk from a stream.

194

*/

195

public class ChunkFetchRequest extends AbstractMessage implements RequestMessage {

196

/** Identifier for the stream and chunk */

197

public final StreamChunkId streamChunkId;

198

199

/**

200

* Creates a chunk fetch request.

201

*

202

* @param streamChunkId Identifies the stream and chunk to fetch

203

*/

204

public ChunkFetchRequest(StreamChunkId streamChunkId);

205

206

@Override

207

public Type type() { return Type.ChunkFetchRequest; }

208

209

@Override

210

public int encodedLength() { return 12; } // streamId (8) + chunkIndex (4)

211

212

@Override

213

public void encode(ByteBuf buf) {

214

buf.writeLong(streamChunkId.streamId);

215

buf.writeInt(streamChunkId.chunkIndex);

216

}

217

218

public static ChunkFetchRequest decode(ByteBuf buf) {

219

long streamId = buf.readLong();

220

int chunkIndex = buf.readInt();

221

return new ChunkFetchRequest(new StreamChunkId(streamId, chunkIndex));

222

}

223

}

224

225

/**

226

* Successful response to a chunk fetch request containing the chunk data.

227

*/

228

public class ChunkFetchSuccess extends AbstractResponseMessage {

229

/** Identifier for the stream and chunk */

230

public final StreamChunkId streamChunkId;

231

232

/**

233

* Creates a successful chunk fetch response.

234

*

235

* @param streamChunkId Identifies the stream and chunk

236

* @param buffer The chunk data

237

*/

238

public ChunkFetchSuccess(StreamChunkId streamChunkId, ManagedBuffer buffer);

239

240

@Override

241

public Type type() { return Type.ChunkFetchSuccess; }

242

}

243

244

/**

245

* Failure response to a chunk fetch request indicating the fetch failed.

246

*/

247

public class ChunkFetchFailure extends AbstractResponseMessage {

248

/** Identifier for the stream and chunk that failed */

249

public final StreamChunkId streamChunkId;

250

251

/** Error message describing the failure */

252

public final String errorString;

253

254

/**

255

* Creates a chunk fetch failure response.

256

*

257

* @param streamChunkId Identifies the stream and chunk that failed

258

* @param errorString Description of the error

259

*/

260

public ChunkFetchFailure(StreamChunkId streamChunkId, String errorString);

261

262

@Override

263

public Type type() { return Type.ChunkFetchFailure; }

264

}

265

```

266

267

### Stream Messages

268

269

Messages for stream-based data transfer operations.

270

271

```java { .api }

272

/**

273

* Request to open a stream for data transfer.

274

*/

275

public class StreamRequest extends AbstractMessage implements RequestMessage {

276

/** String identifier for the stream to open */

277

public final String streamId;

278

279

/**

280

* Creates a stream request.

281

*

282

* @param streamId String identifier for the stream

283

*/

284

public StreamRequest(String streamId);

285

286

@Override

287

public Type type() { return Type.StreamRequest; }

288

}

289

290

/**

291

* Successful response to a stream request indicating the stream is ready.

292

*/

293

public class StreamResponse extends AbstractResponseMessage {

294

/** String identifier for the opened stream */

295

public final String streamId;

296

297

/** Total number of bytes in the stream */

298

public final long byteCount;

299

300

/**

301

* Creates a stream response.

302

*

303

* @param streamId String identifier for the stream

304

* @param byteCount Total number of bytes in the stream

305

*/

306

public StreamResponse(String streamId, long byteCount);

307

308

@Override

309

public Type type() { return Type.StreamResponse; }

310

}

311

312

/**

313

* Failure response to a stream request indicating the stream could not be opened.

314

*/

315

public class StreamFailure extends AbstractResponseMessage {

316

/** String identifier for the stream that failed */

317

public final String streamId;

318

319

/** Error message describing the failure */

320

public final String errorString;

321

322

/**

323

* Creates a stream failure response.

324

*

325

* @param streamId String identifier for the stream that failed

326

* @param errorString Description of the error

327

*/

328

public StreamFailure(String streamId, String errorString);

329

330

@Override

331

public Type type() { return Type.StreamFailure; }

332

}

333

```

334

335

### One-Way Messages

336

337

Messages that don't expect a response.

338

339

```java { .api }

340

/**

341

* One-way message that doesn't expect a response.

342

* Used for notifications, heartbeats, or fire-and-forget operations.

343

*/

344

public class OneWayMessage extends AbstractMessage implements RequestMessage {

345

/**

346

* Creates a one-way message.

347

*

348

* @param body The message payload as ManagedBuffer

349

*/

350

public OneWayMessage(ManagedBuffer body);

351

352

@Override

353

public Type type() { return Type.OneWayMessage; }

354

355

@Override

356

public boolean isBodyInFrame() { return false; }

357

}

358

```

359

360

### Stream and Chunk Identifiers

361

362

Utility classes for identifying streams and chunks.

363

364

```java { .api }

365

/**

366

* Identifier for a specific chunk within a stream.

367

* Combines stream ID and chunk index for unique chunk identification.

368

*/

369

public class StreamChunkId {

370

/** Numeric identifier for the stream */

371

public final long streamId;

372

373

/** Index of the chunk within the stream (0-based) */

374

public final int chunkIndex;

375

376

/**

377

* Creates a stream chunk identifier.

378

*

379

* @param streamId Numeric identifier for the stream

380

* @param chunkIndex Index of the chunk within the stream

381

*/

382

public StreamChunkId(long streamId, int chunkIndex);

383

384

@Override

385

public String toString() {

386

return "StreamChunkId{streamId=" + streamId + ", chunkIndex=" + chunkIndex + "}";

387

}

388

389

@Override

390

public boolean equals(Object other) {

391

if (this == other) return true;

392

if (other == null || getClass() != other.getClass()) return false;

393

394

StreamChunkId that = (StreamChunkId) other;

395

return streamId == that.streamId && chunkIndex == that.chunkIndex;

396

}

397

398

@Override

399

public int hashCode() {

400

return Objects.hash(streamId, chunkIndex);

401

}

402

}

403

```

404

405

### Message Encoding and Decoding

406

407

Utilities for message serialization and deserialization.

408

409

```java { .api }

410

/**

411

* Utility class providing encoding and decoding functions for protocol messages.

412

*/

413

public class Encoders {

414

/**

415

* Decodes a message from a ByteBuf based on message type.

416

*

417

* @param msgType The message type to decode

418

* @param in ByteBuf containing the encoded message

419

* @return Decoded Message instance

420

*/

421

public static Message decode(Message.Type msgType, ByteBuf in);

422

423

/**

424

* Encodes a message to a ByteBuf.

425

*

426

* @param msg The message to encode

427

* @param out ByteBuf to write the encoded message to

428

*/

429

public static void encode(Message msg, ByteBuf out);

430

}

431

432

/**

433

* Netty decoder for converting ByteBuf to Message objects.

434

*/

435

public class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {

436

@Override

437

protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out);

438

}

439

440

/**

441

* Netty encoder for converting Message objects to ByteBuf.

442

*/

443

public class MessageEncoder extends MessageToByteEncoder<Message> {

444

@Override

445

protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out);

446

}

447

```

448

449

## Usage Examples

450

451

### Creating and Sending Messages

452

453

```java

454

import org.apache.spark.network.protocol.*;

455

import org.apache.spark.network.buffer.NioManagedBuffer;

456

import java.nio.ByteBuffer;

457

458

// Create an RPC request

459

ByteBuffer requestData = ByteBuffer.wrap("Hello Server".getBytes());

460

ManagedBuffer requestBuffer = new NioManagedBuffer(requestData);

461

RpcRequest rpcRequest = new RpcRequest(12345L, requestBuffer);

462

463

// Create a chunk fetch request

464

StreamChunkId chunkId = new StreamChunkId(1001L, 5);

465

ChunkFetchRequest chunkRequest = new ChunkFetchRequest(chunkId);

466

467

// Create a stream request

468

StreamRequest streamRequest = new StreamRequest("data-stream-001");

469

470

// Create a one-way message

471

ByteBuffer notificationData = ByteBuffer.wrap("Status update".getBytes());

472

ManagedBuffer notificationBuffer = new NioManagedBuffer(notificationData);

473

OneWayMessage notification = new OneWayMessage(notificationBuffer);

474

```

475

476

### Handling Responses

477

478

```java

479

// Handle different response types

480

public void handleResponse(Message response) {

481

switch (response.type()) {

482

case RpcResponse:

483

RpcResponse rpcResp = (RpcResponse) response;

484

System.out.println("RPC response for request: " + rpcResp.requestId);

485

processRpcResponse(rpcResp.body());

486

break;

487

488

case RpcFailure:

489

RpcFailure rpcFail = (RpcFailure) response;

490

System.err.println("RPC " + rpcFail.requestId + " failed: " + rpcFail.errorString);

491

break;

492

493

case ChunkFetchSuccess:

494

ChunkFetchSuccess chunkSuccess = (ChunkFetchSuccess) response;

495

System.out.println("Received chunk: " + chunkSuccess.streamChunkId);

496

processChunkData(chunkSuccess.body());

497

break;

498

499

case ChunkFetchFailure:

500

ChunkFetchFailure chunkFail = (ChunkFetchFailure) response;

501

System.err.println("Chunk fetch failed: " + chunkFail.streamChunkId +

502

" - " + chunkFail.errorString);

503

break;

504

505

case StreamResponse:

506

StreamResponse streamResp = (StreamResponse) response;

507

System.out.println("Stream " + streamResp.streamId +

508

" opened with " + streamResp.byteCount + " bytes");

509

break;

510

511

case StreamFailure:

512

StreamFailure streamFail = (StreamFailure) response;

513

System.err.println("Stream " + streamFail.streamId +

514

" failed: " + streamFail.errorString);

515

break;

516

517

default:

518

System.err.println("Unknown response type: " + response.type());

519

}

520

}

521

```

522

523

### Custom Message Processing

524

525

```java

526

public class MessageProcessor {

527

public void processMessage(Message message) {

528

// Check if message has body

529

if (message.body() != null && message.body().size() > 0) {

530

try {

531

ByteBuffer bodyData = message.body().nioByteBuffer();

532

533

// Process based on message type

534

if (message instanceof RpcRequest) {

535

processRpcRequest((RpcRequest) message, bodyData);

536

} else if (message instanceof ChunkFetchRequest) {

537

processChunkRequest((ChunkFetchRequest) message);

538

} else if (message instanceof StreamRequest) {

539

processStreamRequest((StreamRequest) message);

540

} else if (message instanceof OneWayMessage) {

541

processOneWayMessage(bodyData);

542

}

543

544

} catch (Exception e) {

545

System.err.println("Error processing message: " + e.getMessage());

546

} finally {

547

// Important: Release buffer when done

548

if (message.body() != null) {

549

message.body().release();

550

}

551

}

552

}

553

}

554

555

private void processRpcRequest(RpcRequest request, ByteBuffer body) {

556

System.out.println("Processing RPC request " + request.requestId);

557

558

// Extract request data

559

byte[] requestBytes = new byte[body.remaining()];

560

body.get(requestBytes);

561

String requestString = new String(requestBytes);

562

563

// Process request and generate response

564

// (Response would be sent via callback in actual implementation)

565

}

566

567

private void processChunkRequest(ChunkFetchRequest request) {

568

System.out.println("Processing chunk request: " + request.streamChunkId);

569

570

// Look up chunk data and prepare response

571

// (Would use StreamManager in actual implementation)

572

}

573

574

private void processStreamRequest(StreamRequest request) {

575

System.out.println("Processing stream request: " + request.streamId);

576

577

// Open stream and prepare response

578

// (Would use StreamManager in actual implementation)

579

}

580

581

private void processOneWayMessage(ByteBuffer body) {

582

byte[] messageBytes = new byte[body.remaining()];

583

body.get(messageBytes);

584

String message = new String(messageBytes);

585

586

System.out.println("Received one-way message: " + message);

587

// Process notification without sending response

588

}

589

}

590

```

591

592

### Message Validation

593

594

```java

595

public class MessageValidator {

596

public static boolean validateMessage(Message message) {

597

if (message == null) {

598

return false;

599

}

600

601

// Validate message type

602

if (message.type() == null) {

603

return false;

604

}

605

606

// Type-specific validation

607

switch (message.type()) {

608

case RpcRequest:

609

return validateRpcRequest((RpcRequest) message);

610

case ChunkFetchRequest:

611

return validateChunkFetchRequest((ChunkFetchRequest) message);

612

case StreamRequest:

613

return validateStreamRequest((StreamRequest) message);

614

default:

615

return true; // Other types are valid by default

616

}

617

}

618

619

private static boolean validateRpcRequest(RpcRequest request) {

620

// Validate request ID is positive

621

if (request.requestId <= 0) {

622

return false;

623

}

624

625

// Validate body exists and has reasonable size

626

if (request.body() == null || request.body().size() > MAX_RPC_SIZE) {

627

return false;

628

}

629

630

return true;

631

}

632

633

private static boolean validateChunkFetchRequest(ChunkFetchRequest request) {

634

if (request.streamChunkId == null) {

635

return false;

636

}

637

638

// Validate stream ID and chunk index

639

return request.streamChunkId.streamId > 0 &&

640

request.streamChunkId.chunkIndex >= 0;

641

}

642

643

private static boolean validateStreamRequest(StreamRequest request) {

644

// Validate stream ID is not null or empty

645

return request.streamId != null && !request.streamId.trim().isEmpty();

646

}

647

648

private static final long MAX_RPC_SIZE = 16 * 1024 * 1024; // 16MB

649

}

650

```