or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async.mdcore-rpc.mdindex.mdplugins.mdprotocols.mdstats.mdtransports.md

plugins.mddocs/

0

# Plugin System and Context

1

2

Apache Avro IPC provides an extensible plugin system for RPC instrumentation, metadata manipulation, and cross-cutting concerns through the `RPCPlugin` and `RPCContext` classes.

3

4

## Capabilities

5

6

### RPC Plugin Base Class

7

8

The foundation for implementing custom RPC instrumentation and metadata manipulation plugins.

9

10

```java { .api }

11

public class RPCPlugin {

12

// Client-side hooks

13

public void clientStartConnect(RPCContext context);

14

public void clientFinishConnect(RPCContext context);

15

public void clientSendRequest(RPCContext context);

16

public void clientReceiveResponse(RPCContext context);

17

18

// Server-side hooks

19

public void serverConnecting(RPCContext context);

20

public void serverReceiveRequest(RPCContext context);

21

public void serverSendResponse(RPCContext context);

22

}

23

```

24

25

All methods have default empty implementations, allowing plugins to override only the hooks they need.

26

27

#### Plugin Lifecycle

28

29

The plugin methods are called in the following order during an RPC call:

30

31

1. **Client Side**: `clientStartConnect``clientFinishConnect``clientSendRequest``clientReceiveResponse`

32

2. **Server Side**: `serverConnecting``serverReceiveRequest``serverSendResponse`

33

34

#### Usage Examples

35

36

```java

37

// Custom logging plugin

38

public class LoggingPlugin extends RPCPlugin {

39

private static final Logger logger = LoggerFactory.getLogger(LoggingPlugin.class);

40

41

@Override

42

public void clientSendRequest(RPCContext context) {

43

Message message = context.getMessage();

44

logger.info("Client sending request: {}", message.getName());

45

}

46

47

@Override

48

public void clientReceiveResponse(RPCContext context) {

49

if (context.isError()) {

50

logger.error("Client received error: {}", context.error().getMessage());

51

} else {

52

logger.info("Client received successful response");

53

}

54

}

55

56

@Override

57

public void serverReceiveRequest(RPCContext context) {

58

Message message = context.getMessage();

59

String remoteName = getRemoteAddress(context);

60

logger.info("Server received request: {} from {}", message.getName(), remoteName);

61

}

62

63

@Override

64

public void serverSendResponse(RPCContext context) {

65

if (context.isError()) {

66

logger.warn("Server sending error response: {}", context.error().getMessage());

67

} else {

68

logger.info("Server sending successful response");

69

}

70

}

71

72

private String getRemoteAddress(RPCContext context) {

73

// Extract remote address from context metadata

74

Map<String, ByteBuffer> meta = context.requestHandshakeMeta();

75

ByteBuffer addressBuffer = meta.get("remote-address");

76

return addressBuffer != null ? new String(addressBuffer.array()) : "unknown";

77

}

78

}

79

80

// Register plugin with requestor and responder

81

LoggingPlugin loggingPlugin = new LoggingPlugin();

82

requestor.addRPCPlugin(loggingPlugin);

83

responder.addRPCPlugin(loggingPlugin);

84

```

85

86

### RPC Context

87

88

Provides access to RPC call state, metadata, and message information during plugin execution.

89

90

```java { .api }

91

public class RPCContext {

92

// Handshake management

93

public void setHandshakeRequest(HandshakeRequest handshakeRequest);

94

public HandshakeRequest getHandshakeRequest();

95

public void setHandshakeResponse(HandshakeResponse handshakeResponse);

96

public HandshakeResponse getHandshakeResponse();

97

98

// Metadata access

99

public Map<String, ByteBuffer> requestHandshakeMeta();

100

public Map<String, ByteBuffer> responseHandshakeMeta();

101

public Map<String, ByteBuffer> requestCallMeta();

102

public Map<String, ByteBuffer> responseCallMeta();

103

104

// Message information

105

public void setMessage(Message message);

106

public Message getMessage();

107

108

// Payload access

109

public void setRequestPayload(List<ByteBuffer> payload);

110

public List<ByteBuffer> getRequestPayload();

111

public List<ByteBuffer> getResponsePayload();

112

public void setResponsePayload(List<ByteBuffer> payload);

113

114

// Response handling

115

public Object response();

116

public Exception error();

117

public boolean isError();

118

}

119

```

120

121

#### Usage Examples

122

123

