Core networking infrastructure for Apache Spark cluster computing with transport layer, RPC, chunk fetching, and SASL authentication
—
Unified buffer management system providing abstractions over different buffer types including memory, file segments, and Netty ByteBufs. The buffer system enables efficient zero-copy operations and resource management for network data transfer.
Abstract base class for all buffer implementations, providing a unified interface for different data sources.
/**
* ManagedBuffer represents an immutable buffer of data with reference counting
* and multiple access methods. It abstracts over different buffer types to provide
* a unified interface for network data transfer.
*/
public abstract class ManagedBuffer {
/**
* Gets the size of the buffer in bytes.
*
* @return The buffer size in bytes
*/
public abstract long size();
/**
* Creates a read-only ByteBuffer view of this buffer.
* For large buffers, this may trigger I/O operations.
*
* @return ByteBuffer containing the buffer data
* @throws IOException if buffer cannot be read
*/
public abstract ByteBuffer nioByteBuffer() throws IOException;
/**
* Creates an InputStream for reading the buffer data.
* Allows streaming access to large buffers without loading everything into memory.
*
* @return InputStream for reading buffer data
* @throws IOException if stream cannot be created
*/
public abstract InputStream createInputStream() throws IOException;
/**
* Increments the reference count for this buffer.
* Must be called when sharing buffer references to prevent premature cleanup.
*
* @return This buffer instance for method chaining
*/
public abstract ManagedBuffer retain();
/**
* Decrements the reference count and releases resources if count reaches zero.
* Must be called when done with a buffer to prevent memory leaks.
*
* @return This buffer instance for method chaining
*/
public abstract ManagedBuffer release();
/**
* Converts this buffer to a Netty-compatible object for efficient network transfer.
* The exact return type depends on the buffer implementation.
*
* @return Netty-compatible object (typically ByteBuf or FileRegion)
* @throws IOException if conversion fails
*/
public abstract Object convertToNetty() throws IOException;
}ByteBuffer-backed managed buffer for in-memory data.
/**
* ManagedBuffer implementation backed by a Java NIO ByteBuffer.
* Best for small to medium-sized data that fits in memory.
*/
public class NioManagedBuffer extends ManagedBuffer {
/**
* Creates a managed buffer from a ByteBuffer.
* The ByteBuffer should be ready for reading (position at start, limit at end).
*
* @param buf The ByteBuffer to wrap
*/
public NioManagedBuffer(ByteBuffer buf);
@Override
public long size() {
return buf.remaining();
}
@Override
public ByteBuffer nioByteBuffer() throws IOException {
return buf.duplicate(); // Returns a duplicate to avoid position changes
}
@Override
public InputStream createInputStream() throws IOException {
return new ByteBufferInputStream(buf);
}
@Override
public ManagedBuffer retain() {
return this; // NIO buffers don't need reference counting
}
@Override
public ManagedBuffer release() {
return this; // NIO buffers don't need explicit release
}
@Override
public Object convertToNetty() throws IOException {
return Unpooled.wrappedBuffer(buf);
}
}Netty ByteBuf-backed managed buffer for integration with Netty pipelines.
/**
* ManagedBuffer implementation backed by a Netty ByteBuf.
* Provides direct integration with Netty's memory management and zero-copy operations.
*/
public class NettyManagedBuffer extends ManagedBuffer {
/**
* Creates a managed buffer from a Netty ByteBuf.
* The buffer takes ownership of the ByteBuf and manages its lifecycle.
*
* @param buf The Netty ByteBuf to wrap
*/
public NettyManagedBuffer(ByteBuf buf);
@Override
public long size() {
return buf.readableBytes();
}
@Override
public ByteBuffer nioByteBuffer() throws IOException {
return buf.nioBuffer(); // Zero-copy access to underlying ByteBuffer
}
@Override
public InputStream createInputStream() throws IOException {
return new ByteBufInputStream(buf);
}
@Override
public ManagedBuffer retain() {
buf.retain();
return this;
}
@Override
public ManagedBuffer release() {
buf.release();
return this;
}
@Override
public Object convertToNetty() throws IOException {
return buf.duplicate(); // Return duplicate to preserve original
}
}File-backed managed buffer for efficient access to file segments without loading into memory.
/**
* ManagedBuffer implementation backed by a file segment.
* Enables efficient zero-copy transfer of file data over the network using FileRegion.
* Best for large data that should remain on disk.
*/
public class FileSegmentManagedBuffer extends ManagedBuffer {
/**
* Creates a managed buffer for a segment of a file.
*
* @param conf Transport configuration for I/O settings
* @param file The file to read from
* @param offset Starting offset within the file (0-based)
* @param length Number of bytes to include in the segment
*/
public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length);
@Override
public long size() {
return length;
}
@Override
public ByteBuffer nioByteBuffer() throws IOException {
// For small files, read into memory
if (length <= conf.memoryMapBytes()) {
return mapFileSegment();
} else {
throw new IOException("File segment too large for memory mapping: " + length);
}
}
@Override
public InputStream createInputStream() throws IOException {
// Create stream that reads only the specified segment
return new LimitedInputStream(
new FileInputStream(file).skip(offset),
length
);
}
@Override
public ManagedBuffer retain() {
return this; // File buffers don't need reference counting
}
@Override
public ManagedBuffer release() {
return this; // File buffers don't need explicit release
}
@Override
public Object convertToNetty() throws IOException {
// Use Netty's FileRegion for zero-copy file transfer
return new DefaultFileRegion(file, offset, length);
}
/**
* Gets the underlying file.
*
* @return The file this buffer reads from
*/
public File getFile() {
return file;
}
/**
* Gets the offset within the file.
*
* @return The starting offset in bytes
*/
public long getOffset() {
return offset;
}
/**
* Gets the length of the segment.
*
* @return The segment length in bytes
*/
public long getLength() {
return length;
}
private ByteBuffer mapFileSegment() throws IOException {
try (RandomAccessFile raf = new RandomAccessFile(file, "r");
FileChannel channel = raf.getChannel()) {
return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
}
}
}Lazy-loading file region for deferred file access.
/**
* FileRegion implementation that defers file opening until transfer time.
* Useful for managing many file references without keeping file handles open.
*/
public class LazyFileRegion extends AbstractReferenceCounted implements FileRegion {
/**
* Creates a lazy file region.
*
* @param file The file to transfer
* @param position Starting position within the file
* @param count Number of bytes to transfer
*/
public LazyFileRegion(File file, long position, long count);
@Override
public long position() {
return position;
}
@Override
public long count() {
return count;
}
@Override
public long transferTo(WritableByteChannel target, long position) throws IOException {
// Opens file and transfers data on-demand
return transferToChannel(target, position);
}
/**
* Gets the file this region references.
*
* @return The underlying file
*/
public File file() {
return file;
}
}import org.apache.spark.network.buffer.*;
import java.nio.ByteBuffer;
import java.io.File;
import java.io.InputStream;
// Create buffer from byte array
byte[] data = "Hello, World!".getBytes();
ByteBuffer byteBuffer = ByteBuffer.wrap(data);
ManagedBuffer nioBuffer = new NioManagedBuffer(byteBuffer);
System.out.println("Buffer size: " + nioBuffer.size());
// Create buffer from file segment
File dataFile = new File("/path/to/large-file.dat");
long offset = 1024; // Start at 1KB
long length = 64 * 1024; // Read 64KB
ManagedBuffer fileBuffer = new FileSegmentManagedBuffer(conf, dataFile, offset, length);
// Create buffer from Netty ByteBuf
ByteBuf nettyBuf = Unpooled.copiedBuffer("Netty data", StandardCharsets.UTF_8);
ManagedBuffer nettyBuffer = new NettyManagedBuffer(nettyBuf);public void processBuffer(ManagedBuffer buffer) {
try {
System.out.println("Processing buffer of size: " + buffer.size());
// Method 1: Direct ByteBuffer access (for small buffers)
if (buffer.size() < 1024 * 1024) { // Less than 1MB
try {
ByteBuffer bb = buffer.nioByteBuffer();
processDirectly(bb);
} catch (IOException e) {
System.err.println("Could not get ByteBuffer: " + e.getMessage());
}
}
// Method 2: Stream access (for large buffers or when size doesn't matter)
try (InputStream stream = buffer.createInputStream()) {
processStream(stream);
} catch (IOException e) {
System.err.println("Could not create stream: " + e.getMessage());
}
} finally {
// Important: Always release buffer when done
buffer.release();
}
}
private void processDirectly(ByteBuffer buffer) {
// Process data directly from ByteBuffer
while (buffer.hasRemaining()) {
byte b = buffer.get();
// Process byte...
}
}
private void processStream(InputStream stream) throws IOException {
// Process data from stream
byte[] chunk = new byte[8192];
int bytesRead;
while ((bytesRead = stream.read(chunk)) != -1) {
// Process chunk...
processChunk(chunk, bytesRead);
}
}public class BufferManager {
private final List<ManagedBuffer> activeBuffers = new ArrayList<>();
public ManagedBuffer shareBuffer(ManagedBuffer original) {
// Retain buffer for sharing
ManagedBuffer shared = original.retain();
activeBuffers.add(shared);
return shared;
}
public void processBufferAsync(ManagedBuffer buffer) {
// Retain buffer before passing to async operation
ManagedBuffer retained = buffer.retain();
CompletableFuture.runAsync(() -> {
try {
// Process buffer in background thread
processBuffer(retained);
} finally {
// Always release in finally block
retained.release();
}
});
}
public void cleanup() {
// Release all active buffers
for (ManagedBuffer buffer : activeBuffers) {
buffer.release();
}
activeBuffers.clear();
}
}public class NetworkTransferExample {
public void sendBufferOverNetwork(ManagedBuffer buffer, Channel channel) {
try {
// Convert buffer to Netty object for efficient transfer
Object nettyObject = buffer.convertToNetty();
if (nettyObject instanceof ByteBuf) {
// Direct ByteBuf transfer
ByteBuf byteBuf = (ByteBuf) nettyObject;
channel.writeAndFlush(byteBuf);
} else if (nettyObject instanceof FileRegion) {
// Zero-copy file transfer
FileRegion fileRegion = (FileRegion) nettyObject;
channel.writeAndFlush(fileRegion);
} else {
System.err.println("Unsupported Netty object type: " + nettyObject.getClass());
}
} catch (IOException e) {
System.err.println("Failed to convert buffer for network transfer: " + e.getMessage());
}
}
public void sendFileSegment(File file, long offset, long length, Channel channel) {
// Create file buffer for zero-copy transfer
FileSegmentManagedBuffer fileBuffer = new FileSegmentManagedBuffer(conf, file, offset, length);
try {
// Get FileRegion for efficient file transfer
FileRegion fileRegion = (FileRegion) fileBuffer.convertToNetty();
// Send with completion handling
ChannelFuture future = channel.writeAndFlush(fileRegion);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
System.out.println("File segment sent successfully");
} else {
System.err.println("Failed to send file segment: " + future.cause());
}
// Release buffer after transfer
fileBuffer.release();
}
});
} catch (IOException e) {
System.err.println("Failed to prepare file for transfer: " + e.getMessage());
fileBuffer.release();
}
}
}public class BufferFactory {
private final TransportConf conf;
public BufferFactory(TransportConf conf) {
this.conf = conf;
}
public ManagedBuffer createFromBytes(byte[] data) {
ByteBuffer buffer = ByteBuffer.wrap(data);
return new NioManagedBuffer(buffer);
}
public ManagedBuffer createFromString(String text) {
return createFromBytes(text.getBytes(StandardCharsets.UTF_8));
}
public ManagedBuffer createFromFile(File file) {
return new FileSegmentManagedBuffer(conf, file, 0, file.length());
}
public ManagedBuffer createFileSegment(File file, long offset, long length) {
// Validate parameters
if (offset < 0 || length < 0) {
throw new IllegalArgumentException("Offset and length must be non-negative");
}
if (offset + length > file.length()) {
throw new IllegalArgumentException("Segment extends beyond file end");
}
return new FileSegmentManagedBuffer(conf, file, offset, length);
}
public ManagedBuffer createFromNettyBuf(ByteBuf buf) {
return new NettyManagedBuffer(buf);
}
public ManagedBuffer createEmpty() {
return new NioManagedBuffer(ByteBuffer.allocate(0));
}
}public class CompositeBufferExample {
public ManagedBuffer combineBuffers(List<ManagedBuffer> buffers) {
// Calculate total size
long totalSize = buffers.stream().mapToLong(ManagedBuffer::size).sum();
if (totalSize > Integer.MAX_VALUE) {
throw new IllegalArgumentException("Combined buffer too large: " + totalSize);
}
// Combine into single ByteBuffer
ByteBuffer combined = ByteBuffer.allocate((int) totalSize);
for (ManagedBuffer buffer : buffers) {
try {
ByteBuffer bb = buffer.nioByteBuffer();
combined.put(bb);
} catch (IOException e) {
throw new RuntimeException("Failed to read buffer", e);
} finally {
buffer.release(); // Release source buffers
}
}
combined.flip(); // Prepare for reading
return new NioManagedBuffer(combined);
}
public void streamBuffersSequentially(List<ManagedBuffer> buffers, OutputStream output)
throws IOException {
for (ManagedBuffer buffer : buffers) {
try (InputStream input = buffer.createInputStream()) {
byte[] chunk = new byte[8192];
int bytesRead;
while ((bytesRead = input.read(chunk)) != -1) {
output.write(chunk, 0, bytesRead);
}
} finally {
buffer.release();
}
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-network-common-2-10