CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-network-common-2-12

Core networking functionality for Apache Spark including transport layers, buffer management, client-server communication, cryptographic protocols, SASL authentication, and shuffle database support using Netty for high-performance network I/O.

Pending
Overview
Eval results
Files

client-operations.mddocs/

Client Operations

The client operations API provides high-performance networking capabilities for Spark's distributed communication needs. The TransportClient class is the main interface for client-side operations, offering thread-safe methods for chunk fetching, RPC communication, and streaming data transfer.

Capabilities

TransportClient

Main client class for network operations, providing thread-safe access to chunk fetching, RPC calls, and streaming functionality.

/**
 * Create a transport client for network communication
 * @param channel - Netty channel for network communication
 * @param handler - Response handler for managing responses and callbacks
 */
public TransportClient(Channel channel, TransportResponseHandler handler);

Connection Management

Methods for managing client connections and retrieving connection information.

/**
 * Get the underlying Netty channel
 * @return Channel instance used for network communication
 */
public Channel getChannel();

/**
 * Check if the client connection is active
 * @return true if the connection is active, false otherwise
 */
public boolean isActive();

/**
 * Get the remote socket address of the connected server
 * @return SocketAddress of the remote server
 */
public SocketAddress getSocketAddress();

/**
 * Get the client identifier
 * @return String representing the client ID, or null if not set
 */
public String getClientId();

/**
 * Set the client identifier
 * @param id - String identifier for this client
 */
public void setClientId(String id);

Chunk Fetching

Asynchronous chunk fetching functionality for retrieving data blocks from streams.

/**
 * Fetch a specific chunk from a stream asynchronously
 * @param streamId - Identifier of the stream containing the chunk
 * @param chunkIndex - Index of the chunk to fetch within the stream
 * @param callback - Callback to handle successful chunk reception or failures
 */
public void fetchChunk(long streamId, int chunkIndex, ChunkReceivedCallback callback);

Streaming Operations

Methods for handling streaming data transfer with support for bidirectional communication.

/**
 * Request to receive data from a named stream
 * @param streamId - Identifier of the stream to receive data from
 * @param callback - Callback to handle streaming data events
 */
public void stream(String streamId, StreamCallback callback);

/**
 * Upload a stream of data to the server with metadata
 * @param meta - Metadata buffer describing the stream contents
 * @param data - Data buffer containing the stream data
 * @param callback - Callback to handle the upload response
 * @return long request ID for tracking the upload operation
 */
public long uploadStream(ManagedBuffer meta, ManagedBuffer data, RpcResponseCallback callback);

RPC Communication

Remote procedure call functionality with both synchronous and asynchronous operation modes.

/**
 * Send an RPC message asynchronously
 * @param message - ByteBuffer containing the RPC message data
 * @param callback - Callback to handle the RPC response or failure
 * @return long request ID for tracking the RPC call
 */
public long sendRpc(ByteBuffer message, RpcResponseCallback callback);

/**
 * Send an RPC message synchronously with timeout
 * @param message - ByteBuffer containing the RPC message data
 * @param timeoutMs - Timeout in milliseconds for the RPC call
 * @return ByteBuffer containing the response data
 * @throws IOException if the RPC call fails or times out
 */
public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs);

/**
 * Send a one-way message (no response expected)
 * @param message - ByteBuffer containing the message data
 */
public void send(ByteBuffer message);

Merged Block Operations

Specialized operations for requesting merged block metadata, used in Spark's shuffle optimization.

/**
 * Request merged block metadata for shuffle operations
 * @param appId - Application identifier
 * @param shuffleId - Shuffle operation identifier
 * @param shuffleMergeId - Merge operation identifier
 * @param reduceId - Reducer task identifier
 * @param callback - Callback to handle the metadata response
 */
public void sendMergedBlockMetaReq(String appId, int shuffleId, int shuffleMergeId, int reduceId, MergedBlockMetaResponseCallback callback);

Request Management

Methods for managing active requests and handling timeouts.