```java

124

// Metadata manipulation plugin

125

public class MetadataPlugin extends RPCPlugin {

126

@Override

127

public void clientSendRequest(RPCContext context) {

128

// Add client metadata to request

129

Map<String, ByteBuffer> callMeta = context.requestCallMeta();

130

callMeta.put("client-id", ByteBuffer.wrap("client-123".getBytes()));

131

callMeta.put("request-time", ByteBuffer.wrap(

132

String.valueOf(System.currentTimeMillis()).getBytes()));

133

callMeta.put("client-version", ByteBuffer.wrap("1.0.0".getBytes()));

134

}

135

136

@Override

137

public void serverReceiveRequest(RPCContext context) {

138

// Read client metadata

139

Map<String, ByteBuffer> callMeta = context.requestCallMeta();

140

String clientId = extractString(callMeta.get("client-id"));

141

String requestTime = extractString(callMeta.get("request-time"));

142

String clientVersion = extractString(callMeta.get("client-version"));

143

144

System.out.println("Request from client: " + clientId +

145

", version: " + clientVersion +

146

", time: " + requestTime);

147

148

// Add server metadata to response

149

Map<String, ByteBuffer> responseMeta = context.responseCallMeta();

150

responseMeta.put("server-id", ByteBuffer.wrap("server-456".getBytes()));

151

responseMeta.put("processed-time", ByteBuffer.wrap(

152

String.valueOf(System.currentTimeMillis()).getBytes()));

153

}

154

155

private String extractString(ByteBuffer buffer) {

156

return buffer != null ? new String(buffer.array()) : null;

157

}

158

}

159

```

160

161

## Advanced Plugin Examples

162

163

### Authentication Plugin

164

165

```java

166

public class AuthenticationPlugin extends RPCPlugin {

167

private final AuthenticationService authService;

168

169

public AuthenticationPlugin(AuthenticationService authService) {

170

this.authService = authService;

171

}

172

173

@Override

174

public void clientSendRequest(RPCContext context) {

175

// Add authentication token to request metadata

176

String token = authService.getCurrentToken();

177

if (token != null) {

178

Map<String, ByteBuffer> callMeta = context.requestCallMeta();

179

callMeta.put("auth-token", ByteBuffer.wrap(token.getBytes()));

180

}

181

}

182

183

@Override

184

public void serverReceiveRequest(RPCContext context) {

185

// Validate authentication token

186

Map<String, ByteBuffer> callMeta = context.requestCallMeta();

187

ByteBuffer tokenBuffer = callMeta.get("auth-token");

188

189

if (tokenBuffer == null) {

190

throw new AvroRuntimeException("Missing authentication token");

191

}

192

193

String token = new String(tokenBuffer.array());

194

if (!authService.validateToken(token)) {

195

throw new AvroRuntimeException("Invalid authentication token");

196

}

197

198

// Store authenticated user in context for later use

199

String userId = authService.getUserId(token);

200

callMeta.put("authenticated-user", ByteBuffer.wrap(userId.getBytes()));

201

}

202

203

@Override

204

public void serverSendResponse(RPCContext context) {

205

// Add server authentication info to response

206

Map<String, ByteBuffer> responseMeta = context.responseCallMeta();

207

responseMeta.put("server-auth", ByteBuffer.wrap("authenticated".getBytes()));

208

}

209

}

210

```

211

212

### Rate Limiting Plugin

213

214

```java

215

public class RateLimitingPlugin extends RPCPlugin {

216

private final Map<String, RateLimiter> clientLimiters = new ConcurrentHashMap<>();

217

private final int requestsPerSecond;

218

219

public RateLimitingPlugin(int requestsPerSecond) {

220

this.requestsPerSecond = requestsPerSecond;

221

}

222

223

@Override

224

public void serverReceiveRequest(RPCContext context) {

225

// Extract client identifier

226

String clientId = extractClientId(context);

227

228

// Get or create rate limiter for client

229

RateLimiter limiter = clientLimiters.computeIfAbsent(clientId,

230

k -> RateLimiter.create(requestsPerSecond));

231

232

// Check rate limit

233

if (!limiter.tryAcquire()) {

234

throw new AvroRuntimeException("Rate limit exceeded for client: " + clientId);

235

}

236

}

237

238

private String extractClientId(RPCContext context) {

239

Map<String, ByteBuffer> callMeta = context.requestCallMeta();

240

ByteBuffer clientIdBuffer = callMeta.get("client-id");

241

return clientIdBuffer != null ? new String(clientIdBuffer.array()) : "unknown";

242

}

243

}

244

```

245

246

### Request/Response Compression Plugin

247

248

