CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-rabbitmq--amqp-client

The RabbitMQ Java client library allows Java applications to interface with RabbitMQ message broker servers

Pending
Overview
Eval results
Files

consumer-api.mddocs/

Consumer API

Interfaces and implementations for consuming messages asynchronously with callbacks. The Consumer API provides both functional callback interfaces and full Consumer implementations for handling message delivery, cancellation, and shutdown events.

Capabilities

Functional Callback Interfaces

Modern callback interfaces for handling message delivery and consumer events.

/**
 * Callback interface for message delivery
 */
@FunctionalInterface
public interface DeliverCallback {
    /**
     * Called when a message is delivered to the consumer
     * @param consumerTag - Consumer identifier
     * @param delivery - Message delivery information
     */
    void handle(String consumerTag, Delivery delivery) throws IOException;
}

/**
 * Callback interface for consumer cancellation
 */  
@FunctionalInterface
public interface CancelCallback {
    /**
     * Called when the consumer is cancelled
     * @param consumerTag - Consumer identifier that was cancelled
     */
    void handle(String consumerTag) throws IOException;
}

/**
 * Callback interface for consumer shutdown signals
 */
@FunctionalInterface
public interface ConsumerShutdownSignalCallback {
    /**
     * Called when the consumer receives a shutdown signal
     * @param consumerTag - Consumer identifier
     * @param sig - Shutdown signal with reason and details
     */
    void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);
}

Usage Examples:

// Lambda-based message handling
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println("[" + consumerTag + "] Received: " + message);
    
    // Process based on message properties
    AMQP.BasicProperties props = delivery.getProperties();
    if ("urgent".equals(props.getType())) {
        handleUrgentMessage(message);
    } else {
        handleRegularMessage(message);
    }
};

CancelCallback cancelCallback = consumerTag -> {
    System.out.println("Consumer " + consumerTag + " was cancelled");
    // Cleanup resources
    cleanupConsumerResources(consumerTag);
};

ConsumerShutdownSignalCallback shutdownCallback = (consumerTag, sig) -> {
    System.out.println("Consumer " + consumerTag + " shutdown: " + sig.getReason());
    if (!sig.isInitiatedByApplication()) {
        // Handle unexpected shutdown
        scheduleReconnection();
    }
};

// Start consuming with callbacks
String consumerTag = channel.basicConsume("work.queue", false, 
    deliverCallback, cancelCallback, shutdownCallback);
// Method reference usage
public class MessageProcessor {
    public void handleDelivery(String consumerTag, Delivery delivery) throws IOException {
        // Process message
        String message = new String(delivery.getBody(), "UTF-8");
        processBusinessLogic(message);
        
        // Manual acknowledgment
        Channel channel = ((Consumer) this).getChannel();
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
    
    public void handleCancel(String consumerTag) throws IOException {
        System.out.println("Consumer cancelled: " + consumerTag);
    }
}

MessageProcessor processor = new MessageProcessor();
channel.basicConsume("queue", false, processor::handleDelivery, processor::handleCancel);

Consumer Interface

Full consumer interface for handling all consumer events.

/**
 * Interface for implementing message consumers
 */
public interface Consumer {
    /**
     * Called when a message is delivered
     * @param consumerTag - Consumer identifier
     * @param envelope - Delivery envelope with routing info
     * @param properties - Message properties
     * @param body - Message body
     */
    void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException;
    
    /**
     * Called when the consumer is cancelled by the server
     * @param consumerTag - Consumer identifier
     */
    void handleCancel(String consumerTag) throws IOException;
    
    /**
     * Called when basicCancel is called
     * @param consumerTag - Consumer identifier  
     */
    void handleCancelOk(String consumerTag);
    
    /**
     * Called when the consumer is registered
     * @param consumerTag - Consumer identifier
     */
    void handleConsumeOk(String consumerTag);
    
    /**
     * Called when basicRecover is called
     * @param consumerTag - Consumer identifier
     */
    void handleRecoverOk(String consumerTag);
    
    /**
     * Called when a shutdown signal is received
     * @param consumerTag - Consumer identifier
     * @param sig - Shutdown signal
     */
    void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);
    
    /**
     * Get the consumer tag
     * @return Consumer identifier
     */
    String getConsumerTag();
}

DefaultConsumer

Default implementation of the Consumer interface providing base functionality.

/**
 * Default implementation of Consumer interface.
 * Extends this class and override methods as needed.
 */
public class DefaultConsumer implements Consumer {
    /**
     * Constructor taking a channel
     * @param channel - Channel for this consumer
     */
    public DefaultConsumer(Channel channel);
    
    /**
     * Get the channel this consumer is associated with
     * @return Channel instance
     */
    public Channel getChannel();
    
    /**
     * Get the consumer tag
     * @return Consumer tag string
     */
    public String getConsumerTag();
    
    /**
     * Set the consumer tag (called by the library)
     * @param consumerTag - Consumer identifier
     */
    public void setConsumerTag(String consumerTag);
    
    // Default implementations of Consumer interface methods
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException;
    public void handleCancel(String consumerTag) throws IOException;
    public void handleCancelOk(String consumerTag);
    public void handleConsumeOk(String consumerTag);
    public void handleRecoverOk(String consumerTag);
    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);
}

Usage Examples:

// Custom consumer extending DefaultConsumer
public class WorkQueueConsumer extends DefaultConsumer {
    
