The RabbitMQ Java client library allows Java applications to interface with RabbitMQ message broker servers
—
Remote Procedure Call (RPC) patterns over AMQP for request-response messaging. RabbitMQ provides convenient RPC classes that enable synchronous call semantics over asynchronous message queues.
Client for making RPC calls to RPC servers over AMQP.
/**
* Simple RPC client for making calls to RPC servers
*/
public class RpcClient implements AutoCloseable {
/**
* Create RPC client with default exchange and routing
* @param channel - Channel to use for RPC calls
* @param exchange - Exchange to publish requests to
* @param routingKey - Routing key for requests
*/
public RpcClient(Channel channel, String exchange, String routingKey) throws IOException;
/**
* Create RPC client with custom timeout
* @param channel - Channel to use for RPC calls
* @param exchange - Exchange to publish requests to
* @param routingKey - Routing key for requests
* @param timeout - Request timeout in milliseconds
*/
public RpcClient(Channel channel, String exchange, String routingKey, int timeout) throws IOException;
/**
* Make RPC call with byte array request and response
* @param message - Request message as bytes
* @return Response message as bytes
*/
public byte[] primitiveCall(byte[] message) throws IOException, ShutdownSignalException, TimeoutException;
/**
* Make RPC call with string request and response
* @param message - Request message as string
* @return Response message as string
*/
public String stringCall(String message) throws IOException, ShutdownSignalException, TimeoutException;
/**
* Make RPC call with Map request and response
* @param message - Request message as Map
* @return Response message as Map
*/
public Map<String, Object> mapCall(Map<String, Object> message) throws IOException, ShutdownSignalException, TimeoutException;
/**
* Make RPC call with Map request and response with custom timeout
* @param message - Request message as Map
* @param timeout - Request timeout in milliseconds
* @return Response message as Map
*/
public Map<String, Object> mapCall(Map<String, Object> message, int timeout) throws IOException, ShutdownSignalException, TimeoutException;
/**
* Close the RPC client and clean up resources
*/
@Override
public void close() throws IOException;
}Usage Examples:
// Basic RPC client usage
Channel channel = connection.createChannel();
RpcClient rpcClient = new RpcClient(channel, "", "rpc_queue");
// String-based RPC call
String response = rpcClient.stringCall("Hello RPC");
System.out.println("Response: " + response);
// Byte array RPC call
byte[] request = "binary request".getBytes();
byte[] responseBytes = rpcClient.primitiveCall(request);
// Map-based RPC call
Map<String, Object> requestMap = new HashMap<>();
requestMap.put("method", "fibonacci");
requestMap.put("number", 30);
Map<String, Object> responseMap = rpcClient.mapCall(requestMap);
rpcClient.close();Base class for implementing RPC servers that handle incoming RPC calls.
/**
* Abstract base class for RPC servers
*/
public abstract class RpcServer implements AutoCloseable {
/**
* Create RPC server on specified queue
* @param channel - Channel to use for RPC operations
* @param queueName - Queue name to consume requests from
*/
public RpcServer(Channel channel, String queueName) throws IOException;
/**
* Handle incoming RPC call - implement this method
* @param requestBody - Request message body
* @param replyProperties - Properties for reply message
* @return Response message body
*/
public abstract byte[] handleCall(byte[] requestBody, AMQP.BasicProperties replyProperties);
/**
* Start processing RPC calls (blocking call)
*/
public void mainloop() throws IOException;
/**
* Process a single request and send response
* @param requestBody - Request message body
* @param properties - Request message properties
* @return Response message body
*/
public byte[] processRequest(byte[] requestBody, AMQP.BasicProperties properties) throws IOException;
/**
* Close the RPC server and clean up resources
*/
@Override
public void close() throws IOException;
}RPC server implementation that works with string messages.
/**
* RPC server that handles string-based requests and responses
*/
public abstract class StringRpcServer extends RpcServer {
/**
* Create string RPC server
* @param channel - Channel to use
* @param queueName - Queue to consume from
*/
public StringRpcServer(Channel channel, String queueName) throws IOException;
/**
* Handle string-based RPC call - implement this method
* @param request - Request message as string
* @return Response message as string
*/
public abstract String handleStringCall(String request);
}RPC server implementation that works with Map-based JSON messages.
/**
* RPC server that handles Map-based requests and responses
*/
public abstract class MapRpcServer extends RpcServer {
/**
* Create Map RPC server
* @param channel - Channel to use
* @param queueName - Queue to consume from
*/
public MapRpcServer(Channel channel, String queueName) throws IOException;
/**
* Handle Map-based RPC call - implement this method
* @param request - Request message as Map
* @return Response message as Map
*/
public abstract Map<String, Object> handleMapCall(Map<String, Object> request);
}Usage Examples:
// String RPC Server implementation
public class FibonacciRpcServer extends StringRpcServer {
public FibonacciRpcServer(Channel channel, String queueName) throws IOException {
super(channel, queueName);
}
@Override
public String handleStringCall(String request) {
int n = Integer.parseInt(request);
int result = fibonacci(n);
return String.valueOf(result);
}
private int fibonacci(int n) {
if (n <= 1) return n;
return fibonacci(n - 1) + fibonacci(n - 2);
}
}
// Start the RPC server
Channel channel = connection.createChannel();
FibonacciRpcServer server = new FibonacciRpcServer(channel, "rpc_queue");
server.mainloop(); // Blocks and processes requests// Map RPC Server implementation
public class CalculatorRpcServer extends MapRpcServer {
public CalculatorRpcServer(Channel channel, String queueName) throws IOException {
super(channel, queueName);
}
@Override
public Map<String, Object> handleMapCall(Map<String, Object> request) {
String operation = (String) request.get("operation");
double a = ((Number) request.get("a")).doubleValue();
double b = ((Number) request.get("b")).doubleValue();
double result;
switch (operation) {
case "add": result = a + b; break;
case "subtract": result = a - b; break;
case "multiply": result = a * b; break;
case "divide": result = a / b; break;
default: throw new IllegalArgumentException("Unknown operation");
}
Map<String, Object> response = new HashMap<>();
response.put("result", result);
return response;
}
}/**
* Parameters for configuring RPC client behavior
*/
public class RpcClientParams {
/**
* Set request timeout in milliseconds
* @param timeout - Timeout value
*/
public RpcClientParams setTimeout(int timeout);
/**
* Set whether to use publisher confirms
* @param useConfirms - Whether to enable confirms
*/
public RpcClientParams setUseConfirms(boolean useConfirms);
/**
* Set whether requests should be mandatory
* @param mandatory - Whether requests are mandatory
*/
public RpcClientParams setMandatory(boolean mandatory);
/**
* Get configured timeout
*/
public int getTimeout();
/**
* Check if confirms are enabled
*/
public boolean isUseConfirms();
/**
* Check if requests are mandatory
*/
public boolean isMandatory();
}Install with Tessl CLI
npx tessl i tessl/maven-com-rabbitmq--amqp-client