Core networking library for Apache Spark providing transport layer abstractions and utilities
—
Efficient streaming data transfer with chunk-based fetching, supporting large data transfers with minimal memory overhead and zero-copy I/O optimizations.
Core abstraction for managing streams that can be fetched by clients, providing lifecycle management and authorization controls.
/**
* Abstract base class for managing streams that can be fetched by clients
* Provides stream lifecycle management and authorization
*/
public abstract class StreamManager {
/**
* Get a specific chunk from a stream by ID and index
* @param streamId Numeric identifier for the stream
* @param chunkIndex Index of the chunk within the stream (0-based)
* @return ManagedBuffer containing the chunk data
* @throws IllegalArgumentException if stream or chunk doesn't exist
*/
public abstract ManagedBuffer getChunk(long streamId, int chunkIndex);
/**
* Open a stream for reading by string identifier
* @param streamId String identifier for the stream
* @return ManagedBuffer containing the full stream data
* @throws IllegalArgumentException if stream doesn't exist
*/
public abstract ManagedBuffer openStream(String streamId);
/**
* Called when a connection terminates to clean up associated streams
* @param channel Netty channel that terminated
*/
public void connectionTerminated(Channel channel);
/**
* Check if client is authorized to access the specified stream
* @param client Transport client requesting access
* @param streamId Numeric stream identifier
* @throws SecurityException if client is not authorized
*/
public void checkAuthorization(TransportClient client, long streamId);
}Usage Examples:
// Implementing a custom StreamManager
public class MyStreamManager extends StreamManager {
private final Map<Long, List<ManagedBuffer>> streams = new ConcurrentHashMap<>();
private final Map<String, ManagedBuffer> namedStreams = new ConcurrentHashMap<>();
@Override
public ManagedBuffer getChunk(long streamId, int chunkIndex) {
List<ManagedBuffer> chunks = streams.get(streamId);
if (chunks == null || chunkIndex >= chunks.size()) {
throw new IllegalArgumentException("Invalid stream or chunk: " +
streamId + "/" + chunkIndex);
}
return chunks.get(chunkIndex).retain();
}
@Override
public ManagedBuffer openStream(String streamId) {
ManagedBuffer buffer = namedStreams.get(streamId);
if (buffer == null) {
throw new IllegalArgumentException("Stream not found: " + streamId);
}
return buffer.retain();
}
@Override
public void checkAuthorization(TransportClient client, long streamId) {
String clientId = client.getClientId();
if (!isAuthorized(clientId, streamId)) {
throw new SecurityException("Client " + clientId +
" not authorized for stream " + streamId);
}
}
private boolean isAuthorized(String clientId, long streamId) {
// Custom authorization logic
return true;
}
}Concrete StreamManager implementation where each chunk corresponds to one buffer, providing simple stream registration and management.
/**
* StreamManager where each chunk corresponds to one buffer
* Provides simple stream registration with automatic cleanup
*/
public class OneForOneStreamManager extends StreamManager {
/**
* Create a new OneForOneStreamManager with default settings
*/
public OneForOneStreamManager();
/**
* Register a new stream with the manager
* @param appId Application identifier for authorization
* @param buffers Iterator of ManagedBuffer instances representing chunks
* @param channel Netty channel associated with this stream (for cleanup)
* @return Numeric stream ID that can be used to fetch chunks
*/
public long registerStream(String appId, Iterator<ManagedBuffer> buffers, Channel channel);
/**
* Get a specific chunk from a registered stream
* @param streamId Stream identifier returned from registerStream
* @param chunkIndex Index of the chunk to retrieve (0-based)
* @return ManagedBuffer containing the chunk data
* @throws IllegalArgumentException if stream or chunk doesn't exist
*/
public ManagedBuffer getChunk(long streamId, int chunkIndex);
/**
* Open a stream by string identifier (not supported in OneForOneStreamManager)
* @param streamId String stream identifier
* @return Never returns normally
* @throws UnsupportedOperationException always thrown
*/
public ManagedBuffer openStream(String streamId);
/**
* Clean up streams associated with terminated connection
* @param channel Netty channel that terminated
*/
public void connectionTerminated(Channel channel);
}Usage Examples:
import java.util.Arrays;
import java.util.Iterator;
// Create stream manager
OneForOneStreamManager streamManager = new OneForOneStreamManager();
// Prepare data chunks
List<ManagedBuffer> chunks = Arrays.asList(
new NioManagedBuffer(ByteBuffer.wrap("chunk0".getBytes())),
new NioManagedBuffer(ByteBuffer.wrap("chunk1".getBytes())),
new NioManagedBuffer(ByteBuffer.wrap("chunk2".getBytes()))
);
// Register stream
long streamId = streamManager.registerStream("myapp", chunks.iterator(), channel);
System.out.println("Registered stream with ID: " + streamId);
// Clients can now fetch chunks
ManagedBuffer chunk0 = streamManager.getChunk(streamId, 0);
ManagedBuffer chunk1 = streamManager.getChunk(streamId, 1);
// File-based chunks for large data
List<ManagedBuffer> fileChunks = Arrays.asList(
new FileSegmentManagedBuffer(conf, file, 0, 1024*1024), // First 1MB
new FileSegmentManagedBuffer(conf, file, 1024*1024, 1024*1024), // Second 1MB
new FileSegmentManagedBuffer(conf, file, 2*1024*1024, 512*1024) // Last 512KB
);
long fileStreamId = streamManager.registerStream("fileapp", fileChunks.iterator(), channel);Callback interfaces for handling streaming data reception with different levels of functionality.
/**
* Callback interface for handling streaming data reception
*/
public interface StreamCallback {
/**
* Called when data is received for a stream
* @param streamId String identifier of the stream
* @param buf ByteBuffer containing the received data
* @throws IOException if data processing fails
*/
void onData(String streamId, ByteBuffer buf) throws IOException;
/**
* Called when stream transfer is complete
* @param streamId String identifier of the stream
* @throws IOException if completion processing fails
*/
void onComplete(String streamId) throws IOException;
/**
* Called when stream transfer fails
* @param streamId String identifier of the stream
* @param cause Exception that caused the failure
* @throws IOException if failure processing fails
*/
void onFailure(String streamId, Throwable cause) throws IOException;
}
/**
* Extended StreamCallback that provides access to stream ID
* Useful when the same callback handles multiple streams
*/
public interface StreamCallbackWithID extends StreamCallback {
/**
* Get the stream identifier this callback is associated with
* @return Stream ID string
*/
String getID();
}Usage Examples:
// Simple stream callback
StreamCallback callback = new StreamCallback() {
private ByteArrayOutputStream buffer = new ByteArrayOutputStream();
@Override
public void onData(String streamId, ByteBuffer buf) throws IOException {
byte[] data = new byte[buf.remaining()];
buf.get(data);
buffer.write(data);
System.out.println("Received " + data.length + " bytes for stream " + streamId);
}
@Override
public void onComplete(String streamId) throws IOException {
byte[] fullData = buffer.toByteArray();
System.out.println("Stream " + streamId + " complete, total: " + fullData.length + " bytes");
processStreamData(fullData);
}
@Override
public void onFailure(String streamId, Throwable cause) throws IOException {
System.err.println("Stream " + streamId + " failed: " + cause.getMessage());
cleanup();
}
};
// Use callback with client
client.stream("my-data-stream", callback);
// Callback with ID for multi-stream handling
public class MultiStreamCallback implements StreamCallbackWithID {
private final String streamId;
private final Map<String, ByteArrayOutputStream> buffers = new ConcurrentHashMap<>();
public MultiStreamCallback(String streamId) {
this.streamId = streamId;
buffers.put(streamId, new ByteArrayOutputStream());
}
@Override
public String getID() {
return streamId;
}
@Override
public void onData(String streamId, ByteBuffer buf) throws IOException {
ByteArrayOutputStream buffer = buffers.get(streamId);
if (buffer != null) {
byte[] data = new byte[buf.remaining()];
buf.get(data);
buffer.write(data);
}
}
@Override
public void onComplete(String streamId) throws IOException {
ByteArrayOutputStream buffer = buffers.remove(streamId);
if (buffer != null) {
processCompletedStream(streamId, buffer.toByteArray());
}
}
@Override
public void onFailure(String streamId, Throwable cause) throws IOException {
buffers.remove(streamId);
handleStreamFailure(streamId, cause);
}
}Callback interface specifically for handling chunk-based data reception with precise chunk indexing.
/**
* Callback interface for handling received chunks from stream fetching
*/
public interface ChunkReceivedCallback {
/**
* Called when a chunk is successfully received
* @param chunkIndex Index of the received chunk (0-based)
* @param buffer ManagedBuffer containing the chunk data
*/
void onSuccess(int chunkIndex, ManagedBuffer buffer);
/**
* Called when chunk fetching fails
* @param chunkIndex Index of the chunk that failed to fetch
* @param e Exception that caused the failure
*/
void onFailure(int chunkIndex, Throwable e);
}Usage Examples:
// Chunk fetching with callback
ChunkReceivedCallback chunkCallback = new ChunkReceivedCallback() {
private final Map<Integer, ManagedBuffer> receivedChunks = new ConcurrentHashMap<>();
private final int totalChunks;
public ChunkReceivedCallback(int totalChunks) {
this.totalChunks = totalChunks;
}
@Override
public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
System.out.println("Received chunk " + chunkIndex + " of size " + buffer.size());
receivedChunks.put(chunkIndex, buffer.retain());
// Check if all chunks received
if (receivedChunks.size() == totalChunks) {
assembleCompleteData();
}
}
@Override
public void onFailure(int chunkIndex, Throwable e) {
System.err.println("Failed to fetch chunk " + chunkIndex + ": " + e.getMessage());
// Retry or handle failure
retryChunk(chunkIndex);
}
private void assembleCompleteData() {
// Assemble chunks in order
ByteArrayOutputStream result = new ByteArrayOutputStream();
for (int i = 0; i < totalChunks; i++) {
ManagedBuffer chunk = receivedChunks.get(i);
if (chunk != null) {
try (InputStream is = chunk.createInputStream()) {
byte[] data = new byte[(int) chunk.size()];
is.read(data);
result.write(data);
} catch (IOException e) {
System.err.println("Error reading chunk " + i + ": " + e.getMessage());
} finally {
chunk.release();
}
}
}
processAssembledData(result.toByteArray());
}
};
// Fetch chunks
for (int i = 0; i < totalChunks; i++) {
client.fetchChunk(streamId, i, chunkCallback);
}// Server-side: Register file as stream
File largeFile = new File("/path/to/large/file.dat");
long fileSize = largeFile.length();
int chunkSize = 1024 * 1024; // 1MB chunks
List<ManagedBuffer> chunks = new ArrayList<>();
for (long offset = 0; offset < fileSize; offset += chunkSize) {
long length = Math.min(chunkSize, fileSize - offset);
chunks.add(new FileSegmentManagedBuffer(conf, largeFile, offset, length));
}
OneForOneStreamManager streamManager = new OneForOneStreamManager();
long streamId = streamManager.registerStream("file-transfer", chunks.iterator(), channel);
// Client-side: Fetch file in chunks
int totalChunks = (int) Math.ceil((double) fileSize / chunkSize);
Map<Integer, ManagedBuffer> receivedChunks = new ConcurrentHashMap<>();
ChunkReceivedCallback callback = new ChunkReceivedCallback() {
@Override
public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
receivedChunks.put(chunkIndex, buffer);
if (receivedChunks.size() == totalChunks) {
reconstructFile();
}
}
@Override
public void onFailure(int chunkIndex, Throwable e) {
System.err.println("Chunk " + chunkIndex + " failed: " + e.getMessage());
}
};
// Fetch all chunks
for (int i = 0; i < totalChunks; i++) {
client.fetchChunk(streamId, i, callback);
}// Server-side: Stream processing results
public class ProcessingStreamManager extends StreamManager {
private final Map<String, DataProcessor> processors = new ConcurrentHashMap<>();
public void startProcessing(String streamId, DataProcessor processor) {
processors.put(streamId, processor);
}
@Override
public ManagedBuffer openStream(String streamId) {
DataProcessor processor = processors.get(streamId);
if (processor == null) {
throw new IllegalArgumentException("No processor for stream: " + streamId);
}
// Return processed data as stream
byte[] processedData = processor.getResults();
return new NioManagedBuffer(ByteBuffer.wrap(processedData));
}
@Override
public ManagedBuffer getChunk(long streamId, int chunkIndex) {
throw new UnsupportedOperationException("Use openStream for processed data");
}
}
// Client-side: Receive processed data
StreamCallback streamCallback = new StreamCallback() {
private final ByteArrayOutputStream results = new ByteArrayOutputStream();
@Override
public void onData(String streamId, ByteBuffer buf) throws IOException {
byte[] data = new byte[buf.remaining()];
buf.get(data);
results.write(data);
}
@Override
public void onComplete(String streamId) throws IOException {
byte[] finalResults = results.toByteArray();
handleProcessingResults(finalResults);
}
@Override
public void onFailure(String streamId, Throwable cause) throws IOException {
System.err.println("Processing stream failed: " + cause.getMessage());
}
};
client.stream("processing-results", streamCallback);// Efficient zero-copy transfer using FileSegmentManagedBuffer
public class ZeroCopyStreamManager extends StreamManager {
private final Map<Long, FileInfo> streamFiles = new ConcurrentHashMap<>();
public long registerFileStream(File file, Channel channel) {
long streamId = generateStreamId();
streamFiles.put(streamId, new FileInfo(file, channel));
return streamId;
}
@Override
public ManagedBuffer getChunk(long streamId, int chunkIndex) {
FileInfo fileInfo = streamFiles.get(streamId);
if (fileInfo == null) {
throw new IllegalArgumentException("Stream not found: " + streamId);
}
long chunkSize = 64 * 1024; // 64KB chunks
long offset = chunkIndex * chunkSize;
long length = Math.min(chunkSize, fileInfo.file.length() - offset);
if (offset >= fileInfo.file.length()) {
throw new IllegalArgumentException("Chunk index out of range: " + chunkIndex);
}
// Zero-copy file segment
return new FileSegmentManagedBuffer(conf, fileInfo.file, offset, length);
}
@Override
public ManagedBuffer openStream(String streamId) {
throw new UnsupportedOperationException("Use getChunk for file streams");
}
private static class FileInfo {
final File file;
final Channel channel;
FileInfo(File file, Channel channel) {
this.file = file;
this.channel = channel;
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-network-common