Avro inter-process communication components providing RPC framework with multiple transport mechanisms and protocol implementations
—
Apache Avro IPC provides three distinct protocol implementations to support different Java object models and development workflows: Generic (no code generation), Specific (generated classes), and Reflect (existing interfaces).
Generic protocol implementation works with Avro's GenericData model, requiring no code generation. Uses GenericRecord and other generic types for data representation.
public class GenericRequestor extends Requestor {
// Constructors
public GenericRequestor(Protocol protocol, Transceiver transceiver) throws IOException;
public GenericRequestor(Protocol protocol, Transceiver transceiver, GenericData data) throws IOException;
// Data model access
public GenericData getGenericData();
// Inherited from Requestor
public Object request(String messageName, Object request) throws Exception;
public <T> void request(String messageName, Object request, Callback<T> callback) throws Exception;
}public abstract class GenericResponder extends Responder {
// Constructors
public GenericResponder(Protocol local);
public GenericResponder(Protocol local, GenericData data);
// Data model access
public GenericData getGenericData();
// Abstract method - implement your business logic
public abstract Object respond(Message message, Object request) throws Exception;
// Protected methods for customization
protected DatumWriter<Object> getDatumWriter(Schema schema);
protected DatumReader<Object> getDatumReader(Schema actual, Schema expected);
}// Generic client setup
Protocol protocol = Protocol.parse(protocolJson);
HttpTransceiver transceiver = new HttpTransceiver(serverUrl);
GenericRequestor requestor = new GenericRequestor(protocol, transceiver);
// Create generic request data
GenericData.Record request = new GenericData.Record(requestSchema);
request.put("message", "Hello World");
request.put("timestamp", System.currentTimeMillis());
// Make RPC call
Object response = requestor.request("echo", request);
if (response instanceof GenericData.Record) {
GenericData.Record record = (GenericData.Record) response;
System.out.println("Response: " + record.get("result"));
}
// Generic server implementation
public class MyGenericResponder extends GenericResponder {
public MyGenericResponder(Protocol protocol) {
super(protocol);
}
@Override
public Object respond(Message message, Object request) throws Exception {
String messageName = message.getName();
GenericData.Record requestRecord = (GenericData.Record) request;
switch (messageName) {
case "echo":
return handleEcho(requestRecord);
case "getData":
return handleGetData(requestRecord);
default:
throw new AvroRuntimeException("Unknown message: " + messageName);
}
}
private Object handleEcho(GenericData.Record request) {
GenericData.Record response = new GenericData.Record(responseSchema);
response.put("result", request.get("message"));
return response;
}
}Specific protocol implementation works with Java classes generated from Avro schemas or IDL files. Provides type-safe, compile-time checked RPC interfaces.
public class SpecificRequestor extends Requestor implements InvocationHandler {
// Constructors
public SpecificRequestor(Class<?> iface, Transceiver transceiver) throws IOException;
public SpecificRequestor(Class<?> iface, Transceiver transceiver, SpecificData data) throws IOException;
public SpecificRequestor(Protocol protocol, Transceiver transceiver, SpecificData data) throws IOException;
protected SpecificRequestor(Protocol protocol, Transceiver transceiver) throws IOException;
// Data model access
public SpecificData getSpecificData();
// InvocationHandler for proxy clients
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable;
// Static factory methods for proxy clients
public static <T> T getClient(Class<T> iface, Transceiver transceiver) throws IOException;
public static <T> T getClient(Class<T> iface, Transceiver transceiver, SpecificData data) throws IOException;
public static <T> T getClient(Class<T> iface, SpecificRequestor requestor);
// Utility methods
public static Protocol getRemote(Object proxy);
// Protected methods for customization
protected DatumWriter<Object> getDatumWriter(Schema schema);
protected DatumReader<Object> getDatumReader(Schema writer, Schema reader);
}public class SpecificResponder extends GenericResponder {
// Constructors
public SpecificResponder(Class iface, Object impl);
public SpecificResponder(Protocol protocol, Object impl);
public SpecificResponder(Class iface, Object impl, SpecificData data);
public SpecificResponder(Protocol protocol, Object impl, SpecificData data);
// Data model access
public SpecificData getSpecificData();
// Inherited abstract method - automatically implemented via reflection
public Object respond(Message message, Object request) throws Exception;
// Protected methods for customization
protected DatumWriter<Object> getDatumWriter(Schema schema);
protected DatumReader<Object> getDatumReader(Schema actual, Schema expected);
}// Assuming generated interface from Avro IDL:
// interface MyService {
// string echo(string message);
// UserRecord getUser(long userId);
// }
// Specific client with proxy interface
HttpTransceiver transceiver = new HttpTransceiver(serverUrl);
MyService client = SpecificRequestor.getClient(MyService.class, transceiver);
// Type-safe RPC calls
String response = client.echo("Hello World");
UserRecord user = client.getUser(12345L);
// Alternative: Direct SpecificRequestor usage
SpecificRequestor requestor = new SpecificRequestor(MyService.class, transceiver);
String response = (String) requestor.request("echo", "Hello World");
// Specific server implementation
public class MyServiceImpl implements MyService {
@Override
public String echo(String message) {
return "Echo: " + message;
}
@Override
public UserRecord getUser(long userId) {
UserRecord user = new UserRecord();
user.setId(userId);
user.setName("User " + userId);
return user;
}
}
// Server setup
MyServiceImpl implementation = new MyServiceImpl();
SpecificResponder responder = new SpecificResponder(MyService.class, implementation);
SaslSocketServer server = new SaslSocketServer(responder, new InetSocketAddress(8080));
server.start();Reflect protocol implementation works with existing Java interfaces using reflection, without requiring code generation from Avro schemas.
public class ReflectRequestor extends SpecificRequestor {
// Constructors
public ReflectRequestor(Class<?> iface, Transceiver transceiver) throws IOException;
public ReflectRequestor(Class<?> iface, Transceiver transceiver, ReflectData data) throws IOException;
public ReflectRequestor(Protocol protocol, Transceiver transceiver, ReflectData data) throws IOException;
protected ReflectRequestor(Protocol protocol, Transceiver transceiver) throws IOException;
// Data model access
public ReflectData getReflectData();
// Static factory methods for proxy clients
public static <T> T getClient(Class<T> iface, Transceiver transceiver) throws IOException;
public static <T> T getClient(Class<T> iface, Transceiver transceiver, ReflectData reflectData) throws IOException;
public static <T> T getClient(Class<T> iface, ReflectRequestor rreq);
// Protected methods for customization
protected DatumWriter<Object> getDatumWriter(Schema schema);
protected DatumReader<Object> getDatumReader(Schema writer, Schema reader);
}public class ReflectResponder extends SpecificResponder {
// Constructors
public ReflectResponder(Class iface, Object impl);
public ReflectResponder(Protocol protocol, Object impl);
public ReflectResponder(Class iface, Object impl, ReflectData data);
public ReflectResponder(Protocol protocol, Object impl, ReflectData data);
// Data model access
public ReflectData getReflectData();
// Inherited methods from SpecificResponder
public Object respond(Message message, Object request) throws Exception;
// Protected methods for customization
protected DatumWriter<Object> getDatumWriter(Schema schema);
protected DatumReader<Object> getDatumReader(Schema actual, Schema expected);
}// Existing Java interface (no Avro generation needed)
public interface Calculator {
double add(double a, double b);
double subtract(double a, double b);
Complex multiply(Complex a, Complex b);
}
public class Complex {
private double real;
private double imaginary;
// Constructors, getters, setters
public Complex() {}
public Complex(double real, double imaginary) {
this.real = real;
this.imaginary = imaginary;
}
// ... getters and setters
}
// Reflect client
HttpTransceiver transceiver = new HttpTransceiver(serverUrl);
Calculator client = ReflectRequestor.getClient(Calculator.class, transceiver);
// Type-safe calls using existing interface
double sum = client.add(3.14, 2.86);
Complex result = client.multiply(new Complex(1, 2), new Complex(3, 4));
// Reflect server implementation
public class CalculatorImpl implements Calculator {
@Override
public double add(double a, double b) {
return a + b;
}
@Override
public double subtract(double a, double b) {
return a - b;
}
@Override
public Complex multiply(Complex a, Complex b) {
double real = a.getReal() * b.getReal() - a.getImaginary() * b.getImaginary();
double imaginary = a.getReal() * b.getImaginary() + a.getImaginary() * b.getReal();
return new Complex(real, imaginary);
}
}
// Server setup
CalculatorImpl implementation = new CalculatorImpl();
ReflectResponder responder = new ReflectResponder(Calculator.class, implementation);
SaslSocketServer server = new SaslSocketServer(responder, new InetSocketAddress(8080));
server.start();Each protocol implementation allows custom data models for specialized serialization:
// Custom SpecificData with custom serialization
SpecificData customData = new SpecificData() {
@Override
protected Schema createSchema(Type type, Map<String, Schema> names) {
// Custom schema creation logic
return super.createSchema(type, names);
}
};
SpecificRequestor requestor = new SpecificRequestor(MyService.class, transceiver, customData);Handle schema evolution gracefully:
// Server supporting multiple protocol versions
public class VersionedResponder extends SpecificResponder {
public VersionedResponder(Class iface, Object impl) {
super(iface, impl);
}
@Override
public Object respond(Message message, Object request) throws Exception {
Protocol remote = getRemote();
String version = remote.getProp("version");
if ("1.0".equals(version)) {
return handleV1Request(message, request);
} else if ("2.0".equals(version)) {
return handleV2Request(message, request);
}
return super.respond(message, request);
}
}Each protocol implementation provides consistent error handling:
try {
String result = client.processData(data);
} catch (AvroRemoteException e) {
// Application-level exception thrown by remote method
if (e.getValue() instanceof ValidationError) {
ValidationError validationError = (ValidationError) e.getValue();
System.err.println("Validation failed: " + validationError.getMessage());
}
} catch (IOException e) {
// Transport or serialization error
System.err.println("Communication error: " + e.getMessage());
}All protocol implementations support asynchronous operations:
// Asynchronous call with callback
requestor.request("processLargeData", data, new Callback<ProcessingResult>() {
@Override
public void handleResult(ProcessingResult result) {
System.out.println("Processing completed: " + result.getStatus());
}
@Override
public void handleError(Throwable error) {
System.err.println("Processing failed: " + error.getMessage());
}
});// Reuse requestor instances
SpecificRequestor requestor = new SpecificRequestor(MyService.class, transceiver);
// Use same requestor for multiple calls
// Pre-warm reflection for ReflectRequestor
ReflectData reflectData = ReflectData.get();
reflectData.getSchema(MyClass.class); // Pre-compute schema
// Configure data model for performance
SpecificData.get().addLogicalTypeConversion(new MyCustomConversion());Install with Tessl CLI
npx tessl i tessl/maven-org-apache-avro--avro-ipc