CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-avro--avro-ipc

Avro inter-process communication components providing RPC framework with multiple transport mechanisms and protocol implementations

Pending
Overview
Eval results
Files

plugins.mddocs/

Plugin System and Context

Apache Avro IPC provides an extensible plugin system for RPC instrumentation, metadata manipulation, and cross-cutting concerns through the RPCPlugin and RPCContext classes.

Capabilities

RPC Plugin Base Class

The foundation for implementing custom RPC instrumentation and metadata manipulation plugins.

public class RPCPlugin {
    // Client-side hooks
    public void clientStartConnect(RPCContext context);
    public void clientFinishConnect(RPCContext context);
    public void clientSendRequest(RPCContext context);
    public void clientReceiveResponse(RPCContext context);
    
    // Server-side hooks
    public void serverConnecting(RPCContext context);
    public void serverReceiveRequest(RPCContext context);
    public void serverSendResponse(RPCContext context);
}

All methods have default empty implementations, allowing plugins to override only the hooks they need.

Plugin Lifecycle

The plugin methods are called in the following order during an RPC call:

  1. Client Side: clientStartConnectclientFinishConnectclientSendRequestclientReceiveResponse
  2. Server Side: serverConnectingserverReceiveRequestserverSendResponse

Usage Examples

// Custom logging plugin
public class LoggingPlugin extends RPCPlugin {
    private static final Logger logger = LoggerFactory.getLogger(LoggingPlugin.class);
    
    @Override
    public void clientSendRequest(RPCContext context) {
        Message message = context.getMessage();
        logger.info("Client sending request: {}", message.getName());
    }
    
    @Override
    public void clientReceiveResponse(RPCContext context) {
        if (context.isError()) {
            logger.error("Client received error: {}", context.error().getMessage());
        } else {
            logger.info("Client received successful response");
        }
    }
    
    @Override
    public void serverReceiveRequest(RPCContext context) {
        Message message = context.getMessage();
        String remoteName = getRemoteAddress(context);
        logger.info("Server received request: {} from {}", message.getName(), remoteName);
    }
    
    @Override
    public void serverSendResponse(RPCContext context) {
        if (context.isError()) {
            logger.warn("Server sending error response: {}", context.error().getMessage());
        } else {
            logger.info("Server sending successful response");
        }
    }
    
    private String getRemoteAddress(RPCContext context) {
        // Extract remote address from context metadata
        Map<String, ByteBuffer> meta = context.requestHandshakeMeta();
        ByteBuffer addressBuffer = meta.get("remote-address");
        return addressBuffer != null ? new String(addressBuffer.array()) : "unknown";
    }
}

// Register plugin with requestor and responder
LoggingPlugin loggingPlugin = new LoggingPlugin();
requestor.addRPCPlugin(loggingPlugin);
responder.addRPCPlugin(loggingPlugin);

RPC Context

Provides access to RPC call state, metadata, and message information during plugin execution.

public class RPCContext {
    // Handshake management
    public void setHandshakeRequest(HandshakeRequest handshakeRequest);
    public HandshakeRequest getHandshakeRequest();
    public void setHandshakeResponse(HandshakeResponse handshakeResponse);
    public HandshakeResponse getHandshakeResponse();
    
    // Metadata access
    public Map<String, ByteBuffer> requestHandshakeMeta();
    public Map<String, ByteBuffer> responseHandshakeMeta();
    public Map<String, ByteBuffer> requestCallMeta();
    public Map<String, ByteBuffer> responseCallMeta();
    
    // Message information
    public void setMessage(Message message);
    public Message getMessage();
    
    // Payload access
    public void setRequestPayload(List<ByteBuffer> payload);
    public List<ByteBuffer> getRequestPayload();
    public List<ByteBuffer> getResponsePayload();
    public void setResponsePayload(List<ByteBuffer> payload);
    
    // Response handling
    public Object response();
    public Exception error();
    public boolean isError();
}

Usage Examples

