CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-network-common-2-10

Core networking infrastructure for Apache Spark cluster computing with transport layer, RPC, chunk fetching, and SASL authentication

Pending
Overview
Eval results
Files

buffer-management.mddocs/

Buffer Management

Unified buffer management system providing abstractions over different buffer types including memory, file segments, and Netty ByteBufs. The buffer system enables efficient zero-copy operations and resource management for network data transfer.

Capabilities

ManagedBuffer

Abstract base class for all buffer implementations, providing a unified interface for different data sources.

/**
 * ManagedBuffer represents an immutable buffer of data with reference counting
 * and multiple access methods. It abstracts over different buffer types to provide
 * a unified interface for network data transfer.
 */
public abstract class ManagedBuffer {
  /**
   * Gets the size of the buffer in bytes.
   * 
   * @return The buffer size in bytes
   */
  public abstract long size();
  
  /**
   * Creates a read-only ByteBuffer view of this buffer.
   * For large buffers, this may trigger I/O operations.
   * 
   * @return ByteBuffer containing the buffer data
   * @throws IOException if buffer cannot be read
   */
  public abstract ByteBuffer nioByteBuffer() throws IOException;
  
  /**
   * Creates an InputStream for reading the buffer data.
   * Allows streaming access to large buffers without loading everything into memory.
   * 
   * @return InputStream for reading buffer data
   * @throws IOException if stream cannot be created
   */
  public abstract InputStream createInputStream() throws IOException;
  
  /**
   * Increments the reference count for this buffer.
   * Must be called when sharing buffer references to prevent premature cleanup.
   * 
   * @return This buffer instance for method chaining
   */
  public abstract ManagedBuffer retain();
  
  /**
   * Decrements the reference count and releases resources if count reaches zero.
   * Must be called when done with a buffer to prevent memory leaks.
   * 
   * @return This buffer instance for method chaining
   */
  public abstract ManagedBuffer release();
  
  /**
   * Converts this buffer to a Netty-compatible object for efficient network transfer.
   * The exact return type depends on the buffer implementation.
   * 
   * @return Netty-compatible object (typically ByteBuf or FileRegion)
   * @throws IOException if conversion fails
   */
  public abstract Object convertToNetty() throws IOException;
}

Buffer Implementations

NioManagedBuffer

ByteBuffer-backed managed buffer for in-memory data.

/**
 * ManagedBuffer implementation backed by a Java NIO ByteBuffer.
 * Best for small to medium-sized data that fits in memory.
 */
public class NioManagedBuffer extends ManagedBuffer {
  /**
   * Creates a managed buffer from a ByteBuffer.
   * The ByteBuffer should be ready for reading (position at start, limit at end).
   * 
   * @param buf The ByteBuffer to wrap
   */
  public NioManagedBuffer(ByteBuffer buf);
  
  @Override
  public long size() {
    return buf.remaining();
  }
  
  @Override
  public ByteBuffer nioByteBuffer() throws IOException {
    return buf.duplicate(); // Returns a duplicate to avoid position changes
  }
  
  @Override
  public InputStream createInputStream() throws IOException {
    return new ByteBufferInputStream(buf);
  }
  
  @Override
  public ManagedBuffer retain() {
    return this; // NIO buffers don't need reference counting
  }
  
  @Override
  public ManagedBuffer release() {
    return this; // NIO buffers don't need explicit release
  }
  
  @Override
  public Object convertToNetty() throws IOException {
    return Unpooled.wrappedBuffer(buf);
  }
}

NettyManagedBuffer

Netty ByteBuf-backed managed buffer for integration with Netty pipelines.

/**
 * ManagedBuffer implementation backed by a Netty ByteBuf.
 * Provides direct integration with Netty's memory management and zero-copy operations.
 */
public class NettyManagedBuffer extends ManagedBuffer {
  /**
   * Creates a managed buffer from a Netty ByteBuf.
   * The buffer takes ownership of the ByteBuf and manages its lifecycle.
   * 
   * @param buf The Netty ByteBuf to wrap
   */
  public NettyManagedBuffer(ByteBuf buf);
  