```java

249

public class CompressionPlugin extends RPCPlugin {

250

private final Compressor compressor = new GzipCompressor();

251

252

@Override

253

public void clientSendRequest(RPCContext context) {

254

// Compress request payload

255

List<ByteBuffer> payload = context.getRequestPayload();

256

if (payload != null && !payload.isEmpty()) {

257

List<ByteBuffer> compressed = compressor.compress(payload);

258

context.setRequestPayload(compressed);

259

260

// Add compression metadata

261

Map<String, ByteBuffer> callMeta = context.requestCallMeta();

262

callMeta.put("compression", ByteBuffer.wrap("gzip".getBytes()));

263

}

264

}

265

266

@Override

267

public void serverReceiveRequest(RPCContext context) {

268

// Decompress request payload if compressed

269

Map<String, ByteBuffer> callMeta = context.requestCallMeta();

270

ByteBuffer compressionBuffer = callMeta.get("compression");

271

272

if (compressionBuffer != null && "gzip".equals(new String(compressionBuffer.array()))) {

273

List<ByteBuffer> payload = context.getRequestPayload();

274

List<ByteBuffer> decompressed = compressor.decompress(payload);

275

context.setRequestPayload(decompressed);

276

}

277

}

278

279

@Override

280

public void serverSendResponse(RPCContext context) {

281

// Compress response payload

282

List<ByteBuffer> payload = context.getResponsePayload();

283

if (payload != null && !payload.isEmpty()) {

284

List<ByteBuffer> compressed = compressor.compress(payload);

285

context.setResponsePayload(compressed);

286

287

// Add compression metadata

288

Map<String, ByteBuffer> responseMeta = context.responseCallMeta();

289

responseMeta.put("compression", ByteBuffer.wrap("gzip".getBytes()));

290

}

291

}

292

293

@Override

294

public void clientReceiveResponse(RPCContext context) {

295

// Decompress response payload if compressed

296

Map<String, ByteBuffer> responseMeta = context.responseCallMeta();

297

ByteBuffer compressionBuffer = responseMeta.get("compression");

298

299

if (compressionBuffer != null && "gzip".equals(new String(compressionBuffer.array()))) {

300

List<ByteBuffer> payload = context.getResponsePayload();

301

List<ByteBuffer> decompressed = compressor.decompress(payload);

302

context.setResponsePayload(decompressed);

303

}

304

}

305

}

306

```

307

308

### Tracing Plugin

309

310

```java

311

public class TracingPlugin extends RPCPlugin {

312

private final Tracer tracer;

313

private final ThreadLocal<Span> currentSpan = new ThreadLocal<>();

314

315

public TracingPlugin(Tracer tracer) {

316

this.tracer = tracer;

317

}

318

319

@Override

320

public void clientSendRequest(RPCContext context) {

321

// Start client span

322

Message message = context.getMessage();

323

Span span = tracer.nextSpan()

324

.name("avro-rpc-client")

325

.tag("rpc.method", message.getName())

326

.tag("rpc.system", "avro")

327

.start();

328

329

currentSpan.set(span);

330

331

// Add trace context to request metadata

332

Map<String, ByteBuffer> callMeta = context.requestCallMeta();

333

TraceContext traceContext = span.context();

334

callMeta.put("trace-id", ByteBuffer.wrap(traceContext.traceId().getBytes()));

335

callMeta.put("span-id", ByteBuffer.wrap(traceContext.spanId().getBytes()));

336

}

337

338

@Override

339

public void clientReceiveResponse(RPCContext context) {

340

Span span = currentSpan.get();

341

if (span != null) {

342

if (context.isError()) {

343

span.tag("error", "true")

344

.tag("error.message", context.error().getMessage());

345

}

346

span.end();

347

currentSpan.remove();

348

}

349

}

350

351

@Override

352

public void serverReceiveRequest(RPCContext context) {

353

// Extract trace context from request metadata

354

Map<String, ByteBuffer> callMeta = context.requestCallMeta();

355

String traceId = extractString(callMeta.get("trace-id"));

356

String spanId = extractString(callMeta.get("span-id"));

357

358

// Create server span

359

SpanBuilder spanBuilder = tracer.nextSpan()

360

.name("avro-rpc-server")

361

.tag("rpc.method", context.getMessage().getName())

362

.tag("rpc.system", "avro");

363

364

if (traceId != null && spanId != null) {

365

// Continue existing trace

366

TraceContext.Builder contextBuilder = TraceContext.newBuilder()

367

.traceId(Long.parseUnsignedLong(traceId, 16))

368

.spanId(Long.parseUnsignedLong(spanId, 16));

369

spanBuilder.setParent(contextBuilder.build());

370

}

371

372

Span span = spanBuilder.start();

373

currentSpan.set(span);

374

}

375

376

@Override

377

public void serverSendResponse(RPCContext context) {

378

Span span = currentSpan.get();

379

if (span != null) {

380

if (context.isError()) {

381

span.tag("error", "true")

382

.tag("error.message", context.error().getMessage());

383

}

384

span.end();

385

currentSpan.remove();

386

}

387

}

388

389

private String extractString(ByteBuffer buffer) {

390

return buffer != null ? new String(buffer.array()) : null;

391

}

392

}

393

```