// Metadata manipulation plugin
public class MetadataPlugin extends RPCPlugin {
    @Override
    public void clientSendRequest(RPCContext context) {
        // Add client metadata to request
        Map<String, ByteBuffer> callMeta = context.requestCallMeta();
        callMeta.put("client-id", ByteBuffer.wrap("client-123".getBytes()));
        callMeta.put("request-time", ByteBuffer.wrap(
            String.valueOf(System.currentTimeMillis()).getBytes()));
        callMeta.put("client-version", ByteBuffer.wrap("1.0.0".getBytes()));
    }
    
    @Override
    public void serverReceiveRequest(RPCContext context) {
        // Read client metadata
        Map<String, ByteBuffer> callMeta = context.requestCallMeta();
        String clientId = extractString(callMeta.get("client-id"));
        String requestTime = extractString(callMeta.get("request-time"));
        String clientVersion = extractString(callMeta.get("client-version"));
        
        System.out.println("Request from client: " + clientId + 
                          ", version: " + clientVersion +
                          ", time: " + requestTime);
        
        // Add server metadata to response
        Map<String, ByteBuffer> responseMeta = context.responseCallMeta();
        responseMeta.put("server-id", ByteBuffer.wrap("server-456".getBytes()));
        responseMeta.put("processed-time", ByteBuffer.wrap(
            String.valueOf(System.currentTimeMillis()).getBytes()));
    }
    
    private String extractString(ByteBuffer buffer) {
        return buffer != null ? new String(buffer.array()) : null;
    }
}

Advanced Plugin Examples

Authentication Plugin

public class AuthenticationPlugin extends RPCPlugin {
    private final AuthenticationService authService;
    
    public AuthenticationPlugin(AuthenticationService authService) {
        this.authService = authService;
    }
    
    @Override
    public void clientSendRequest(RPCContext context) {
        // Add authentication token to request metadata
        String token = authService.getCurrentToken();
        if (token != null) {
            Map<String, ByteBuffer> callMeta = context.requestCallMeta();
            callMeta.put("auth-token", ByteBuffer.wrap(token.getBytes()));
        }
    }
    
    @Override
    public void serverReceiveRequest(RPCContext context) {
        // Validate authentication token
        Map<String, ByteBuffer> callMeta = context.requestCallMeta();
        ByteBuffer tokenBuffer = callMeta.get("auth-token");
        
        if (tokenBuffer == null) {
            throw new AvroRuntimeException("Missing authentication token");
        }
        
        String token = new String(tokenBuffer.array());
        if (!authService.validateToken(token)) {
            throw new AvroRuntimeException("Invalid authentication token");
        }
        
        // Store authenticated user in context for later use
        String userId = authService.getUserId(token);
        callMeta.put("authenticated-user", ByteBuffer.wrap(userId.getBytes()));
    }
    
    @Override
    public void serverSendResponse(RPCContext context) {
        // Add server authentication info to response
        Map<String, ByteBuffer> responseMeta = context.responseCallMeta();
        responseMeta.put("server-auth", ByteBuffer.wrap("authenticated".getBytes()));
    }
}

Rate Limiting Plugin

public class RateLimitingPlugin extends RPCPlugin {
    private final Map<String, RateLimiter> clientLimiters = new ConcurrentHashMap<>();
    private final int requestsPerSecond;
    
    public RateLimitingPlugin(int requestsPerSecond) {
        this.requestsPerSecond = requestsPerSecond;
    }
    
    @Override
    public void serverReceiveRequest(RPCContext context) {
        // Extract client identifier
        String clientId = extractClientId(context);
        
        // Get or create rate limiter for client
        RateLimiter limiter = clientLimiters.computeIfAbsent(clientId, 
            k -> RateLimiter.create(requestsPerSecond));
        
        // Check rate limit
        if (!limiter.tryAcquire()) {
            throw new AvroRuntimeException("Rate limit exceeded for client: " + clientId);
        }
    }
    
    private String extractClientId(RPCContext context) {
        Map<String, ByteBuffer> callMeta = context.requestCallMeta();
        ByteBuffer clientIdBuffer = callMeta.get("client-id");
        return clientIdBuffer != null ? new String(clientIdBuffer.array()) : "unknown";
    }
}

Request/Response Compression Plugin

public class CompressionPlugin extends RPCPlugin {
    private final Compressor compressor = new GzipCompressor();
    
