or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

buffer-management.mdclient-operations.mdconfiguration-utilities.mdindex.mdmessage-protocol.mdsasl-authentication.mdserver-operations.mdtransport-setup.md

buffer-management.mddocs/

0

# Buffer Management

1

2

Unified buffer management system providing abstractions over different buffer types including memory, file segments, and Netty ByteBufs. The buffer system enables efficient zero-copy operations and resource management for network data transfer.

3

4

## Capabilities

5

6

### ManagedBuffer

7

8

Abstract base class for all buffer implementations, providing a unified interface for different data sources.

9

10

```java { .api }

11

/**

12

* ManagedBuffer represents an immutable buffer of data with reference counting

13

* and multiple access methods. It abstracts over different buffer types to provide

14

* a unified interface for network data transfer.

15

*/

16

public abstract class ManagedBuffer {

17

/**

18

* Gets the size of the buffer in bytes.

19

*

20

* @return The buffer size in bytes

21

*/

22

public abstract long size();

23

24

/**

25

* Creates a read-only ByteBuffer view of this buffer.

26

* For large buffers, this may trigger I/O operations.

27

*

28

* @return ByteBuffer containing the buffer data

29

* @throws IOException if buffer cannot be read

30

*/

31

public abstract ByteBuffer nioByteBuffer() throws IOException;

32

33

/**

34

* Creates an InputStream for reading the buffer data.

35

* Allows streaming access to large buffers without loading everything into memory.

36

*

37

* @return InputStream for reading buffer data

38

* @throws IOException if stream cannot be created

39

*/

40

public abstract InputStream createInputStream() throws IOException;

41

42

/**

43

* Increments the reference count for this buffer.

44

* Must be called when sharing buffer references to prevent premature cleanup.

45

*

46

* @return This buffer instance for method chaining

47

*/

48

public abstract ManagedBuffer retain();

49

50

/**

51

* Decrements the reference count and releases resources if count reaches zero.

52

* Must be called when done with a buffer to prevent memory leaks.

53

*

54

* @return This buffer instance for method chaining

55

*/

56

public abstract ManagedBuffer release();

57

58

/**

59

* Converts this buffer to a Netty-compatible object for efficient network transfer.

60

* The exact return type depends on the buffer implementation.

61

*

62

* @return Netty-compatible object (typically ByteBuf or FileRegion)

63

* @throws IOException if conversion fails

64

*/

65

public abstract Object convertToNetty() throws IOException;

66

}

67

```

68

69

### Buffer Implementations

70

71

#### NioManagedBuffer

72

73

ByteBuffer-backed managed buffer for in-memory data.

74

75

```java { .api }

76

/**

77

* ManagedBuffer implementation backed by a Java NIO ByteBuffer.

78

* Best for small to medium-sized data that fits in memory.

79

*/

80

public class NioManagedBuffer extends ManagedBuffer {

81

/**

82

* Creates a managed buffer from a ByteBuffer.

83

* The ByteBuffer should be ready for reading (position at start, limit at end).

84

*

85

* @param buf The ByteBuffer to wrap

86

*/

87

public NioManagedBuffer(ByteBuffer buf);

88

89

@Override

90

public long size() {

91

return buf.remaining();

92

}

93

94

@Override

95

public ByteBuffer nioByteBuffer() throws IOException {

96

return buf.duplicate(); // Returns a duplicate to avoid position changes

97

}

98

99

@Override

100

public InputStream createInputStream() throws IOException {

101

return new ByteBufferInputStream(buf);

102

}

103

104

@Override

105

public ManagedBuffer retain() {

106

return this; // NIO buffers don't need reference counting

107

}

108

109

@Override

110

public ManagedBuffer release() {

111

return this; // NIO buffers don't need explicit release

112

}

113

114

@Override

115

public Object convertToNetty() throws IOException {

116

return Unpooled.wrappedBuffer(buf);

117

}

118

}

119

```

120

121

#### NettyManagedBuffer

122

123

Netty ByteBuf-backed managed buffer for integration with Netty pipelines.

124

125

