CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-network-common

Core networking library for Apache Spark providing transport layer abstractions and utilities

Pending
Overview
Eval results
Files

streaming.mddocs/

Stream Management

Efficient streaming data transfer with chunk-based fetching, supporting large data transfers with minimal memory overhead and zero-copy I/O optimizations.

Capabilities

Stream Manager Abstract Base

Core abstraction for managing streams that can be fetched by clients, providing lifecycle management and authorization controls.

/**
 * Abstract base class for managing streams that can be fetched by clients
 * Provides stream lifecycle management and authorization
 */
public abstract class StreamManager {
    /**
     * Get a specific chunk from a stream by ID and index
     * @param streamId Numeric identifier for the stream
     * @param chunkIndex Index of the chunk within the stream (0-based)
     * @return ManagedBuffer containing the chunk data
     * @throws IllegalArgumentException if stream or chunk doesn't exist
     */
    public abstract ManagedBuffer getChunk(long streamId, int chunkIndex);
    
    /**
     * Open a stream for reading by string identifier
     * @param streamId String identifier for the stream
     * @return ManagedBuffer containing the full stream data
     * @throws IllegalArgumentException if stream doesn't exist
     */
    public abstract ManagedBuffer openStream(String streamId);
    
    /**
     * Called when a connection terminates to clean up associated streams
     * @param channel Netty channel that terminated
     */
    public void connectionTerminated(Channel channel);
    
    /**
     * Check if client is authorized to access the specified stream
     * @param client Transport client requesting access
     * @param streamId Numeric stream identifier
     * @throws SecurityException if client is not authorized
     */
    public void checkAuthorization(TransportClient client, long streamId);
}

Usage Examples:

// Implementing a custom StreamManager
public class MyStreamManager extends StreamManager {
    private final Map<Long, List<ManagedBuffer>> streams = new ConcurrentHashMap<>();
    private final Map<String, ManagedBuffer> namedStreams = new ConcurrentHashMap<>();
    
    @Override
    public ManagedBuffer getChunk(long streamId, int chunkIndex) {
        List<ManagedBuffer> chunks = streams.get(streamId);
        if (chunks == null || chunkIndex >= chunks.size()) {
            throw new IllegalArgumentException("Invalid stream or chunk: " + 
                                             streamId + "/" + chunkIndex);
        }
        return chunks.get(chunkIndex).retain();
    }
    
    @Override
    public ManagedBuffer openStream(String streamId) {
        ManagedBuffer buffer = namedStreams.get(streamId);
        if (buffer == null) {
            throw new IllegalArgumentException("Stream not found: " + streamId);
        }
        return buffer.retain();
    }
    
    @Override
    public void checkAuthorization(TransportClient client, long streamId) {
        String clientId = client.getClientId();
        if (!isAuthorized(clientId, streamId)) {
            throw new SecurityException("Client " + clientId + 
                                      " not authorized for stream " + streamId);
        }
    }
    
    private boolean isAuthorized(String clientId, long streamId) {
        // Custom authorization logic
        return true;
    }
}

One-For-One Stream Manager

Concrete StreamManager implementation where each chunk corresponds to one buffer, providing simple stream registration and management.

/**
 * StreamManager where each chunk corresponds to one buffer
 * Provides simple stream registration with automatic cleanup
 */
public class OneForOneStreamManager extends StreamManager {
    /**
     * Create a new OneForOneStreamManager with default settings
     */
    public OneForOneStreamManager();
    
    /**
     * Register a new stream with the manager
     * @param appId Application identifier for authorization
     * @param buffers Iterator of ManagedBuffer instances representing chunks
     * @param channel Netty channel associated with this stream (for cleanup)
     * @return Numeric stream ID that can be used to fetch chunks
     */
    public long registerStream(String appId, Iterator<ManagedBuffer> buffers, Channel channel);
    
    /**
     * Get a specific chunk from a registered stream
     * @param streamId Stream identifier returned from registerStream
     * @param chunkIndex Index of the chunk to retrieve (0-based)
     * @return ManagedBuffer containing the chunk data
     * @throws IllegalArgumentException if stream or chunk doesn't exist
     */
    public ManagedBuffer getChunk(long streamId, int chunkIndex);
    
