CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-avro--avro-ipc

Avro inter-process communication components providing RPC framework with multiple transport mechanisms and protocol implementations

Pending
Overview
Eval results
Files

core-rpc.mddocs/

Core RPC Framework

The foundation of Apache Avro IPC consists of abstract base classes that provide the client-server RPC communication infrastructure. These classes define the contract for RPC operations while remaining transport and protocol agnostic.

Capabilities

Client-Side RPC Base Class

The Requestor class provides the client-side foundation for RPC communication, handling protocol negotiation, request serialization, and response deserialization.

public abstract class Requestor {
    protected Requestor(Protocol local, Transceiver transceiver) throws IOException;
    
    // Protocol and transport access
    public Protocol getLocal();
    public Transceiver getTransceiver();
    public Protocol getRemote() throws IOException;
    
    // Plugin management
    public void addRPCPlugin(RPCPlugin plugin);
    
    // Synchronous RPC call
    public Object request(String messageName, Object request) throws Exception;
    
    // Asynchronous RPC call with callback
    public <T> void request(String messageName, Object request, Callback<T> callback) 
            throws AvroRemoteException, IOException;
    
    // Abstract methods for protocol-specific implementations
    public abstract void writeRequest(Schema schema, Object request, Encoder out) throws IOException;
    public abstract Object readResponse(Schema writer, Schema reader, Decoder in) throws IOException;
    public abstract Exception readError(Schema writer, Schema reader, Decoder in) throws IOException;
}

Usage Examples

// Synchronous RPC call
MyRequestor requestor = new MyRequestor(protocol, transceiver);
try {
    Object result = requestor.request("getMessage", requestData);
    // Handle result
} catch (Exception e) {
    // Handle RPC error
}

// Asynchronous RPC call
requestor.request("getMessage", requestData, new Callback<String>() {
    @Override
    public void handleResult(String result) {
        // Handle successful result
    }
    
    @Override
    public void handleError(Throwable error) {
        // Handle error
    }
});

Server-Side RPC Base Class

The Responder class provides the server-side foundation for RPC communication, handling request deserialization, method dispatch, and response serialization.

public abstract class Responder {
    protected Responder(Protocol local);
    
    // Protocol access
    public static Protocol getRemote(); // ThreadLocal access to remote protocol
    public Protocol getLocal();
    
    // Plugin management
    public void addRPCPlugin(RPCPlugin plugin);
    
    // Process RPC request buffers
    public List<ByteBuffer> respond(List<ByteBuffer> buffers) throws IOException;
    public List<ByteBuffer> respond(List<ByteBuffer> buffers, Transceiver connection) throws IOException;
    
    // Abstract methods for protocol-specific implementations
    public abstract Object respond(Message message, Object request) throws Exception;
    public abstract Object readRequest(Schema actual, Schema expected, Decoder in) throws IOException;
    public abstract void writeResponse(Schema schema, Object response, Encoder out) throws IOException;
    public abstract void writeError(Schema schema, Object error, Encoder out) throws IOException;
}

Usage Examples

// Custom responder implementation
public class MyResponder extends GenericResponder {
    public MyResponder(Protocol protocol) {
        super(protocol);
    }
    
    @Override
    public Object respond(Message message, Object request) throws Exception {
        String messageName = message.getName();
        switch (messageName) {
            case "getMessage":
                return handleGetMessage(request);
            case "setMessage":
                return handleSetMessage(request);
            default:
                throw new AvroRuntimeException("Unknown message: " + messageName);
        }
    }
    
    private Object handleGetMessage(Object request) {
        // Implementation logic
        return "Hello from server";
    }
    
    private Object handleSetMessage(Object request) {
        // Implementation logic
        return null; // void method
    }
}

Transport Abstraction

The Transceiver class provides the transport abstraction layer, handling network I/O operations while remaining protocol agnostic.

public abstract class Transceiver implements Closeable {
    // Connection information
    public abstract String getRemoteName() throws IOException;
    public boolean isConnected();
    
    // Protocol management
    public void setRemote(Protocol protocol);
    public Protocol getRemote();
    
    // Channel synchronization
    public void lockChannel();
    public void unlockChannel();
    
    // Synchronous communication
    public List<ByteBuffer> transceive(List<ByteBuffer> request) throws IOException;
    
