CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-rabbitmq--amqp-client

The RabbitMQ Java client library allows Java applications to interface with RabbitMQ message broker servers

Pending
Overview
Eval results
Files

confirms-returns.mddocs/

Publisher Confirms and Returns

Mechanisms for reliable message publishing with publisher confirms and handling returned messages. These features provide acknowledgment that messages have been received and processed by the broker, and notification when messages cannot be routed to queues.

Capabilities

Publisher Confirms

Mechanism for getting acknowledgments from the broker that messages have been received and processed.

/**
 * Enable publisher confirms on this channel
 * All subsequently published messages will be confirmed
 */
void confirmSelect() throws IOException;

/**
 * Wait for all outstanding confirms to be received
 * @return true if all confirms received, false if timeout
 */
boolean waitForConfirms() throws InterruptedException;

/**
 * Wait for confirms with timeout
 * @param timeout - Maximum time to wait in milliseconds
 * @return true if all confirms received within timeout
 */
boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException;

/**
 * Wait for at least one confirm to be received
 * Throws exception if any message is nacked
 */
void waitForConfirmsOrDie() throws IOException, InterruptedException;

/**
 * Wait for confirms with timeout, throw exception on nack
 * @param timeout - Maximum time to wait in milliseconds
 */
void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;

/**
 * Get the next publish sequence number
 * @return Sequence number for next published message
 */
long getNextPublishSeqNo();

/**
 * Add a confirm listener for asynchronous confirm handling
 * @param listener - Listener to receive confirm/nack notifications
 */
void addConfirmListener(ConfirmListener listener);

/**
 * Remove a confirm listener
 * @param listener - Listener to remove
 * @return true if listener was removed
 */
boolean removeConfirmListener(ConfirmListener listener);

/**
 * Clear all confirm listeners
 */
void clearConfirmListeners();

Usage Examples:

// Synchronous confirms - wait for each message
Channel channel = connection.createChannel();
channel.confirmSelect();

for (int i = 0; i < 1000; i++) {
    String message = "Message " + i;
    channel.basicPublish("exchange", "routing.key", null, message.getBytes());
    
    // Wait for this message to be confirmed
    if (channel.waitForConfirms(5000)) {
        System.out.println("Message " + i + " confirmed");
    } else {
        System.out.println("Message " + i + " not confirmed within timeout");
    }
}
// Batch confirms - publish multiple messages then wait
channel.confirmSelect();
int batchSize = 100;

for (int i = 0; i < 1000; i++) {
    String message = "Message " + i;
    channel.basicPublish("exchange", "routing.key", null, message.getBytes());
    
    if ((i + 1) % batchSize == 0) {
        // Wait for batch to be confirmed
        try {
            channel.waitForConfirmsOrDie(10000);
            System.out.println("Batch " + ((i + 1) / batchSize) + " confirmed");
        } catch (IOException e) {
            System.out.println("Batch failed: " + e.getMessage());
        }
    }
}

Asynchronous Confirm Handling

Interfaces and callbacks for handling confirms asynchronously without blocking.

/**
 * Listener interface for publisher confirms
 */
public interface ConfirmListener {
    /**
     * Called when message(s) are acknowledged by broker
     * @param deliveryTag - Delivery tag of confirmed message
     * @param multiple - True if multiple messages confirmed (up to and including deliveryTag)
     */
    void handleAck(long deliveryTag, boolean multiple) throws IOException;
    
    /**
     * Called when message(s) are rejected by broker
     * @param deliveryTag - Delivery tag of rejected message
     * @param multiple - True if multiple messages rejected (up to and including deliveryTag)  
     */
    void handleNack(long deliveryTag, boolean multiple) throws IOException;
}

/**
 * Functional interface for confirm acknowledgments
 */
@FunctionalInterface
public interface ConfirmCallback {
    /**
     * Handle confirm acknowledgment
     * @param deliveryTag - Delivery tag of confirmed message
     * @param multiple - True if multiple messages confirmed
     */
    void handle(long deliveryTag, boolean multiple) throws IOException;
}

Usage Examples:

// Asynchronous confirms with listener
channel.confirmSelect();

channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        if (multiple) {
            System.out.println("Messages up to " + deliveryTag + " confirmed");
            // Remove confirmed messages from tracking
            removeConfirmedMessages(deliveryTag);
        } else {
            System.out.println("Message " + deliveryTag + " confirmed");
            removeConfirmedMessage(deliveryTag);
        }
    }
    
    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        if (multiple) {
            System.out.println("Messages up to " + deliveryTag + " rejected");
            // Handle rejected messages
            handleRejectedMessages(deliveryTag);
        } else {
            System.out.println("Message " + deliveryTag + " rejected");
            handleRejectedMessage(deliveryTag);
        }
    }
});

