CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-network-common-2-12

Core networking functionality for Apache Spark including transport layers, buffer management, client-server communication, cryptographic protocols, SASL authentication, and shuffle database support using Netty for high-performance network I/O.

Pending
Overview
Eval results
Files

buffer-management.mddocs/

Buffer Management

The buffer management API provides efficient, zero-copy buffer operations for Apache Spark's networking layer. The ManagedBuffer abstract class serves as the foundation for different buffer implementations, enabling efficient memory usage and data transfer with various backing stores.

Capabilities

ManagedBuffer (Abstract Class)

Abstract base class for immutable byte data views with different backing implementations.

/**
 * Get the size of the buffer in bytes
 * @return long representing the number of bytes in the buffer
 */
public abstract long size();

/**
 * Get a NIO ByteBuffer view of this buffer's data
 * @return ByteBuffer containing the buffer data
 * @throws IOException if the buffer data cannot be accessed
 */
public abstract ByteBuffer nioByteBuffer() throws IOException;

/**
 * Create an InputStream for reading the buffer data
 * @return InputStream for sequential reading of buffer contents
 * @throws IOException if the stream cannot be created
 */
public abstract InputStream createInputStream() throws IOException;

/**
 * Increment the reference count of this buffer
 * @return ManagedBuffer instance for method chaining
 */
public abstract ManagedBuffer retain();

/**
 * Decrement the reference count and release resources if count reaches zero
 * @return ManagedBuffer instance for method chaining
 */
public abstract ManagedBuffer release();

/**
 * Convert this buffer to a Netty-compatible object for efficient network transfer
 * @return Object suitable for Netty channel operations (typically ByteBuf or FileRegion)
 * @throws IOException if conversion fails
 */
public abstract Object convertToNetty() throws IOException;

Buffer Implementations

NioManagedBuffer

Managed buffer implementation backed by a NIO ByteBuffer, suitable for in-memory data.

/**
 * Create a managed buffer from a NIO ByteBuffer
 * @param buf - ByteBuffer containing the data (position and limit are preserved)
 */
public NioManagedBuffer(ByteBuffer buf);

@Override
public long size();

@Override
public ByteBuffer nioByteBuffer() throws IOException;

@Override
public InputStream createInputStream() throws IOException;

@Override
public ManagedBuffer retain();

@Override
public ManagedBuffer release();

@Override
public Object convertToNetty() throws IOException;

FileSegmentManagedBuffer

Managed buffer implementation backed by a file segment, enabling efficient zero-copy file transfers.

/**
 * Create a managed buffer from a file segment
 * @param file - File to read data from
 * @param offset - Starting position in the file
 * @param length - Number of bytes to include from the file
 */
public FileSegmentManagedBuffer(File file, long offset, long length);

/**
 * Create a managed buffer from a complete file
 * @param file - File to read data from (entire file)
 */
public FileSegmentManagedBuffer(File file);

@Override
public long size();

@Override
public ByteBuffer nioByteBuffer() throws IOException;

@Override
public InputStream createInputStream() throws IOException;

@Override
public ManagedBuffer retain();

@Override
public ManagedBuffer release();

@Override
public Object convertToNetty() throws IOException;

/**
 * Get the underlying file
 * @return File object backing this buffer
 */
public File getFile();

/**
 * Get the offset within the file
 * @return long representing the starting position in bytes
 */
public long getOffset();

/**
 * Get the length of the file segment
 * @return long representing the number of bytes in the segment
 */
public long getLength();

NettyManagedBuffer

Managed buffer implementation backed by a Netty ByteBuf with reference counting support.

/**
 * Create a managed buffer from a Netty ByteBuf
 * @param buf - ByteBuf containing the data (reference count is managed)
 */
public NettyManagedBuffer(ByteBuf buf);

@Override
public long size();

@Override
public ByteBuffer nioByteBuffer() throws IOException;

@Override
public InputStream createInputStream() throws IOException;

@Override
public ManagedBuffer retain();

@Override
public ManagedBuffer release();

@Override
public Object convertToNetty() throws IOException;

/**
 * Get the underlying Netty ByteBuf
 * @return ByteBuf backing this managed buffer
 */
public ByteBuf getBuf();