```java { .api }

126

/**

127

* ManagedBuffer implementation backed by a Netty ByteBuf.

128

* Provides direct integration with Netty's memory management and zero-copy operations.

129

*/

130

public class NettyManagedBuffer extends ManagedBuffer {

131

/**

132

* Creates a managed buffer from a Netty ByteBuf.

133

* The buffer takes ownership of the ByteBuf and manages its lifecycle.

134

*

135

* @param buf The Netty ByteBuf to wrap

136

*/

137

public NettyManagedBuffer(ByteBuf buf);

138

139

@Override

140

public long size() {

141

return buf.readableBytes();

142

}

143

144

@Override

145

public ByteBuffer nioByteBuffer() throws IOException {

146

return buf.nioBuffer(); // Zero-copy access to underlying ByteBuffer

147

}

148

149

@Override

150

public InputStream createInputStream() throws IOException {

151

return new ByteBufInputStream(buf);

152

}

153

154

@Override

155

public ManagedBuffer retain() {

156

buf.retain();

157

return this;

158

}

159

160

@Override

161

public ManagedBuffer release() {

162

buf.release();

163

return this;

164

}

165

166

@Override

167

public Object convertToNetty() throws IOException {

168

return buf.duplicate(); // Return duplicate to preserve original

169

}

170

}

171

```

172

173

#### FileSegmentManagedBuffer

174

175

File-backed managed buffer for efficient access to file segments without loading into memory.

176

177

```java { .api }

178

/**

179

* ManagedBuffer implementation backed by a file segment.

180

* Enables efficient zero-copy transfer of file data over the network using FileRegion.

181

* Best for large data that should remain on disk.

182

*/

183

public class FileSegmentManagedBuffer extends ManagedBuffer {

184

/**

185

* Creates a managed buffer for a segment of a file.

186

*

187

* @param conf Transport configuration for I/O settings

188

* @param file The file to read from

189

* @param offset Starting offset within the file (0-based)

190

* @param length Number of bytes to include in the segment

191

*/

192

public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length);

193

194

@Override

195

public long size() {

196

return length;

197

}

198

199

@Override

200

public ByteBuffer nioByteBuffer() throws IOException {

201

// For small files, read into memory

202

if (length <= conf.memoryMapBytes()) {

203

return mapFileSegment();

204

} else {

205

throw new IOException("File segment too large for memory mapping: " + length);

206

}

207

}

208

209

@Override

210

public InputStream createInputStream() throws IOException {

211

// Create stream that reads only the specified segment

212

return new LimitedInputStream(

213

new FileInputStream(file).skip(offset),

214

length

215

);

216

}

217

218

@Override

219

public ManagedBuffer retain() {

220

return this; // File buffers don't need reference counting

221

}

222

223

@Override

224

public ManagedBuffer release() {

225

return this; // File buffers don't need explicit release

226

}

227

228

@Override

229

public Object convertToNetty() throws IOException {

230

// Use Netty's FileRegion for zero-copy file transfer

231

return new DefaultFileRegion(file, offset, length);

232

}

233

234

/**

235

* Gets the underlying file.

236

*

237

* @return The file this buffer reads from

238

*/

239

public File getFile() {

240

return file;

241

}

242

243

/**

244

* Gets the offset within the file.

245

*

246

* @return The starting offset in bytes

247

*/

248

public long getOffset() {

249

return offset;

250

}

251

252

/**

253

* Gets the length of the segment.

254

*

255

* @return The segment length in bytes

256

*/

257

public long getLength() {

258

return length;

259

}

260

261

private ByteBuffer mapFileSegment() throws IOException {

262

try (RandomAccessFile raf = new RandomAccessFile(file, "r");

263

FileChannel channel = raf.getChannel()) {

264

265

return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);

266

}

267

}

268

}

269

```

270

271

### Utility Classes

272

273

#### LazyFileRegion

274

275

Lazy-loading file region for deferred file access.

276

277

```java { .api }

278

/**

279

* FileRegion implementation that defers file opening until transfer time.

280

* Useful for managing many file references without keeping file handles open.

281

*/

282

public class LazyFileRegion extends AbstractReferenceCounted implements FileRegion {

283

/**

284

* Creates a lazy file region.

285

*

286

* @param file The file to transfer

287

* @param position Starting position within the file

288

* @param count Number of bytes to transfer

289

*/

290

public LazyFileRegion(File file, long position, long count);

291

292

@Override

293

public long position() {

294

return position;

295

}

296

297

@Override

298

public long count() {

299

return count;

300

}

301

302

@Override

303

public long transferTo(WritableByteChannel target, long position) throws IOException {

304

// Opens file and transfers data on-demand

305

return transferToChannel(target, position);

306

}

307

308

/**

309

* Gets the file this region references.

310

*

311

* @return The underlying file

312

*/

313

public File file() {

314

return file;

315

}

316

}

317

```