// Publish with tracking
Map<Long, String> outstandingConfirms = new ConcurrentHashMap<>();

for (int i = 0; i < 1000; i++) {
    String message = "Message " + i;
    long deliveryTag = channel.getNextPublishSeqNo();
    outstandingConfirms.put(deliveryTag, message);
    
    channel.basicPublish("exchange", "routing.key", null, message.getBytes());
}
// Using functional confirm callbacks
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
    System.out.println("Acked: " + deliveryTag + " (multiple: " + multiple + ")");
    confirmTracker.handleAck(deliveryTag, multiple);
};

ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
    System.out.println("Nacked: " + deliveryTag + " (multiple: " + multiple + ")");
    confirmTracker.handleNack(deliveryTag, multiple);
    // Retry logic here
};

channel.addConfirmListener(ackCallback, nackCallback);

Message Returns

Handling of messages that cannot be routed to any queue (when published with mandatory flag).

/**
 * Add a return listener to handle returned messages
 * @param listener - Listener to receive returned messages
 */
void addReturnListener(ReturnListener listener);

/**
 * Remove a return listener
 * @param listener - Listener to remove
 * @return true if listener was removed
 */
boolean removeReturnListener(ReturnListener listener);

/**
 * Clear all return listeners
 */
void clearReturnListeners();

/**
 * Listener interface for returned messages
 */
public interface ReturnListener {
    /**
     * Called when a message is returned by the broker
     * @param replyCode - Reply code indicating why message was returned
     * @param replyText - Human-readable reply text
     * @param exchange - Exchange the message was published to
     * @param routingKey - Routing key used for publishing
     * @param properties - Message properties
     * @param body - Message body
     */
    void handleReturn(int replyCode, String replyText, String exchange, String routingKey, 
                      AMQP.BasicProperties properties, byte[] body) throws IOException;
}

/**
 * Functional interface for handling returned messages  
 */
@FunctionalInterface
public interface ReturnCallback {
    /**
     * Handle returned message
     * @param returnMessage - Return information
     */
    void handle(Return returnMessage) throws IOException;
}

/**
 * Class representing a returned message
 */
public class Return {
    public int getReplyCode();
    public String getReplyText();
    public String getExchange();
    public String getRoutingKey();
    public AMQP.BasicProperties getProperties();
    public byte[] getBody();
}

Usage Examples:

// Handle returned messages with listener
channel.addReturnListener(new ReturnListener() {
    @Override
    public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
                           AMQP.BasicProperties properties, byte[] body) throws IOException {
        
        String message = new String(body, "UTF-8");
        System.out.printf("Message returned: %d - %s%n", replyCode, replyText);
        System.out.printf("Exchange: %s, Routing Key: %s%n", exchange, routingKey);
        System.out.printf("Message: %s%n", message);
        
        // Handle based on reason
        switch (replyCode) {
            case 312: // NO_ROUTE
                System.out.println("No route found for message");
                // Try alternative routing or store for retry
                handleUnroutableMessage(exchange, routingKey, properties, body);
                break;
            case 313: // NO_CONSUMERS  
                System.out.println("No consumers available");
                // Queue exists but no consumers
                handleNoConsumers(exchange, routingKey, properties, body);
                break;
            default:
                System.out.println("Unknown return reason: " + replyCode);
                handleUnknownReturn(replyCode, replyText, properties, body);
        }
    }
});

// Publish with mandatory flag
String message = "Important message";
boolean mandatory = true; // Return if unroutable
channel.basicPublish("my.exchange", "nonexistent.key", mandatory, null, message.getBytes());
// Using functional return callback
ReturnCallback returnCallback = returnMessage -> {
    System.out.println("Returned: " + returnMessage.getReplyText());
    System.out.println("Exchange: " + returnMessage.getExchange());  
    System.out.println("Routing Key: " + returnMessage.getRoutingKey());
    
    String messageBody = new String(returnMessage.getBody(), "UTF-8");
    System.out.println("Message: " + messageBody);
    
    // Store for retry or send to dead letter
    storeForRetry(returnMessage);
};

channel.addReturnListener(returnCallback);

Combined Confirms and Returns

Example showing how to use both confirms and returns together for robust publishing.

Robust Publisher Implementation:

public class RobustPublisher {
    private final Channel channel;
    private final Map<Long, PendingMessage> pendingConfirms = new ConcurrentHashMap<>();
    private final AtomicLong publishSeqNo = new AtomicLong(0);
    
    public RobustPublisher(Channel channel) throws IOException {
        this.channel = channel;
        channel.confirmSelect();
        setupListeners();
    }
    
