or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

buffer-management.mdclient-operations.mdconfiguration-management.mdindex.mdmessage-protocol.mdsecurity-authentication.mdserver-operations.mdshuffle-database.mdtransport-context.md

buffer-management.mddocs/

0

# Buffer Management

1

2

The buffer management API provides efficient, zero-copy buffer operations for Apache Spark's networking layer. The `ManagedBuffer` abstract class serves as the foundation for different buffer implementations, enabling efficient memory usage and data transfer with various backing stores.

3

4

## Capabilities

5

6

### ManagedBuffer (Abstract Class)

7

8

Abstract base class for immutable byte data views with different backing implementations.

9

10

```java { .api }

11

/**

12

* Get the size of the buffer in bytes

13

* @return long representing the number of bytes in the buffer

14

*/

15

public abstract long size();

16

17

/**

18

* Get a NIO ByteBuffer view of this buffer's data

19

* @return ByteBuffer containing the buffer data

20

* @throws IOException if the buffer data cannot be accessed

21

*/

22

public abstract ByteBuffer nioByteBuffer() throws IOException;

23

24

/**

25

* Create an InputStream for reading the buffer data

26

* @return InputStream for sequential reading of buffer contents

27

* @throws IOException if the stream cannot be created

28

*/

29

public abstract InputStream createInputStream() throws IOException;

30

31

/**

32

* Increment the reference count of this buffer

33

* @return ManagedBuffer instance for method chaining

34

*/

35

public abstract ManagedBuffer retain();

36

37

/**

38

* Decrement the reference count and release resources if count reaches zero

39

* @return ManagedBuffer instance for method chaining

40

*/

41

public abstract ManagedBuffer release();

42

43

/**

44

* Convert this buffer to a Netty-compatible object for efficient network transfer

45

* @return Object suitable for Netty channel operations (typically ByteBuf or FileRegion)

46

* @throws IOException if conversion fails

47

*/

48

public abstract Object convertToNetty() throws IOException;

49

```

50

51

## Buffer Implementations

52

53

### NioManagedBuffer

54

55

Managed buffer implementation backed by a NIO ByteBuffer, suitable for in-memory data.

56

57

```java { .api }

58

/**

59

* Create a managed buffer from a NIO ByteBuffer

60

* @param buf - ByteBuffer containing the data (position and limit are preserved)

61

*/

62

public NioManagedBuffer(ByteBuffer buf);

63

64

@Override

65

public long size();

66

67

@Override

68

public ByteBuffer nioByteBuffer() throws IOException;

69

70

@Override

71

public InputStream createInputStream() throws IOException;

72

73

@Override

74

public ManagedBuffer retain();

75

76

@Override

77

public ManagedBuffer release();

78

79

@Override

80

public Object convertToNetty() throws IOException;

81

```

82

83

### FileSegmentManagedBuffer

84

85

Managed buffer implementation backed by a file segment, enabling efficient zero-copy file transfers.

86

87

```java { .api }

88

/**

89

* Create a managed buffer from a file segment

90

* @param file - File to read data from

91

* @param offset - Starting position in the file

92

* @param length - Number of bytes to include from the file

93

*/

94

public FileSegmentManagedBuffer(File file, long offset, long length);

95

96

/**

97

* Create a managed buffer from a complete file

98

* @param file - File to read data from (entire file)

99

*/

100

public FileSegmentManagedBuffer(File file);

101

102

@Override

103

public long size();

104

105

@Override

106

public ByteBuffer nioByteBuffer() throws IOException;

107

108

@Override

109

public InputStream createInputStream() throws IOException;

110

111

@Override

112

public ManagedBuffer retain();

113

114

@Override

115

public ManagedBuffer release();

116

117

@Override

118

public Object convertToNetty() throws IOException;

119

120

/**

121

* Get the underlying file

122

* @return File object backing this buffer

123

*/

124

public File getFile();

125

126

/**

127

* Get the offset within the file

128

* @return long representing the starting position in bytes

129

*/

130

public long getOffset();

131

132

/**

133

* Get the length of the file segment

134

* @return long representing the number of bytes in the segment

135

*/

136

public long getLength();

137

```

138

139

### NettyManagedBuffer

140

141

Managed buffer implementation backed by a Netty ByteBuf with reference counting support.

142

143

```java { .api }

144

/**

145

* Create a managed buffer from a Netty ByteBuf

146

* @param buf - ByteBuf containing the data (reference count is managed)

147

*/

148

public NettyManagedBuffer(ByteBuf buf);

149

150

@Override

151

public long size();

152

153

@Override

154

public ByteBuffer nioByteBuffer() throws IOException;

155

156

@Override

157

public InputStream createInputStream() throws IOException;

158

159

@Override

160

public ManagedBuffer retain();

161

162

@Override

163

public ManagedBuffer release();

164

165

@Override

166

public Object convertToNetty() throws IOException;

167

168

/**

169

* Get the underlying Netty ByteBuf

170

* @return ByteBuf backing this managed buffer

171

*/

172

public ByteBuf getBuf();

173

```

