CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-network-common

Core networking library for Apache Spark providing transport layer abstractions and utilities

Pending
Overview
Eval results
Files

transport.mddocs/

Transport Layer

Core networking functionality providing client-server communication with connection pooling, automatic reconnection, and comprehensive resource management for Apache Spark's distributed architecture.

Capabilities

Transport Context

Central factory for creating transport servers and client factories with consistent configuration and RPC handler setup.

/**
 * Central context for creating transport servers, client factories, and Netty channel pipelines
 */
public class TransportContext {
    /**
     * Create a new TransportContext with the given configuration and RPC handler
     * @param conf Transport configuration settings
     * @param rpcHandler Handler for processing RPC messages
     */
    public TransportContext(TransportConf conf, RpcHandler rpcHandler);
    
    /**
     * Create a new TransportContext with connection idle timeout control
     * @param conf Transport configuration settings
     * @param rpcHandler Handler for processing RPC messages
     * @param closeIdleConnections Whether to close idle connections automatically
     */
    public TransportContext(TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections);
    
    /**
     * Create a client factory with custom bootstrap configurations
     * @param bootstraps List of client bootstrap configurations for channel setup
     * @return TransportClientFactory for creating clients
     */
    public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps);
    
    /**
     * Create a client factory with default configuration
     * @return TransportClientFactory for creating clients
     */
    public TransportClientFactory createClientFactory();
    
    /**
     * Create a server bound to specific port with custom bootstrap configurations
     * @param port Port to bind server to
     * @param bootstraps List of server bootstrap configurations
     * @return TransportServer instance
     */
    public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps);
    
    /**
     * Create a server bound to specific host and port with custom bootstrap configurations
     * @param host Host address to bind server to
     * @param port Port to bind server to
     * @param bootstraps List of server bootstrap configurations
     * @return TransportServer instance
     */
    public TransportServer createServer(String host, int port, List<TransportServerBootstrap> bootstraps);
    
    /**
     * Create a server with custom bootstrap configurations on any available port
     * @param bootstraps List of server bootstrap configurations
     * @return TransportServer instance
     */
    public TransportServer createServer(List<TransportServerBootstrap> bootstraps);
    
    /**
     * Create a server with default configuration on any available port
     * @return TransportServer instance
     */
    public TransportServer createServer();
    
    /**
     * Initialize a Netty channel pipeline with transport handlers
     * @param channel Socket channel to initialize
     * @return TransportChannelHandler for the channel
     */
    public TransportChannelHandler initializePipeline(SocketChannel channel);
    
    /**
     * Initialize a Netty channel pipeline with custom RPC handler
     * @param channel Socket channel to initialize
     * @param channelRpcHandler Custom RPC handler for this channel
     * @return TransportChannelHandler for the channel
     */
    public TransportChannelHandler initializePipeline(SocketChannel channel, RpcHandler channelRpcHandler);
    
    /**
     * Get the transport configuration
     * @return TransportConf instance
     */
    public TransportConf getConf();
}

Usage Examples:

import org.apache.spark.network.TransportContext;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.util.TransportConf;

// Basic setup
TransportConf conf = new TransportConf("myapp", configProvider);
RpcHandler rpcHandler = new MyRpcHandler();
TransportContext context = new TransportContext(conf, rpcHandler);

// Create server
TransportServer server = context.createServer(8080);

// Create client factory
TransportClientFactory factory = context.createClientFactory();

// With authentication
List<TransportServerBootstrap> serverBootstraps = Arrays.asList(
    new SaslServerBootstrap(conf, secretKeyHolder)
);
TransportServer authServer = context.createServer(8081, serverBootstraps);

Transport Client

Thread-safe client for fetching consecutive chunks of pre-negotiated streams and sending RPCs with comprehensive callback support.

/**
 * Client for fetching consecutive chunks of pre-negotiated streams and sending RPCs
 * Thread-safe and supports concurrent operations
 */
public class TransportClient {
    /**
     * Create a new TransportClient with the given channel and response handler
     * @param channel Netty channel for communication
     * @param handler Response handler for processing server responses
     */
    public TransportClient(Channel channel, TransportResponseHandler handler);
    
    /**
     * Get the underlying Netty channel
     * @return Channel instance
     */
    public Channel getChannel();
    
    /**
     * Check if the client connection is active
     * @return true if connection is active, false otherwise
     */
    public boolean isActive();
    