/**
 * Remove a pending RPC request by its ID
 * @param requestId - ID of the request to remove
 */
public void removeRpcRequest(long requestId);

/**
 * Mark this client as timed out, triggering cleanup of pending requests
 */
public void timeOut();

Resource Management

Proper cleanup and resource management for client connections.

/**
 * Close the client connection and clean up all resources
 * This will cancel all pending requests and close the underlying channel
 */
public void close();

Callback Interfaces

RpcResponseCallback

Callback interface for handling RPC responses.

public interface RpcResponseCallback extends BaseResponseCallback {
    /**
     * Called when an RPC call completes successfully
     * @param response - ByteBuffer containing the response data
     */
    void onSuccess(ByteBuffer response);
    
    /**
     * Called when an RPC call fails
     * @param e - Throwable representing the failure cause
     */
    void onFailure(Throwable e);
}

ChunkReceivedCallback

Callback interface for handling chunk fetch operations.

public interface ChunkReceivedCallback {
    /**
     * Called when a chunk is successfully received
     * @param chunkIndex - Index of the received chunk
     * @param buffer - ManagedBuffer containing the chunk data
     */
    void onSuccess(int chunkIndex, ManagedBuffer buffer);
    
    /**
     * Called when chunk fetching fails
     * @param chunkIndex - Index of the chunk that failed to be received
     * @param e - Throwable representing the failure cause
     */
    void onFailure(int chunkIndex, Throwable e);
}

StreamCallback

Callback interface for handling streaming data operations.

public interface StreamCallback {
    /**
     * Called when data is received from the stream
     * @param streamId - Identifier of the stream
     * @param buf - ByteBuffer containing the received data
     * @throws IOException if data processing fails
     */
    void onData(String streamId, ByteBuffer buf) throws IOException;
    
    /**
     * Called when the stream is completed successfully
     * @param streamId - Identifier of the completed stream
     * @throws IOException if completion processing fails
     */
    void onComplete(String streamId) throws IOException;
    
    /**
     * Called when the stream encounters a failure
     * @param streamId - Identifier of the failed stream
     * @param cause - Throwable representing the failure cause
     * @throws IOException if failure processing fails
     */
    void onFailure(String streamId, Throwable cause) throws IOException;
}

StreamCallbackWithID

Extended stream callback interface that includes an identifier.

public interface StreamCallbackWithID extends StreamCallback {
    /**
     * Get the identifier for this stream callback
     * @return String identifier for the callback
     */
    String getID();
}

MergedBlockMetaResponseCallback

Callback interface for handling merged block metadata responses.

public interface MergedBlockMetaResponseCallback extends BaseResponseCallback {
    /**
     * Called when merged block metadata is successfully received
     * @param mergedBlockMeta - MergedBlockMetaSuccess containing the metadata
     */
    void onSuccess(MergedBlockMetaSuccess mergedBlockMeta);
    
    /**
     * Called when merged block metadata request fails
     * @param e - Throwable representing the failure cause
     */
    void onFailure(Throwable e);
}

Client Factory

TransportClientFactory

Factory class for creating and managing transport clients with connection pooling and lifecycle management.

public class TransportClientFactory implements Closeable {
    /**
     * Create a transport client connected to the specified host and port
     * @param remoteHost - Hostname or IP address of the remote server
     * @param remotePort - Port number of the remote server
     * @return TransportClient connected to the specified endpoint
     * @throws IOException if connection establishment fails
     */
    public TransportClient createClient(String remoteHost, int remotePort) throws IOException;
    
    /**
     * Create a transport client with a specific client ID
     * @param remoteHost - Hostname or IP address of the remote server
     * @param remotePort - Port number of the remote server
     * @param clientId - Identifier for the client
     * @return TransportClient connected to the specified endpoint
     * @throws IOException if connection establishment fails
     */
    public TransportClient createClient(String remoteHost, int remotePort, int clientId) throws IOException;
    
    /**
     * Close the factory and all associated client connections
     */
    public void close();
}

Usage Examples

Basic RPC Communication

import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.RpcResponseCallback;