Usage Examples

Working with NioManagedBuffer

import org.apache.spark.network.buffer.NioManagedBuffer;
import java.nio.ByteBuffer;

// Create buffer from byte array
byte[] data = "Hello, Spark Network!".getBytes();
ByteBuffer byteBuffer = ByteBuffer.wrap(data);
NioManagedBuffer buffer = new NioManagedBuffer(byteBuffer);

// Get buffer information
System.out.println("Buffer size: " + buffer.size() + " bytes");

// Read data as ByteBuffer
try {
    ByteBuffer nioBuffer = buffer.nioByteBuffer();
    byte[] readData = new byte[nioBuffer.remaining()];
    nioBuffer.get(readData);
    System.out.println("Buffer content: " + new String(readData));
} catch (IOException e) {
    System.err.println("Failed to read buffer: " + e.getMessage());
}

// Read data as InputStream
try (InputStream inputStream = buffer.createInputStream()) {
    byte[] streamData = inputStream.readAllBytes();
    System.out.println("Stream content: " + new String(streamData));
} catch (IOException e) {
    System.err.println("Failed to read stream: " + e.getMessage());
}

// Reference counting (NioManagedBuffer doesn't actually count, but follows the interface)
buffer.retain(); // Increment reference count
buffer.release(); // Decrement reference count

Working with FileSegmentManagedBuffer

import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
import java.io.File;
import java.io.FileWriter;

// Create a test file
File testFile = new File("test-data.txt");
try (FileWriter writer = new FileWriter(testFile)) {
    writer.write("This is test data for FileSegmentManagedBuffer demonstration.");
}

// Create buffer for entire file
FileSegmentManagedBuffer fileBuffer = new FileSegmentManagedBuffer(testFile);
System.out.println("File buffer size: " + fileBuffer.size() + " bytes");
System.out.println("File: " + fileBuffer.getFile().getName());
System.out.println("Offset: " + fileBuffer.getOffset());
System.out.println("Length: " + fileBuffer.getLength());

// Create buffer for file segment
FileSegmentManagedBuffer segmentBuffer = new FileSegmentManagedBuffer(testFile, 10, 20);
System.out.println("Segment buffer size: " + segmentBuffer.size() + " bytes");

// Read file segment data
try {
    ByteBuffer segmentData = segmentBuffer.nioByteBuffer();
    byte[] segmentBytes = new byte[segmentData.remaining()];
    segmentData.get(segmentBytes);
    System.out.println("Segment content: " + new String(segmentBytes));
} catch (IOException e) {
    System.err.println("Failed to read segment: " + e.getMessage());
}

// Zero-copy conversion for Netty
try {
    Object nettyObject = fileBuffer.convertToNetty();
    System.out.println("Netty object type: " + nettyObject.getClass().getSimpleName());
    // This typically returns a FileRegion for efficient zero-copy transfer
} catch (IOException e) {
    System.err.println("Failed to convert to Netty: " + e.getMessage());
}

// Cleanup
testFile.delete();

Working with NettyManagedBuffer

import org.apache.spark.network.buffer.NettyManagedBuffer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

// Create Netty ByteBuf
byte[] data = "Netty-backed buffer data".getBytes();
ByteBuf byteBuf = Unpooled.copiedBuffer(data);

// Create managed buffer
NettyManagedBuffer nettyBuffer = new NettyManagedBuffer(byteBuf);
System.out.println("Netty buffer size: " + nettyBuffer.size() + " bytes");

// Reference counting is important with Netty buffers
nettyBuffer.retain(); // Increment reference count
System.out.println("ByteBuf reference count: " + nettyBuffer.getBuf().refCnt());

// Read data
try {
    ByteBuffer nioView = nettyBuffer.nioByteBuffer();
    byte[] readData = new byte[nioView.remaining()];
    nioView.get(readData);
    System.out.println("Netty buffer content: " + new String(readData));
} catch (IOException e) {
    System.err.println("Failed to read Netty buffer: " + e.getMessage());
}

// Release references (important to prevent memory leaks)
nettyBuffer.release(); // Decrement reference count
nettyBuffer.release(); // Final release

Buffer Reference Management