    @Override
    public void clientSendRequest(RPCContext context) {
        // Compress request payload
        List<ByteBuffer> payload = context.getRequestPayload();
        if (payload != null && !payload.isEmpty()) {
            List<ByteBuffer> compressed = compressor.compress(payload);
            context.setRequestPayload(compressed);
            
            // Add compression metadata
            Map<String, ByteBuffer> callMeta = context.requestCallMeta();
            callMeta.put("compression", ByteBuffer.wrap("gzip".getBytes()));
        }
    }
    
    @Override
    public void serverReceiveRequest(RPCContext context) {
        // Decompress request payload if compressed
        Map<String, ByteBuffer> callMeta = context.requestCallMeta();
        ByteBuffer compressionBuffer = callMeta.get("compression");
        
        if (compressionBuffer != null && "gzip".equals(new String(compressionBuffer.array()))) {
            List<ByteBuffer> payload = context.getRequestPayload();
            List<ByteBuffer> decompressed = compressor.decompress(payload);
            context.setRequestPayload(decompressed);
        }
    }
    
    @Override
    public void serverSendResponse(RPCContext context) {
        // Compress response payload
        List<ByteBuffer> payload = context.getResponsePayload();
        if (payload != null && !payload.isEmpty()) {
            List<ByteBuffer> compressed = compressor.compress(payload);
            context.setResponsePayload(compressed);
            
            // Add compression metadata
            Map<String, ByteBuffer> responseMeta = context.responseCallMeta();
            responseMeta.put("compression", ByteBuffer.wrap("gzip".getBytes()));
        }
    }
    
    @Override
    public void clientReceiveResponse(RPCContext context) {
        // Decompress response payload if compressed
        Map<String, ByteBuffer> responseMeta = context.responseCallMeta();
        ByteBuffer compressionBuffer = responseMeta.get("compression");
        
        if (compressionBuffer != null && "gzip".equals(new String(compressionBuffer.array()))) {
            List<ByteBuffer> payload = context.getResponsePayload();
            List<ByteBuffer> decompressed = compressor.decompress(payload);
            context.setResponsePayload(decompressed);
        }
    }
}

Tracing Plugin

public class TracingPlugin extends RPCPlugin {
    private final Tracer tracer;
    private final ThreadLocal<Span> currentSpan = new ThreadLocal<>();
    
    public TracingPlugin(Tracer tracer) {
        this.tracer = tracer;
    }
    
    @Override
    public void clientSendRequest(RPCContext context) {
        // Start client span
        Message message = context.getMessage();
        Span span = tracer.nextSpan()
            .name("avro-rpc-client")
            .tag("rpc.method", message.getName())
            .tag("rpc.system", "avro")
            .start();
        
        currentSpan.set(span);
        
        // Add trace context to request metadata
        Map<String, ByteBuffer> callMeta = context.requestCallMeta();
        TraceContext traceContext = span.context();
        callMeta.put("trace-id", ByteBuffer.wrap(traceContext.traceId().getBytes()));
        callMeta.put("span-id", ByteBuffer.wrap(traceContext.spanId().getBytes()));
    }
    
    @Override
    public void clientReceiveResponse(RPCContext context) {
        Span span = currentSpan.get();
        if (span != null) {
            if (context.isError()) {
                span.tag("error", "true")
                    .tag("error.message", context.error().getMessage());
            }
            span.end();
            currentSpan.remove();
        }
    }
    
    @Override
    public void serverReceiveRequest(RPCContext context) {
        // Extract trace context from request metadata
        Map<String, ByteBuffer> callMeta = context.requestCallMeta();
        String traceId = extractString(callMeta.get("trace-id"));
        String spanId = extractString(callMeta.get("span-id"));
        
        // Create server span
        SpanBuilder spanBuilder = tracer.nextSpan()
            .name("avro-rpc-server")
            .tag("rpc.method", context.getMessage().getName())
            .tag("rpc.system", "avro");
        
        if (traceId != null && spanId != null) {
            // Continue existing trace
            TraceContext.Builder contextBuilder = TraceContext.newBuilder()
                .traceId(Long.parseUnsignedLong(traceId, 16))
                .spanId(Long.parseUnsignedLong(spanId, 16));
            spanBuilder.setParent(contextBuilder.build());
        }
        
        Span span = spanBuilder.start();
        currentSpan.set(span);
    }
    