    /**
     * Open a stream by string identifier (not supported in OneForOneStreamManager)
     * @param streamId String stream identifier
     * @return Never returns normally
     * @throws UnsupportedOperationException always thrown
     */
    public ManagedBuffer openStream(String streamId);
    
    /**
     * Clean up streams associated with terminated connection
     * @param channel Netty channel that terminated
     */
    public void connectionTerminated(Channel channel);
}

Usage Examples:

import java.util.Arrays;
import java.util.Iterator;

// Create stream manager
OneForOneStreamManager streamManager = new OneForOneStreamManager();

// Prepare data chunks
List<ManagedBuffer> chunks = Arrays.asList(
    new NioManagedBuffer(ByteBuffer.wrap("chunk0".getBytes())),
    new NioManagedBuffer(ByteBuffer.wrap("chunk1".getBytes())),
    new NioManagedBuffer(ByteBuffer.wrap("chunk2".getBytes()))
);

// Register stream
long streamId = streamManager.registerStream("myapp", chunks.iterator(), channel);
System.out.println("Registered stream with ID: " + streamId);

// Clients can now fetch chunks
ManagedBuffer chunk0 = streamManager.getChunk(streamId, 0);
ManagedBuffer chunk1 = streamManager.getChunk(streamId, 1);

// File-based chunks for large data
List<ManagedBuffer> fileChunks = Arrays.asList(
    new FileSegmentManagedBuffer(conf, file, 0, 1024*1024),        // First 1MB
    new FileSegmentManagedBuffer(conf, file, 1024*1024, 1024*1024), // Second 1MB
    new FileSegmentManagedBuffer(conf, file, 2*1024*1024, 512*1024) // Last 512KB
);

long fileStreamId = streamManager.registerStream("fileapp", fileChunks.iterator(), channel);

Stream Callback Interfaces

Callback interfaces for handling streaming data reception with different levels of functionality.

/**
 * Callback interface for handling streaming data reception
 */
public interface StreamCallback {
    /**
     * Called when data is received for a stream
     * @param streamId String identifier of the stream
     * @param buf ByteBuffer containing the received data
     * @throws IOException if data processing fails
     */
    void onData(String streamId, ByteBuffer buf) throws IOException;
    
    /**
     * Called when stream transfer is complete
     * @param streamId String identifier of the stream
     * @throws IOException if completion processing fails
     */
    void onComplete(String streamId) throws IOException;
    
    /**
     * Called when stream transfer fails
     * @param streamId String identifier of the stream
     * @param cause Exception that caused the failure
     * @throws IOException if failure processing fails
     */
    void onFailure(String streamId, Throwable cause) throws IOException;
}

/**
 * Extended StreamCallback that provides access to stream ID
 * Useful when the same callback handles multiple streams
 */
public interface StreamCallbackWithID extends StreamCallback {
    /**
     * Get the stream identifier this callback is associated with
     * @return Stream ID string
     */
    String getID();
}

Usage Examples:

// Simple stream callback
StreamCallback callback = new StreamCallback() {
    private ByteArrayOutputStream buffer = new ByteArrayOutputStream();
    
    @Override
    public void onData(String streamId, ByteBuffer buf) throws IOException {
        byte[] data = new byte[buf.remaining()];
        buf.get(data);
        buffer.write(data);
        System.out.println("Received " + data.length + " bytes for stream " + streamId);
    }
    
    @Override
    public void onComplete(String streamId) throws IOException {
        byte[] fullData = buffer.toByteArray();
        System.out.println("Stream " + streamId + " complete, total: " + fullData.length + " bytes");
        processStreamData(fullData);
    }
    
    @Override
    public void onFailure(String streamId, Throwable cause) throws IOException {
        System.err.println("Stream " + streamId + " failed: " + cause.getMessage());
        cleanup();
    }
};

// Use callback with client
client.stream("my-data-stream", callback);