    /**
     * Get the remote socket address
     * @return SocketAddress of remote peer
     */
    public SocketAddress getSocketAddress();
    
    /**
     * Get the authenticated client ID when authentication is enabled
     * @return String client ID or null if not authenticated
     */
    public String getClientId();
    
    /**
     * Set the authenticated client ID
     * @param id Client ID to set
     */
    public void setClientId(String id);
    
    /**
     * Fetch a specific chunk from a stream asynchronously
     * @param streamId ID of the stream to fetch from
     * @param chunkIndex Index of the chunk to fetch
     * @param callback Callback to handle chunk reception or failure
     */
    public void fetchChunk(long streamId, int chunkIndex, ChunkReceivedCallback callback);
    
    /**
     * Request to stream data with given stream ID
     * @param streamId ID of the stream to request
     * @param callback Callback to handle streaming data
     */
    public void stream(String streamId, StreamCallback callback);
    
    /**
     * Send an RPC message asynchronously
     * @param message Message payload as ByteBuffer
     * @param callback Callback to handle RPC response or failure
     * @return Request ID for tracking
     */
    public long sendRpc(ByteBuffer message, RpcResponseCallback callback);
    
    /**
     * Upload streaming data with metadata
     * @param meta Metadata for the upload
     * @param data Data to upload
     * @param callback Callback to handle upload response
     * @return Request ID for tracking
     */
    public long uploadStream(ManagedBuffer meta, ManagedBuffer data, RpcResponseCallback callback);
    
    /**
     * Send an RPC message synchronously with timeout
     * @param message Message payload as ByteBuffer
     * @param timeoutMs Timeout in milliseconds
     * @return Response ByteBuffer
     * @throws RuntimeException if timeout or other error occurs
     */
    public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs);
    
    /**
     * Send a one-way message that expects no response
     * @param message Message payload as ByteBuffer
     */
    public void send(ByteBuffer message);
    
    /**
     * Remove a pending RPC request
     * @param requestId ID of the request to remove
     */
    public void removeRpcRequest(long requestId);
    
    /**
     * Mark the channel as timed out, preventing further requests
     */
    public void timeOut();
    
    /**
     * Close the client connection and clean up resources
     */
    public void close();
}

Usage Examples:

// Async RPC
ByteBuffer request = ByteBuffer.wrap("hello".getBytes());
client.sendRpc(request, new RpcResponseCallback() {
    @Override
    public void onSuccess(ByteBuffer response) {
        System.out.println("Response: " + new String(response.array()));
    }
    
    @Override
    public void onFailure(Throwable e) {
        System.err.println("RPC failed: " + e.getMessage());
    }
});

// Sync RPC with timeout
try {
    ByteBuffer response = client.sendRpcSync(request, 5000); // 5 second timeout
    System.out.println("Sync response: " + new String(response.array()));
} catch (RuntimeException e) {
    System.err.println("Sync RPC failed: " + e.getMessage());
}

// Chunk fetching
client.fetchChunk(streamId, 0, new ChunkReceivedCallback() {
    @Override
    public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
        // Process chunk data
        System.out.println("Received chunk " + chunkIndex + " of size " + buffer.size());
    }
    
    @Override
    public void onFailure(int chunkIndex, Throwable e) {
        System.err.println("Failed to fetch chunk " + chunkIndex + ": " + e.getMessage());
    }
});

// One-way message
ByteBuffer notification = ByteBuffer.wrap("status_update".getBytes());
client.send(notification);

Transport Client Factory

Factory for creating TransportClient instances with connection pooling, retry logic, and resource management.

/**
 * Factory for creating TransportClient instances with connection pooling
 */
public class TransportClientFactory {
    /**
     * Create a new TransportClientFactory
     * @param context Transport context for configuration
     * @param clientBootstraps List of client bootstrap configurations
     */
    public TransportClientFactory(TransportContext context, List<TransportClientBootstrap> clientBootstraps);
    
    /**
     * Get all metrics for monitoring connection pool and performance
     * @return MetricSet containing all factory metrics
     */
    public MetricSet getAllMetrics();
    
    /**
     * Create a pooled client connection to the specified remote host and port
     * @param remoteHost Hostname or IP address to connect to
     * @param remotePort Port number to connect to
     * @return TransportClient instance from connection pool
     * @throws IOException if connection fails
     * @throws InterruptedException if connection is interrupted
     */
    public TransportClient createClient(String remoteHost, int remotePort) throws IOException, InterruptedException;
    
