0
# RPC Support
1
2
Remote Procedure Call (RPC) patterns over AMQP for request-response messaging. RabbitMQ provides convenient RPC classes that enable synchronous call semantics over asynchronous message queues.
3
4
## Capabilities
5
6
### RPC Client
7
8
Client for making RPC calls to RPC servers over AMQP.
9
10
```java { .api }
11
/**
12
* Simple RPC client for making calls to RPC servers
13
*/
14
public class RpcClient implements AutoCloseable {
15
16
/**
17
* Create RPC client with default exchange and routing
18
* @param channel - Channel to use for RPC calls
19
* @param exchange - Exchange to publish requests to
20
* @param routingKey - Routing key for requests
21
*/
22
public RpcClient(Channel channel, String exchange, String routingKey) throws IOException;
23
24
/**
25
* Create RPC client with custom timeout
26
* @param channel - Channel to use for RPC calls
27
* @param exchange - Exchange to publish requests to
28
* @param routingKey - Routing key for requests
29
* @param timeout - Request timeout in milliseconds
30
*/
31
public RpcClient(Channel channel, String exchange, String routingKey, int timeout) throws IOException;
32
33
/**
34
* Make RPC call with byte array request and response
35
* @param message - Request message as bytes
36
* @return Response message as bytes
37
*/
38
public byte[] primitiveCall(byte[] message) throws IOException, ShutdownSignalException, TimeoutException;
39
40
/**
41
* Make RPC call with string request and response
42
* @param message - Request message as string
43
* @return Response message as string
44
*/
45
public String stringCall(String message) throws IOException, ShutdownSignalException, TimeoutException;
46
47
/**
48
* Make RPC call with Map request and response
49
* @param message - Request message as Map
50
* @return Response message as Map
51
*/
52
public Map<String, Object> mapCall(Map<String, Object> message) throws IOException, ShutdownSignalException, TimeoutException;
53
54
/**
55
* Make RPC call with Map request and response with custom timeout
56
* @param message - Request message as Map
57
* @param timeout - Request timeout in milliseconds
58
* @return Response message as Map
59
*/
60
public Map<String, Object> mapCall(Map<String, Object> message, int timeout) throws IOException, ShutdownSignalException, TimeoutException;
61
62
/**
63
* Close the RPC client and clean up resources
64
*/
65
@Override
66
public void close() throws IOException;
67
}
68
```
69
70
**Usage Examples:**
71
72
```java
73
// Basic RPC client usage
74
Channel channel = connection.createChannel();
75
RpcClient rpcClient = new RpcClient(channel, "", "rpc_queue");
76
77
// String-based RPC call
78
String response = rpcClient.stringCall("Hello RPC");
79
System.out.println("Response: " + response);
80
81
// Byte array RPC call
82
byte[] request = "binary request".getBytes();
83
byte[] responseBytes = rpcClient.primitiveCall(request);
84
85
// Map-based RPC call
86
Map<String, Object> requestMap = new HashMap<>();
87
requestMap.put("method", "fibonacci");
88
requestMap.put("number", 30);
89
Map<String, Object> responseMap = rpcClient.mapCall(requestMap);
90
91
rpcClient.close();
92
```
93
94
### RPC Server
95
96
Base class for implementing RPC servers that handle incoming RPC calls.
97
98
```java { .api }
99
/**
100
* Abstract base class for RPC servers
101
*/
102
public abstract class RpcServer implements AutoCloseable {
103
104
/**
105
* Create RPC server on specified queue
106
* @param channel - Channel to use for RPC operations
107
* @param queueName - Queue name to consume requests from
108
*/
109
public RpcServer(Channel channel, String queueName) throws IOException;
110
111
/**
112
* Handle incoming RPC call - implement this method
113
* @param requestBody - Request message body
114
* @param replyProperties - Properties for reply message
115
* @return Response message body
116
*/
117
public abstract byte[] handleCall(byte[] requestBody, AMQP.BasicProperties replyProperties);
118
119
/**
120
* Start processing RPC calls (blocking call)
121
*/
122
public void mainloop() throws IOException;
123
124
/**
125
* Process a single request and send response
126
* @param requestBody - Request message body
127
* @param properties - Request message properties
128
* @return Response message body
129
*/
130
public byte[] processRequest(byte[] requestBody, AMQP.BasicProperties properties) throws IOException;
131
132
/**
133
* Close the RPC server and clean up resources
134
*/
135
@Override
136
public void close() throws IOException;
137
}
138
```
139
140
### String RPC Server
141
142
RPC server implementation that works with string messages.
143
144
```java { .api }
145
/**
146
* RPC server that handles string-based requests and responses
147
*/
148
public abstract class StringRpcServer extends RpcServer {
149
150
/**
151
* Create string RPC server
152
* @param channel - Channel to use
153
* @param queueName - Queue to consume from
154
*/
155
public StringRpcServer(Channel channel, String queueName) throws IOException;
156
157
/**
158
* Handle string-based RPC call - implement this method
159
* @param request - Request message as string
160
* @return Response message as string
161
*/
162
public abstract String handleStringCall(String request);
163
}
164
```
165
166
### Map RPC Server
167
168
RPC server implementation that works with Map-based JSON messages.
169
170
```java { .api }
171
/**
172
* RPC server that handles Map-based requests and responses
173
*/
174
public abstract class MapRpcServer extends RpcServer {
175
176
/**
177
* Create Map RPC server
178
* @param channel - Channel to use
179
* @param queueName - Queue to consume from
180
*/
181
public MapRpcServer(Channel channel, String queueName) throws IOException;
182
183
/**
184
* Handle Map-based RPC call - implement this method
185
* @param request - Request message as Map
186
* @return Response message as Map
187
*/
188
public abstract Map<String, Object> handleMapCall(Map<String, Object> request);
189
}
190
```
191
192
**Usage Examples:**
193
194
```java
195
// String RPC Server implementation
196
public class FibonacciRpcServer extends StringRpcServer {
197
public FibonacciRpcServer(Channel channel, String queueName) throws IOException {
198
super(channel, queueName);
199
}
200
201
@Override
202
public String handleStringCall(String request) {
203
int n = Integer.parseInt(request);
204
int result = fibonacci(n);
205
return String.valueOf(result);
206
}
207
208
private int fibonacci(int n) {
209
if (n <= 1) return n;
210
return fibonacci(n - 1) + fibonacci(n - 2);
211
}
212
}
213
214
// Start the RPC server
215
Channel channel = connection.createChannel();
216
FibonacciRpcServer server = new FibonacciRpcServer(channel, "rpc_queue");
217
server.mainloop(); // Blocks and processes requests
218
```
219
220
```java
221
// Map RPC Server implementation
222
public class CalculatorRpcServer extends MapRpcServer {
223
public CalculatorRpcServer(Channel channel, String queueName) throws IOException {
224
super(channel, queueName);
225
}
226
227
@Override
228
public Map<String, Object> handleMapCall(Map<String, Object> request) {
229
String operation = (String) request.get("operation");
230
double a = ((Number) request.get("a")).doubleValue();
231
double b = ((Number) request.get("b")).doubleValue();
232
233
double result;
234
switch (operation) {
235
case "add": result = a + b; break;
236
case "subtract": result = a - b; break;
237
case "multiply": result = a * b; break;
238
case "divide": result = a / b; break;
239
default: throw new IllegalArgumentException("Unknown operation");
240
}
241
242
Map<String, Object> response = new HashMap<>();
243
response.put("result", result);
244
return response;
245
}
246
}
247
```
248
249
## Types
250
251
### RPC Client Parameters
252
253
```java { .api }
254
/**
255
* Parameters for configuring RPC client behavior
256
*/
257
public class RpcClientParams {
258
259
/**
260
* Set request timeout in milliseconds
261
* @param timeout - Timeout value
262
*/
263
public RpcClientParams setTimeout(int timeout);
264
265
/**
266
* Set whether to use publisher confirms
267
* @param useConfirms - Whether to enable confirms
268
*/
269
public RpcClientParams setUseConfirms(boolean useConfirms);
270
271
/**
272
* Set whether requests should be mandatory
273
* @param mandatory - Whether requests are mandatory
274
*/
275
public RpcClientParams setMandatory(boolean mandatory);
276
277
/**
278
* Get configured timeout
279
*/
280
public int getTimeout();
281
282
/**
283
* Check if confirms are enabled
284
*/
285
public boolean isUseConfirms();
286
287
/**
288
* Check if requests are mandatory
289
*/
290
public boolean isMandatory();
291
}
292
```