0
# Plugin System and Context
1
2
Apache Avro IPC provides an extensible plugin system for RPC instrumentation, metadata manipulation, and cross-cutting concerns through the `RPCPlugin` and `RPCContext` classes.
3
4
## Capabilities
5
6
### RPC Plugin Base Class
7
8
The foundation for implementing custom RPC instrumentation and metadata manipulation plugins.
9
10
```java { .api }
11
public class RPCPlugin {
12
// Client-side hooks
13
public void clientStartConnect(RPCContext context);
14
public void clientFinishConnect(RPCContext context);
15
public void clientSendRequest(RPCContext context);
16
public void clientReceiveResponse(RPCContext context);
17
18
// Server-side hooks
19
public void serverConnecting(RPCContext context);
20
public void serverReceiveRequest(RPCContext context);
21
public void serverSendResponse(RPCContext context);
22
}
23
```
24
25
All methods have default empty implementations, allowing plugins to override only the hooks they need.
26
27
#### Plugin Lifecycle
28
29
The plugin methods are called in the following order during an RPC call:
30
31
1. **Client Side**: `clientStartConnect` → `clientFinishConnect` → `clientSendRequest` → `clientReceiveResponse`
32
2. **Server Side**: `serverConnecting` → `serverReceiveRequest` → `serverSendResponse`
33
34
#### Usage Examples
35
36
```java
37
// Custom logging plugin
38
public class LoggingPlugin extends RPCPlugin {
39
private static final Logger logger = LoggerFactory.getLogger(LoggingPlugin.class);
40
41
@Override
42
public void clientSendRequest(RPCContext context) {
43
Message message = context.getMessage();
44
logger.info("Client sending request: {}", message.getName());
45
}
46
47
@Override
48
public void clientReceiveResponse(RPCContext context) {
49
if (context.isError()) {
50
logger.error("Client received error: {}", context.error().getMessage());
51
} else {
52
logger.info("Client received successful response");
53
}
54
}
55
56
@Override
57
public void serverReceiveRequest(RPCContext context) {
58
Message message = context.getMessage();
59
String remoteName = getRemoteAddress(context);
60
logger.info("Server received request: {} from {}", message.getName(), remoteName);
61
}
62
63
@Override
64
public void serverSendResponse(RPCContext context) {
65
if (context.isError()) {
66
logger.warn("Server sending error response: {}", context.error().getMessage());
67
} else {
68
logger.info("Server sending successful response");
69
}
70
}
71
72
private String getRemoteAddress(RPCContext context) {
73
// Extract remote address from context metadata
74
Map<String, ByteBuffer> meta = context.requestHandshakeMeta();
75
ByteBuffer addressBuffer = meta.get("remote-address");
76
return addressBuffer != null ? new String(addressBuffer.array()) : "unknown";
77
}
78
}
79
80
// Register plugin with requestor and responder
81
LoggingPlugin loggingPlugin = new LoggingPlugin();
82
requestor.addRPCPlugin(loggingPlugin);
83
responder.addRPCPlugin(loggingPlugin);
84
```
85
86
### RPC Context
87
88
Provides access to RPC call state, metadata, and message information during plugin execution.
89
90
```java { .api }
91
public class RPCContext {
92
// Handshake management
93
public void setHandshakeRequest(HandshakeRequest handshakeRequest);
94
public HandshakeRequest getHandshakeRequest();
95
public void setHandshakeResponse(HandshakeResponse handshakeResponse);
96
public HandshakeResponse getHandshakeResponse();
97
98
// Metadata access
99
public Map<String, ByteBuffer> requestHandshakeMeta();
100
public Map<String, ByteBuffer> responseHandshakeMeta();
101
public Map<String, ByteBuffer> requestCallMeta();
102
public Map<String, ByteBuffer> responseCallMeta();
103
104
// Message information
105
public void setMessage(Message message);
106
public Message getMessage();
107
108
// Payload access
109
public void setRequestPayload(List<ByteBuffer> payload);
110
public List<ByteBuffer> getRequestPayload();
111
public List<ByteBuffer> getResponsePayload();
112
public void setResponsePayload(List<ByteBuffer> payload);
113
114
// Response handling
115
public Object response();
116
public Exception error();
117
public boolean isError();
118
}
119
```
120
121
#### Usage Examples
122
123
```java
124
// Metadata manipulation plugin
125
public class MetadataPlugin extends RPCPlugin {
126
@Override
127
public void clientSendRequest(RPCContext context) {
128
// Add client metadata to request
129
Map<String, ByteBuffer> callMeta = context.requestCallMeta();
130
callMeta.put("client-id", ByteBuffer.wrap("client-123".getBytes()));
131
callMeta.put("request-time", ByteBuffer.wrap(
132
String.valueOf(System.currentTimeMillis()).getBytes()));
133
callMeta.put("client-version", ByteBuffer.wrap("1.0.0".getBytes()));
134
}
135
136
@Override
137
public void serverReceiveRequest(RPCContext context) {
138
// Read client metadata
139
Map<String, ByteBuffer> callMeta = context.requestCallMeta();
140
String clientId = extractString(callMeta.get("client-id"));
141
String requestTime = extractString(callMeta.get("request-time"));
142
String clientVersion = extractString(callMeta.get("client-version"));
143
144
System.out.println("Request from client: " + clientId +
145
", version: " + clientVersion +
146
", time: " + requestTime);
147
148
// Add server metadata to response
149
Map<String, ByteBuffer> responseMeta = context.responseCallMeta();
150
responseMeta.put("server-id", ByteBuffer.wrap("server-456".getBytes()));
151
responseMeta.put("processed-time", ByteBuffer.wrap(
152
String.valueOf(System.currentTimeMillis()).getBytes()));
153
}
154
155
private String extractString(ByteBuffer buffer) {
156
return buffer != null ? new String(buffer.array()) : null;
157
}
158
}
159
```
160
161
## Advanced Plugin Examples
162
163
### Authentication Plugin
164
165
```java
166
public class AuthenticationPlugin extends RPCPlugin {
167
private final AuthenticationService authService;
168
169
public AuthenticationPlugin(AuthenticationService authService) {
170
this.authService = authService;
171
}
172
173
@Override
174
public void clientSendRequest(RPCContext context) {
175
// Add authentication token to request metadata
176
String token = authService.getCurrentToken();
177
if (token != null) {
178
Map<String, ByteBuffer> callMeta = context.requestCallMeta();
179
callMeta.put("auth-token", ByteBuffer.wrap(token.getBytes()));
180
}
181
}
182
183
@Override
184
public void serverReceiveRequest(RPCContext context) {
185
// Validate authentication token
186
Map<String, ByteBuffer> callMeta = context.requestCallMeta();
187
ByteBuffer tokenBuffer = callMeta.get("auth-token");
188
189
if (tokenBuffer == null) {
190
throw new AvroRuntimeException("Missing authentication token");
191
}
192
193
String token = new String(tokenBuffer.array());
194
if (!authService.validateToken(token)) {
195
throw new AvroRuntimeException("Invalid authentication token");
196
}
197
198
// Store authenticated user in context for later use
199
String userId = authService.getUserId(token);
200
callMeta.put("authenticated-user", ByteBuffer.wrap(userId.getBytes()));
201
}
202
203
@Override
204
public void serverSendResponse(RPCContext context) {
205
// Add server authentication info to response
206
Map<String, ByteBuffer> responseMeta = context.responseCallMeta();
207
responseMeta.put("server-auth", ByteBuffer.wrap("authenticated".getBytes()));
208
}
209
}
210
```
211
212
### Rate Limiting Plugin
213
214
```java
215
public class RateLimitingPlugin extends RPCPlugin {
216
private final Map<String, RateLimiter> clientLimiters = new ConcurrentHashMap<>();
217
private final int requestsPerSecond;
218
219
public RateLimitingPlugin(int requestsPerSecond) {
220
this.requestsPerSecond = requestsPerSecond;
221
}
222
223
@Override
224
public void serverReceiveRequest(RPCContext context) {
225
// Extract client identifier
226
String clientId = extractClientId(context);
227
228
// Get or create rate limiter for client
229
RateLimiter limiter = clientLimiters.computeIfAbsent(clientId,
230
k -> RateLimiter.create(requestsPerSecond));
231
232
// Check rate limit
233
if (!limiter.tryAcquire()) {
234
throw new AvroRuntimeException("Rate limit exceeded for client: " + clientId);
235
}
236
}
237
238
private String extractClientId(RPCContext context) {
239
Map<String, ByteBuffer> callMeta = context.requestCallMeta();
240
ByteBuffer clientIdBuffer = callMeta.get("client-id");
241
return clientIdBuffer != null ? new String(clientIdBuffer.array()) : "unknown";
242
}
243
}
244
```
245
246
### Request/Response Compression Plugin
247
248
```java
249
public class CompressionPlugin extends RPCPlugin {
250
private final Compressor compressor = new GzipCompressor();
251
252
@Override
253
public void clientSendRequest(RPCContext context) {
254
// Compress request payload
255
List<ByteBuffer> payload = context.getRequestPayload();
256
if (payload != null && !payload.isEmpty()) {
257
List<ByteBuffer> compressed = compressor.compress(payload);
258
context.setRequestPayload(compressed);
259
260
// Add compression metadata
261
Map<String, ByteBuffer> callMeta = context.requestCallMeta();
262
callMeta.put("compression", ByteBuffer.wrap("gzip".getBytes()));
263
}
264
}
265
266
@Override
267
public void serverReceiveRequest(RPCContext context) {
268
// Decompress request payload if compressed
269
Map<String, ByteBuffer> callMeta = context.requestCallMeta();
270
ByteBuffer compressionBuffer = callMeta.get("compression");
271
272
if (compressionBuffer != null && "gzip".equals(new String(compressionBuffer.array()))) {
273
List<ByteBuffer> payload = context.getRequestPayload();
274
List<ByteBuffer> decompressed = compressor.decompress(payload);
275
context.setRequestPayload(decompressed);
276
}
277
}
278
279
@Override
280
public void serverSendResponse(RPCContext context) {
281
// Compress response payload
282
List<ByteBuffer> payload = context.getResponsePayload();
283
if (payload != null && !payload.isEmpty()) {
284
List<ByteBuffer> compressed = compressor.compress(payload);
285
context.setResponsePayload(compressed);
286
287
// Add compression metadata
288
Map<String, ByteBuffer> responseMeta = context.responseCallMeta();
289
responseMeta.put("compression", ByteBuffer.wrap("gzip".getBytes()));
290
}
291
}
292
293
@Override
294
public void clientReceiveResponse(RPCContext context) {
295
// Decompress response payload if compressed
296
Map<String, ByteBuffer> responseMeta = context.responseCallMeta();
297
ByteBuffer compressionBuffer = responseMeta.get("compression");
298
299
if (compressionBuffer != null && "gzip".equals(new String(compressionBuffer.array()))) {
300
List<ByteBuffer> payload = context.getResponsePayload();
301
List<ByteBuffer> decompressed = compressor.decompress(payload);
302
context.setResponsePayload(decompressed);
303
}
304
}
305
}
306
```
307
308
### Tracing Plugin
309
310
```java
311
public class TracingPlugin extends RPCPlugin {
312
private final Tracer tracer;
313
private final ThreadLocal<Span> currentSpan = new ThreadLocal<>();
314
315
public TracingPlugin(Tracer tracer) {
316
this.tracer = tracer;
317
}
318
319
@Override
320
public void clientSendRequest(RPCContext context) {
321
// Start client span
322
Message message = context.getMessage();
323
Span span = tracer.nextSpan()
324
.name("avro-rpc-client")
325
.tag("rpc.method", message.getName())
326
.tag("rpc.system", "avro")
327
.start();
328
329
currentSpan.set(span);
330
331
// Add trace context to request metadata
332
Map<String, ByteBuffer> callMeta = context.requestCallMeta();
333
TraceContext traceContext = span.context();
334
callMeta.put("trace-id", ByteBuffer.wrap(traceContext.traceId().getBytes()));
335
callMeta.put("span-id", ByteBuffer.wrap(traceContext.spanId().getBytes()));
336
}
337
338
@Override
339
public void clientReceiveResponse(RPCContext context) {
340
Span span = currentSpan.get();
341
if (span != null) {
342
if (context.isError()) {
343
span.tag("error", "true")
344
.tag("error.message", context.error().getMessage());
345
}
346
span.end();
347
currentSpan.remove();
348
}
349
}
350
351
@Override
352
public void serverReceiveRequest(RPCContext context) {
353
// Extract trace context from request metadata
354
Map<String, ByteBuffer> callMeta = context.requestCallMeta();
355
String traceId = extractString(callMeta.get("trace-id"));
356
String spanId = extractString(callMeta.get("span-id"));
357
358
// Create server span
359
SpanBuilder spanBuilder = tracer.nextSpan()
360
.name("avro-rpc-server")
361
.tag("rpc.method", context.getMessage().getName())
362
.tag("rpc.system", "avro");
363
364
if (traceId != null && spanId != null) {
365
// Continue existing trace
366
TraceContext.Builder contextBuilder = TraceContext.newBuilder()
367
.traceId(Long.parseUnsignedLong(traceId, 16))
368
.spanId(Long.parseUnsignedLong(spanId, 16));
369
spanBuilder.setParent(contextBuilder.build());
370
}
371
372
Span span = spanBuilder.start();
373
currentSpan.set(span);
374
}
375
376
@Override
377
public void serverSendResponse(RPCContext context) {
378
Span span = currentSpan.get();
379
if (span != null) {
380
if (context.isError()) {
381
span.tag("error", "true")
382
.tag("error.message", context.error().getMessage());
383
}
384
span.end();
385
currentSpan.remove();
386
}
387
}
388
389
private String extractString(ByteBuffer buffer) {
390
return buffer != null ? new String(buffer.array()) : null;
391
}
392
}
393
```
394
395
## Plugin Registration and Management
396
397
### Plugin Registration
398
399
```java
400
// Register plugins with requestor and responder
401
Requestor requestor = new SpecificRequestor(MyService.class, transceiver);
402
Responder responder = new SpecificResponder(MyService.class, implementation);
403
404
// Add multiple plugins - they execute in registration order
405
requestor.addRPCPlugin(new AuthenticationPlugin(authService));
406
requestor.addRPCPlugin(new TracingPlugin(tracer));
407
requestor.addRPCPlugin(new CompressionPlugin());
408
409
responder.addRPCPlugin(new AuthenticationPlugin(authService));
410
responder.addRPCPlugin(new RateLimitingPlugin(100)); // 100 requests/second
411
responder.addRPCPlugin(new TracingPlugin(tracer));
412
responder.addRPCPlugin(new CompressionPlugin());
413
```
414
415
### Plugin Ordering
416
417
Plugins execute in the order they are registered:
418
419
- **Client Send**: First registered plugin executes first
420
- **Client Receive**: Last registered plugin executes first (reverse order)
421
- **Server Receive**: First registered plugin executes first
422
- **Server Send**: Last registered plugin executes first (reverse order)
423
424
This ordering ensures proper nesting behavior for plugins that modify payloads or metadata.
425
426
## Error Handling in Plugins
427
428
```java
429
public class ErrorHandlingPlugin extends RPCPlugin {
430
@Override
431
public void clientSendRequest(RPCContext context) {
432
try {
433
// Plugin logic that might fail
434
performSomeOperation();
435
} catch (Exception e) {
436
// Log error but don't break the RPC call
437
logger.error("Plugin error in clientSendRequest", e);
438
// Optionally add error info to metadata
439
Map<String, ByteBuffer> callMeta = context.requestCallMeta();
440
callMeta.put("plugin-error", ByteBuffer.wrap(e.getMessage().getBytes()));
441
}
442
}
443
444
@Override
445
public void serverReceiveRequest(RPCContext context) {
446
try {
447
validateRequest(context);
448
} catch (ValidationException e) {
449
// Critical error - break the RPC call
450
throw new AvroRuntimeException("Request validation failed: " + e.getMessage(), e);
451
}
452
}
453
}
454
```
455
456
## Performance Considerations
457
458
### Plugin Performance Impact
459
460
- Plugins add overhead to every RPC call
461
- Keep plugin logic lightweight and non-blocking
462
- Avoid expensive operations in plugin methods
463
- Use asynchronous processing for heavy operations
464
465
### Optimization Tips
466
467
```java
468
// Good: Lightweight plugin
469
public class LightweightPlugin extends RPCPlugin {
470
private final AtomicLong requestCounter = new AtomicLong();
471
472
@Override
473
public void clientSendRequest(RPCContext context) {
474
// Very fast operation
475
requestCounter.incrementAndGet();
476
}
477
}
478
479
// Bad: Heavy plugin
480
public class HeavyPlugin extends RPCPlugin {
481
@Override
482
public void clientSendRequest(RPCContext context) {
483
// Expensive operations that slow down every RPC call
484
database.logRequest(context); // Blocking database call
485
Thread.sleep(100); // Very bad!
486
httpClient.post("http://analytics.com/track", context); // Blocking HTTP call
487
}
488
}
489
490
// Better: Asynchronous heavy plugin
491
public class AsyncHeavyPlugin extends RPCPlugin {
492
private final ExecutorService executor = Executors.newCachedThreadPool();
493
494
@Override
495
public void clientSendRequest(RPCContext context) {
496
// Quick metadata copy
497
final String method = context.getMessage().getName();
498
final long timestamp = System.currentTimeMillis();
499
500
// Heavy operations on background thread
501
executor.submit(() -> {
502
database.logRequest(method, timestamp);
503
httpClient.post("http://analytics.com/track", method);
504
});
505
}
506
}
507
```