  @Override
  public long size() {
    return buf.readableBytes();
  }
  
  @Override
  public ByteBuffer nioByteBuffer() throws IOException {
    return buf.nioBuffer(); // Zero-copy access to underlying ByteBuffer
  }
  
  @Override
  public InputStream createInputStream() throws IOException {
    return new ByteBufInputStream(buf);
  }
  
  @Override
  public ManagedBuffer retain() {
    buf.retain();
    return this;
  }
  
  @Override
  public ManagedBuffer release() {
    buf.release();
    return this;
  }
  
  @Override
  public Object convertToNetty() throws IOException {
    return buf.duplicate(); // Return duplicate to preserve original
  }
}

FileSegmentManagedBuffer

File-backed managed buffer for efficient access to file segments without loading into memory.

/**
 * ManagedBuffer implementation backed by a file segment.
 * Enables efficient zero-copy transfer of file data over the network using FileRegion.
 * Best for large data that should remain on disk.
 */
public class FileSegmentManagedBuffer extends ManagedBuffer {
  /**
   * Creates a managed buffer for a segment of a file.
   * 
   * @param conf Transport configuration for I/O settings
   * @param file The file to read from
   * @param offset Starting offset within the file (0-based)
   * @param length Number of bytes to include in the segment
   */
  public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length);
  
  @Override
  public long size() {
    return length;
  }
  
  @Override
  public ByteBuffer nioByteBuffer() throws IOException {
    // For small files, read into memory
    if (length <= conf.memoryMapBytes()) {
      return mapFileSegment();
    } else {
      throw new IOException("File segment too large for memory mapping: " + length);
    }
  }
  
  @Override
  public InputStream createInputStream() throws IOException {
    // Create stream that reads only the specified segment
    return new LimitedInputStream(
      new FileInputStream(file).skip(offset), 
      length
    );
  }
  
  @Override
  public ManagedBuffer retain() {
    return this; // File buffers don't need reference counting
  }
  
  @Override
  public ManagedBuffer release() {
    return this; // File buffers don't need explicit release
  }
  
  @Override
  public Object convertToNetty() throws IOException {
    // Use Netty's FileRegion for zero-copy file transfer
    return new DefaultFileRegion(file, offset, length);
  }
  
  /**
   * Gets the underlying file.
   * 
   * @return The file this buffer reads from
   */
  public File getFile() {
    return file;
  }
  
  /**
   * Gets the offset within the file.
   * 
   * @return The starting offset in bytes
   */
  public long getOffset() {
    return offset;
  }
  
  /**
   * Gets the length of the segment.
   * 
   * @return The segment length in bytes
   */
  public long getLength() {
    return length;
  }
  
  private ByteBuffer mapFileSegment() throws IOException {
    try (RandomAccessFile raf = new RandomAccessFile(file, "r");
         FileChannel channel = raf.getChannel()) {
      
      return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
    }
  }
}

Utility Classes

LazyFileRegion

Lazy-loading file region for deferred file access.

/**
 * FileRegion implementation that defers file opening until transfer time.
 * Useful for managing many file references without keeping file handles open.
 */
public class LazyFileRegion extends AbstractReferenceCounted implements FileRegion {
  /**
   * Creates a lazy file region.
   * 
   * @param file The file to transfer
   * @param position Starting position within the file
   * @param count Number of bytes to transfer
   */
  public LazyFileRegion(File file, long position, long count);
  
  @Override
  public long position() {
    return position;
  }
  
  @Override
  public long count() {
    return count;
  }
  
  @Override
  public long transferTo(WritableByteChannel target, long position) throws IOException {
    // Opens file and transfers data on-demand
    return transferToChannel(target, position);
  }
  
  /**
   * Gets the file this region references.
   * 
   * @return The underlying file
   */
  public File file() {
    return file;
  }
}

Usage Examples

Creating and Using Buffers

