Core networking functionality for Apache Spark including transport layers, buffer management, client-server communication, cryptographic protocols, SASL authentication, and shuffle database support using Netty for high-performance network I/O.
—
The buffer management API provides efficient, zero-copy buffer operations for Apache Spark's networking layer. The ManagedBuffer abstract class serves as the foundation for different buffer implementations, enabling efficient memory usage and data transfer with various backing stores.
Abstract base class for immutable byte data views with different backing implementations.
/**
* Get the size of the buffer in bytes
* @return long representing the number of bytes in the buffer
*/
public abstract long size();
/**
* Get a NIO ByteBuffer view of this buffer's data
* @return ByteBuffer containing the buffer data
* @throws IOException if the buffer data cannot be accessed
*/
public abstract ByteBuffer nioByteBuffer() throws IOException;
/**
* Create an InputStream for reading the buffer data
* @return InputStream for sequential reading of buffer contents
* @throws IOException if the stream cannot be created
*/
public abstract InputStream createInputStream() throws IOException;
/**
* Increment the reference count of this buffer
* @return ManagedBuffer instance for method chaining
*/
public abstract ManagedBuffer retain();
/**
* Decrement the reference count and release resources if count reaches zero
* @return ManagedBuffer instance for method chaining
*/
public abstract ManagedBuffer release();
/**
* Convert this buffer to a Netty-compatible object for efficient network transfer
* @return Object suitable for Netty channel operations (typically ByteBuf or FileRegion)
* @throws IOException if conversion fails
*/
public abstract Object convertToNetty() throws IOException;Managed buffer implementation backed by a NIO ByteBuffer, suitable for in-memory data.
/**
* Create a managed buffer from a NIO ByteBuffer
* @param buf - ByteBuffer containing the data (position and limit are preserved)
*/
public NioManagedBuffer(ByteBuffer buf);
@Override
public long size();
@Override
public ByteBuffer nioByteBuffer() throws IOException;
@Override
public InputStream createInputStream() throws IOException;
@Override
public ManagedBuffer retain();
@Override
public ManagedBuffer release();
@Override
public Object convertToNetty() throws IOException;Managed buffer implementation backed by a file segment, enabling efficient zero-copy file transfers.
/**
* Create a managed buffer from a file segment
* @param file - File to read data from
* @param offset - Starting position in the file
* @param length - Number of bytes to include from the file
*/
public FileSegmentManagedBuffer(File file, long offset, long length);
/**
* Create a managed buffer from a complete file
* @param file - File to read data from (entire file)
*/
public FileSegmentManagedBuffer(File file);
@Override
public long size();
@Override
public ByteBuffer nioByteBuffer() throws IOException;
@Override
public InputStream createInputStream() throws IOException;
@Override
public ManagedBuffer retain();
@Override
public ManagedBuffer release();
@Override
public Object convertToNetty() throws IOException;
/**
* Get the underlying file
* @return File object backing this buffer
*/
public File getFile();
/**
* Get the offset within the file
* @return long representing the starting position in bytes
*/
public long getOffset();
/**
* Get the length of the file segment
* @return long representing the number of bytes in the segment
*/
public long getLength();Managed buffer implementation backed by a Netty ByteBuf with reference counting support.
/**
* Create a managed buffer from a Netty ByteBuf
* @param buf - ByteBuf containing the data (reference count is managed)
*/
public NettyManagedBuffer(ByteBuf buf);
@Override
public long size();
@Override
public ByteBuffer nioByteBuffer() throws IOException;
@Override
public InputStream createInputStream() throws IOException;
@Override
public ManagedBuffer retain();
@Override
public ManagedBuffer release();
@Override
public Object convertToNetty() throws IOException;
/**
* Get the underlying Netty ByteBuf
* @return ByteBuf backing this managed buffer
*/
public ByteBuf getBuf();import org.apache.spark.network.buffer.NioManagedBuffer;
import java.nio.ByteBuffer;
// Create buffer from byte array
byte[] data = "Hello, Spark Network!".getBytes();
ByteBuffer byteBuffer = ByteBuffer.wrap(data);
NioManagedBuffer buffer = new NioManagedBuffer(byteBuffer);
// Get buffer information
System.out.println("Buffer size: " + buffer.size() + " bytes");
// Read data as ByteBuffer
try {
ByteBuffer nioBuffer = buffer.nioByteBuffer();
byte[] readData = new byte[nioBuffer.remaining()];
nioBuffer.get(readData);
System.out.println("Buffer content: " + new String(readData));
} catch (IOException e) {
System.err.println("Failed to read buffer: " + e.getMessage());
}
// Read data as InputStream
try (InputStream inputStream = buffer.createInputStream()) {
byte[] streamData = inputStream.readAllBytes();
System.out.println("Stream content: " + new String(streamData));
} catch (IOException e) {
System.err.println("Failed to read stream: " + e.getMessage());
}
// Reference counting (NioManagedBuffer doesn't actually count, but follows the interface)
buffer.retain(); // Increment reference count
buffer.release(); // Decrement reference countimport org.apache.spark.network.buffer.FileSegmentManagedBuffer;
import java.io.File;
import java.io.FileWriter;
// Create a test file
File testFile = new File("test-data.txt");
try (FileWriter writer = new FileWriter(testFile)) {
writer.write("This is test data for FileSegmentManagedBuffer demonstration.");
}
// Create buffer for entire file
FileSegmentManagedBuffer fileBuffer = new FileSegmentManagedBuffer(testFile);
System.out.println("File buffer size: " + fileBuffer.size() + " bytes");
System.out.println("File: " + fileBuffer.getFile().getName());
System.out.println("Offset: " + fileBuffer.getOffset());
System.out.println("Length: " + fileBuffer.getLength());
// Create buffer for file segment
FileSegmentManagedBuffer segmentBuffer = new FileSegmentManagedBuffer(testFile, 10, 20);
System.out.println("Segment buffer size: " + segmentBuffer.size() + " bytes");
// Read file segment data
try {
ByteBuffer segmentData = segmentBuffer.nioByteBuffer();
byte[] segmentBytes = new byte[segmentData.remaining()];
segmentData.get(segmentBytes);
System.out.println("Segment content: " + new String(segmentBytes));
} catch (IOException e) {
System.err.println("Failed to read segment: " + e.getMessage());
}
// Zero-copy conversion for Netty
try {
Object nettyObject = fileBuffer.convertToNetty();
System.out.println("Netty object type: " + nettyObject.getClass().getSimpleName());
// This typically returns a FileRegion for efficient zero-copy transfer
} catch (IOException e) {
System.err.println("Failed to convert to Netty: " + e.getMessage());
}
// Cleanup
testFile.delete();import org.apache.spark.network.buffer.NettyManagedBuffer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
// Create Netty ByteBuf
byte[] data = "Netty-backed buffer data".getBytes();
ByteBuf byteBuf = Unpooled.copiedBuffer(data);
// Create managed buffer
NettyManagedBuffer nettyBuffer = new NettyManagedBuffer(byteBuf);
System.out.println("Netty buffer size: " + nettyBuffer.size() + " bytes");
// Reference counting is important with Netty buffers
nettyBuffer.retain(); // Increment reference count
System.out.println("ByteBuf reference count: " + nettyBuffer.getBuf().refCnt());
// Read data
try {
ByteBuffer nioView = nettyBuffer.nioByteBuffer();
byte[] readData = new byte[nioView.remaining()];
nioView.get(readData);
System.out.println("Netty buffer content: " + new String(readData));
} catch (IOException e) {
System.err.println("Failed to read Netty buffer: " + e.getMessage());
}
// Release references (important to prevent memory leaks)
nettyBuffer.release(); // Decrement reference count
nettyBuffer.release(); // Final release// Proper reference counting pattern
ManagedBuffer buffer = new NioManagedBuffer(ByteBuffer.wrap("data".getBytes()));
try {
// Retain buffer for async operation
buffer.retain();
// Pass buffer to async operation
asyncOperation(buffer, new Callback() {
@Override
public void onComplete() {
// Release buffer when async operation completes
buffer.release();
}
@Override
public void onError(Throwable e) {
// Always release buffer, even on error
buffer.release();
}
});
} finally {
// Release original reference
buffer.release();
}// Convert different buffer types for Netty transfer
void transferBuffer(ManagedBuffer buffer, Channel channel) {
try {
Object nettyObject = buffer.convertToNetty();
if (nettyObject instanceof ByteBuf) {
// Direct ByteBuf transfer
ByteBuf byteBuf = (ByteBuf) nettyObject;
channel.writeAndFlush(byteBuf);
} else if (nettyObject instanceof FileRegion) {
// Zero-copy file transfer
FileRegion fileRegion = (FileRegion) nettyObject;
channel.writeAndFlush(fileRegion);
} else {
// Fallback to ByteBuf
ByteBuffer nioBuffer = buffer.nioByteBuffer();
ByteBuf byteBuf = Unpooled.wrappedBuffer(nioBuffer);
channel.writeAndFlush(byteBuf);
}
} catch (IOException e) {
System.err.println("Failed to transfer buffer: " + e.getMessage());
}
}// Example of implementing a custom ManagedBuffer
public class StringManagedBuffer extends ManagedBuffer {
private final String data;
private final byte[] bytes;
public StringManagedBuffer(String data) {
this.data = data;
this.bytes = data.getBytes(StandardCharsets.UTF_8);
}
@Override
public long size() {
return bytes.length;
}
@Override
public ByteBuffer nioByteBuffer() throws IOException {
return ByteBuffer.wrap(bytes).asReadOnlyBuffer();
}
@Override
public InputStream createInputStream() throws IOException {
return new ByteArrayInputStream(bytes);
}
@Override
public ManagedBuffer retain() {
// No-op for this simple implementation
return this;
}
@Override
public ManagedBuffer release() {
// No-op for this simple implementation
return this;
}
@Override
public Object convertToNetty() throws IOException {
return Unpooled.wrappedBuffer(bytes);
}
public String getString() {
return data;
}
}
// Usage
StringManagedBuffer stringBuffer = new StringManagedBuffer("Custom buffer content");
System.out.println("String buffer size: " + stringBuffer.size());
System.out.println("String content: " + stringBuffer.getString());release() when done with a buffer to prevent memory leaksretain() call, ensure a corresponding release() callFileSegmentManagedBuffer for large file transfersChoose appropriate buffer type:
NioManagedBuffer for small in-memory dataFileSegmentManagedBuffer for large file dataNettyManagedBuffer when working with Netty componentsAvoid unnecessary copies: Use convertToNetty() for network transfers instead of copying to new buffers
Reuse buffers: Where possible, reuse buffer instances to reduce garbage collection pressure
// Safe buffer handling pattern
ManagedBuffer buffer = null;
try {
buffer = createBuffer(); // Some buffer creation method
buffer.retain();
// Use buffer...
processBuffer(buffer);
} catch (Exception e) {
System.err.println("Error processing buffer: " + e.getMessage());
} finally {
if (buffer != null) {
buffer.release();
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-network-common-2-12