// Proper reference counting pattern
ManagedBuffer buffer = new NioManagedBuffer(ByteBuffer.wrap("data".getBytes()));

try {
    // Retain buffer for async operation
    buffer.retain();
    
    // Pass buffer to async operation
    asyncOperation(buffer, new Callback() {
        @Override
        public void onComplete() {
            // Release buffer when async operation completes
            buffer.release();
        }
        
        @Override
        public void onError(Throwable e) {
            // Always release buffer, even on error
            buffer.release();
        }
    });
    
} finally {
    // Release original reference
    buffer.release();
}

Buffer Conversion for Network Transfer

// Convert different buffer types for Netty transfer
void transferBuffer(ManagedBuffer buffer, Channel channel) {
    try {
        Object nettyObject = buffer.convertToNetty();
        
        if (nettyObject instanceof ByteBuf) {
            // Direct ByteBuf transfer
            ByteBuf byteBuf = (ByteBuf) nettyObject;
            channel.writeAndFlush(byteBuf);
        } else if (nettyObject instanceof FileRegion) {
            // Zero-copy file transfer
            FileRegion fileRegion = (FileRegion) nettyObject;
            channel.writeAndFlush(fileRegion);
        } else {
            // Fallback to ByteBuf
            ByteBuffer nioBuffer = buffer.nioByteBuffer();
            ByteBuf byteBuf = Unpooled.wrappedBuffer(nioBuffer);
            channel.writeAndFlush(byteBuf);
        }
    } catch (IOException e) {
        System.err.println("Failed to transfer buffer: " + e.getMessage());
    }
}

Custom Buffer Implementation

// Example of implementing a custom ManagedBuffer
public class StringManagedBuffer extends ManagedBuffer {
    private final String data;
    private final byte[] bytes;
    
    public StringManagedBuffer(String data) {
        this.data = data;
        this.bytes = data.getBytes(StandardCharsets.UTF_8);
    }
    
    @Override
    public long size() {
        return bytes.length;
    }
    
    @Override
    public ByteBuffer nioByteBuffer() throws IOException {
        return ByteBuffer.wrap(bytes).asReadOnlyBuffer();
    }
    
    @Override
    public InputStream createInputStream() throws IOException {
        return new ByteArrayInputStream(bytes);
    }
    
    @Override
    public ManagedBuffer retain() {
        // No-op for this simple implementation
        return this;
    }
    
    @Override
    public ManagedBuffer release() {
        // No-op for this simple implementation
        return this;
    }
    
    @Override
    public Object convertToNetty() throws IOException {
        return Unpooled.wrappedBuffer(bytes);
    }
    
    public String getString() {
        return data;
    }
}

// Usage
StringManagedBuffer stringBuffer = new StringManagedBuffer("Custom buffer content");
System.out.println("String buffer size: " + stringBuffer.size());
System.out.println("String content: " + stringBuffer.getString());

Best Practices

Memory Management

  1. Always release buffers: Call release() when done with a buffer to prevent memory leaks
  2. Use retain/release pairs: For every retain() call, ensure a corresponding release() call
  3. Handle exceptions: Always release buffers in finally blocks or try-with-resources when possible
  4. Zero-copy when possible: Use FileSegmentManagedBuffer for large file transfers

Performance Optimization

  1. Choose appropriate buffer type:

    • NioManagedBuffer for small in-memory data
    • FileSegmentManagedBuffer for large file data
    • NettyManagedBuffer when working with Netty components
  2. Avoid unnecessary copies: Use convertToNetty() for network transfers instead of copying to new buffers

  3. Reuse buffers: Where possible, reuse buffer instances to reduce garbage collection pressure

Error Handling

// Safe buffer handling pattern
ManagedBuffer buffer = null;
try {
    buffer = createBuffer(); // Some buffer creation method
    buffer.retain();
    
    // Use buffer...
    processBuffer(buffer);
    
} catch (Exception e) {
    System.err.println("Error processing buffer: " + e.getMessage());
} finally {
    if (buffer != null) {
        buffer.release();
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-network-common-2-12

docs

buffer-management.md

client-operations.md

configuration-management.md

index.md

message-protocol.md

security-authentication.md

server-operations.md

shuffle-database.md

transport-context.md

tile.json