Core networking functionality for Apache Spark including transport layers, buffer management, client-server communication, cryptographic protocols, SASL authentication, and shuffle database support using Netty for high-performance network I/O.
—
The client operations API provides high-performance networking capabilities for Spark's distributed communication needs. The TransportClient class is the main interface for client-side operations, offering thread-safe methods for chunk fetching, RPC communication, and streaming data transfer.
Main client class for network operations, providing thread-safe access to chunk fetching, RPC calls, and streaming functionality.
/**
* Create a transport client for network communication
* @param channel - Netty channel for network communication
* @param handler - Response handler for managing responses and callbacks
*/
public TransportClient(Channel channel, TransportResponseHandler handler);Methods for managing client connections and retrieving connection information.
/**
* Get the underlying Netty channel
* @return Channel instance used for network communication
*/
public Channel getChannel();
/**
* Check if the client connection is active
* @return true if the connection is active, false otherwise
*/
public boolean isActive();
/**
* Get the remote socket address of the connected server
* @return SocketAddress of the remote server
*/
public SocketAddress getSocketAddress();
/**
* Get the client identifier
* @return String representing the client ID, or null if not set
*/
public String getClientId();
/**
* Set the client identifier
* @param id - String identifier for this client
*/
public void setClientId(String id);Asynchronous chunk fetching functionality for retrieving data blocks from streams.
/**
* Fetch a specific chunk from a stream asynchronously
* @param streamId - Identifier of the stream containing the chunk
* @param chunkIndex - Index of the chunk to fetch within the stream
* @param callback - Callback to handle successful chunk reception or failures
*/
public void fetchChunk(long streamId, int chunkIndex, ChunkReceivedCallback callback);Methods for handling streaming data transfer with support for bidirectional communication.
/**
* Request to receive data from a named stream
* @param streamId - Identifier of the stream to receive data from
* @param callback - Callback to handle streaming data events
*/
public void stream(String streamId, StreamCallback callback);
/**
* Upload a stream of data to the server with metadata
* @param meta - Metadata buffer describing the stream contents
* @param data - Data buffer containing the stream data
* @param callback - Callback to handle the upload response
* @return long request ID for tracking the upload operation
*/
public long uploadStream(ManagedBuffer meta, ManagedBuffer data, RpcResponseCallback callback);Remote procedure call functionality with both synchronous and asynchronous operation modes.
/**
* Send an RPC message asynchronously
* @param message - ByteBuffer containing the RPC message data
* @param callback - Callback to handle the RPC response or failure
* @return long request ID for tracking the RPC call
*/
public long sendRpc(ByteBuffer message, RpcResponseCallback callback);
/**
* Send an RPC message synchronously with timeout
* @param message - ByteBuffer containing the RPC message data
* @param timeoutMs - Timeout in milliseconds for the RPC call
* @return ByteBuffer containing the response data
* @throws IOException if the RPC call fails or times out
*/
public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs);
/**
* Send a one-way message (no response expected)
* @param message - ByteBuffer containing the message data
*/
public void send(ByteBuffer message);Specialized operations for requesting merged block metadata, used in Spark's shuffle optimization.
/**
* Request merged block metadata for shuffle operations
* @param appId - Application identifier
* @param shuffleId - Shuffle operation identifier
* @param shuffleMergeId - Merge operation identifier
* @param reduceId - Reducer task identifier
* @param callback - Callback to handle the metadata response
*/
public void sendMergedBlockMetaReq(String appId, int shuffleId, int shuffleMergeId, int reduceId, MergedBlockMetaResponseCallback callback);Methods for managing active requests and handling timeouts.
/**
* Remove a pending RPC request by its ID
* @param requestId - ID of the request to remove
*/
public void removeRpcRequest(long requestId);
/**
* Mark this client as timed out, triggering cleanup of pending requests
*/
public void timeOut();Proper cleanup and resource management for client connections.
/**
* Close the client connection and clean up all resources
* This will cancel all pending requests and close the underlying channel
*/
public void close();Callback interface for handling RPC responses.
public interface RpcResponseCallback extends BaseResponseCallback {
/**
* Called when an RPC call completes successfully
* @param response - ByteBuffer containing the response data
*/
void onSuccess(ByteBuffer response);
/**
* Called when an RPC call fails
* @param e - Throwable representing the failure cause
*/
void onFailure(Throwable e);
}Callback interface for handling chunk fetch operations.
public interface ChunkReceivedCallback {
/**
* Called when a chunk is successfully received
* @param chunkIndex - Index of the received chunk
* @param buffer - ManagedBuffer containing the chunk data
*/
void onSuccess(int chunkIndex, ManagedBuffer buffer);
/**
* Called when chunk fetching fails
* @param chunkIndex - Index of the chunk that failed to be received
* @param e - Throwable representing the failure cause
*/
void onFailure(int chunkIndex, Throwable e);
}Callback interface for handling streaming data operations.
public interface StreamCallback {
/**
* Called when data is received from the stream
* @param streamId - Identifier of the stream
* @param buf - ByteBuffer containing the received data
* @throws IOException if data processing fails
*/
void onData(String streamId, ByteBuffer buf) throws IOException;
/**
* Called when the stream is completed successfully
* @param streamId - Identifier of the completed stream
* @throws IOException if completion processing fails
*/
void onComplete(String streamId) throws IOException;
/**
* Called when the stream encounters a failure
* @param streamId - Identifier of the failed stream
* @param cause - Throwable representing the failure cause
* @throws IOException if failure processing fails
*/
void onFailure(String streamId, Throwable cause) throws IOException;
}Extended stream callback interface that includes an identifier.
public interface StreamCallbackWithID extends StreamCallback {
/**
* Get the identifier for this stream callback
* @return String identifier for the callback
*/
String getID();
}Callback interface for handling merged block metadata responses.
public interface MergedBlockMetaResponseCallback extends BaseResponseCallback {
/**
* Called when merged block metadata is successfully received
* @param mergedBlockMeta - MergedBlockMetaSuccess containing the metadata
*/
void onSuccess(MergedBlockMetaSuccess mergedBlockMeta);
/**
* Called when merged block metadata request fails
* @param e - Throwable representing the failure cause
*/
void onFailure(Throwable e);
}Factory class for creating and managing transport clients with connection pooling and lifecycle management.
public class TransportClientFactory implements Closeable {
/**
* Create a transport client connected to the specified host and port
* @param remoteHost - Hostname or IP address of the remote server
* @param remotePort - Port number of the remote server
* @return TransportClient connected to the specified endpoint
* @throws IOException if connection establishment fails
*/
public TransportClient createClient(String remoteHost, int remotePort) throws IOException;
/**
* Create a transport client with a specific client ID
* @param remoteHost - Hostname or IP address of the remote server
* @param remotePort - Port number of the remote server
* @param clientId - Identifier for the client
* @return TransportClient connected to the specified endpoint
* @throws IOException if connection establishment fails
*/
public TransportClient createClient(String remoteHost, int remotePort, int clientId) throws IOException;
/**
* Close the factory and all associated client connections
*/
public void close();
}import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.RpcResponseCallback;
// Create client through factory
TransportClient client = clientFactory.createClient("localhost", 9999);
// Send asynchronous RPC
ByteBuffer request = ByteBuffer.wrap("Hello, Server!".getBytes());
client.sendRpc(request, new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
String responseStr = new String(response.array());
System.out.println("Server responded: " + responseStr);
}
@Override
public void onFailure(Throwable e) {
System.err.println("RPC failed: " + e.getMessage());
}
});
// Send synchronous RPC with timeout
try {
ByteBuffer syncResponse = client.sendRpcSync(request, 30000); // 30 second timeout
System.out.println("Sync response: " + new String(syncResponse.array()));
} catch (IOException e) {
System.err.println("Sync RPC failed: " + e.getMessage());
}import org.apache.spark.network.client.ChunkReceivedCallback;
import org.apache.spark.network.buffer.ManagedBuffer;
// Fetch chunks from a stream
long streamId = 12345L;
client.fetchChunk(streamId, 0, new ChunkReceivedCallback() {
@Override
public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
System.out.println("Received chunk " + chunkIndex + " with " + buffer.size() + " bytes");
try {
// Process chunk data
ByteBuffer data = buffer.nioByteBuffer();
// ... process data ...
} catch (IOException e) {
System.err.println("Failed to process chunk: " + e.getMessage());
} finally {
buffer.release(); // Important: release buffer when done
}
}
@Override
public void onFailure(int chunkIndex, Throwable e) {
System.err.println("Failed to fetch chunk " + chunkIndex + ": " + e.getMessage());
}
});import org.apache.spark.network.client.StreamCallback;
// Receive streaming data
client.stream("data-stream-1", new StreamCallback() {
@Override
public void onData(String streamId, ByteBuffer buf) throws IOException {
System.out.println("Received " + buf.remaining() + " bytes from stream " + streamId);
// Process streaming data
byte[] data = new byte[buf.remaining()];
buf.get(data);
// ... process data ...
}
@Override
public void onComplete(String streamId) throws IOException {
System.out.println("Stream " + streamId + " completed successfully");
}
@Override
public void onFailure(String streamId, Throwable cause) throws IOException {
System.err.println("Stream " + streamId + " failed: " + cause.getMessage());
}
});// Check connection status
if (client.isActive()) {
System.out.println("Client connected to: " + client.getSocketAddress());
System.out.println("Client ID: " + client.getClientId());
// Set custom client ID
client.setClientId("spark-client-" + System.currentTimeMillis());
// Perform operations...
} else {
System.out.println("Client connection is not active");
}
// Proper cleanup
client.close();Exception thrown when chunk fetching operations fail.
public class ChunkFetchFailureException extends RuntimeException {
/**
* Create exception with error message and cause
* @param errorMsg - Description of the error
* @param cause - Underlying cause of the failure
*/
public ChunkFetchFailureException(String errorMsg, Throwable cause);
/**
* Create exception with error message only
* @param errorMsg - Description of the error
*/
public ChunkFetchFailureException(String errorMsg);
}Interface for customizing client initialization, commonly used for authentication and encryption setup.
public interface TransportClientBootstrap {
/**
* Perform bootstrap operations on a newly created client
* @param client - TransportClient instance to bootstrap
* @param channel - Underlying Netty channel
* @throws RuntimeException if bootstrap operations fail
*/
void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-network-common-2-12