Core networking library for Apache Spark providing transport layer abstractions and utilities
—
Unified buffer abstraction with multiple backing implementations for efficient memory and file-based data handling, providing zero-copy I/O capabilities and automatic resource management.
Core abstraction for immutable data views with different backing implementations, supporting efficient data transfer and resource management.
/**
* Immutable view for data in bytes with different backing implementations
* Provides unified interface for file, memory, and Netty-backed data
*/
public abstract class ManagedBuffer {
/**
* Number of bytes of data in this buffer
* @return Size in bytes
*/
public abstract long size();
/**
* Expose this buffer's data as a NIO ByteBuffer
* Multiple calls may return the same ByteBuffer or different ByteBuffers
* @return ByteBuffer view of the data
* @throws IOException if buffer cannot be converted
*/
public abstract ByteBuffer nioByteBuffer() throws IOException;
/**
* Expose this buffer's data as an InputStream for streaming reads
* @return InputStream for reading the data
* @throws IOException if stream cannot be created
*/
public abstract InputStream createInputStream() throws IOException;
/**
* Increment the reference count of this buffer if applicable
* @return This buffer for method chaining
*/
public abstract ManagedBuffer retain();
/**
* Decrement the reference count and deallocate if zero
* @return This buffer for method chaining
*/
public abstract ManagedBuffer release();
/**
* Convert this buffer to a Netty ByteBuf or FileRegion for efficient transfer
* @return Netty object (ByteBuf for memory, FileRegion for files)
* @throws IOException if conversion fails
*/
public abstract Object convertToNetty() throws IOException;
}Usage Examples:
// Working with a managed buffer
ManagedBuffer buffer = // ... obtained from somewhere
try {
long size = buffer.size();
System.out.println("Buffer size: " + size + " bytes");
// Read as ByteBuffer
ByteBuffer bb = buffer.nioByteBuffer();
byte[] data = new byte[bb.remaining()];
bb.get(data);
// Or read as stream
try (InputStream is = buffer.createInputStream()) {
byte[] streamData = new byte[(int) size];
is.read(streamData);
}
} finally {
// Always release when done
buffer.release();
}ManagedBuffer implementation backed by a segment of a file, providing efficient file-based data access without loading entire files into memory.
/**
* ManagedBuffer backed by a segment in a file
* Enables efficient access to portions of large files without loading entire file
*/
public final class FileSegmentManagedBuffer extends ManagedBuffer {
/**
* Create a buffer backed by a file segment
* @param conf Transport configuration for memory mapping settings
* @param file File to read from
* @param offset Starting offset in the file
* @param length Number of bytes to include from the offset
*/
public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length);
/**
* Get the underlying file
* @return File instance
*/
public File getFile();
/**
* Get the starting offset in the file
* @return Offset in bytes
*/
public long getOffset();
/**
* Get the length of the segment
* @return Length in bytes
*/
public long getLength();
}Usage Examples:
import java.io.File;
// Create a buffer for part of a large file
File dataFile = new File("/path/to/large/data.bin");
long offset = 1024 * 1024; // Start at 1MB
long length = 512 * 1024; // Read 512KB
FileSegmentManagedBuffer fileBuffer = new FileSegmentManagedBuffer(
conf, dataFile, offset, length
);
try {
System.out.println("Reading " + fileBuffer.getLength() + " bytes from " +
fileBuffer.getFile().getName() + " at offset " + fileBuffer.getOffset());
// Efficient zero-copy transfer to Netty
Object nettyObject = fileBuffer.convertToNetty();
// Or read as stream
try (InputStream stream = fileBuffer.createInputStream()) {
// Process stream data
}
} finally {
fileBuffer.release();
}ManagedBuffer implementation backed by a Netty ByteBuf, providing integration with Netty's buffer management and reference counting.
/**
* ManagedBuffer backed by a Netty ByteBuf
* Integrates with Netty's reference counting and memory management
*/
public class NettyManagedBuffer extends ManagedBuffer {
/**
* Create a buffer backed by a Netty ByteBuf
* @param buf Netty ByteBuf to wrap
*/
public NettyManagedBuffer(ByteBuf buf);
}Usage Examples:
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
// Create from Netty ByteBuf
ByteBuf nettyBuf = Unpooled.copiedBuffer("Hello, World!".getBytes());
NettyManagedBuffer buffer = new NettyManagedBuffer(nettyBuf);
try {
// Use as managed buffer
ByteBuffer nio = buffer.nioByteBuffer();
System.out.println("Data: " + new String(nio.array()));
// Convert back to Netty (efficient, no copy)
Object nettyObject = buffer.convertToNetty();
} finally {
buffer.release(); // This will release the underlying ByteBuf
}ManagedBuffer implementation backed by a NIO ByteBuffer, providing integration with standard Java NIO buffers.
/**
* ManagedBuffer backed by a NIO ByteBuffer
* Provides ManagedBuffer interface for standard Java ByteBuffers
*/
public class NioManagedBuffer extends ManagedBuffer {
/**
* Create a buffer backed by a NIO ByteBuffer
* @param buf NIO ByteBuffer to wrap
*/
public NioManagedBuffer(ByteBuffer buf);
}Usage Examples:
import java.nio.ByteBuffer;
// Create from NIO ByteBuffer
ByteBuffer nioBuf = ByteBuffer.wrap("Hello, NIO!".getBytes());
NioManagedBuffer buffer = new NioManagedBuffer(nioBuf);
try {
// Access data
System.out.println("Buffer size: " + buffer.size());
// Get as ByteBuffer (may return same instance)
ByteBuffer bb = buffer.nioByteBuffer();
// Read as stream
try (InputStream stream = buffer.createInputStream()) {
byte[] data = new byte[(int) buffer.size()];
stream.read(data);
System.out.println("Stream data: " + new String(data));
}
} finally {
buffer.release();
}Different buffer types are optimized for different use cases:
// For file-based data (large files, zero-copy I/O)
FileSegmentManagedBuffer fileBuffer = new FileSegmentManagedBuffer(
conf, file, 0, file.length()
);
// For in-memory data with Netty integration
ByteBuf nettyBuf = ctx.alloc().buffer(1024);
NettyManagedBuffer nettyBuffer = new NettyManagedBuffer(nettyBuf);
// For existing NIO ByteBuffer integration
ByteBuffer existing = // ... from somewhere
NioManagedBuffer nioBuffer = new NioManagedBuffer(existing);Always use try-finally or try-with-resources patterns for proper cleanup:
ManagedBuffer buffer = createBuffer();
try {
// Use buffer
processBuffer(buffer);
} finally {
buffer.release(); // Essential for preventing memory leaks
}Leverage convertToNetty() for efficient network transfer:
ManagedBuffer buffer = new FileSegmentManagedBuffer(conf, file, 0, file.length());
try {
// Efficient zero-copy transfer
Object nettyObject = buffer.convertToNetty();
if (nettyObject instanceof FileRegion) {
// File-based zero-copy
channel.writeAndFlush(nettyObject);
} else if (nettyObject instanceof ByteBuf) {
// Memory-based transfer
channel.writeAndFlush(nettyObject);
}
} finally {
buffer.release();
}For complex data processing pipelines:
public ManagedBuffer processData(ManagedBuffer input) {
try (InputStream stream = input.createInputStream()) {
// Process data
byte[] processed = transformData(stream);
// Return new buffer with processed data
return new NioManagedBuffer(ByteBuffer.wrap(processed));
} catch (IOException e) {
throw new RuntimeException("Processing failed", e);
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-network-common