import org.apache.spark.network.buffer.*;
import java.nio.ByteBuffer;
import java.io.File;
import java.io.InputStream;

// Create buffer from byte array
byte[] data = "Hello, World!".getBytes();
ByteBuffer byteBuffer = ByteBuffer.wrap(data);
ManagedBuffer nioBuffer = new NioManagedBuffer(byteBuffer);

System.out.println("Buffer size: " + nioBuffer.size());

// Create buffer from file segment
File dataFile = new File("/path/to/large-file.dat");
long offset = 1024;      // Start at 1KB
long length = 64 * 1024; // Read 64KB
ManagedBuffer fileBuffer = new FileSegmentManagedBuffer(conf, dataFile, offset, length);

// Create buffer from Netty ByteBuf
ByteBuf nettyBuf = Unpooled.copiedBuffer("Netty data", StandardCharsets.UTF_8);
ManagedBuffer nettyBuffer = new NettyManagedBuffer(nettyBuf);

Buffer Access Patterns

public void processBuffer(ManagedBuffer buffer) {
  try {
    System.out.println("Processing buffer of size: " + buffer.size());
    
    // Method 1: Direct ByteBuffer access (for small buffers)
    if (buffer.size() < 1024 * 1024) { // Less than 1MB
      try {
        ByteBuffer bb = buffer.nioByteBuffer();
        processDirectly(bb);
      } catch (IOException e) {
        System.err.println("Could not get ByteBuffer: " + e.getMessage());
      }
    }
    
    // Method 2: Stream access (for large buffers or when size doesn't matter)
    try (InputStream stream = buffer.createInputStream()) {
      processStream(stream);
    } catch (IOException e) {
      System.err.println("Could not create stream: " + e.getMessage());
    }
    
  } finally {
    // Important: Always release buffer when done
    buffer.release();
  }
}

private void processDirectly(ByteBuffer buffer) {
  // Process data directly from ByteBuffer
  while (buffer.hasRemaining()) {
    byte b = buffer.get();
    // Process byte...
  }
}

private void processStream(InputStream stream) throws IOException {
  // Process data from stream
  byte[] chunk = new byte[8192];
  int bytesRead;
  
  while ((bytesRead = stream.read(chunk)) != -1) {
    // Process chunk...
    processChunk(chunk, bytesRead);
  }
}

Reference Counting and Resource Management

public class BufferManager {
  private final List<ManagedBuffer> activeBuffers = new ArrayList<>();
  
  public ManagedBuffer shareBuffer(ManagedBuffer original) {
    // Retain buffer for sharing
    ManagedBuffer shared = original.retain();
    activeBuffers.add(shared);
    return shared;
  }
  
  public void processBufferAsync(ManagedBuffer buffer) {
    // Retain buffer before passing to async operation
    ManagedBuffer retained = buffer.retain();
    
    CompletableFuture.runAsync(() -> {
      try {
        // Process buffer in background thread
        processBuffer(retained);
      } finally {
        // Always release in finally block
        retained.release();
      }
    });
  }
  
  public void cleanup() {
    // Release all active buffers
    for (ManagedBuffer buffer : activeBuffers) {
      buffer.release();
    }
    activeBuffers.clear();
  }
}

Zero-Copy Network Transfer

public class NetworkTransferExample {
  public void sendBufferOverNetwork(ManagedBuffer buffer, Channel channel) {
    try {
      // Convert buffer to Netty object for efficient transfer
      Object nettyObject = buffer.convertToNetty();
      
      if (nettyObject instanceof ByteBuf) {
        // Direct ByteBuf transfer
        ByteBuf byteBuf = (ByteBuf) nettyObject;
        channel.writeAndFlush(byteBuf);
        
      } else if (nettyObject instanceof FileRegion) {
        // Zero-copy file transfer
        FileRegion fileRegion = (FileRegion) nettyObject;
        channel.writeAndFlush(fileRegion);
        
      } else {
        System.err.println("Unsupported Netty object type: " + nettyObject.getClass());
      }
      
    } catch (IOException e) {
      System.err.println("Failed to convert buffer for network transfer: " + e.getMessage());
    }
  }
  