    private void setupListeners() {
        // Handle confirms
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                if (multiple) {
                    // Remove all confirmed messages up to deliveryTag
                    pendingConfirms.entrySet().removeIf(entry -> entry.getKey() <= deliveryTag);
                } else {
                    pendingConfirms.remove(deliveryTag);
                }
                System.out.println("Confirmed: " + deliveryTag + " (multiple: " + multiple + ")");
            }
            
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                if (multiple) {
                    // Handle all nacked messages up to deliveryTag
                    pendingConfirms.entrySet().stream()
                        .filter(entry -> entry.getKey() <= deliveryTag)
                        .forEach(entry -> handleNackedMessage(entry.getValue()));
                    pendingConfirms.entrySet().removeIf(entry -> entry.getKey() <= deliveryTag);
                } else {
                    PendingMessage message = pendingConfirms.remove(deliveryTag);
                    if (message != null) {
                        handleNackedMessage(message);
                    }
                }
                System.out.println("Nacked: " + deliveryTag + " (multiple: " + multiple + ")");
            }
        });
        
        // Handle returns
        channel.addReturnListener(returnMessage -> {
            System.out.println("Message returned: " + returnMessage.getReplyText());
            
            // Find the corresponding pending message and mark as returned
            String correlationId = returnMessage.getProperties().getCorrelationId();
            if (correlationId != null) {
                handleReturnedMessage(correlationId, returnMessage);
            }
        });
    }
    
    public void publishReliably(String exchange, String routingKey, AMQP.BasicProperties props, 
                               byte[] body) throws IOException {
        
        // Generate correlation ID for tracking
        String correlationId = UUID.randomUUID().toString();
        AMQP.BasicProperties propsWithCorrelation = new AMQP.BasicProperties.Builder()
            .correlationId(correlationId)
            .contentType(props != null ? props.getContentType() : null)
            .deliveryMode(props != null ? props.getDeliveryMode() : null)
            .headers(props != null ? props.getHeaders() : null)
            .build();
        
        // Track the message
        long seqNo = channel.getNextPublishSeqNo();
        PendingMessage pending = new PendingMessage(correlationId, exchange, routingKey, 
                                                   propsWithCorrelation, body, System.currentTimeMillis());
        pendingConfirms.put(seqNo, pending);
        
        // Publish with mandatory flag
        channel.basicPublish(exchange, routingKey, true, propsWithCorrelation, body);
        
        System.out.println("Published message " + seqNo + " with correlation ID: " + correlationId);
    }
    
    private void handleNackedMessage(PendingMessage message) {
        System.out.println("Message nacked: " + message.getCorrelationId());
        // Implement retry logic or dead letter handling
        scheduleRetry(message);
    }
    
    private void handleReturnedMessage(String correlationId, Return returnMessage) {
        System.out.println("Message returned: " + correlationId);
        // Handle unroutable message
        storeInDeadLetter(correlationId, returnMessage);
    }
    
    // Helper class for tracking pending messages
    private static class PendingMessage {
        private final String correlationId;
        private final String exchange;
        private final String routingKey;
        private final AMQP.BasicProperties properties; 
        private final byte[] body;
        private final long timestamp;
        
        // Constructor and getters...
    }
}

Usage:

Channel channel = connection.createChannel();
RobustPublisher publisher = new RobustPublisher(channel);

// Publish messages reliably
for (int i = 0; i < 100; i++) {
    String message = "Message " + i;
    AMQP.BasicProperties props = MessageProperties.PERSISTENT_TEXT_PLAIN;
    
    publisher.publishReliably("my.exchange", "routing.key", props, message.getBytes());
}

// Monitor pending confirms
Thread.sleep(5000);
System.out.println("Pending confirms: " + publisher.getPendingConfirmCount());

Types

Confirm and Return Types

// Return message information
public class Return {
    public int getReplyCode();        // AMQP reply code (312=NO_ROUTE, 313=NO_CONSUMERS)
    public String getReplyText();     // Human-readable explanation
    public String getExchange();      // Exchange message was published to
    public String getRoutingKey();    // Routing key used
    public AMQP.BasicProperties getProperties(); // Message properties
    public byte[] getBody();          // Message body
}

// Common AMQP reply codes for returns
public static final int REPLY_SUCCESS = 200;
public static final int NO_ROUTE = 312;        // No route found for message
public static final int NO_CONSUMERS = 313;    // Queue exists but no consumers

Install with Tessl CLI

npx tessl i tessl/maven-com-rabbitmq--amqp-client

docs

configuration.md

confirms-returns.md

connection-channel.md

consumer-api.md

consuming.md

error-recovery.md

index.md

observability.md

publishing.md

rpc.md

tile.json