The RabbitMQ Java client library allows Java applications to interface with RabbitMQ message broker servers
—
Core functionality for establishing connections to RabbitMQ brokers and creating channels for AMQP operations. Connections represent the TCP connection to the broker, while channels provide a lightweight way to multiplex multiple conversations over a single connection.
Factory class for creating and configuring connections to RabbitMQ brokers.
/**
* Factory class to facilitate opening a Connection to a RabbitMQ node.
* Most connection and socket settings are configured using this factory.
*/
public class ConnectionFactory implements Cloneable {
// Connection creation
public Connection newConnection() throws IOException, TimeoutException;
public Connection newConnection(ExecutorService executor) throws IOException, TimeoutException;
public Connection newConnection(Address[] addrs) throws IOException, TimeoutException;
public Connection newConnection(ExecutorService executor, Address[] addrs) throws IOException, TimeoutException;
public Connection newConnection(List<Address> addrs) throws IOException, TimeoutException;
public Connection newConnection(AddressResolver addressResolver) throws IOException, TimeoutException;
public Connection newConnection(String connectionName) throws IOException, TimeoutException;
// Basic connection settings
public void setHost(String host);
public String getHost();
public void setPort(int port);
public int getPort();
public void setUsername(String username);
public String getUsername();
public void setPassword(String password);
public String getPassword();
public void setVirtualHost(String virtualHost);
public String getVirtualHost();
// URI-based configuration
public void setUri(String uri) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException;
public void setUri(URI uri) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException;
public String getUri();
// Timeouts and limits
public void setConnectionTimeout(int timeout);
public int getConnectionTimeout();
public void setHandshakeTimeout(int timeout);
public int getHandshakeTimeout();
public void setShutdownTimeout(int shutdownTimeout);
public int getShutdownTimeout();
public void setRequestedHeartbeat(int requestedHeartbeat);
public int getRequestedHeartbeat();
public void setRequestedChannelMax(int requestedChannelMax);
public int getRequestedChannelMax();
public void setRequestedFrameMax(int requestedFrameMax);
public int getRequestedFrameMax();
public void setChannelRpcTimeout(int channelRpcTimeout);
public int getChannelRpcTimeout();
// Network configuration
public void setSocketFactory(SocketFactory factory);
public SocketFactory getSocketFactory();
public void setSocketConfigurator(SocketConfigurator socketConfigurator);
public SocketConfigurator getSocketConfigurator();
// SSL/TLS configuration
public void useSslProtocol() throws NoSuchAlgorithmException, KeyManagementException;
public void useSslProtocol(String protocol) throws NoSuchAlgorithmException, KeyManagementException;
public void useSslProtocol(SSLContext context);
public void setSocketFactory(SSLSocketFactory factory);
// Authentication
public void setSaslConfig(SaslConfig saslConfig);
public SaslConfig getSaslConfig();
// Recovery settings
public void setAutomaticRecoveryEnabled(boolean automaticRecovery);
public boolean isAutomaticRecoveryEnabled();
public void setNetworkRecoveryInterval(long networkRecoveryInterval);
public long getNetworkRecoveryInterval();
public void setRecoveryDelayHandler(RecoveryDelayHandler recoveryDelayHandler);
public RecoveryDelayHandler getRecoveryDelayHandler();
// Advanced settings
public void setExceptionHandler(ExceptionHandler exceptionHandler);
public ExceptionHandler getExceptionHandler();
public void setMetricsCollector(MetricsCollector metricsCollector);
public MetricsCollector getMetricsCollector();
public void setTrafficListener(TrafficListener trafficListener);
public TrafficListener getTrafficListener();
public void setObservationCollector(ObservationCollector observationCollector);
public ObservationCollector getObservationCollector();
// Credentials and authentication
public void setCredentialsProvider(CredentialsProvider credentialsProvider);
public CredentialsProvider getCredentialsProvider();
public void setCredentialsRefreshService(CredentialsRefreshService credentialsRefreshService);
public CredentialsRefreshService getCredentialsRefreshService();
// SSL Context Factory
public void setSslContextFactory(SslContextFactory sslContextFactory);
public SslContextFactory getSslContextFactory();
public void enableHostnameVerification();
// Topology Recovery
public void setTopologyRecoveryEnabled(boolean topologyRecovery);
public boolean isTopologyRecoveryEnabled();
public void setTopologyRecoveryExecutor(ExecutorService executor);
public ExecutorService getTopologyRecoveryExecutor();
// Channel configuration
public void setChannelShouldCheckRpcResponseType(boolean channelShouldCheckRpcResponseType);
public boolean isChannelShouldCheckRpcResponseType();
// Work pool configuration
public void setWorkPoolTimeout(int workPoolTimeout);
public int getWorkPoolTimeout();
// Error handling
public void setErrorOnWriteListener(ErrorOnWriteListener errorOnWriteListener);
public ErrorOnWriteListener getErrorOnWriteListener();
// Message size limits
public void setMaxInboundMessageBodySize(int maxInboundMessageBodySize);
public int getMaxInboundMessageBodySize();
// NIO configuration
public void useNio();
public void useBlockingIo();
public void setNioParams(NioParams nioParams);
public NioParams getNioParams();
// Configuration loading
public void load(Properties properties);
public void load(Properties properties, String prefix);
public void load(String propertyFileLocation) throws IOException;
public void load(String propertyFileLocation, String prefix) throws IOException;
// Cloning
public ConnectionFactory clone();
// Constants
public static final String DEFAULT_USER = "guest";
public static final String DEFAULT_PASS = "guest";
public static final String DEFAULT_VHOST = "/";
public static final String DEFAULT_HOST = "localhost";
public static final int DEFAULT_AMQP_PORT = 5672;
public static final int DEFAULT_AMQP_OVER_SSL_PORT = 5671;
public static final int DEFAULT_CONNECTION_TIMEOUT = 60000;
public static final int DEFAULT_HANDSHAKE_TIMEOUT = 10000;
public static final int DEFAULT_SHUTDOWN_TIMEOUT = 10000;
public static final int DEFAULT_HEARTBEAT = 60;
public static final int DEFAULT_CHANNEL_MAX = 2047;
public static final int DEFAULT_FRAME_MAX = 0;
public static final int DEFAULT_CHANNEL_RPC_TIMEOUT = 600000;
public static final long DEFAULT_NETWORK_RECOVERY_INTERVAL = 5000;
}Usage Examples:
// Basic connection setup
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("user");
factory.setPassword("password");
factory.setVirtualHost("/");
Connection connection = factory.newConnection();// URI-based configuration
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://user:password@localhost:5672/vhost");
Connection connection = factory.newConnection();// SSL connection
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5671);
factory.useSslProtocol();
Connection connection = factory.newConnection();// Connection with custom timeouts and recovery
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setConnectionTimeout(30000);
factory.setHandshakeTimeout(5000);
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(10000);
Connection connection = factory.newConnection();Interface representing a connection to a RabbitMQ broker.
/**
* Public API: Interface to an AMQ connection.
*/
public interface Connection extends Closeable, ShutdownNotifier {
// Channel management
Channel createChannel() throws IOException;
Channel createChannel(int channelNumber) throws IOException;
// Connection state
boolean isOpen();
InetAddress getAddress();
int getPort();
// Connection properties
Map<String, Object> getServerProperties();
Map<String, Object> getClientProperties();
String getClientProvidedName();
// Connection information
int getChannelMax();
int getFrameMax();
int getHeartbeat();
String getId();
// Shutdown and cleanup
void close() throws IOException;
void close(int closeCode, String closeMessage) throws IOException;
void close(int timeout) throws IOException;
void close(int closeCode, String closeMessage, int timeout) throws IOException;
void abort();
void abort(int closeCode, String closeMessage);
void abort(int timeout);
void abort(int closeCode, String closeMessage, int timeout);
// Blocked connection handling
void addBlockedListener(BlockedListener listener);
BlockedListener addBlockedListener(BlockedCallback blockedCallback, UnblockedCallback unblockedCallback);
boolean removeBlockedListener(BlockedListener listener);
void clearBlockedListeners();
}Usage Examples:
// Create and use connection
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
// Check connection state
if (connection.isOpen()) {
System.out.println("Connection is open");
}
// Get server information
Map<String, Object> serverProps = connection.getServerProperties();
System.out.println("Server version: " + serverProps.get("version"));
// Create channels
Channel channel1 = connection.createChannel();
Channel channel2 = connection.createChannel();
// Close connection
connection.close();Interface representing a channel within a connection for AMQP operations.
/**
* Interface to a channel. All non-deprecated methods of this interface are part of the public API.
*/
public interface Channel extends Closeable, ShutdownNotifier {
// Channel information
int getChannelNumber();
Connection getConnection();
boolean isOpen();
// Channel state and flow control
void abort() throws IOException;
void abort(int closeCode, String closeMessage) throws IOException;
void close() throws IOException, TimeoutException;
void close(int closeCode, String closeMessage) throws IOException, TimeoutException;
// Consumer management
Consumer getDefaultConsumer();
void setDefaultConsumer(Consumer consumer);
// Quality of Service (flow control)
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
void basicQos(int prefetchCount, boolean global) throws IOException;
void basicQos(int prefetchCount) throws IOException;
// Exchange operations
AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;
AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;
AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException;
AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;
void exchangeDeclareNoWait(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;
AMQP.Exchange.DeclareOk exchangeDeclarePassive(String exchange) throws IOException;
AMQP.Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;
AMQP.Exchange.DeleteOk exchangeDelete(String exchange) throws IOException;
void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;
AMQP.Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException;
AMQP.Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
AMQP.Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey) throws IOException;
AMQP.Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
void exchangeUnbindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
// Queue operations
AMQP.Queue.DeclareOk queueDeclare() throws IOException;
AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
AMQP.Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;
AMQP.Queue.DeleteOk queueDelete(String queue) throws IOException;
AMQP.Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;
AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException;
AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
AMQP.Queue.PurgeOk queuePurge(String queue) throws IOException;
int messageCount(String queue) throws IOException;
int consumerCount(String queue) throws IOException;
// Message publishing
void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, AMQP.BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, AMQP.BasicProperties props, byte[] body) throws IOException;
// Message consuming
GetResponse basicGet(String queue, boolean autoAck) throws IOException;
String basicConsume(String queue, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
void basicCancel(String consumerTag) throws IOException;
// Message acknowledgment
void basicAck(long deliveryTag, boolean multiple) throws IOException;
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
void basicReject(long deliveryTag, boolean requeue) throws IOException;
AMQP.Basic.RecoverOk basicRecover() throws IOException;
AMQP.Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
// Transaction support
AMQP.Tx.SelectOk txSelect() throws IOException;
AMQP.Tx.CommitOk txCommit() throws IOException;
AMQP.Tx.RollbackOk txRollback() throws IOException;
// Publisher confirms
AMQP.Confirm.SelectOk confirmSelect() throws IOException;
long getNextPublishSeqNo();
boolean waitForConfirms() throws InterruptedException;
boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException;
void waitForConfirmsOrDie() throws IOException, InterruptedException;
void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;
// Listener management
void addReturnListener(ReturnListener listener);
ReturnListener addReturnListener(ReturnCallback returnCallback);
boolean removeReturnListener(ReturnListener listener);
void clearReturnListeners();
void addConfirmListener(ConfirmListener listener);
ConfirmListener addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback);
boolean removeConfirmListener(ConfirmListener listener);
void clearConfirmListeners();
// Low-level operations
Method rpc(Method method) throws IOException, ShutdownSignalException;
void asyncRpc(Method method) throws IOException;
CompletableFuture<Command> asyncCompletableRpc(Method method) throws IOException;
}Interface for connections that support automatic recovery from network failures.
/**
* Connection that can automatically recover from network failures.
*/
public interface RecoverableConnection extends Connection {
void addRecoveryListener(RecoveryListener listener);
void removeRecoveryListener(RecoveryListener listener);
}Interface for channels that support automatic recovery.
/**
* Channel that can automatically recover from network failures.
*/
public interface RecoverableChannel extends Channel {
// Inherits recovery capabilities from the connection
}Usage Examples:
// Working with recoverable connections
ConnectionFactory factory = new ConnectionFactory();
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
RecoverableConnection connection = (RecoverableConnection) factory.newConnection();
// Add recovery listener
connection.addRecoveryListener(new RecoveryListener() {
@Override
public void handleRecovery(Recoverable recoverable) {
System.out.println("Connection recovered!");
}
@Override
public void handleRecoveryStarted(Recoverable recoverable) {
System.out.println("Recovery started...");
}
});
RecoverableChannel channel = (RecoverableChannel) connection.createChannel();// Shutdown notification support
public interface ShutdownNotifier {
void addShutdownListener(ShutdownListener listener);
void removeShutdownListener(ShutdownListener listener);
ShutdownSignalException getCloseReason();
}
public interface ShutdownListener {
void shutdownCompleted(ShutdownSignalException cause);
}
// Blocked connection support
public interface BlockedListener {
void handleBlocked(String reason) throws IOException;
void handleUnblocked() throws IOException;
}
@FunctionalInterface
public interface BlockedCallback {
void handle(String reason) throws IOException;
}
@FunctionalInterface
public interface UnblockedCallback {
void handle() throws IOException;
}
// Recovery support
public interface RecoveryListener {
void handleRecovery(Recoverable recoverable);
void handleRecoveryStarted(Recoverable recoverable);
}
public interface Recoverable {
void addRecoveryListener(RecoveryListener listener);
void removeRecoveryListener(RecoveryListener listener);
}Install with Tessl CLI
npx tessl i tessl/maven-com-rabbitmq--amqp-client