  public void sendFileSegment(File file, long offset, long length, Channel channel) {
    // Create file buffer for zero-copy transfer
    FileSegmentManagedBuffer fileBuffer = new FileSegmentManagedBuffer(conf, file, offset, length);
    
    try {
      // Get FileRegion for efficient file transfer
      FileRegion fileRegion = (FileRegion) fileBuffer.convertToNetty();
      
      // Send with completion handling
      ChannelFuture future = channel.writeAndFlush(fileRegion);
      future.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
          if (future.isSuccess()) {
            System.out.println("File segment sent successfully");
          } else {
            System.err.println("Failed to send file segment: " + future.cause());
          }
          
          // Release buffer after transfer
          fileBuffer.release();
        }
      });
      
    } catch (IOException e) {
      System.err.println("Failed to prepare file for transfer: " + e.getMessage());
      fileBuffer.release();
    }
  }
}

Buffer Factory Pattern

public class BufferFactory {
  private final TransportConf conf;
  
  public BufferFactory(TransportConf conf) {
    this.conf = conf;
  }
  
  public ManagedBuffer createFromBytes(byte[] data) {
    ByteBuffer buffer = ByteBuffer.wrap(data);
    return new NioManagedBuffer(buffer);
  }
  
  public ManagedBuffer createFromString(String text) {
    return createFromBytes(text.getBytes(StandardCharsets.UTF_8));
  }
  
  public ManagedBuffer createFromFile(File file) {
    return new FileSegmentManagedBuffer(conf, file, 0, file.length());
  }
  
  public ManagedBuffer createFileSegment(File file, long offset, long length) {
    // Validate parameters
    if (offset < 0 || length < 0) {
      throw new IllegalArgumentException("Offset and length must be non-negative");
    }
    
    if (offset + length > file.length()) {
      throw new IllegalArgumentException("Segment extends beyond file end");
    }
    
    return new FileSegmentManagedBuffer(conf, file, offset, length);
  }
  
  public ManagedBuffer createFromNettyBuf(ByteBuf buf) {
    return new NettyManagedBuffer(buf);
  }
  
  public ManagedBuffer createEmpty() {
    return new NioManagedBuffer(ByteBuffer.allocate(0));
  }
}

Buffer Composition and Chaining

public class CompositeBufferExample {
  public ManagedBuffer combineBuffers(List<ManagedBuffer> buffers) {
    // Calculate total size
    long totalSize = buffers.stream().mapToLong(ManagedBuffer::size).sum();
    
    if (totalSize > Integer.MAX_VALUE) {
      throw new IllegalArgumentException("Combined buffer too large: " + totalSize);
    }
    
    // Combine into single ByteBuffer
    ByteBuffer combined = ByteBuffer.allocate((int) totalSize);
    
    for (ManagedBuffer buffer : buffers) {
      try {
        ByteBuffer bb = buffer.nioByteBuffer();
        combined.put(bb);
      } catch (IOException e) {
        throw new RuntimeException("Failed to read buffer", e);
      } finally {
        buffer.release(); // Release source buffers
      }
    }
    
    combined.flip(); // Prepare for reading
    return new NioManagedBuffer(combined);
  }
  
  public void streamBuffersSequentially(List<ManagedBuffer> buffers, OutputStream output) 
      throws IOException {
    
    for (ManagedBuffer buffer : buffers) {
      try (InputStream input = buffer.createInputStream()) {
        byte[] chunk = new byte[8192];
        int bytesRead;
        
        while ((bytesRead = input.read(chunk)) != -1) {
          output.write(chunk, 0, bytesRead);
        }
        
      } finally {
        buffer.release();
      }
    }
  }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-network-common-2-10

docs

buffer-management.md

client-operations.md

configuration-utilities.md

index.md

message-protocol.md

sasl-authentication.md

server-operations.md

transport-setup.md

tile.json