0
# Buffer Management
1
2
Unified buffer management system providing abstractions over different buffer types including memory, file segments, and Netty ByteBufs. The buffer system enables efficient zero-copy operations and resource management for network data transfer.
3
4
## Capabilities
5
6
### ManagedBuffer
7
8
Abstract base class for all buffer implementations, providing a unified interface for different data sources.
9
10
```java { .api }
11
/**
12
* ManagedBuffer represents an immutable buffer of data with reference counting
13
* and multiple access methods. It abstracts over different buffer types to provide
14
* a unified interface for network data transfer.
15
*/
16
public abstract class ManagedBuffer {
17
/**
18
* Gets the size of the buffer in bytes.
19
*
20
* @return The buffer size in bytes
21
*/
22
public abstract long size();
23
24
/**
25
* Creates a read-only ByteBuffer view of this buffer.
26
* For large buffers, this may trigger I/O operations.
27
*
28
* @return ByteBuffer containing the buffer data
29
* @throws IOException if buffer cannot be read
30
*/
31
public abstract ByteBuffer nioByteBuffer() throws IOException;
32
33
/**
34
* Creates an InputStream for reading the buffer data.
35
* Allows streaming access to large buffers without loading everything into memory.
36
*
37
* @return InputStream for reading buffer data
38
* @throws IOException if stream cannot be created
39
*/
40
public abstract InputStream createInputStream() throws IOException;
41
42
/**
43
* Increments the reference count for this buffer.
44
* Must be called when sharing buffer references to prevent premature cleanup.
45
*
46
* @return This buffer instance for method chaining
47
*/
48
public abstract ManagedBuffer retain();
49
50
/**
51
* Decrements the reference count and releases resources if count reaches zero.
52
* Must be called when done with a buffer to prevent memory leaks.
53
*
54
* @return This buffer instance for method chaining
55
*/
56
public abstract ManagedBuffer release();
57
58
/**
59
* Converts this buffer to a Netty-compatible object for efficient network transfer.
60
* The exact return type depends on the buffer implementation.
61
*
62
* @return Netty-compatible object (typically ByteBuf or FileRegion)
63
* @throws IOException if conversion fails
64
*/
65
public abstract Object convertToNetty() throws IOException;
66
}
67
```
68
69
### Buffer Implementations
70
71
#### NioManagedBuffer
72
73
ByteBuffer-backed managed buffer for in-memory data.
74
75
```java { .api }
76
/**
77
* ManagedBuffer implementation backed by a Java NIO ByteBuffer.
78
* Best for small to medium-sized data that fits in memory.
79
*/
80
public class NioManagedBuffer extends ManagedBuffer {
81
/**
82
* Creates a managed buffer from a ByteBuffer.
83
* The ByteBuffer should be ready for reading (position at start, limit at end).
84
*
85
* @param buf The ByteBuffer to wrap
86
*/
87
public NioManagedBuffer(ByteBuffer buf);
88
89
@Override
90
public long size() {
91
return buf.remaining();
92
}
93
94
@Override
95
public ByteBuffer nioByteBuffer() throws IOException {
96
return buf.duplicate(); // Returns a duplicate to avoid position changes
97
}
98
99
@Override
100
public InputStream createInputStream() throws IOException {
101
return new ByteBufferInputStream(buf);
102
}
103
104
@Override
105
public ManagedBuffer retain() {
106
return this; // NIO buffers don't need reference counting
107
}
108
109
@Override
110
public ManagedBuffer release() {
111
return this; // NIO buffers don't need explicit release
112
}
113
114
@Override
115
public Object convertToNetty() throws IOException {
116
return Unpooled.wrappedBuffer(buf);
117
}
118
}
119
```
120
121
#### NettyManagedBuffer
122
123
Netty ByteBuf-backed managed buffer for integration with Netty pipelines.
124
125
```java { .api }
126
/**
127
* ManagedBuffer implementation backed by a Netty ByteBuf.
128
* Provides direct integration with Netty's memory management and zero-copy operations.
129
*/
130
public class NettyManagedBuffer extends ManagedBuffer {
131
/**
132
* Creates a managed buffer from a Netty ByteBuf.
133
* The buffer takes ownership of the ByteBuf and manages its lifecycle.
134
*
135
* @param buf The Netty ByteBuf to wrap
136
*/
137
public NettyManagedBuffer(ByteBuf buf);
138
139
@Override
140
public long size() {
141
return buf.readableBytes();
142
}
143
144
@Override
145
public ByteBuffer nioByteBuffer() throws IOException {
146
return buf.nioBuffer(); // Zero-copy access to underlying ByteBuffer
147
}
148
149
@Override
150
public InputStream createInputStream() throws IOException {
151
return new ByteBufInputStream(buf);
152
}
153
154
@Override
155
public ManagedBuffer retain() {
156
buf.retain();
157
return this;
158
}
159
160
@Override
161
public ManagedBuffer release() {
162
buf.release();
163
return this;
164
}
165
166
@Override
167
public Object convertToNetty() throws IOException {
168
return buf.duplicate(); // Return duplicate to preserve original
169
}
170
}
171
```
172
173
#### FileSegmentManagedBuffer
174
175
File-backed managed buffer for efficient access to file segments without loading into memory.
176
177
```java { .api }
178
/**
179
* ManagedBuffer implementation backed by a file segment.
180
* Enables efficient zero-copy transfer of file data over the network using FileRegion.
181
* Best for large data that should remain on disk.
182
*/
183
public class FileSegmentManagedBuffer extends ManagedBuffer {
184
/**
185
* Creates a managed buffer for a segment of a file.
186
*
187
* @param conf Transport configuration for I/O settings
188
* @param file The file to read from
189
* @param offset Starting offset within the file (0-based)
190
* @param length Number of bytes to include in the segment
191
*/
192
public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length);
193
194
@Override
195
public long size() {
196
return length;
197
}
198
199
@Override
200
public ByteBuffer nioByteBuffer() throws IOException {
201
// For small files, read into memory
202
if (length <= conf.memoryMapBytes()) {
203
return mapFileSegment();
204
} else {
205
throw new IOException("File segment too large for memory mapping: " + length);
206
}
207
}
208
209
@Override
210
public InputStream createInputStream() throws IOException {
211
// Create stream that reads only the specified segment
212
return new LimitedInputStream(
213
new FileInputStream(file).skip(offset),
214
length
215
);
216
}
217
218
@Override
219
public ManagedBuffer retain() {
220
return this; // File buffers don't need reference counting
221
}
222
223
@Override
224
public ManagedBuffer release() {
225
return this; // File buffers don't need explicit release
226
}
227
228
@Override
229
public Object convertToNetty() throws IOException {
230
// Use Netty's FileRegion for zero-copy file transfer
231
return new DefaultFileRegion(file, offset, length);
232
}
233
234
/**
235
* Gets the underlying file.
236
*
237
* @return The file this buffer reads from
238
*/
239
public File getFile() {
240
return file;
241
}
242
243
/**
244
* Gets the offset within the file.
245
*
246
* @return The starting offset in bytes
247
*/
248
public long getOffset() {
249
return offset;
250
}
251
252
/**
253
* Gets the length of the segment.
254
*
255
* @return The segment length in bytes
256
*/
257
public long getLength() {
258
return length;
259
}
260
261
private ByteBuffer mapFileSegment() throws IOException {
262
try (RandomAccessFile raf = new RandomAccessFile(file, "r");
263
FileChannel channel = raf.getChannel()) {
264
265
return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
266
}
267
}
268
}
269
```
270
271
### Utility Classes
272
273
#### LazyFileRegion
274
275
Lazy-loading file region for deferred file access.
276
277
```java { .api }
278
/**
279
* FileRegion implementation that defers file opening until transfer time.
280
* Useful for managing many file references without keeping file handles open.
281
*/
282
public class LazyFileRegion extends AbstractReferenceCounted implements FileRegion {
283
/**
284
* Creates a lazy file region.
285
*
286
* @param file The file to transfer
287
* @param position Starting position within the file
288
* @param count Number of bytes to transfer
289
*/
290
public LazyFileRegion(File file, long position, long count);
291
292
@Override
293
public long position() {
294
return position;
295
}
296
297
@Override
298
public long count() {
299
return count;
300
}
301
302
@Override
303
public long transferTo(WritableByteChannel target, long position) throws IOException {
304
// Opens file and transfers data on-demand
305
return transferToChannel(target, position);
306
}
307
308
/**
309
* Gets the file this region references.
310
*
311
* @return The underlying file
312
*/
313
public File file() {
314
return file;
315
}
316
}
317
```
318
319
## Usage Examples
320
321
### Creating and Using Buffers
322
323
```java
324
import org.apache.spark.network.buffer.*;
325
import java.nio.ByteBuffer;
326
import java.io.File;
327
import java.io.InputStream;
328
329
// Create buffer from byte array
330
byte[] data = "Hello, World!".getBytes();
331
ByteBuffer byteBuffer = ByteBuffer.wrap(data);
332
ManagedBuffer nioBuffer = new NioManagedBuffer(byteBuffer);
333
334
System.out.println("Buffer size: " + nioBuffer.size());
335
336
// Create buffer from file segment
337
File dataFile = new File("/path/to/large-file.dat");
338
long offset = 1024; // Start at 1KB
339
long length = 64 * 1024; // Read 64KB
340
ManagedBuffer fileBuffer = new FileSegmentManagedBuffer(conf, dataFile, offset, length);
341
342
// Create buffer from Netty ByteBuf
343
ByteBuf nettyBuf = Unpooled.copiedBuffer("Netty data", StandardCharsets.UTF_8);
344
ManagedBuffer nettyBuffer = new NettyManagedBuffer(nettyBuf);
345
```
346
347
### Buffer Access Patterns
348
349
```java
350
public void processBuffer(ManagedBuffer buffer) {
351
try {
352
System.out.println("Processing buffer of size: " + buffer.size());
353
354
// Method 1: Direct ByteBuffer access (for small buffers)
355
if (buffer.size() < 1024 * 1024) { // Less than 1MB
356
try {
357
ByteBuffer bb = buffer.nioByteBuffer();
358
processDirectly(bb);
359
} catch (IOException e) {
360
System.err.println("Could not get ByteBuffer: " + e.getMessage());
361
}
362
}
363
364
// Method 2: Stream access (for large buffers or when size doesn't matter)
365
try (InputStream stream = buffer.createInputStream()) {
366
processStream(stream);
367
} catch (IOException e) {
368
System.err.println("Could not create stream: " + e.getMessage());
369
}
370
371
} finally {
372
// Important: Always release buffer when done
373
buffer.release();
374
}
375
}
376
377
private void processDirectly(ByteBuffer buffer) {
378
// Process data directly from ByteBuffer
379
while (buffer.hasRemaining()) {
380
byte b = buffer.get();
381
// Process byte...
382
}
383
}
384
385
private void processStream(InputStream stream) throws IOException {
386
// Process data from stream
387
byte[] chunk = new byte[8192];
388
int bytesRead;
389
390
while ((bytesRead = stream.read(chunk)) != -1) {
391
// Process chunk...
392
processChunk(chunk, bytesRead);
393
}
394
}
395
```
396
397
### Reference Counting and Resource Management
398
399
```java
400
public class BufferManager {
401
private final List<ManagedBuffer> activeBuffers = new ArrayList<>();
402
403
public ManagedBuffer shareBuffer(ManagedBuffer original) {
404
// Retain buffer for sharing
405
ManagedBuffer shared = original.retain();
406
activeBuffers.add(shared);
407
return shared;
408
}
409
410
public void processBufferAsync(ManagedBuffer buffer) {
411
// Retain buffer before passing to async operation
412
ManagedBuffer retained = buffer.retain();
413
414
CompletableFuture.runAsync(() -> {
415
try {
416
// Process buffer in background thread
417
processBuffer(retained);
418
} finally {
419
// Always release in finally block
420
retained.release();
421
}
422
});
423
}
424
425
public void cleanup() {
426
// Release all active buffers
427
for (ManagedBuffer buffer : activeBuffers) {
428
buffer.release();
429
}
430
activeBuffers.clear();
431
}
432
}
433
```
434
435
### Zero-Copy Network Transfer
436
437
```java
438
public class NetworkTransferExample {
439
public void sendBufferOverNetwork(ManagedBuffer buffer, Channel channel) {
440
try {
441
// Convert buffer to Netty object for efficient transfer
442
Object nettyObject = buffer.convertToNetty();
443
444
if (nettyObject instanceof ByteBuf) {
445
// Direct ByteBuf transfer
446
ByteBuf byteBuf = (ByteBuf) nettyObject;
447
channel.writeAndFlush(byteBuf);
448
449
} else if (nettyObject instanceof FileRegion) {
450
// Zero-copy file transfer
451
FileRegion fileRegion = (FileRegion) nettyObject;
452
channel.writeAndFlush(fileRegion);
453
454
} else {
455
System.err.println("Unsupported Netty object type: " + nettyObject.getClass());
456
}
457
458
} catch (IOException e) {
459
System.err.println("Failed to convert buffer for network transfer: " + e.getMessage());
460
}
461
}
462
463
public void sendFileSegment(File file, long offset, long length, Channel channel) {
464
// Create file buffer for zero-copy transfer
465
FileSegmentManagedBuffer fileBuffer = new FileSegmentManagedBuffer(conf, file, offset, length);
466
467
try {
468
// Get FileRegion for efficient file transfer
469
FileRegion fileRegion = (FileRegion) fileBuffer.convertToNetty();
470
471
// Send with completion handling
472
ChannelFuture future = channel.writeAndFlush(fileRegion);
473
future.addListener(new ChannelFutureListener() {
474
@Override
475
public void operationComplete(ChannelFuture future) {
476
if (future.isSuccess()) {
477
System.out.println("File segment sent successfully");
478
} else {
479
System.err.println("Failed to send file segment: " + future.cause());
480
}
481
482
// Release buffer after transfer
483
fileBuffer.release();
484
}
485
});
486
487
} catch (IOException e) {
488
System.err.println("Failed to prepare file for transfer: " + e.getMessage());
489
fileBuffer.release();
490
}
491
}
492
}
493
```
494
495
### Buffer Factory Pattern
496
497
```java
498
public class BufferFactory {
499
private final TransportConf conf;
500
501
public BufferFactory(TransportConf conf) {
502
this.conf = conf;
503
}
504
505
public ManagedBuffer createFromBytes(byte[] data) {
506
ByteBuffer buffer = ByteBuffer.wrap(data);
507
return new NioManagedBuffer(buffer);
508
}
509
510
public ManagedBuffer createFromString(String text) {
511
return createFromBytes(text.getBytes(StandardCharsets.UTF_8));
512
}
513
514
public ManagedBuffer createFromFile(File file) {
515
return new FileSegmentManagedBuffer(conf, file, 0, file.length());
516
}
517
518
public ManagedBuffer createFileSegment(File file, long offset, long length) {
519
// Validate parameters
520
if (offset < 0 || length < 0) {
521
throw new IllegalArgumentException("Offset and length must be non-negative");
522
}
523
524
if (offset + length > file.length()) {
525
throw new IllegalArgumentException("Segment extends beyond file end");
526
}
527
528
return new FileSegmentManagedBuffer(conf, file, offset, length);
529
}
530
531
public ManagedBuffer createFromNettyBuf(ByteBuf buf) {
532
return new NettyManagedBuffer(buf);
533
}
534
535
public ManagedBuffer createEmpty() {
536
return new NioManagedBuffer(ByteBuffer.allocate(0));
537
}
538
}
539
```
540
541
### Buffer Composition and Chaining
542
543
```java
544
public class CompositeBufferExample {
545
public ManagedBuffer combineBuffers(List<ManagedBuffer> buffers) {
546
// Calculate total size
547
long totalSize = buffers.stream().mapToLong(ManagedBuffer::size).sum();
548
549
if (totalSize > Integer.MAX_VALUE) {
550
throw new IllegalArgumentException("Combined buffer too large: " + totalSize);
551
}
552
553
// Combine into single ByteBuffer
554
ByteBuffer combined = ByteBuffer.allocate((int) totalSize);
555
556
for (ManagedBuffer buffer : buffers) {
557
try {
558
ByteBuffer bb = buffer.nioByteBuffer();
559
combined.put(bb);
560
} catch (IOException e) {
561
throw new RuntimeException("Failed to read buffer", e);
562
} finally {
563
buffer.release(); // Release source buffers
564
}
565
}
566
567
combined.flip(); // Prepare for reading
568
return new NioManagedBuffer(combined);
569
}
570
571
public void streamBuffersSequentially(List<ManagedBuffer> buffers, OutputStream output)
572
throws IOException {
573
574
for (ManagedBuffer buffer : buffers) {
575
try (InputStream input = buffer.createInputStream()) {
576
byte[] chunk = new byte[8192];
577
int bytesRead;
578
579
while ((bytesRead = input.read(chunk)) != -1) {
580
output.write(chunk, 0, bytesRead);
581
}
582
583
} finally {
584
buffer.release();
585
}
586
}
587
}
588
}