174

175

## Usage Examples

176

177

### Working with NioManagedBuffer

178

179

```java

180

import org.apache.spark.network.buffer.NioManagedBuffer;

181

import java.nio.ByteBuffer;

182

183

// Create buffer from byte array

184

byte[] data = "Hello, Spark Network!".getBytes();

185

ByteBuffer byteBuffer = ByteBuffer.wrap(data);

186

NioManagedBuffer buffer = new NioManagedBuffer(byteBuffer);

187

188

// Get buffer information

189

System.out.println("Buffer size: " + buffer.size() + " bytes");

190

191

// Read data as ByteBuffer

192

try {

193

ByteBuffer nioBuffer = buffer.nioByteBuffer();

194

byte[] readData = new byte[nioBuffer.remaining()];

195

nioBuffer.get(readData);

196

System.out.println("Buffer content: " + new String(readData));

197

} catch (IOException e) {

198

System.err.println("Failed to read buffer: " + e.getMessage());

199

}

200

201

// Read data as InputStream

202

try (InputStream inputStream = buffer.createInputStream()) {

203

byte[] streamData = inputStream.readAllBytes();

204

System.out.println("Stream content: " + new String(streamData));

205

} catch (IOException e) {

206

System.err.println("Failed to read stream: " + e.getMessage());

207

}

208

209

// Reference counting (NioManagedBuffer doesn't actually count, but follows the interface)

210

buffer.retain(); // Increment reference count

211

buffer.release(); // Decrement reference count

212

```

213

214

### Working with FileSegmentManagedBuffer

215

216

```java

217

import org.apache.spark.network.buffer.FileSegmentManagedBuffer;

218

import java.io.File;

219

import java.io.FileWriter;

220

221

// Create a test file

222

File testFile = new File("test-data.txt");

223

try (FileWriter writer = new FileWriter(testFile)) {

224

writer.write("This is test data for FileSegmentManagedBuffer demonstration.");

225

}

226

227

// Create buffer for entire file

228

FileSegmentManagedBuffer fileBuffer = new FileSegmentManagedBuffer(testFile);

229

System.out.println("File buffer size: " + fileBuffer.size() + " bytes");

230

System.out.println("File: " + fileBuffer.getFile().getName());

231

System.out.println("Offset: " + fileBuffer.getOffset());

232

System.out.println("Length: " + fileBuffer.getLength());

233

234

// Create buffer for file segment

235

FileSegmentManagedBuffer segmentBuffer = new FileSegmentManagedBuffer(testFile, 10, 20);

236

System.out.println("Segment buffer size: " + segmentBuffer.size() + " bytes");

237

238

// Read file segment data

239

try {

240

ByteBuffer segmentData = segmentBuffer.nioByteBuffer();

241

byte[] segmentBytes = new byte[segmentData.remaining()];

242

segmentData.get(segmentBytes);

243

System.out.println("Segment content: " + new String(segmentBytes));

244

} catch (IOException e) {

245

System.err.println("Failed to read segment: " + e.getMessage());

246

}

247

248

// Zero-copy conversion for Netty

249

try {

250

Object nettyObject = fileBuffer.convertToNetty();

251

System.out.println("Netty object type: " + nettyObject.getClass().getSimpleName());

252

// This typically returns a FileRegion for efficient zero-copy transfer

253

} catch (IOException e) {

254

System.err.println("Failed to convert to Netty: " + e.getMessage());

255

}

256

257

// Cleanup

258

testFile.delete();

259

```

260

261

### Working with NettyManagedBuffer

262

263

```java

264

import org.apache.spark.network.buffer.NettyManagedBuffer;

265

import io.netty.buffer.ByteBuf;

266

import io.netty.buffer.Unpooled;

267

268

// Create Netty ByteBuf

269

byte[] data = "Netty-backed buffer data".getBytes();

270

ByteBuf byteBuf = Unpooled.copiedBuffer(data);

271

272

// Create managed buffer

273

NettyManagedBuffer nettyBuffer = new NettyManagedBuffer(byteBuf);

274

System.out.println("Netty buffer size: " + nettyBuffer.size() + " bytes");

275

276

// Reference counting is important with Netty buffers

277

nettyBuffer.retain(); // Increment reference count

278

System.out.println("ByteBuf reference count: " + nettyBuffer.getBuf().refCnt());

279

280

// Read data

281

try {

282

ByteBuffer nioView = nettyBuffer.nioByteBuffer();

283

byte[] readData = new byte[nioView.remaining()];

284

nioView.get(readData);

285

System.out.println("Netty buffer content: " + new String(readData));

286

} catch (IOException e) {

287

System.err.println("Failed to read Netty buffer: " + e.getMessage());

288

}

289

290

// Release references (important to prevent memory leaks)

291

nettyBuffer.release(); // Decrement reference count

292

nettyBuffer.release(); // Final release

293

```