// Create client through factory
TransportClient client = clientFactory.createClient("localhost", 9999);

// Send asynchronous RPC
ByteBuffer request = ByteBuffer.wrap("Hello, Server!".getBytes());
client.sendRpc(request, new RpcResponseCallback() {
    @Override
    public void onSuccess(ByteBuffer response) {
        String responseStr = new String(response.array());
        System.out.println("Server responded: " + responseStr);
    }
    
    @Override
    public void onFailure(Throwable e) {
        System.err.println("RPC failed: " + e.getMessage());
    }
});

// Send synchronous RPC with timeout
try {
    ByteBuffer syncResponse = client.sendRpcSync(request, 30000); // 30 second timeout
    System.out.println("Sync response: " + new String(syncResponse.array()));
} catch (IOException e) {
    System.err.println("Sync RPC failed: " + e.getMessage());
}

Chunk Fetching

import org.apache.spark.network.client.ChunkReceivedCallback;
import org.apache.spark.network.buffer.ManagedBuffer;

// Fetch chunks from a stream
long streamId = 12345L;
client.fetchChunk(streamId, 0, new ChunkReceivedCallback() {
    @Override
    public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
        System.out.println("Received chunk " + chunkIndex + " with " + buffer.size() + " bytes");
        try {
            // Process chunk data
            ByteBuffer data = buffer.nioByteBuffer();
            // ... process data ...
        } catch (IOException e) {
            System.err.println("Failed to process chunk: " + e.getMessage());
        } finally {
            buffer.release(); // Important: release buffer when done
        }
    }
    
    @Override
    public void onFailure(int chunkIndex, Throwable e) {
        System.err.println("Failed to fetch chunk " + chunkIndex + ": " + e.getMessage());
    }
});

Streaming Data

import org.apache.spark.network.client.StreamCallback;

// Receive streaming data
client.stream("data-stream-1", new StreamCallback() {
    @Override
    public void onData(String streamId, ByteBuffer buf) throws IOException {
        System.out.println("Received " + buf.remaining() + " bytes from stream " + streamId);
        // Process streaming data
        byte[] data = new byte[buf.remaining()];
        buf.get(data);
        // ... process data ...
    }
    
    @Override
    public void onComplete(String streamId) throws IOException {
        System.out.println("Stream " + streamId + " completed successfully");
    }
    
    @Override
    public void onFailure(String streamId, Throwable cause) throws IOException {
        System.err.println("Stream " + streamId + " failed: " + cause.getMessage());
    }
});

Connection Management

// Check connection status
if (client.isActive()) {
    System.out.println("Client connected to: " + client.getSocketAddress());
    System.out.println("Client ID: " + client.getClientId());
    
    // Set custom client ID
    client.setClientId("spark-client-" + System.currentTimeMillis());
    
    // Perform operations...
} else {
    System.out.println("Client connection is not active");
}

// Proper cleanup
client.close();

Exception Handling

ChunkFetchFailureException

Exception thrown when chunk fetching operations fail.

public class ChunkFetchFailureException extends RuntimeException {
    /**
     * Create exception with error message and cause
     * @param errorMsg - Description of the error
     * @param cause - Underlying cause of the failure
     */
    public ChunkFetchFailureException(String errorMsg, Throwable cause);
    
    /**
     * Create exception with error message only
     * @param errorMsg - Description of the error
     */
    public ChunkFetchFailureException(String errorMsg);
}

Bootstrap Integration

TransportClientBootstrap

Interface for customizing client initialization, commonly used for authentication and encryption setup.

public interface TransportClientBootstrap {
    /**
     * Perform bootstrap operations on a newly created client
     * @param client - TransportClient instance to bootstrap
     * @param channel - Underlying Netty channel
     * @throws RuntimeException if bootstrap operations fail
     */
    void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-network-common-2-12

docs

buffer-management.md

client-operations.md

configuration-management.md

index.md

message-protocol.md

security-authentication.md

server-operations.md

shuffle-database.md

transport-context.md

tile.json