CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-network-common-2-10

Core networking infrastructure for Apache Spark cluster computing with transport layer, RPC, chunk fetching, and SASL authentication

Pending
Overview
Eval results
Files

client-operations.mddocs/

Client Operations

Client-side networking functionality for establishing connections, sending RPC requests, and fetching data chunks from remote servers. The client API provides both synchronous and asynchronous operations for different use cases.

Capabilities

TransportClient

Main client class for network operations including RPC calls, chunk fetching, and streaming.

/**
 * TransportClient provides the client-side API for network communication.
 * It supports RPC requests, chunk fetching, and streaming operations.
 * All operations are thread-safe and can be called concurrently.
 */
public class TransportClient implements Closeable {
  public TransportClient(Channel channel, TransportResponseHandler handler);
  
  /** Gets the underlying Netty channel */
  public Channel getChannel();
  
  /** Checks if the client connection is active */
  public boolean isActive();
  
  /** Gets the remote socket address */
  public SocketAddress getSocketAddress();
  
  /** Gets the client identifier */
  public String getClientId();
  
  /** Sets the client identifier */
  public void setClientId(String id);
  
  /**
   * Fetches a specific chunk from a stream asynchronously.
   * 
   * @param streamId The stream identifier
   * @param chunkIndex The index of the chunk to fetch
   * @param callback Callback to handle the chunk data or failure
   */
  public void fetchChunk(long streamId, int chunkIndex, ChunkReceivedCallback callback);
  
  /**
   * Opens a stream for continuous data transfer.
   * 
   * @param streamId The stream identifier
   * @param callback Callback to handle stream data, completion, or failure
   */
  public void stream(String streamId, StreamCallback callback);
  
  /**
   * Sends an RPC request asynchronously.
   * 
   * @param message The request message as ByteBuffer
   * @param callback Callback to handle the response or failure
   * @return Request ID that can be used to cancel the request
   */
  public long sendRpc(ByteBuffer message, RpcResponseCallback callback);
  
  /**
   * Sends an RPC request synchronously with timeout.
   * 
   * @param message The request message as ByteBuffer
   * @param timeoutMs Timeout in milliseconds
   * @return The response message as ByteBuffer
   * @throws RuntimeException if the request fails or times out
   */
  public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs);
  
  /**
   * Sends a one-way message (no response expected).
   * 
   * @param message The message as ByteBuffer
   */
  public void send(ByteBuffer message);
  
  /**
   * Removes a pending RPC request (cancels it).
   * 
   * @param requestId The request ID returned by sendRpc
   */
  public void removeRpcRequest(long requestId);
  
  /** Forces a timeout on all pending requests */
  public void timeOut();
  
  /** Closes the client connection and releases resources */
  public void close();
  
  public String toString();
}

TransportClientFactory

Factory for creating and managing TransportClient instances.

/**
 * Factory for creating TransportClient instances with connection pooling and management.
 * Manages connection lifecycle and provides both managed and unmanaged clients.
 */
public class TransportClientFactory implements Closeable {
  /**
   * Creates a managed client connection to the specified host and port.
   * Managed clients are pooled and reused for efficiency.
   * 
   * @param remoteHost The remote host to connect to
   * @param remotePort The remote port to connect to
   * @return A TransportClient instance
   * @throws IOException if connection fails
   */
  public TransportClient createClient(String remoteHost, int remotePort) throws IOException;
  
  /**
   * Creates an unmanaged client connection to the specified host and port.
   * Unmanaged clients are not pooled and must be explicitly closed.
   * 
   * @param remoteHost The remote host to connect to
   * @param remotePort The remote port to connect to
   * @return A TransportClient instance
   * @throws IOException if connection fails
   */
  public TransportClient createUnmanagedClient(String remoteHost, int remotePort) throws IOException;
  
  /** Closes the factory and all managed client connections */
  public void close();
}

Callback Interfaces

Callback interfaces for handling asynchronous operations.

/**
 * Callback interface for chunk fetch operations.
 * Implementations handle successful chunk receipt or fetch failures.
 */
public interface ChunkReceivedCallback {
  /**
   * Called when a chunk is successfully received.
   * 
   * @param chunkIndex The index of the received chunk
   * @param buffer The chunk data as a ManagedBuffer
   */
  void onSuccess(int chunkIndex, ManagedBuffer buffer);
  
  /**
   * Called when chunk fetching fails.
   * 
   * @param chunkIndex The index of the chunk that failed
   * @param e The exception that caused the failure
   */
  void onFailure(int chunkIndex, Throwable e);
}

/**
 * Callback interface for RPC response handling.
 * Implementations handle successful responses or RPC failures.
 */
public interface RpcResponseCallback {
  /**
   * Called when an RPC response is successfully received.
   * 
   * @param response The response message as ByteBuffer
   */
  void onSuccess(ByteBuffer response);
  
  /**
   * Called when an RPC request fails.
   * 
   * @param e The exception that caused the failure
   */
  void onFailure(Throwable e);
}

/**
 * Callback interface for stream operations.
 * Implementations handle stream data, completion, and failures.
 */
