CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-rabbitmq--amqp-client

The RabbitMQ Java client library allows Java applications to interface with RabbitMQ message broker servers

Pending
Overview
Eval results
Files

connection-channel.mddocs/

Connection and Channel Management

Core functionality for establishing connections to RabbitMQ brokers and creating channels for AMQP operations. Connections represent the TCP connection to the broker, while channels provide a lightweight way to multiplex multiple conversations over a single connection.

Capabilities

ConnectionFactory

Factory class for creating and configuring connections to RabbitMQ brokers.

/**
 * Factory class to facilitate opening a Connection to a RabbitMQ node.
 * Most connection and socket settings are configured using this factory.
 */
public class ConnectionFactory implements Cloneable {
    
    // Connection creation
    public Connection newConnection() throws IOException, TimeoutException;
    public Connection newConnection(ExecutorService executor) throws IOException, TimeoutException;
    public Connection newConnection(Address[] addrs) throws IOException, TimeoutException;
    public Connection newConnection(ExecutorService executor, Address[] addrs) throws IOException, TimeoutException;
    public Connection newConnection(List<Address> addrs) throws IOException, TimeoutException;
    public Connection newConnection(AddressResolver addressResolver) throws IOException, TimeoutException;
    public Connection newConnection(String connectionName) throws IOException, TimeoutException;

    // Basic connection settings
    public void setHost(String host);
    public String getHost();
    public void setPort(int port);
    public int getPort();
    public void setUsername(String username);
    public String getUsername();
    public void setPassword(String password);
    public String getPassword();
    public void setVirtualHost(String virtualHost);
    public String getVirtualHost();

    // URI-based configuration
    public void setUri(String uri) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException;
    public void setUri(URI uri) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException;
    public String getUri();

    // Timeouts and limits
    public void setConnectionTimeout(int timeout);
    public int getConnectionTimeout();
    public void setHandshakeTimeout(int timeout);
    public int getHandshakeTimeout();
    public void setShutdownTimeout(int shutdownTimeout);
    public int getShutdownTimeout();
    public void setRequestedHeartbeat(int requestedHeartbeat);
    public int getRequestedHeartbeat();
    public void setRequestedChannelMax(int requestedChannelMax);
    public int getRequestedChannelMax();
    public void setRequestedFrameMax(int requestedFrameMax);
    public int getRequestedFrameMax();
    public void setChannelRpcTimeout(int channelRpcTimeout);
    public int getChannelRpcTimeout();

    // Network configuration
    public void setSocketFactory(SocketFactory factory);
    public SocketFactory getSocketFactory();
    public void setSocketConfigurator(SocketConfigurator socketConfigurator);
    public SocketConfigurator getSocketConfigurator();

    // SSL/TLS configuration
    public void useSslProtocol() throws NoSuchAlgorithmException, KeyManagementException;
    public void useSslProtocol(String protocol) throws NoSuchAlgorithmException, KeyManagementException;
    public void useSslProtocol(SSLContext context);
    public void setSocketFactory(SSLSocketFactory factory);

    // Authentication
    public void setSaslConfig(SaslConfig saslConfig);
    public SaslConfig getSaslConfig();

    // Recovery settings
    public void setAutomaticRecoveryEnabled(boolean automaticRecovery);
    public boolean isAutomaticRecoveryEnabled();
    public void setNetworkRecoveryInterval(long networkRecoveryInterval);
    public long getNetworkRecoveryInterval();
    public void setRecoveryDelayHandler(RecoveryDelayHandler recoveryDelayHandler);
    public RecoveryDelayHandler getRecoveryDelayHandler();

    // Advanced settings
    public void setExceptionHandler(ExceptionHandler exceptionHandler);
    public ExceptionHandler getExceptionHandler();
    public void setMetricsCollector(MetricsCollector metricsCollector);
    public MetricsCollector getMetricsCollector();
    public void setTrafficListener(TrafficListener trafficListener);
    public TrafficListener getTrafficListener();
    public void setObservationCollector(ObservationCollector observationCollector);
    public ObservationCollector getObservationCollector();
    