    /**
     * Create an unmanaged client connection (not pooled)
     * @param remoteHost Hostname or IP address to connect to
     * @param remotePort Port number to connect to
     * @return TransportClient instance not managed by pool
     * @throws IOException if connection fails
     * @throws InterruptedException if connection is interrupted
     */
    public TransportClient createUnmanagedClient(String remoteHost, int remotePort) throws IOException, InterruptedException;
    
    /**
     * Close the factory and all pooled connections
     */
    public void close();
}

Usage Examples:

// Create factory with auth bootstrap
List<TransportClientBootstrap> bootstraps = Arrays.asList(
    new SaslClientBootstrap(conf, appId, secretKeyHolder)
);
TransportClientFactory factory = new TransportClientFactory(context, bootstraps);

// Create pooled client (recommended for most use cases)
TransportClient client = factory.createClient("spark-worker-1", 7337);

// Create unmanaged client (for special cases)
TransportClient unmanagedClient = factory.createUnmanagedClient("spark-worker-2", 7337);

// Monitor connection pool
MetricSet metrics = factory.getAllMetrics();

// Cleanup
client.close();
unmanagedClient.close();
factory.close();

Transport Server

Server for handling incoming connections and requests with pluggable RPC handlers and bootstrap configurations.

/**
 * Server for handling incoming connections and requests
 */
public class TransportServer {
    /**
     * Create a new TransportServer
     * @param context Transport context with configuration and RPC handler
     * @param hostToBind Host address to bind server to (null for all interfaces)
     * @param portToBind Port to bind server to (0 for any available port)
     * @param appRpcHandler RPC handler for processing application messages
     * @param bootstraps List of server bootstrap configurations
     */
    public TransportServer(TransportContext context, String hostToBind, int portToBind, 
                          RpcHandler appRpcHandler, List<TransportServerBootstrap> bootstraps);
    
    /**
     * Get the port the server is listening on
     * @return Port number
     */
    public int getPort();
    
    /**
     * Shut down the server and clean up resources
     */
    public void close();
}

Usage Examples:

// Basic server
TransportServer server = new TransportServer(context, null, 0, rpcHandler, Collections.emptyList());
int port = server.getPort();
System.out.println("Server listening on port: " + port);

// Server with authentication
List<TransportServerBootstrap> bootstraps = Arrays.asList(
    new SaslServerBootstrap(conf, secretKeyHolder)
);
TransportServer authServer = new TransportServer(context, "localhost", 8080, rpcHandler, bootstraps);

// Cleanup
server.close();
authServer.close();

Bootstrap Interfaces

Interfaces for customizing client and server channel initialization, supporting authentication, encryption, and other channel setup requirements.

/**
 * Bootstrap interface for client-side channel initialization
 */
public interface TransportClientBootstrap {
    /**
     * Initialize a client channel with custom configuration
     * @param client The transport client instance
     * @param channel The Netty channel to bootstrap
     * @throws RuntimeException if bootstrap fails
     */
    void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
}

/**
 * Bootstrap interface for server-side channel initialization
 */
public interface TransportServerBootstrap {
    /**
     * Initialize a server channel and return the RPC handler to use
     * @param channel The Netty channel to bootstrap
     * @param rpcHandler The default RPC handler
     * @return RPC handler to use for this channel (may be wrapped or replaced)
     */
    RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
}

Channel Handler

Low-level Netty channel handler for the transport protocol, typically used internally by the transport layer.

/**
 * Netty channel handler for transport protocol
 */
public class TransportChannelHandler extends ChannelInboundHandlerAdapter {
    /**
     * Create a new TransportChannelHandler
     * @param client Transport client for this channel
     * @param requestHandler Handler for processing requests
     * @param closeIdleConnections Whether to close idle connections
     * @param streamInterceptor Optional interceptor for stream frames
     */
    public TransportChannelHandler(TransportClient client, TransportRequestHandler requestHandler,
                                  boolean closeIdleConnections, TransportFrameDecoder.Interceptor streamInterceptor);
    
    /**
     * Get the transport client for this channel
     * @return TransportClient instance
     */
    public TransportClient getClient();
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-network-common

docs

authentication.md

buffers.md

configuration.md

index.md

protocol.md

streaming.md

transport.md

tile.json