394

395

## Plugin Registration and Management

396

397

### Plugin Registration

398

399

```java

400

// Register plugins with requestor and responder

401

Requestor requestor = new SpecificRequestor(MyService.class, transceiver);

402

Responder responder = new SpecificResponder(MyService.class, implementation);

403

404

// Add multiple plugins - they execute in registration order

405

requestor.addRPCPlugin(new AuthenticationPlugin(authService));

406

requestor.addRPCPlugin(new TracingPlugin(tracer));

407

requestor.addRPCPlugin(new CompressionPlugin());

408

409

responder.addRPCPlugin(new AuthenticationPlugin(authService));

410

responder.addRPCPlugin(new RateLimitingPlugin(100)); // 100 requests/second

411

responder.addRPCPlugin(new TracingPlugin(tracer));

412

responder.addRPCPlugin(new CompressionPlugin());

413

```

414

415

### Plugin Ordering

416

417

Plugins execute in the order they are registered:

418

419

- **Client Send**: First registered plugin executes first

420

- **Client Receive**: Last registered plugin executes first (reverse order)

421

- **Server Receive**: First registered plugin executes first

422

- **Server Send**: Last registered plugin executes first (reverse order)

423

424

This ordering ensures proper nesting behavior for plugins that modify payloads or metadata.

425

426

## Error Handling in Plugins

427

428

```java

429

public class ErrorHandlingPlugin extends RPCPlugin {

430

@Override

431

public void clientSendRequest(RPCContext context) {

432

try {

433

// Plugin logic that might fail

434

performSomeOperation();

435

} catch (Exception e) {

436

// Log error but don't break the RPC call

437

logger.error("Plugin error in clientSendRequest", e);

438

// Optionally add error info to metadata

439

Map<String, ByteBuffer> callMeta = context.requestCallMeta();

440

callMeta.put("plugin-error", ByteBuffer.wrap(e.getMessage().getBytes()));

441

}

442

}

443

444

@Override

445

public void serverReceiveRequest(RPCContext context) {

446

try {

447

validateRequest(context);

448

} catch (ValidationException e) {

449

// Critical error - break the RPC call

450

throw new AvroRuntimeException("Request validation failed: " + e.getMessage(), e);

451

}

452

}

453

}

454

```

455

456

## Performance Considerations

457

458

### Plugin Performance Impact

459

460

- Plugins add overhead to every RPC call

461

- Keep plugin logic lightweight and non-blocking

462

- Avoid expensive operations in plugin methods

463

- Use asynchronous processing for heavy operations

464

465

### Optimization Tips

466

467

```java

468

// Good: Lightweight plugin

469

public class LightweightPlugin extends RPCPlugin {

470

private final AtomicLong requestCounter = new AtomicLong();

471

472

@Override

473

public void clientSendRequest(RPCContext context) {

474

// Very fast operation

475

requestCounter.incrementAndGet();

476

}

477

}

478

479

// Bad: Heavy plugin

480

public class HeavyPlugin extends RPCPlugin {

481

@Override

482

public void clientSendRequest(RPCContext context) {

483

// Expensive operations that slow down every RPC call

484

database.logRequest(context); // Blocking database call

485

Thread.sleep(100); // Very bad!

486

httpClient.post("http://analytics.com/track", context); // Blocking HTTP call

487

}

488

}

489

490

// Better: Asynchronous heavy plugin

491

public class AsyncHeavyPlugin extends RPCPlugin {

492

private final ExecutorService executor = Executors.newCachedThreadPool();

493

494

@Override

495

public void clientSendRequest(RPCContext context) {

496

// Quick metadata copy

497

final String method = context.getMessage().getName();

498

final long timestamp = System.currentTimeMillis();

499

500

// Heavy operations on background thread

501

executor.submit(() -> {

502

database.logRequest(method, timestamp);

503

httpClient.post("http://analytics.com/track", method);

504

});

505

}

506

}

507

```