// Callback with ID for multi-stream handling
public class MultiStreamCallback implements StreamCallbackWithID {
    private final String streamId;
    private final Map<String, ByteArrayOutputStream> buffers = new ConcurrentHashMap<>();
    
    public MultiStreamCallback(String streamId) {
        this.streamId = streamId;
        buffers.put(streamId, new ByteArrayOutputStream());
    }
    
    @Override
    public String getID() {
        return streamId;
    }
    
    @Override
    public void onData(String streamId, ByteBuffer buf) throws IOException {
        ByteArrayOutputStream buffer = buffers.get(streamId);
        if (buffer != null) {
            byte[] data = new byte[buf.remaining()];
            buf.get(data);
            buffer.write(data);
        }
    }
    
    @Override
    public void onComplete(String streamId) throws IOException {
        ByteArrayOutputStream buffer = buffers.remove(streamId);
        if (buffer != null) {
            processCompletedStream(streamId, buffer.toByteArray());
        }
    }
    
    @Override
    public void onFailure(String streamId, Throwable cause) throws IOException {
        buffers.remove(streamId);
        handleStreamFailure(streamId, cause);
    }
}

Chunk Reception Callback

Callback interface specifically for handling chunk-based data reception with precise chunk indexing.

/**
 * Callback interface for handling received chunks from stream fetching
 */
public interface ChunkReceivedCallback {
    /**
     * Called when a chunk is successfully received
     * @param chunkIndex Index of the received chunk (0-based)
     * @param buffer ManagedBuffer containing the chunk data
     */
    void onSuccess(int chunkIndex, ManagedBuffer buffer);
    
    /**
     * Called when chunk fetching fails
     * @param chunkIndex Index of the chunk that failed to fetch
     * @param e Exception that caused the failure
     */
    void onFailure(int chunkIndex, Throwable e);
}

Usage Examples:

// Chunk fetching with callback
ChunkReceivedCallback chunkCallback = new ChunkReceivedCallback() {
    private final Map<Integer, ManagedBuffer> receivedChunks = new ConcurrentHashMap<>();
    private final int totalChunks;
    
    public ChunkReceivedCallback(int totalChunks) {
        this.totalChunks = totalChunks;
    }
    
    @Override
    public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
        System.out.println("Received chunk " + chunkIndex + " of size " + buffer.size());
        receivedChunks.put(chunkIndex, buffer.retain());
        
        // Check if all chunks received
        if (receivedChunks.size() == totalChunks) {
            assembleCompleteData();
        }
    }
    
    @Override
    public void onFailure(int chunkIndex, Throwable e) {
        System.err.println("Failed to fetch chunk " + chunkIndex + ": " + e.getMessage());
        // Retry or handle failure
        retryChunk(chunkIndex);
    }
    
    private void assembleCompleteData() {
        // Assemble chunks in order
        ByteArrayOutputStream result = new ByteArrayOutputStream();
        for (int i = 0; i < totalChunks; i++) {
            ManagedBuffer chunk = receivedChunks.get(i);
            if (chunk != null) {
                try (InputStream is = chunk.createInputStream()) {
                    byte[] data = new byte[(int) chunk.size()];
                    is.read(data);
                    result.write(data);
                } catch (IOException e) {
                    System.err.println("Error reading chunk " + i + ": " + e.getMessage());
                } finally {
                    chunk.release();
                }
            }
        }
        
        processAssembledData(result.toByteArray());
    }
};

// Fetch chunks
for (int i = 0; i < totalChunks; i++) {
    client.fetchChunk(streamId, i, chunkCallback);
}

Stream Usage Patterns

Large File Transfer

// Server-side: Register file as stream
File largeFile = new File("/path/to/large/file.dat");
long fileSize = largeFile.length();
int chunkSize = 1024 * 1024; // 1MB chunks
List<ManagedBuffer> chunks = new ArrayList<>();

for (long offset = 0; offset < fileSize; offset += chunkSize) {
    long length = Math.min(chunkSize, fileSize - offset);
    chunks.add(new FileSegmentManagedBuffer(conf, largeFile, offset, length));
}

