Avro inter-process communication components providing RPC framework with multiple transport mechanisms and protocol implementations
—
Apache Avro IPC provides an extensible plugin system for RPC instrumentation, metadata manipulation, and cross-cutting concerns through the RPCPlugin and RPCContext classes.
The foundation for implementing custom RPC instrumentation and metadata manipulation plugins.
public class RPCPlugin {
// Client-side hooks
public void clientStartConnect(RPCContext context);
public void clientFinishConnect(RPCContext context);
public void clientSendRequest(RPCContext context);
public void clientReceiveResponse(RPCContext context);
// Server-side hooks
public void serverConnecting(RPCContext context);
public void serverReceiveRequest(RPCContext context);
public void serverSendResponse(RPCContext context);
}All methods have default empty implementations, allowing plugins to override only the hooks they need.
The plugin methods are called in the following order during an RPC call:
clientStartConnect → clientFinishConnect → clientSendRequest → clientReceiveResponseserverConnecting → serverReceiveRequest → serverSendResponse// Custom logging plugin
public class LoggingPlugin extends RPCPlugin {
private static final Logger logger = LoggerFactory.getLogger(LoggingPlugin.class);
@Override
public void clientSendRequest(RPCContext context) {
Message message = context.getMessage();
logger.info("Client sending request: {}", message.getName());
}
@Override
public void clientReceiveResponse(RPCContext context) {
if (context.isError()) {
logger.error("Client received error: {}", context.error().getMessage());
} else {
logger.info("Client received successful response");
}
}
@Override
public void serverReceiveRequest(RPCContext context) {
Message message = context.getMessage();
String remoteName = getRemoteAddress(context);
logger.info("Server received request: {} from {}", message.getName(), remoteName);
}
@Override
public void serverSendResponse(RPCContext context) {
if (context.isError()) {
logger.warn("Server sending error response: {}", context.error().getMessage());
} else {
logger.info("Server sending successful response");
}
}
private String getRemoteAddress(RPCContext context) {
// Extract remote address from context metadata
Map<String, ByteBuffer> meta = context.requestHandshakeMeta();
ByteBuffer addressBuffer = meta.get("remote-address");
return addressBuffer != null ? new String(addressBuffer.array()) : "unknown";
}
}
// Register plugin with requestor and responder
LoggingPlugin loggingPlugin = new LoggingPlugin();
requestor.addRPCPlugin(loggingPlugin);
responder.addRPCPlugin(loggingPlugin);Provides access to RPC call state, metadata, and message information during plugin execution.
public class RPCContext {
// Handshake management
public void setHandshakeRequest(HandshakeRequest handshakeRequest);
public HandshakeRequest getHandshakeRequest();
public void setHandshakeResponse(HandshakeResponse handshakeResponse);
public HandshakeResponse getHandshakeResponse();
// Metadata access
public Map<String, ByteBuffer> requestHandshakeMeta();
public Map<String, ByteBuffer> responseHandshakeMeta();
public Map<String, ByteBuffer> requestCallMeta();
public Map<String, ByteBuffer> responseCallMeta();
// Message information
public void setMessage(Message message);
public Message getMessage();
// Payload access
public void setRequestPayload(List<ByteBuffer> payload);
public List<ByteBuffer> getRequestPayload();
public List<ByteBuffer> getResponsePayload();
public void setResponsePayload(List<ByteBuffer> payload);
// Response handling
public Object response();
public Exception error();
public boolean isError();
}// Metadata manipulation plugin
public class MetadataPlugin extends RPCPlugin {
@Override
public void clientSendRequest(RPCContext context) {
// Add client metadata to request
Map<String, ByteBuffer> callMeta = context.requestCallMeta();
callMeta.put("client-id", ByteBuffer.wrap("client-123".getBytes()));
callMeta.put("request-time", ByteBuffer.wrap(
String.valueOf(System.currentTimeMillis()).getBytes()));
callMeta.put("client-version", ByteBuffer.wrap("1.0.0".getBytes()));
}
@Override
public void serverReceiveRequest(RPCContext context) {
// Read client metadata
Map<String, ByteBuffer> callMeta = context.requestCallMeta();
String clientId = extractString(callMeta.get("client-id"));
String requestTime = extractString(callMeta.get("request-time"));
String clientVersion = extractString(callMeta.get("client-version"));
System.out.println("Request from client: " + clientId +
", version: " + clientVersion +
", time: " + requestTime);
// Add server metadata to response
Map<String, ByteBuffer> responseMeta = context.responseCallMeta();
responseMeta.put("server-id", ByteBuffer.wrap("server-456".getBytes()));
responseMeta.put("processed-time", ByteBuffer.wrap(
String.valueOf(System.currentTimeMillis()).getBytes()));
}
private String extractString(ByteBuffer buffer) {
return buffer != null ? new String(buffer.array()) : null;
}
}public class AuthenticationPlugin extends RPCPlugin {
private final AuthenticationService authService;
public AuthenticationPlugin(AuthenticationService authService) {
this.authService = authService;
}
@Override
public void clientSendRequest(RPCContext context) {
// Add authentication token to request metadata
String token = authService.getCurrentToken();
if (token != null) {
Map<String, ByteBuffer> callMeta = context.requestCallMeta();
callMeta.put("auth-token", ByteBuffer.wrap(token.getBytes()));
}
}
@Override
public void serverReceiveRequest(RPCContext context) {
// Validate authentication token
Map<String, ByteBuffer> callMeta = context.requestCallMeta();
ByteBuffer tokenBuffer = callMeta.get("auth-token");
if (tokenBuffer == null) {
throw new AvroRuntimeException("Missing authentication token");
}
String token = new String(tokenBuffer.array());
if (!authService.validateToken(token)) {
throw new AvroRuntimeException("Invalid authentication token");
}
// Store authenticated user in context for later use
String userId = authService.getUserId(token);
callMeta.put("authenticated-user", ByteBuffer.wrap(userId.getBytes()));
}
@Override
public void serverSendResponse(RPCContext context) {
// Add server authentication info to response
Map<String, ByteBuffer> responseMeta = context.responseCallMeta();
responseMeta.put("server-auth", ByteBuffer.wrap("authenticated".getBytes()));
}
}public class RateLimitingPlugin extends RPCPlugin {
private final Map<String, RateLimiter> clientLimiters = new ConcurrentHashMap<>();
private final int requestsPerSecond;
public RateLimitingPlugin(int requestsPerSecond) {
this.requestsPerSecond = requestsPerSecond;
}
@Override
public void serverReceiveRequest(RPCContext context) {
// Extract client identifier
String clientId = extractClientId(context);
// Get or create rate limiter for client
RateLimiter limiter = clientLimiters.computeIfAbsent(clientId,
k -> RateLimiter.create(requestsPerSecond));
// Check rate limit
if (!limiter.tryAcquire()) {
throw new AvroRuntimeException("Rate limit exceeded for client: " + clientId);
}
}
private String extractClientId(RPCContext context) {
Map<String, ByteBuffer> callMeta = context.requestCallMeta();
ByteBuffer clientIdBuffer = callMeta.get("client-id");
return clientIdBuffer != null ? new String(clientIdBuffer.array()) : "unknown";
}
}public class CompressionPlugin extends RPCPlugin {
private final Compressor compressor = new GzipCompressor();
@Override
public void clientSendRequest(RPCContext context) {
// Compress request payload
List<ByteBuffer> payload = context.getRequestPayload();
if (payload != null && !payload.isEmpty()) {
List<ByteBuffer> compressed = compressor.compress(payload);
context.setRequestPayload(compressed);
// Add compression metadata
Map<String, ByteBuffer> callMeta = context.requestCallMeta();
callMeta.put("compression", ByteBuffer.wrap("gzip".getBytes()));
}
}
@Override
public void serverReceiveRequest(RPCContext context) {
// Decompress request payload if compressed
Map<String, ByteBuffer> callMeta = context.requestCallMeta();
ByteBuffer compressionBuffer = callMeta.get("compression");
if (compressionBuffer != null && "gzip".equals(new String(compressionBuffer.array()))) {
List<ByteBuffer> payload = context.getRequestPayload();
List<ByteBuffer> decompressed = compressor.decompress(payload);
context.setRequestPayload(decompressed);
}
}
@Override
public void serverSendResponse(RPCContext context) {
// Compress response payload
List<ByteBuffer> payload = context.getResponsePayload();
if (payload != null && !payload.isEmpty()) {
List<ByteBuffer> compressed = compressor.compress(payload);
context.setResponsePayload(compressed);
// Add compression metadata
Map<String, ByteBuffer> responseMeta = context.responseCallMeta();
responseMeta.put("compression", ByteBuffer.wrap("gzip".getBytes()));
}
}
@Override
public void clientReceiveResponse(RPCContext context) {
// Decompress response payload if compressed
Map<String, ByteBuffer> responseMeta = context.responseCallMeta();
ByteBuffer compressionBuffer = responseMeta.get("compression");
if (compressionBuffer != null && "gzip".equals(new String(compressionBuffer.array()))) {
List<ByteBuffer> payload = context.getResponsePayload();
List<ByteBuffer> decompressed = compressor.decompress(payload);
context.setResponsePayload(decompressed);
}
}
}public class TracingPlugin extends RPCPlugin {
private final Tracer tracer;
private final ThreadLocal<Span> currentSpan = new ThreadLocal<>();
public TracingPlugin(Tracer tracer) {
this.tracer = tracer;
}
@Override
public void clientSendRequest(RPCContext context) {
// Start client span
Message message = context.getMessage();
Span span = tracer.nextSpan()
.name("avro-rpc-client")
.tag("rpc.method", message.getName())
.tag("rpc.system", "avro")
.start();
currentSpan.set(span);
// Add trace context to request metadata
Map<String, ByteBuffer> callMeta = context.requestCallMeta();
TraceContext traceContext = span.context();
callMeta.put("trace-id", ByteBuffer.wrap(traceContext.traceId().getBytes()));
callMeta.put("span-id", ByteBuffer.wrap(traceContext.spanId().getBytes()));
}
@Override
public void clientReceiveResponse(RPCContext context) {
Span span = currentSpan.get();
if (span != null) {
if (context.isError()) {
span.tag("error", "true")
.tag("error.message", context.error().getMessage());
}
span.end();
currentSpan.remove();
}
}
@Override
public void serverReceiveRequest(RPCContext context) {
// Extract trace context from request metadata
Map<String, ByteBuffer> callMeta = context.requestCallMeta();
String traceId = extractString(callMeta.get("trace-id"));
String spanId = extractString(callMeta.get("span-id"));
// Create server span
SpanBuilder spanBuilder = tracer.nextSpan()
.name("avro-rpc-server")
.tag("rpc.method", context.getMessage().getName())
.tag("rpc.system", "avro");
if (traceId != null && spanId != null) {
// Continue existing trace
TraceContext.Builder contextBuilder = TraceContext.newBuilder()
.traceId(Long.parseUnsignedLong(traceId, 16))
.spanId(Long.parseUnsignedLong(spanId, 16));
spanBuilder.setParent(contextBuilder.build());
}
Span span = spanBuilder.start();
currentSpan.set(span);
}
@Override
public void serverSendResponse(RPCContext context) {
Span span = currentSpan.get();
if (span != null) {
if (context.isError()) {
span.tag("error", "true")
.tag("error.message", context.error().getMessage());
}
span.end();
currentSpan.remove();
}
}
private String extractString(ByteBuffer buffer) {
return buffer != null ? new String(buffer.array()) : null;
}
}// Register plugins with requestor and responder
Requestor requestor = new SpecificRequestor(MyService.class, transceiver);
Responder responder = new SpecificResponder(MyService.class, implementation);
// Add multiple plugins - they execute in registration order
requestor.addRPCPlugin(new AuthenticationPlugin(authService));
requestor.addRPCPlugin(new TracingPlugin(tracer));
requestor.addRPCPlugin(new CompressionPlugin());
responder.addRPCPlugin(new AuthenticationPlugin(authService));
responder.addRPCPlugin(new RateLimitingPlugin(100)); // 100 requests/second
responder.addRPCPlugin(new TracingPlugin(tracer));
responder.addRPCPlugin(new CompressionPlugin());Plugins execute in the order they are registered:
This ordering ensures proper nesting behavior for plugins that modify payloads or metadata.
public class ErrorHandlingPlugin extends RPCPlugin {
@Override
public void clientSendRequest(RPCContext context) {
try {
// Plugin logic that might fail
performSomeOperation();
} catch (Exception e) {
// Log error but don't break the RPC call
logger.error("Plugin error in clientSendRequest", e);
// Optionally add error info to metadata
Map<String, ByteBuffer> callMeta = context.requestCallMeta();
callMeta.put("plugin-error", ByteBuffer.wrap(e.getMessage().getBytes()));
}
}
@Override
public void serverReceiveRequest(RPCContext context) {
try {
validateRequest(context);
} catch (ValidationException e) {
// Critical error - break the RPC call
throw new AvroRuntimeException("Request validation failed: " + e.getMessage(), e);
}
}
}// Good: Lightweight plugin
public class LightweightPlugin extends RPCPlugin {
private final AtomicLong requestCounter = new AtomicLong();
@Override
public void clientSendRequest(RPCContext context) {
// Very fast operation
requestCounter.incrementAndGet();
}
}
// Bad: Heavy plugin
public class HeavyPlugin extends RPCPlugin {
@Override
public void clientSendRequest(RPCContext context) {
// Expensive operations that slow down every RPC call
database.logRequest(context); // Blocking database call
Thread.sleep(100); // Very bad!
httpClient.post("http://analytics.com/track", context); // Blocking HTTP call
}
}
// Better: Asynchronous heavy plugin
public class AsyncHeavyPlugin extends RPCPlugin {
private final ExecutorService executor = Executors.newCachedThreadPool();
@Override
public void clientSendRequest(RPCContext context) {
// Quick metadata copy
final String method = context.getMessage().getName();
final long timestamp = System.currentTimeMillis();
// Heavy operations on background thread
executor.submit(() -> {
database.logRequest(method, timestamp);
httpClient.post("http://analytics.com/track", method);
});
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-avro--avro-ipc