    @Override
    public void serverSendResponse(RPCContext context) {
        Span span = currentSpan.get();
        if (span != null) {
            if (context.isError()) {
                span.tag("error", "true")
                    .tag("error.message", context.error().getMessage());
            }
            span.end();
            currentSpan.remove();
        }
    }
    
    private String extractString(ByteBuffer buffer) {
        return buffer != null ? new String(buffer.array()) : null;
    }
}

Plugin Registration and Management

Plugin Registration

// Register plugins with requestor and responder
Requestor requestor = new SpecificRequestor(MyService.class, transceiver);
Responder responder = new SpecificResponder(MyService.class, implementation);

// Add multiple plugins - they execute in registration order
requestor.addRPCPlugin(new AuthenticationPlugin(authService));
requestor.addRPCPlugin(new TracingPlugin(tracer));
requestor.addRPCPlugin(new CompressionPlugin());

responder.addRPCPlugin(new AuthenticationPlugin(authService));
responder.addRPCPlugin(new RateLimitingPlugin(100)); // 100 requests/second
responder.addRPCPlugin(new TracingPlugin(tracer));
responder.addRPCPlugin(new CompressionPlugin());

Plugin Ordering

Plugins execute in the order they are registered:

  • Client Send: First registered plugin executes first
  • Client Receive: Last registered plugin executes first (reverse order)
  • Server Receive: First registered plugin executes first
  • Server Send: Last registered plugin executes first (reverse order)

This ordering ensures proper nesting behavior for plugins that modify payloads or metadata.

Error Handling in Plugins

public class ErrorHandlingPlugin extends RPCPlugin {
    @Override
    public void clientSendRequest(RPCContext context) {
        try {
            // Plugin logic that might fail
            performSomeOperation();
        } catch (Exception e) {
            // Log error but don't break the RPC call
            logger.error("Plugin error in clientSendRequest", e);
            // Optionally add error info to metadata
            Map<String, ByteBuffer> callMeta = context.requestCallMeta();
            callMeta.put("plugin-error", ByteBuffer.wrap(e.getMessage().getBytes()));
        }
    }
    
    @Override
    public void serverReceiveRequest(RPCContext context) {
        try {
            validateRequest(context);
        } catch (ValidationException e) {
            // Critical error - break the RPC call
            throw new AvroRuntimeException("Request validation failed: " + e.getMessage(), e);
        }
    }
}

Performance Considerations

Plugin Performance Impact

  • Plugins add overhead to every RPC call
  • Keep plugin logic lightweight and non-blocking
  • Avoid expensive operations in plugin methods
  • Use asynchronous processing for heavy operations

Optimization Tips

// Good: Lightweight plugin
public class LightweightPlugin extends RPCPlugin {
    private final AtomicLong requestCounter = new AtomicLong();
    
    @Override
    public void clientSendRequest(RPCContext context) {
        // Very fast operation
        requestCounter.incrementAndGet();
    }
}

// Bad: Heavy plugin
public class HeavyPlugin extends RPCPlugin {
    @Override
    public void clientSendRequest(RPCContext context) {
        // Expensive operations that slow down every RPC call
        database.logRequest(context); // Blocking database call
        Thread.sleep(100); // Very bad!
        httpClient.post("http://analytics.com/track", context); // Blocking HTTP call  
    }
}

// Better: Asynchronous heavy plugin
public class AsyncHeavyPlugin extends RPCPlugin {
    private final ExecutorService executor = Executors.newCachedThreadPool();
    
    @Override
    public void clientSendRequest(RPCContext context) {
        // Quick metadata copy
        final String method = context.getMessage().getName();
        final long timestamp = System.currentTimeMillis();
        
        // Heavy operations on background thread
        executor.submit(() -> {
            database.logRequest(method, timestamp);
            httpClient.post("http://analytics.com/track", method);
        });
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-avro--avro-ipc

docs

async.md

core-rpc.md

index.md

plugins.md

protocols.md

stats.md

transports.md

tile.json