    // Credentials and authentication
    public void setCredentialsProvider(CredentialsProvider credentialsProvider);
    public CredentialsProvider getCredentialsProvider();
    public void setCredentialsRefreshService(CredentialsRefreshService credentialsRefreshService);
    public CredentialsRefreshService getCredentialsRefreshService();
    
    // SSL Context Factory
    public void setSslContextFactory(SslContextFactory sslContextFactory);
    public SslContextFactory getSslContextFactory();
    public void enableHostnameVerification();
    
    // Topology Recovery
    public void setTopologyRecoveryEnabled(boolean topologyRecovery);
    public boolean isTopologyRecoveryEnabled();
    public void setTopologyRecoveryExecutor(ExecutorService executor);
    public ExecutorService getTopologyRecoveryExecutor();
    
    // Channel configuration
    public void setChannelShouldCheckRpcResponseType(boolean channelShouldCheckRpcResponseType);
    public boolean isChannelShouldCheckRpcResponseType();
    
    // Work pool configuration
    public void setWorkPoolTimeout(int workPoolTimeout);
    public int getWorkPoolTimeout();
    
    // Error handling
    public void setErrorOnWriteListener(ErrorOnWriteListener errorOnWriteListener);
    public ErrorOnWriteListener getErrorOnWriteListener();
    
    // Message size limits
    public void setMaxInboundMessageBodySize(int maxInboundMessageBodySize);
    public int getMaxInboundMessageBodySize();
    
    // NIO configuration  
    public void useNio();
    public void useBlockingIo();
    public void setNioParams(NioParams nioParams);
    public NioParams getNioParams();
    
    // Configuration loading
    public void load(Properties properties);
    public void load(Properties properties, String prefix);
    public void load(String propertyFileLocation) throws IOException;
    public void load(String propertyFileLocation, String prefix) throws IOException;
    
    // Cloning
    public ConnectionFactory clone();

    // Constants
    public static final String DEFAULT_USER = "guest";
    public static final String DEFAULT_PASS = "guest";
    public static final String DEFAULT_VHOST = "/";
    public static final String DEFAULT_HOST = "localhost";
    public static final int DEFAULT_AMQP_PORT = 5672;
    public static final int DEFAULT_AMQP_OVER_SSL_PORT = 5671;
    public static final int DEFAULT_CONNECTION_TIMEOUT = 60000;
    public static final int DEFAULT_HANDSHAKE_TIMEOUT = 10000;
    public static final int DEFAULT_SHUTDOWN_TIMEOUT = 10000;
    public static final int DEFAULT_HEARTBEAT = 60;
    public static final int DEFAULT_CHANNEL_MAX = 2047;
    public static final int DEFAULT_FRAME_MAX = 0;
    public static final int DEFAULT_CHANNEL_RPC_TIMEOUT = 600000;
    public static final long DEFAULT_NETWORK_RECOVERY_INTERVAL = 5000;
}

Usage Examples:

// Basic connection setup
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("user");
factory.setPassword("password");
factory.setVirtualHost("/");

Connection connection = factory.newConnection();
// URI-based configuration
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://user:password@localhost:5672/vhost");
Connection connection = factory.newConnection();
// SSL connection
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5671);
factory.useSslProtocol();
Connection connection = factory.newConnection();
// Connection with custom timeouts and recovery
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setConnectionTimeout(30000);
factory.setHandshakeTimeout(5000);
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(10000);

Connection connection = factory.newConnection();

Connection Interface

Interface representing a connection to a RabbitMQ broker.

/**
 * Public API: Interface to an AMQ connection.
 */
public interface Connection extends Closeable, ShutdownNotifier {
    