294

295

### Buffer Reference Management

296

297

```java

298

// Proper reference counting pattern

299

ManagedBuffer buffer = new NioManagedBuffer(ByteBuffer.wrap("data".getBytes()));

300

301

try {

302

// Retain buffer for async operation

303

buffer.retain();

304

305

// Pass buffer to async operation

306

asyncOperation(buffer, new Callback() {

307

@Override

308

public void onComplete() {

309

// Release buffer when async operation completes

310

buffer.release();

311

}

312

313

@Override

314

public void onError(Throwable e) {

315

// Always release buffer, even on error

316

buffer.release();

317

}

318

});

319

320

} finally {

321

// Release original reference

322

buffer.release();

323

}

324

```

325

326

### Buffer Conversion for Network Transfer

327

328

```java

329

// Convert different buffer types for Netty transfer

330

void transferBuffer(ManagedBuffer buffer, Channel channel) {

331

try {

332

Object nettyObject = buffer.convertToNetty();

333

334

if (nettyObject instanceof ByteBuf) {

335

// Direct ByteBuf transfer

336

ByteBuf byteBuf = (ByteBuf) nettyObject;

337

channel.writeAndFlush(byteBuf);

338

} else if (nettyObject instanceof FileRegion) {

339

// Zero-copy file transfer

340

FileRegion fileRegion = (FileRegion) nettyObject;

341

channel.writeAndFlush(fileRegion);

342

} else {

343

// Fallback to ByteBuf

344

ByteBuffer nioBuffer = buffer.nioByteBuffer();

345

ByteBuf byteBuf = Unpooled.wrappedBuffer(nioBuffer);

346

channel.writeAndFlush(byteBuf);

347

}

348

} catch (IOException e) {

349

System.err.println("Failed to transfer buffer: " + e.getMessage());

350

}

351

}

352

```

353

354

### Custom Buffer Implementation

355

356

```java

357

// Example of implementing a custom ManagedBuffer

358

public class StringManagedBuffer extends ManagedBuffer {

359

private final String data;

360

private final byte[] bytes;

361

362

public StringManagedBuffer(String data) {

363

this.data = data;

364

this.bytes = data.getBytes(StandardCharsets.UTF_8);

365

}

366

367

@Override

368

public long size() {

369

return bytes.length;

370

}

371

372

@Override

373

public ByteBuffer nioByteBuffer() throws IOException {

374

return ByteBuffer.wrap(bytes).asReadOnlyBuffer();

375

}

376

377

@Override

378

public InputStream createInputStream() throws IOException {

379

return new ByteArrayInputStream(bytes);

380

}

381

382

@Override

383

public ManagedBuffer retain() {

384

// No-op for this simple implementation

385

return this;

386

}

387

388

@Override

389

public ManagedBuffer release() {

390

// No-op for this simple implementation

391

return this;

392

}

393

394

@Override

395

public Object convertToNetty() throws IOException {

396

return Unpooled.wrappedBuffer(bytes);

397

}

398

399

public String getString() {

400

return data;

401

}

402

}

403

404

// Usage

405

StringManagedBuffer stringBuffer = new StringManagedBuffer("Custom buffer content");

406

System.out.println("String buffer size: " + stringBuffer.size());

407

System.out.println("String content: " + stringBuffer.getString());

408

```

409

410

## Best Practices

411

412

### Memory Management

413

414

1. **Always release buffers**: Call `release()` when done with a buffer to prevent memory leaks

415

2. **Use retain/release pairs**: For every `retain()` call, ensure a corresponding `release()` call

416

3. **Handle exceptions**: Always release buffers in finally blocks or try-with-resources when possible

417

4. **Zero-copy when possible**: Use `FileSegmentManagedBuffer` for large file transfers

418

419

### Performance Optimization

420

421

1. **Choose appropriate buffer type**:

422

- `NioManagedBuffer` for small in-memory data

423

- `FileSegmentManagedBuffer` for large file data

424

- `NettyManagedBuffer` when working with Netty components

425

426

2. **Avoid unnecessary copies**: Use `convertToNetty()` for network transfers instead of copying to new buffers

427

428

3. **Reuse buffers**: Where possible, reuse buffer instances to reduce garbage collection pressure

429

430

### Error Handling

431

432

```java

433

// Safe buffer handling pattern

434

ManagedBuffer buffer = null;

435

try {

436

buffer = createBuffer(); // Some buffer creation method

437

buffer.retain();

438

439

// Use buffer...

440

processBuffer(buffer);

441

442

} catch (Exception e) {

443

System.err.println("Error processing buffer: " + e.getMessage());

444

} finally {

445

if (buffer != null) {

446

buffer.release();

447

}

448

}

449

```