318

319

## Usage Examples

320

321

### Creating and Using Buffers

322

323

```java

324

import org.apache.spark.network.buffer.*;

325

import java.nio.ByteBuffer;

326

import java.io.File;

327

import java.io.InputStream;

328

329

// Create buffer from byte array

330

byte[] data = "Hello, World!".getBytes();

331

ByteBuffer byteBuffer = ByteBuffer.wrap(data);

332

ManagedBuffer nioBuffer = new NioManagedBuffer(byteBuffer);

333

334

System.out.println("Buffer size: " + nioBuffer.size());

335

336

// Create buffer from file segment

337

File dataFile = new File("/path/to/large-file.dat");

338

long offset = 1024; // Start at 1KB

339

long length = 64 * 1024; // Read 64KB

340

ManagedBuffer fileBuffer = new FileSegmentManagedBuffer(conf, dataFile, offset, length);

341

342

// Create buffer from Netty ByteBuf

343

ByteBuf nettyBuf = Unpooled.copiedBuffer("Netty data", StandardCharsets.UTF_8);

344

ManagedBuffer nettyBuffer = new NettyManagedBuffer(nettyBuf);

345

```

346

347

### Buffer Access Patterns

348

349

```java

350

public void processBuffer(ManagedBuffer buffer) {

351

try {

352

System.out.println("Processing buffer of size: " + buffer.size());

353

354

// Method 1: Direct ByteBuffer access (for small buffers)

355

if (buffer.size() < 1024 * 1024) { // Less than 1MB

356

try {

357

ByteBuffer bb = buffer.nioByteBuffer();

358

processDirectly(bb);

359

} catch (IOException e) {

360

System.err.println("Could not get ByteBuffer: " + e.getMessage());

361

}

362

}

363

364

// Method 2: Stream access (for large buffers or when size doesn't matter)

365

try (InputStream stream = buffer.createInputStream()) {

366

processStream(stream);

367

} catch (IOException e) {

368

System.err.println("Could not create stream: " + e.getMessage());

369

}

370

371

} finally {

372

// Important: Always release buffer when done

373

buffer.release();

374

}

375

}

376

377

private void processDirectly(ByteBuffer buffer) {

378

// Process data directly from ByteBuffer

379

while (buffer.hasRemaining()) {

380

byte b = buffer.get();

381

// Process byte...

382

}

383

}

384

385

private void processStream(InputStream stream) throws IOException {

386

// Process data from stream

387

byte[] chunk = new byte[8192];

388

int bytesRead;

389

390

while ((bytesRead = stream.read(chunk)) != -1) {

391

// Process chunk...

392

processChunk(chunk, bytesRead);

393

}

394

}

395

```

396

397

### Reference Counting and Resource Management

398

399

```java

400

public class BufferManager {

401

private final List<ManagedBuffer> activeBuffers = new ArrayList<>();

402

403

public ManagedBuffer shareBuffer(ManagedBuffer original) {

404

// Retain buffer for sharing

405

ManagedBuffer shared = original.retain();

406

activeBuffers.add(shared);

407

return shared;

408

}

409

410

public void processBufferAsync(ManagedBuffer buffer) {

411

// Retain buffer before passing to async operation

412

ManagedBuffer retained = buffer.retain();

413

414

CompletableFuture.runAsync(() -> {

415

try {

416

// Process buffer in background thread

417

processBuffer(retained);

418

} finally {

419

// Always release in finally block

420

retained.release();

421

}

422

});

423

}

424

425

public void cleanup() {

426

// Release all active buffers

427

for (ManagedBuffer buffer : activeBuffers) {

428

buffer.release();

429

}

430

activeBuffers.clear();

431

}

432

}

433

```

434

435

### Zero-Copy Network Transfer

436

437