    // Asynchronous communication
    public void transceive(List<ByteBuffer> request, Callback<List<ByteBuffer>> callback) throws IOException;
    
    // Abstract I/O methods
    public abstract List<ByteBuffer> readBuffers() throws IOException;
    public abstract void writeBuffers(List<ByteBuffer> buffers) throws IOException;
    
    // Resource management
    public void close() throws IOException;
}

Usage Examples

// Using transceiver directly for low-level communication
List<ByteBuffer> requestBuffers = serializeRequest(request);
try {
    transceiver.lockChannel();
    List<ByteBuffer> responseBuffers = transceiver.transceive(requestBuffers);
    Object response = deserializeResponse(responseBuffers);
} finally {
    transceiver.unlockChannel();
}

// Asynchronous transceiver usage
transceiver.transceive(requestBuffers, new Callback<List<ByteBuffer>>() {
    @Override
    public void handleResult(List<ByteBuffer> responseBuffers) {
        Object response = deserializeResponse(responseBuffers);
        // Handle response
    }
    
    @Override
    public void handleError(Throwable error) {
        // Handle communication error
    }
});

Server Interface

The Server interface defines the contract for server lifecycle management, providing standard start/stop semantics.

public interface Server {
    // Server information
    int getPort();
    
    // Lifecycle management
    void start();
    void close();
    void join() throws InterruptedException;
}

Usage Examples

// Server lifecycle management
Server server = new SaslSocketServer(responder, new InetSocketAddress(8080));

// Start server
server.start();
System.out.println("Server started on port: " + server.getPort());

// Wait for server to finish (in separate thread or shutdown hook)
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    server.close();
    try {
        server.join();
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}));

// Wait for server completion
server.join();

Advanced Usage

Custom Requestor Implementation

public class CustomRequestor extends Requestor {
    private final SpecificDatumWriter<Object> writer;
    private final SpecificDatumReader<Object> reader;
    
    public CustomRequestor(Protocol protocol, Transceiver transceiver) throws IOException {
        super(protocol, transceiver);
        this.writer = new SpecificDatumWriter<>(protocol);
        this.reader = new SpecificDatumReader<>(protocol);
    }
    
    @Override
    public void writeRequest(Schema schema, Object request, Encoder out) throws IOException {
        writer.setSchema(schema);
        writer.write(request, out);
    }
    
    @Override
    public Object readResponse(Schema writer, Schema reader, Decoder in) throws IOException {
        this.reader.setSchema(writer);
        this.reader.setExpected(reader);
        return this.reader.read(null, in);
    }
    
    @Override
    public Exception readError(Schema writer, Schema reader, Decoder in) throws IOException {
        this.reader.setSchema(writer);
        this.reader.setExpected(reader);
        Object error = this.reader.read(null, in);
        if (error instanceof Exception) {
            return (Exception) error;
        }
        return new AvroRemoteException(error);
    }
}

Plugin Integration

// Add monitoring plugin to requestor and responder
RPCPlugin monitoringPlugin = new RPCPlugin() {
    @Override
    public void clientSendRequest(RPCContext context) {
        System.out.println("Sending request: " + context.getMessage().getName());
    }
    
    @Override
    public void clientReceiveResponse(RPCContext context) {
        System.out.println("Received response, error: " + context.isError());
    }
};

requestor.addRPCPlugin(monitoringPlugin);
responder.addRPCPlugin(monitoringPlugin);

Error Handling

The core RPC framework provides comprehensive error handling through exceptions and the RPC context:

try {
    Object result = requestor.request("methodName", request);
} catch (AvroRemoteException e) {
    // Remote method threw an exception
    System.err.println("Remote error: " + e.getMessage());
} catch (IOException e) {
    // Network or serialization error
    System.err.println("Communication error: " + e.getMessage());
} catch (Exception e) {
    // Other RPC-related errors
    System.err.println("RPC error: " + e.getMessage());
}

Thread Safety

  • Requestor instances are thread-safe for concurrent RPC calls
  • Responder instances are thread-safe for concurrent request processing
  • Transceiver instances provide channel locking for thread-safe access
  • Each RPC call gets its own RPCContext for thread-local state management

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-avro--avro-ipc

docs

async.md

core-rpc.md

index.md

plugins.md

protocols.md

stats.md

transports.md

tile.json