    public WorkQueueConsumer(Channel channel) {
        super(channel);
    }
    
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, 
                              AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        
        try {
            // Simulate work
            processWork(message);
            
            // Acknowledge successful processing
            getChannel().basicAck(envelope.getDeliveryTag(), false);
            
        } catch (Exception e) {
            System.err.println("Error processing message: " + e.getMessage());
            
            // Reject and requeue for retry
            getChannel().basicNack(envelope.getDeliveryTag(), false, true);
        }
    }
    
    @Override
    public void handleCancel(String consumerTag) throws IOException {
        System.out.println("Consumer cancelled: " + consumerTag);
        // Perform cleanup
        cleanup();
    }
    
    @Override
    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
        System.out.println("Consumer shutdown: " + sig.getReason());
        if (!sig.isInitiatedByApplication()) {
            // Handle unexpected shutdown
            reconnect();
        }
    }
    
    private void processWork(String message) throws Exception {
        // Business logic here
        Thread.sleep(1000); // Simulate processing time
        System.out.println("Processed: " + message);
    }
    
    private void cleanup() {
        // Cleanup resources
    }
    
    private void reconnect() {
        // Reconnection logic
    }
}

// Use the custom consumer
Channel channel = connection.createChannel();
WorkQueueConsumer consumer = new WorkQueueConsumer(channel);
channel.basicConsume("work.queue", false, consumer);
// Simple consumer for logging messages
public class LoggingConsumer extends DefaultConsumer {
    
    public LoggingConsumer(Channel channel) {
        super(channel);
    }
    
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                              AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        String routingKey = envelope.getRoutingKey();
        String exchange = envelope.getExchange();
        
        System.out.printf("[%s] %s->%s: %s%n", 
            consumerTag, exchange, routingKey, message);
            
        // Auto-acknowledge for logging (fire-and-forget)
        getChannel().basicAck(envelope.getDeliveryTag(), false);
    }
}

channel.basicConsume("logs.queue", false, new LoggingConsumer(channel));

Consumer Exception Handling

/**
 * Exception thrown when a consumer is cancelled
 */
public class ConsumerCancelledException extends RuntimeException {
    public ConsumerCancelledException();
}

Usage Examples:

// Handling consumer cancellation in application code
try {
    // Consumer processing logic
    while (isRunning) {
        // Process messages or wait
        Thread.sleep(1000);
    }
} catch (ConsumerCancelledException e) {
    System.out.println("Consumer was cancelled");
    // Restart consumer or exit gracefully
    restartConsumer();
}

Advanced Consumer Patterns

Multi-threaded Consumer:

public class ThreadedConsumer extends DefaultConsumer {
    private final ExecutorService executor;
    
    public ThreadedConsumer(Channel channel, ExecutorService executor) {
        super(channel);
        this.executor = executor;
    }
    
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                              AMQP.BasicProperties properties, byte[] body) throws IOException {
        
        // Submit to thread pool for processing
        executor.submit(() -> {
            try {
                String message = new String(body, "UTF-8");
                processMessage(message);
                
                // Acknowledge in the callback thread
                getChannel().basicAck(envelope.getDeliveryTag(), false);
                
            } catch (Exception e) {
                try {
                    getChannel().basicNack(envelope.getDeliveryTag(), false, true);
                } catch (IOException ex) {
                    System.err.println("Failed to nack message: " + ex.getMessage());
                }
            }
        });
    }
}

Retry Logic Consumer:

public class RetryConsumer extends DefaultConsumer {
    private static final int MAX_RETRIES = 3;
    
    public RetryConsumer(Channel channel) {
        super(channel);
    }
    
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                              AMQP.BasicProperties properties, byte[] body) throws IOException {
        
        Map<String, Object> headers = properties.getHeaders();
        int retryCount = headers != null && headers.containsKey("x-retry-count") ? 
            (Integer) headers.get("x-retry-count") : 0;
        
        try {
            String message = new String(body, "UTF-8");
            processMessage(message);
            getChannel().basicAck(envelope.getDeliveryTag(), false);
            
        } catch (Exception e) {
            if (retryCount < MAX_RETRIES) {
                // Republish with incremented retry count
                republishWithRetry(envelope, properties, body, retryCount + 1);
                getChannel().basicAck(envelope.getDeliveryTag(), false);
            } else {
                // Send to dead letter queue or discard
                getChannel().basicNack(envelope.getDeliveryTag(), false, false);
            }
        }
    }
    
    private void republishWithRetry(Envelope envelope, AMQP.BasicProperties props, 
                                   byte[] body, int retryCount) throws IOException {
        
        Map<String, Object> headers = new HashMap<>(props.getHeaders() != null ? props.getHeaders() : new HashMap<>());
        headers.put("x-retry-count", retryCount);
        
        AMQP.BasicProperties newProps = new AMQP.BasicProperties.Builder()
            .headers(headers)
            .contentType(props.getContentType())
            .deliveryMode(props.getDeliveryMode())
            .build();
            
        // Republish to retry exchange/queue
        getChannel().basicPublish("retry.exchange", envelope.getRoutingKey(), newProps, body);
    }
}

Types

Consumer-Related Types

// Consumer tag management
public interface ConsumerTagSupplier {
    String get();
}

// Message delivery data
public class Delivery {
    public Envelope getEnvelope();
    public AMQP.BasicProperties getProperties();
    public byte[] getBody();
}

// Envelope with message routing information
public class Envelope {
    public long getDeliveryTag();
    public boolean isRedeliver();
    public String getExchange();
    public String getRoutingKey();
}

Install with Tessl CLI

npx tessl i tessl/maven-com-rabbitmq--amqp-client

docs

configuration.md

confirms-returns.md

connection-channel.md

consumer-api.md

consuming.md

error-recovery.md

index.md

observability.md

publishing.md

rpc.md

tile.json