    // Channel management
    Channel createChannel() throws IOException;
    Channel createChannel(int channelNumber) throws IOException;

    // Connection state
    boolean isOpen();
    InetAddress getAddress();
    int getPort();

    // Connection properties
    Map<String, Object> getServerProperties();
    Map<String, Object> getClientProperties();
    String getClientProvidedName();

    // Connection information
    int getChannelMax();
    int getFrameMax();
    int getHeartbeat();
    String getId();

    // Shutdown and cleanup
    void close() throws IOException;
    void close(int closeCode, String closeMessage) throws IOException;
    void close(int timeout) throws IOException;
    void close(int closeCode, String closeMessage, int timeout) throws IOException;
    void abort();
    void abort(int closeCode, String closeMessage);
    void abort(int timeout);
    void abort(int closeCode, String closeMessage, int timeout);

    // Blocked connection handling
    void addBlockedListener(BlockedListener listener);
    BlockedListener addBlockedListener(BlockedCallback blockedCallback, UnblockedCallback unblockedCallback);
    boolean removeBlockedListener(BlockedListener listener);
    void clearBlockedListeners();
}

Usage Examples:

// Create and use connection
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();

// Check connection state
if (connection.isOpen()) {
    System.out.println("Connection is open");
}

// Get server information
Map<String, Object> serverProps = connection.getServerProperties();
System.out.println("Server version: " + serverProps.get("version"));

// Create channels
Channel channel1 = connection.createChannel();
Channel channel2 = connection.createChannel();

// Close connection
connection.close();

Channel Interface

Interface representing a channel within a connection for AMQP operations.

/**
 * Interface to a channel. All non-deprecated methods of this interface are part of the public API.
 */
public interface Channel extends Closeable, ShutdownNotifier {
    
    // Channel information
    int getChannelNumber();
    Connection getConnection();
    boolean isOpen();

    // Channel state and flow control
    void abort() throws IOException;
    void abort(int closeCode, String closeMessage) throws IOException;
    void close() throws IOException, TimeoutException;
    void close(int closeCode, String closeMessage) throws IOException, TimeoutException;
    
    // Consumer management
    Consumer getDefaultConsumer();
    void setDefaultConsumer(Consumer consumer);
    
    // Quality of Service (flow control)
    void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
    void basicQos(int prefetchCount, boolean global) throws IOException;
    void basicQos(int prefetchCount) throws IOException;
    
    // Exchange operations
    AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;
    AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;
    AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException;
    AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;
    void exchangeDeclareNoWait(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;
    AMQP.Exchange.DeclareOk exchangeDeclarePassive(String exchange) throws IOException;
    AMQP.Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;
    AMQP.Exchange.DeleteOk exchangeDelete(String exchange) throws IOException;
    void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;
    AMQP.Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException;
    AMQP.Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
    void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
    AMQP.Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey) throws IOException;
    AMQP.Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
    void exchangeUnbindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;

    // Queue operations
    AMQP.Queue.DeclareOk queueDeclare() throws IOException;
    AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
    void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
    AMQP.Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;
    AMQP.Queue.DeleteOk queueDelete(String queue) throws IOException;
    AMQP.Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
    void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
    AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;
    AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
    void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
    AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException;
    AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
    AMQP.Queue.PurgeOk queuePurge(String queue) throws IOException;
    int messageCount(String queue) throws IOException;
    int consumerCount(String queue) throws IOException;

