0
# Core RPC Framework
1
2
The foundation of Apache Avro IPC consists of abstract base classes that provide the client-server RPC communication infrastructure. These classes define the contract for RPC operations while remaining transport and protocol agnostic.
3
4
## Capabilities
5
6
### Client-Side RPC Base Class
7
8
The `Requestor` class provides the client-side foundation for RPC communication, handling protocol negotiation, request serialization, and response deserialization.
9
10
```java { .api }
11
public abstract class Requestor {
12
protected Requestor(Protocol local, Transceiver transceiver) throws IOException;
13
14
// Protocol and transport access
15
public Protocol getLocal();
16
public Transceiver getTransceiver();
17
public Protocol getRemote() throws IOException;
18
19
// Plugin management
20
public void addRPCPlugin(RPCPlugin plugin);
21
22
// Synchronous RPC call
23
public Object request(String messageName, Object request) throws Exception;
24
25
// Asynchronous RPC call with callback
26
public <T> void request(String messageName, Object request, Callback<T> callback)
27
throws AvroRemoteException, IOException;
28
29
// Abstract methods for protocol-specific implementations
30
public abstract void writeRequest(Schema schema, Object request, Encoder out) throws IOException;
31
public abstract Object readResponse(Schema writer, Schema reader, Decoder in) throws IOException;
32
public abstract Exception readError(Schema writer, Schema reader, Decoder in) throws IOException;
33
}
34
```
35
36
#### Usage Examples
37
38
```java
39
// Synchronous RPC call
40
MyRequestor requestor = new MyRequestor(protocol, transceiver);
41
try {
42
Object result = requestor.request("getMessage", requestData);
43
// Handle result
44
} catch (Exception e) {
45
// Handle RPC error
46
}
47
48
// Asynchronous RPC call
49
requestor.request("getMessage", requestData, new Callback<String>() {
50
@Override
51
public void handleResult(String result) {
52
// Handle successful result
53
}
54
55
@Override
56
public void handleError(Throwable error) {
57
// Handle error
58
}
59
});
60
```
61
62
### Server-Side RPC Base Class
63
64
The `Responder` class provides the server-side foundation for RPC communication, handling request deserialization, method dispatch, and response serialization.
65
66
```java { .api }
67
public abstract class Responder {
68
protected Responder(Protocol local);
69
70
// Protocol access
71
public static Protocol getRemote(); // ThreadLocal access to remote protocol
72
public Protocol getLocal();
73
74
// Plugin management
75
public void addRPCPlugin(RPCPlugin plugin);
76
77
// Process RPC request buffers
78
public List<ByteBuffer> respond(List<ByteBuffer> buffers) throws IOException;
79
public List<ByteBuffer> respond(List<ByteBuffer> buffers, Transceiver connection) throws IOException;
80
81
// Abstract methods for protocol-specific implementations
82
public abstract Object respond(Message message, Object request) throws Exception;
83
public abstract Object readRequest(Schema actual, Schema expected, Decoder in) throws IOException;
84
public abstract void writeResponse(Schema schema, Object response, Encoder out) throws IOException;
85
public abstract void writeError(Schema schema, Object error, Encoder out) throws IOException;
86
}
87
```
88
89
#### Usage Examples
90
91
```java
92
// Custom responder implementation
93
public class MyResponder extends GenericResponder {
94
public MyResponder(Protocol protocol) {
95
super(protocol);
96
}
97
98
@Override
99
public Object respond(Message message, Object request) throws Exception {
100
String messageName = message.getName();
101
switch (messageName) {
102
case "getMessage":
103
return handleGetMessage(request);
104
case "setMessage":
105
return handleSetMessage(request);
106
default:
107
throw new AvroRuntimeException("Unknown message: " + messageName);
108
}
109
}
110
111
private Object handleGetMessage(Object request) {
112
// Implementation logic
113
return "Hello from server";
114
}
115
116
private Object handleSetMessage(Object request) {
117
// Implementation logic
118
return null; // void method
119
}
120
}
121
```
122
123
### Transport Abstraction
124
125
The `Transceiver` class provides the transport abstraction layer, handling network I/O operations while remaining protocol agnostic.
126
127
```java { .api }
128
public abstract class Transceiver implements Closeable {
129
// Connection information
130
public abstract String getRemoteName() throws IOException;
131
public boolean isConnected();
132
133
// Protocol management
134
public void setRemote(Protocol protocol);
135
public Protocol getRemote();
136
137
// Channel synchronization
138
public void lockChannel();
139
public void unlockChannel();
140
141
// Synchronous communication
142
public List<ByteBuffer> transceive(List<ByteBuffer> request) throws IOException;
143
144
// Asynchronous communication
145
public void transceive(List<ByteBuffer> request, Callback<List<ByteBuffer>> callback) throws IOException;
146
147
// Abstract I/O methods
148
public abstract List<ByteBuffer> readBuffers() throws IOException;
149
public abstract void writeBuffers(List<ByteBuffer> buffers) throws IOException;
150
151
// Resource management
152
public void close() throws IOException;
153
}
154
```
155
156
#### Usage Examples
157
158
```java
159
// Using transceiver directly for low-level communication
160
List<ByteBuffer> requestBuffers = serializeRequest(request);
161
try {
162
transceiver.lockChannel();
163
List<ByteBuffer> responseBuffers = transceiver.transceive(requestBuffers);
164
Object response = deserializeResponse(responseBuffers);
165
} finally {
166
transceiver.unlockChannel();
167
}
168
169
// Asynchronous transceiver usage
170
transceiver.transceive(requestBuffers, new Callback<List<ByteBuffer>>() {
171
@Override
172
public void handleResult(List<ByteBuffer> responseBuffers) {
173
Object response = deserializeResponse(responseBuffers);
174
// Handle response
175
}
176
177
@Override
178
public void handleError(Throwable error) {
179
// Handle communication error
180
}
181
});
182
```
183
184
### Server Interface
185
186
The `Server` interface defines the contract for server lifecycle management, providing standard start/stop semantics.
187
188
```java { .api }
189
public interface Server {
190
// Server information
191
int getPort();
192
193
// Lifecycle management
194
void start();
195
void close();
196
void join() throws InterruptedException;
197
}
198
```
199
200
#### Usage Examples
201
202
```java
203
// Server lifecycle management
204
Server server = new SaslSocketServer(responder, new InetSocketAddress(8080));
205
206
// Start server
207
server.start();
208
System.out.println("Server started on port: " + server.getPort());
209
210
// Wait for server to finish (in separate thread or shutdown hook)
211
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
212
server.close();
213
try {
214
server.join();
215
} catch (InterruptedException e) {
216
Thread.currentThread().interrupt();
217
}
218
}));
219
220
// Wait for server completion
221
server.join();
222
```
223
224
## Advanced Usage
225
226
### Custom Requestor Implementation
227
228
```java
229
public class CustomRequestor extends Requestor {
230
private final SpecificDatumWriter<Object> writer;
231
private final SpecificDatumReader<Object> reader;
232
233
public CustomRequestor(Protocol protocol, Transceiver transceiver) throws IOException {
234
super(protocol, transceiver);
235
this.writer = new SpecificDatumWriter<>(protocol);
236
this.reader = new SpecificDatumReader<>(protocol);
237
}
238
239
@Override
240
public void writeRequest(Schema schema, Object request, Encoder out) throws IOException {
241
writer.setSchema(schema);
242
writer.write(request, out);
243
}
244
245
@Override
246
public Object readResponse(Schema writer, Schema reader, Decoder in) throws IOException {
247
this.reader.setSchema(writer);
248
this.reader.setExpected(reader);
249
return this.reader.read(null, in);
250
}
251
252
@Override
253
public Exception readError(Schema writer, Schema reader, Decoder in) throws IOException {
254
this.reader.setSchema(writer);
255
this.reader.setExpected(reader);
256
Object error = this.reader.read(null, in);
257
if (error instanceof Exception) {
258
return (Exception) error;
259
}
260
return new AvroRemoteException(error);
261
}
262
}
263
```
264
265
### Plugin Integration
266
267
```java
268
// Add monitoring plugin to requestor and responder
269
RPCPlugin monitoringPlugin = new RPCPlugin() {
270
@Override
271
public void clientSendRequest(RPCContext context) {
272
System.out.println("Sending request: " + context.getMessage().getName());
273
}
274
275
@Override
276
public void clientReceiveResponse(RPCContext context) {
277
System.out.println("Received response, error: " + context.isError());
278
}
279
};
280
281
requestor.addRPCPlugin(monitoringPlugin);
282
responder.addRPCPlugin(monitoringPlugin);
283
```
284
285
## Error Handling
286
287
The core RPC framework provides comprehensive error handling through exceptions and the RPC context:
288
289
```java
290
try {
291
Object result = requestor.request("methodName", request);
292
} catch (AvroRemoteException e) {
293
// Remote method threw an exception
294
System.err.println("Remote error: " + e.getMessage());
295
} catch (IOException e) {
296
// Network or serialization error
297
System.err.println("Communication error: " + e.getMessage());
298
} catch (Exception e) {
299
// Other RPC-related errors
300
System.err.println("RPC error: " + e.getMessage());
301
}
302
```
303
304
## Thread Safety
305
306
- `Requestor` instances are thread-safe for concurrent RPC calls
307
- `Responder` instances are thread-safe for concurrent request processing
308
- `Transceiver` instances provide channel locking for thread-safe access
309
- Each RPC call gets its own `RPCContext` for thread-local state management