OneForOneStreamManager streamManager = new OneForOneStreamManager();
long streamId = streamManager.registerStream("file-transfer", chunks.iterator(), channel);

// Client-side: Fetch file in chunks
int totalChunks = (int) Math.ceil((double) fileSize / chunkSize);
Map<Integer, ManagedBuffer> receivedChunks = new ConcurrentHashMap<>();

ChunkReceivedCallback callback = new ChunkReceivedCallback() {
    @Override
    public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
        receivedChunks.put(chunkIndex, buffer);
        if (receivedChunks.size() == totalChunks) {
            reconstructFile();
        }
    }
    
    @Override
    public void onFailure(int chunkIndex, Throwable e) {
        System.err.println("Chunk " + chunkIndex + " failed: " + e.getMessage());
    }
};

// Fetch all chunks
for (int i = 0; i < totalChunks; i++) {
    client.fetchChunk(streamId, i, callback);
}

Streaming Data Processing

// Server-side: Stream processing results
public class ProcessingStreamManager extends StreamManager {
    private final Map<String, DataProcessor> processors = new ConcurrentHashMap<>();
    
    public void startProcessing(String streamId, DataProcessor processor) {
        processors.put(streamId, processor);
    }
    
    @Override
    public ManagedBuffer openStream(String streamId) {
        DataProcessor processor = processors.get(streamId);
        if (processor == null) {
            throw new IllegalArgumentException("No processor for stream: " + streamId);
        }
        
        // Return processed data as stream
        byte[] processedData = processor.getResults();
        return new NioManagedBuffer(ByteBuffer.wrap(processedData));
    }
    
    @Override
    public ManagedBuffer getChunk(long streamId, int chunkIndex) {
        throw new UnsupportedOperationException("Use openStream for processed data");
    }
}

// Client-side: Receive processed data
StreamCallback streamCallback = new StreamCallback() {
    private final ByteArrayOutputStream results = new ByteArrayOutputStream();
    
    @Override
    public void onData(String streamId, ByteBuffer buf) throws IOException {
        byte[] data = new byte[buf.remaining()];
        buf.get(data);
        results.write(data);
    }
    
    @Override
    public void onComplete(String streamId) throws IOException {
        byte[] finalResults = results.toByteArray();
        handleProcessingResults(finalResults);
    }
    
    @Override
    public void onFailure(String streamId, Throwable cause) throws IOException {
        System.err.println("Processing stream failed: " + cause.getMessage());
    }
};

client.stream("processing-results", streamCallback);

Zero-Copy Stream Transfer

// Efficient zero-copy transfer using FileSegmentManagedBuffer
public class ZeroCopyStreamManager extends StreamManager {
    private final Map<Long, FileInfo> streamFiles = new ConcurrentHashMap<>();
    
    public long registerFileStream(File file, Channel channel) {
        long streamId = generateStreamId();
        streamFiles.put(streamId, new FileInfo(file, channel));
        return streamId;
    }
    
    @Override
    public ManagedBuffer getChunk(long streamId, int chunkIndex) {
        FileInfo fileInfo = streamFiles.get(streamId);
        if (fileInfo == null) {
            throw new IllegalArgumentException("Stream not found: " + streamId);
        }
        
        long chunkSize = 64 * 1024; // 64KB chunks
        long offset = chunkIndex * chunkSize;
        long length = Math.min(chunkSize, fileInfo.file.length() - offset);
        
        if (offset >= fileInfo.file.length()) {
            throw new IllegalArgumentException("Chunk index out of range: " + chunkIndex);
        }
        
        // Zero-copy file segment
        return new FileSegmentManagedBuffer(conf, fileInfo.file, offset, length);
    }
    
    @Override
    public ManagedBuffer openStream(String streamId) {
        throw new UnsupportedOperationException("Use getChunk for file streams");
    }
    
    private static class FileInfo {
        final File file;
        final Channel channel;
        
        FileInfo(File file, Channel channel) {
            this.file = file;
            this.channel = channel;
        }
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-network-common

docs

authentication.md

buffers.md

configuration.md

index.md

protocol.md

streaming.md

transport.md

tile.json