CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-rabbitmq--amqp-client

The RabbitMQ Java client library allows Java applications to interface with RabbitMQ message broker servers

Pending
Overview
Eval results
Files

publishing.mddocs/

Message Publishing

Operations for publishing messages to exchanges and managing exchange topology. Publishing is the primary way to send messages into RabbitMQ, where they are routed to queues based on exchange type and routing rules.

Capabilities

Basic Publishing

Core message publishing functionality for sending messages to exchanges.

/**
 * Publish a message to an exchange with routing key
 * @param exchange - Exchange name (empty string for default exchange)
 * @param routingKey - Routing key for message routing
 * @param props - Message properties (headers, content type, etc.)
 * @param body - Message body as byte array
 */
void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) throws IOException;

/**
 * Publish a message with mandatory flag
 * @param exchange - Exchange name
 * @param routingKey - Routing key
 * @param mandatory - If true, message will be returned if unroutable
 * @param props - Message properties
 * @param body - Message body
 */
void basicPublish(String exchange, String routingKey, boolean mandatory, AMQP.BasicProperties props, byte[] body) throws IOException;

/**
 * Publish a message with both mandatory and immediate flags
 * @param exchange - Exchange name
 * @param routingKey - Routing key
 * @param mandatory - If true, message will be returned if unroutable
 * @param immediate - If true, message will be returned if no consumer can immediately receive it (deprecated)
 * @param props - Message properties
 * @param body - Message body
 */
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, AMQP.BasicProperties props, byte[] body) throws IOException;

Usage Examples:

// Basic message publishing
Channel channel = connection.createChannel();
String message = "Hello World!";
channel.basicPublish("", "hello", null, message.getBytes("UTF-8"));
// Publishing with message properties
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
    .contentType("text/plain")
    .deliveryMode(2) // persistent
    .priority(1)
    .correlationId("abc123")
    .replyTo("reply.queue")
    .expiration("60000") // TTL in milliseconds
    .messageId("msg001")
    .timestamp(new Date())
    .type("notification")
    .userId("user123")
    .appId("myapp")
    .build();
    
channel.basicPublish("my.exchange", "routing.key", props, message.getBytes());
// Publishing with mandatory flag (returns message if unroutable)
channel.addReturnListener(returnMessage -> {
    System.out.println("Message returned: " + returnMessage.getReplyText());
});

channel.basicPublish("my.exchange", "nonexistent.key", true, props, message.getBytes());

Exchange Management

Operations for declaring, deleting, and binding exchanges.

/**
 * Declare an exchange
 * @param exchange - Exchange name
 * @param type - Exchange type (direct, fanout, topic, headers)
 * @return Exchange.DeclareOk confirmation
 */
AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;

/**
 * Declare an exchange with full options
 * @param exchange - Exchange name
 * @param type - Exchange type
 * @param durable - Exchange survives server restarts
 * @param autoDelete - Exchange deleted when no longer in use
 * @param arguments - Optional arguments for exchange configuration
 */
AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException;

/**
 * Declare an exchange with all options
 * @param exchange - Exchange name
 * @param type - Exchange type
 * @param durable - Exchange survives server restarts
 * @param autoDelete - Exchange deleted when no longer in use
 * @param internal - Exchange cannot be directly published to by clients
 * @param arguments - Optional arguments
 */
AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;

/**
 * Declare an exchange passively (check if exists without creating)
 * @param exchange - Exchange name
 */
AMQP.Exchange.DeclareOk exchangeDeclarePassive(String exchange) throws IOException;

/**
 * Delete an exchange
 * @param exchange - Exchange name
 */
AMQP.Exchange.DeleteOk exchangeDelete(String exchange) throws IOException;

/**
 * Delete an exchange with conditions
 * @param exchange - Exchange name
 * @param ifUnused - Only delete if exchange has no bindings
 */
AMQP.Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;

/**
 * Bind one exchange to another
 * @param destination - Destination exchange name
 * @param source - Source exchange name
 * @param routingKey - Routing key for binding
 */
AMQP.Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException;

/**
 * Bind exchange with arguments
 * @param destination - Destination exchange
 * @param source - Source exchange
 * @param routingKey - Routing key
 * @param arguments - Binding arguments
 */
AMQP.Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;

/**
 * Unbind one exchange from another
 * @param destination - Destination exchange
 * @param source - Source exchange
 * @param routingKey - Routing key to unbind
 */
AMQP.Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey) throws IOException;

/**
 * Unbind exchange with arguments
 * @param destination - Destination exchange
 * @param source - Source exchange
 * @param routingKey - Routing key
 * @param arguments - Binding arguments to match
 */
AMQP.Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;

Usage Examples:

// Declare different types of exchanges
Channel channel = connection.createChannel();

// Direct exchange (default type)
channel.exchangeDeclare("my.direct", "direct", true, false, null);

// Topic exchange for pattern matching
channel.exchangeDeclare("my.topic", "topic", true, false, null);

// Fanout exchange for broadcasting
channel.exchangeDeclare("my.fanout", "fanout", true, false, null);

// Headers exchange for header-based routing
Map<String, Object> args = new HashMap<>();
args.put("x-match", "all");
channel.exchangeDeclare("my.headers", "headers", true, false, args);
// Exchange to exchange binding
channel.exchangeDeclare("source.exchange", "topic", true, false, null);
channel.exchangeDeclare("dest.exchange", "direct", true, false, null);

// Bind exchanges
channel.exchangeBind("dest.exchange", "source.exchange", "routing.pattern.*");

// Unbind when no longer needed
channel.exchangeUnbind("dest.exchange", "source.exchange", "routing.pattern.*");
// Delete exchanges
channel.exchangeDelete("temporary.exchange");
channel.exchangeDelete("conditional.exchange", true); // only if unused

Built-in Exchange Types

/**
 * Enumeration of built-in exchange types
 */
public enum BuiltinExchangeType {
    DIRECT("direct"),
    FANOUT("fanout"), 
    TOPIC("topic"),
    HEADERS("headers");
    
    private final String type;
    
    public String getType();
}

Usage Examples:

// Using built-in exchange types
channel.exchangeDeclare("my.exchange", BuiltinExchangeType.TOPIC.getType(), true, false, null);

// Alternative direct usage
channel.exchangeDeclare("direct.exchange", "direct", true, false, null);

Message Properties

/**
 * Message properties builder for creating AMQP.BasicProperties
 */
public static class AMQP.BasicProperties.Builder {
    public Builder contentType(String contentType);
    public Builder contentEncoding(String contentEncoding);
    public Builder headers(Map<String, Object> headers);
    public Builder deliveryMode(Integer deliveryMode); // 1=non-persistent, 2=persistent
    public Builder priority(Integer priority);
    public Builder correlationId(String correlationId);
    public Builder replyTo(String replyTo);
    public Builder expiration(String expiration); // TTL in milliseconds as string
    public Builder messageId(String messageId);
    public Builder timestamp(Date timestamp);
    public Builder type(String type);
    public Builder userId(String userId);
    public Builder appId(String appId);
    public Builder clusterId(String clusterId);
    
    public AMQP.BasicProperties build();
}

/**
 * Pre-built message properties for common use cases
 */
public class MessageProperties {
    public static final AMQP.BasicProperties MINIMAL_BASIC;
    public static final AMQP.BasicProperties MINIMAL_PERSISTENT_BASIC;
    public static final AMQP.BasicProperties BASIC;
    public static final AMQP.BasicProperties PERSISTENT_BASIC;
    public static final AMQP.BasicProperties TEXT_PLAIN;
    public static final AMQP.BasicProperties PERSISTENT_TEXT_PLAIN;
}

Usage Examples:

// Using pre-built properties
channel.basicPublish("", "queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

// Custom properties with builder
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
    .contentType("application/json")
    .deliveryMode(2) // persistent
    .headers(Map.of("version", "1.0", "priority", "high"))
    .correlationId(UUID.randomUUID().toString())
    .timestamp(new Date())
    .build();

channel.basicPublish("api.exchange", "user.created", props, jsonPayload.getBytes());

Types

Exchange and Publishing Types

// Exchange operation results
public static class AMQP.Exchange {
    public static class DeclareOk {
        // Confirmation of exchange declaration
    }
    
    public static class DeleteOk {
        // Confirmation of exchange deletion  
    }
    
    public static class BindOk {
        // Confirmation of exchange binding
    }
    
    public static class UnbindOk {
        // Confirmation of exchange unbinding
    }
}

// Message properties class
public class AMQP.BasicProperties {
    public String getContentType();
    public String getContentEncoding();
    public Map<String, Object> getHeaders();
    public Integer getDeliveryMode(); // 1=non-persistent, 2=persistent
    public Integer getPriority();
    public String getCorrelationId();
    public String getReplyTo();
    public String getExpiration();
    public String getMessageId();
    public Date getTimestamp();
    public String getType();
    public String getUserId();
    public String getAppId();
    public String getClusterId();
}

Install with Tessl CLI

npx tessl i tessl/maven-com-rabbitmq--amqp-client

docs

configuration.md

confirms-returns.md

connection-channel.md

consumer-api.md

consuming.md

error-recovery.md

index.md

observability.md

publishing.md

rpc.md

tile.json