```java

438

public class NetworkTransferExample {

439

public void sendBufferOverNetwork(ManagedBuffer buffer, Channel channel) {

440

try {

441

// Convert buffer to Netty object for efficient transfer

442

Object nettyObject = buffer.convertToNetty();

443

444

if (nettyObject instanceof ByteBuf) {

445

// Direct ByteBuf transfer

446

ByteBuf byteBuf = (ByteBuf) nettyObject;

447

channel.writeAndFlush(byteBuf);

448

449

} else if (nettyObject instanceof FileRegion) {

450

// Zero-copy file transfer

451

FileRegion fileRegion = (FileRegion) nettyObject;

452

channel.writeAndFlush(fileRegion);

453

454

} else {

455

System.err.println("Unsupported Netty object type: " + nettyObject.getClass());

456

}

457

458

} catch (IOException e) {

459

System.err.println("Failed to convert buffer for network transfer: " + e.getMessage());

460

}

461

}

462

463

public void sendFileSegment(File file, long offset, long length, Channel channel) {

464

// Create file buffer for zero-copy transfer

465

FileSegmentManagedBuffer fileBuffer = new FileSegmentManagedBuffer(conf, file, offset, length);

466

467

try {

468

// Get FileRegion for efficient file transfer

469

FileRegion fileRegion = (FileRegion) fileBuffer.convertToNetty();

470

471

// Send with completion handling

472

ChannelFuture future = channel.writeAndFlush(fileRegion);

473

future.addListener(new ChannelFutureListener() {

474

@Override

475

public void operationComplete(ChannelFuture future) {

476

if (future.isSuccess()) {

477

System.out.println("File segment sent successfully");

478

} else {

479

System.err.println("Failed to send file segment: " + future.cause());

480

}

481

482

// Release buffer after transfer

483

fileBuffer.release();

484

}

485

});

486

487

} catch (IOException e) {

488

System.err.println("Failed to prepare file for transfer: " + e.getMessage());

489

fileBuffer.release();

490

}

491

}

492

}

493

```

494

495

### Buffer Factory Pattern

496

497

```java

498

public class BufferFactory {

499

private final TransportConf conf;

500

501

public BufferFactory(TransportConf conf) {

502

this.conf = conf;

503

}

504

505

public ManagedBuffer createFromBytes(byte[] data) {

506

ByteBuffer buffer = ByteBuffer.wrap(data);

507

return new NioManagedBuffer(buffer);

508

}

509

510

public ManagedBuffer createFromString(String text) {

511

return createFromBytes(text.getBytes(StandardCharsets.UTF_8));

512

}

513

514

public ManagedBuffer createFromFile(File file) {

515

return new FileSegmentManagedBuffer(conf, file, 0, file.length());

516

}

517

518

public ManagedBuffer createFileSegment(File file, long offset, long length) {

519

// Validate parameters

520

if (offset < 0 || length < 0) {

521

throw new IllegalArgumentException("Offset and length must be non-negative");

522

}

523

524

if (offset + length > file.length()) {

525

throw new IllegalArgumentException("Segment extends beyond file end");

526

}

527

528

return new FileSegmentManagedBuffer(conf, file, offset, length);

529

}

530

531

public ManagedBuffer createFromNettyBuf(ByteBuf buf) {

532

return new NettyManagedBuffer(buf);

533

}

534

535

public ManagedBuffer createEmpty() {

536

return new NioManagedBuffer(ByteBuffer.allocate(0));

537

}

538

}

539

```

540

541

### Buffer Composition and Chaining

542

543

```java

544

public class CompositeBufferExample {

545

public ManagedBuffer combineBuffers(List<ManagedBuffer> buffers) {

546

// Calculate total size

547

long totalSize = buffers.stream().mapToLong(ManagedBuffer::size).sum();

548

549

if (totalSize > Integer.MAX_VALUE) {

550

throw new IllegalArgumentException("Combined buffer too large: " + totalSize);

551

}

552

553

// Combine into single ByteBuffer

554

ByteBuffer combined = ByteBuffer.allocate((int) totalSize);

555

556

for (ManagedBuffer buffer : buffers) {

557

try {

558

ByteBuffer bb = buffer.nioByteBuffer();

559

combined.put(bb);

560

} catch (IOException e) {

561

throw new RuntimeException("Failed to read buffer", e);

562

} finally {

563

buffer.release(); // Release source buffers

564

}

565

}

566

567

combined.flip(); // Prepare for reading

568

return new NioManagedBuffer(combined);

569

}

570

571

public void streamBuffersSequentially(List<ManagedBuffer> buffers, OutputStream output)

572

throws IOException {

573

574

for (ManagedBuffer buffer : buffers) {

575

try (InputStream input = buffer.createInputStream()) {

576

byte[] chunk = new byte[8192];

577

int bytesRead;

578

579

while ((bytesRead = input.read(chunk)) != -1) {

580

output.write(chunk, 0, bytesRead);

581

}

582

583

} finally {

584

buffer.release();

585

}

586

}

587

}

588

}