Avro inter-process communication components providing RPC framework with multiple transport mechanisms and protocol implementations
—
The foundation of Apache Avro IPC consists of abstract base classes that provide the client-server RPC communication infrastructure. These classes define the contract for RPC operations while remaining transport and protocol agnostic.
The Requestor class provides the client-side foundation for RPC communication, handling protocol negotiation, request serialization, and response deserialization.
public abstract class Requestor {
protected Requestor(Protocol local, Transceiver transceiver) throws IOException;
// Protocol and transport access
public Protocol getLocal();
public Transceiver getTransceiver();
public Protocol getRemote() throws IOException;
// Plugin management
public void addRPCPlugin(RPCPlugin plugin);
// Synchronous RPC call
public Object request(String messageName, Object request) throws Exception;
// Asynchronous RPC call with callback
public <T> void request(String messageName, Object request, Callback<T> callback)
throws AvroRemoteException, IOException;
// Abstract methods for protocol-specific implementations
public abstract void writeRequest(Schema schema, Object request, Encoder out) throws IOException;
public abstract Object readResponse(Schema writer, Schema reader, Decoder in) throws IOException;
public abstract Exception readError(Schema writer, Schema reader, Decoder in) throws IOException;
}// Synchronous RPC call
MyRequestor requestor = new MyRequestor(protocol, transceiver);
try {
Object result = requestor.request("getMessage", requestData);
// Handle result
} catch (Exception e) {
// Handle RPC error
}
// Asynchronous RPC call
requestor.request("getMessage", requestData, new Callback<String>() {
@Override
public void handleResult(String result) {
// Handle successful result
}
@Override
public void handleError(Throwable error) {
// Handle error
}
});The Responder class provides the server-side foundation for RPC communication, handling request deserialization, method dispatch, and response serialization.
public abstract class Responder {
protected Responder(Protocol local);
// Protocol access
public static Protocol getRemote(); // ThreadLocal access to remote protocol
public Protocol getLocal();
// Plugin management
public void addRPCPlugin(RPCPlugin plugin);
// Process RPC request buffers
public List<ByteBuffer> respond(List<ByteBuffer> buffers) throws IOException;
public List<ByteBuffer> respond(List<ByteBuffer> buffers, Transceiver connection) throws IOException;
// Abstract methods for protocol-specific implementations
public abstract Object respond(Message message, Object request) throws Exception;
public abstract Object readRequest(Schema actual, Schema expected, Decoder in) throws IOException;
public abstract void writeResponse(Schema schema, Object response, Encoder out) throws IOException;
public abstract void writeError(Schema schema, Object error, Encoder out) throws IOException;
}// Custom responder implementation
public class MyResponder extends GenericResponder {
public MyResponder(Protocol protocol) {
super(protocol);
}
@Override
public Object respond(Message message, Object request) throws Exception {
String messageName = message.getName();
switch (messageName) {
case "getMessage":
return handleGetMessage(request);
case "setMessage":
return handleSetMessage(request);
default:
throw new AvroRuntimeException("Unknown message: " + messageName);
}
}
private Object handleGetMessage(Object request) {
// Implementation logic
return "Hello from server";
}
private Object handleSetMessage(Object request) {
// Implementation logic
return null; // void method
}
}The Transceiver class provides the transport abstraction layer, handling network I/O operations while remaining protocol agnostic.
public abstract class Transceiver implements Closeable {
// Connection information
public abstract String getRemoteName() throws IOException;
public boolean isConnected();
// Protocol management
public void setRemote(Protocol protocol);
public Protocol getRemote();
// Channel synchronization
public void lockChannel();
public void unlockChannel();
// Synchronous communication
public List<ByteBuffer> transceive(List<ByteBuffer> request) throws IOException;
// Asynchronous communication
public void transceive(List<ByteBuffer> request, Callback<List<ByteBuffer>> callback) throws IOException;
// Abstract I/O methods
public abstract List<ByteBuffer> readBuffers() throws IOException;
public abstract void writeBuffers(List<ByteBuffer> buffers) throws IOException;
// Resource management
public void close() throws IOException;
}// Using transceiver directly for low-level communication
List<ByteBuffer> requestBuffers = serializeRequest(request);
try {
transceiver.lockChannel();
List<ByteBuffer> responseBuffers = transceiver.transceive(requestBuffers);
Object response = deserializeResponse(responseBuffers);
} finally {
transceiver.unlockChannel();
}
// Asynchronous transceiver usage
transceiver.transceive(requestBuffers, new Callback<List<ByteBuffer>>() {
@Override
public void handleResult(List<ByteBuffer> responseBuffers) {
Object response = deserializeResponse(responseBuffers);
// Handle response
}
@Override
public void handleError(Throwable error) {
// Handle communication error
}
});The Server interface defines the contract for server lifecycle management, providing standard start/stop semantics.
public interface Server {
// Server information
int getPort();
// Lifecycle management
void start();
void close();
void join() throws InterruptedException;
}// Server lifecycle management
Server server = new SaslSocketServer(responder, new InetSocketAddress(8080));
// Start server
server.start();
System.out.println("Server started on port: " + server.getPort());
// Wait for server to finish (in separate thread or shutdown hook)
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
server.close();
try {
server.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}));
// Wait for server completion
server.join();public class CustomRequestor extends Requestor {
private final SpecificDatumWriter<Object> writer;
private final SpecificDatumReader<Object> reader;
public CustomRequestor(Protocol protocol, Transceiver transceiver) throws IOException {
super(protocol, transceiver);
this.writer = new SpecificDatumWriter<>(protocol);
this.reader = new SpecificDatumReader<>(protocol);
}
@Override
public void writeRequest(Schema schema, Object request, Encoder out) throws IOException {
writer.setSchema(schema);
writer.write(request, out);
}
@Override
public Object readResponse(Schema writer, Schema reader, Decoder in) throws IOException {
this.reader.setSchema(writer);
this.reader.setExpected(reader);
return this.reader.read(null, in);
}
@Override
public Exception readError(Schema writer, Schema reader, Decoder in) throws IOException {
this.reader.setSchema(writer);
this.reader.setExpected(reader);
Object error = this.reader.read(null, in);
if (error instanceof Exception) {
return (Exception) error;
}
return new AvroRemoteException(error);
}
}// Add monitoring plugin to requestor and responder
RPCPlugin monitoringPlugin = new RPCPlugin() {
@Override
public void clientSendRequest(RPCContext context) {
System.out.println("Sending request: " + context.getMessage().getName());
}
@Override
public void clientReceiveResponse(RPCContext context) {
System.out.println("Received response, error: " + context.isError());
}
};
requestor.addRPCPlugin(monitoringPlugin);
responder.addRPCPlugin(monitoringPlugin);The core RPC framework provides comprehensive error handling through exceptions and the RPC context:
try {
Object result = requestor.request("methodName", request);
} catch (AvroRemoteException e) {
// Remote method threw an exception
System.err.println("Remote error: " + e.getMessage());
} catch (IOException e) {
// Network or serialization error
System.err.println("Communication error: " + e.getMessage());
} catch (Exception e) {
// Other RPC-related errors
System.err.println("RPC error: " + e.getMessage());
}Requestor instances are thread-safe for concurrent RPC callsResponder instances are thread-safe for concurrent request processingTransceiver instances provide channel locking for thread-safe accessRPCContext for thread-local state managementInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-avro--avro-ipc