    // Message publishing
    void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) throws IOException;
    void basicPublish(String exchange, String routingKey, boolean mandatory, AMQP.BasicProperties props, byte[] body) throws IOException;
    void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, AMQP.BasicProperties props, byte[] body) throws IOException;

    // Message consuming
    GetResponse basicGet(String queue, boolean autoAck) throws IOException;
    String basicConsume(String queue, Consumer callback) throws IOException;
    String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
    String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;
    String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
    String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
    String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
    String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
    String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
    String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
    void basicCancel(String consumerTag) throws IOException;

    // Message acknowledgment
    void basicAck(long deliveryTag, boolean multiple) throws IOException;
    void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
    void basicReject(long deliveryTag, boolean requeue) throws IOException;
    AMQP.Basic.RecoverOk basicRecover() throws IOException;
    AMQP.Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

    // Transaction support
    AMQP.Tx.SelectOk txSelect() throws IOException;
    AMQP.Tx.CommitOk txCommit() throws IOException;
    AMQP.Tx.RollbackOk txRollback() throws IOException;

    // Publisher confirms
    AMQP.Confirm.SelectOk confirmSelect() throws IOException;
    long getNextPublishSeqNo();
    boolean waitForConfirms() throws InterruptedException;
    boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException;
    void waitForConfirmsOrDie() throws IOException, InterruptedException;
    void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;

    // Listener management
    void addReturnListener(ReturnListener listener);
    ReturnListener addReturnListener(ReturnCallback returnCallback);
    boolean removeReturnListener(ReturnListener listener);
    void clearReturnListeners();
    void addConfirmListener(ConfirmListener listener);
    ConfirmListener addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback);
    boolean removeConfirmListener(ConfirmListener listener);
    void clearConfirmListeners();

    // Low-level operations
    Method rpc(Method method) throws IOException, ShutdownSignalException;
    void asyncRpc(Method method) throws IOException;
    CompletableFuture<Command> asyncCompletableRpc(Method method) throws IOException;
}

RecoverableConnection

Interface for connections that support automatic recovery from network failures.

/**
 * Connection that can automatically recover from network failures.
 */
public interface RecoverableConnection extends Connection {
    void addRecoveryListener(RecoveryListener listener);
    void removeRecoveryListener(RecoveryListener listener);
}

RecoverableChannel

Interface for channels that support automatic recovery.

/**
 * Channel that can automatically recover from network failures.
 */
public interface RecoverableChannel extends Channel {
    // Inherits recovery capabilities from the connection
}

Usage Examples:

// Working with recoverable connections
ConnectionFactory factory = new ConnectionFactory();
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);

RecoverableConnection connection = (RecoverableConnection) factory.newConnection();

// Add recovery listener
connection.addRecoveryListener(new RecoveryListener() {
    @Override
    public void handleRecovery(Recoverable recoverable) {
        System.out.println("Connection recovered!");
    }
    
    @Override
    public void handleRecoveryStarted(Recoverable recoverable) {
        System.out.println("Recovery started...");
    }
});

RecoverableChannel channel = (RecoverableChannel) connection.createChannel();

Types

Supporting Types

// Shutdown notification support
public interface ShutdownNotifier {
    void addShutdownListener(ShutdownListener listener);
    void removeShutdownListener(ShutdownListener listener);
    ShutdownSignalException getCloseReason();
}

public interface ShutdownListener {
    void shutdownCompleted(ShutdownSignalException cause);
}

// Blocked connection support
public interface BlockedListener {
    void handleBlocked(String reason) throws IOException;
    void handleUnblocked() throws IOException;
}

@FunctionalInterface
public interface BlockedCallback {
    void handle(String reason) throws IOException;
}

@FunctionalInterface  
public interface UnblockedCallback {
    void handle() throws IOException;
}

// Recovery support
public interface RecoveryListener {
    void handleRecovery(Recoverable recoverable);
    void handleRecoveryStarted(Recoverable recoverable);
}

public interface Recoverable {
    void addRecoveryListener(RecoveryListener listener);
    void removeRecoveryListener(RecoveryListener listener);
}

Install with Tessl CLI

npx tessl i tessl/maven-com-rabbitmq--amqp-client

docs

configuration.md

confirms-returns.md

connection-channel.md

consumer-api.md

consuming.md

error-recovery.md

index.md

observability.md

publishing.md

rpc.md

tile.json