public interface StreamCallback {
  /**
   * Called when stream data is received.
   * 
   * @param streamId The stream identifier
   * @param buf The stream data as ByteBuffer
   */
  void onData(String streamId, ByteBuffer buf);
  
  /**
   * Called when the stream completes successfully.
   * 
   * @param streamId The stream identifier
   */
  void onComplete(String streamId);
  
  /**
   * Called when the stream fails.
   * 
   * @param streamId The stream identifier
   * @param cause The exception that caused the failure
   */
  void onFailure(String streamId, Throwable cause);
}

Usage Examples

Basic RPC Operations

import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.RpcResponseCallback;
import java.nio.ByteBuffer;

// Asynchronous RPC call
TransportClient client = clientFactory.createClient("server-host", 8080);

ByteBuffer request = ByteBuffer.wrap("Hello Server".getBytes());

client.sendRpc(request, new RpcResponseCallback() {
  @Override
  public void onSuccess(ByteBuffer response) {
    String responseStr = new String(response.array());
    System.out.println("Received response: " + responseStr);
  }
  
  @Override
  public void onFailure(Throwable e) {
    System.err.println("RPC failed: " + e.getMessage());
  }
});

// Synchronous RPC call with timeout
try {
  ByteBuffer response = client.sendRpcSync(request, 5000); // 5 second timeout
  String responseStr = new String(response.array());
  System.out.println("Sync response: " + responseStr);
} catch (Exception e) {
  System.err.println("Sync RPC failed: " + e.getMessage());
}

Chunk Fetching

import org.apache.spark.network.client.ChunkReceivedCallback;
import org.apache.spark.network.buffer.ManagedBuffer;

// Fetch specific chunks from a stream
long streamId = 12345L;

for (int chunkIndex = 0; chunkIndex < 10; chunkIndex++) {
  final int currentChunk = chunkIndex;
  
  client.fetchChunk(streamId, chunkIndex, new ChunkReceivedCallback() {
    @Override
    public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
      System.out.println("Received chunk " + chunkIndex + " of size " + buffer.size());
      
      // Process the chunk data
      try {
        ByteBuffer data = buffer.nioByteBuffer();
        // Process data...
      } catch (Exception e) {
        System.err.println("Failed to process chunk: " + e.getMessage());
      } finally {
        buffer.release(); // Important: release buffer when done
      }
    }
    
    @Override
    public void onFailure(int chunkIndex, Throwable e) {
      System.err.println("Failed to fetch chunk " + chunkIndex + ": " + e.getMessage());
    }
  });
}

Stream Operations

import org.apache.spark.network.client.StreamCallback;

// Open a stream for continuous data transfer
String streamId = "data-stream-001";

client.stream(streamId, new StreamCallback() {
  @Override
  public void onData(String streamId, ByteBuffer buf) {
    System.out.println("Received " + buf.remaining() + " bytes from stream " + streamId);
    
    // Process stream data
    byte[] data = new byte[buf.remaining()];
    buf.get(data);
    // Process data...
  }
  
  @Override
  public void onComplete(String streamId) {
    System.out.println("Stream " + streamId + " completed successfully");
  }
  
  @Override
  public void onFailure(String streamId, Throwable cause) {
    System.err.println("Stream " + streamId + " failed: " + cause.getMessage());
  }
});

Connection Management

// Create managed clients (recommended for most use cases)
TransportClient managedClient = clientFactory.createClient("server1", 8080);

// Client is automatically managed by the factory
// No need to explicitly close (factory handles cleanup)

// Create unmanaged clients (for special cases)
TransportClient unmanagedClient = clientFactory.createUnmanagedClient("server2", 9090);

// Important: Must explicitly close unmanaged clients
try {
  // Use client...
} finally {
  unmanagedClient.close();
}

// Check client status
if (client.isActive()) {
  System.out.println("Client connected to: " + client.getSocketAddress());
  System.out.println("Client ID: " + client.getClientId());
} else {
  System.out.println("Client connection is not active");
}

Error Handling and Request Cancellation

// Send RPC with ability to cancel
long requestId = client.sendRpc(request, new RpcResponseCallback() {
  @Override
  public void onSuccess(ByteBuffer response) {
    // Handle success
  }
  
  @Override
  public void onFailure(Throwable e) {
    if (e instanceof TimeoutException) {
      System.err.println("Request timed out");
    } else {
      System.err.println("Request failed: " + e.getMessage());
    }
  }
});

// Cancel the request if needed
if (shouldCancel) {
  client.removeRpcRequest(requestId);
}

// Force timeout all pending requests (emergency cleanup)
client.timeOut();

One-Way Messages

// Send fire-and-forget messages
ByteBuffer notification = ByteBuffer.wrap("Server notification".getBytes());

client.send(notification); // No response expected or callback needed

// Useful for notifications, heartbeats, or status updates

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-network-common-2-10

docs

buffer-management.md

client-operations.md

configuration-utilities.md

index.md

message-protocol.md

sasl-authentication.md

server-operations.md

transport-setup.md

tile.json