0
# Message Publishing
1
2
Operations for publishing messages to exchanges and managing exchange topology. Publishing is the primary way to send messages into RabbitMQ, where they are routed to queues based on exchange type and routing rules.
3
4
## Capabilities
5
6
### Basic Publishing
7
8
Core message publishing functionality for sending messages to exchanges.
9
10
```java { .api }
11
/**
12
* Publish a message to an exchange with routing key
13
* @param exchange - Exchange name (empty string for default exchange)
14
* @param routingKey - Routing key for message routing
15
* @param props - Message properties (headers, content type, etc.)
16
* @param body - Message body as byte array
17
*/
18
void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) throws IOException;
19
20
/**
21
* Publish a message with mandatory flag
22
* @param exchange - Exchange name
23
* @param routingKey - Routing key
24
* @param mandatory - If true, message will be returned if unroutable
25
* @param props - Message properties
26
* @param body - Message body
27
*/
28
void basicPublish(String exchange, String routingKey, boolean mandatory, AMQP.BasicProperties props, byte[] body) throws IOException;
29
30
/**
31
* Publish a message with both mandatory and immediate flags
32
* @param exchange - Exchange name
33
* @param routingKey - Routing key
34
* @param mandatory - If true, message will be returned if unroutable
35
* @param immediate - If true, message will be returned if no consumer can immediately receive it (deprecated)
36
* @param props - Message properties
37
* @param body - Message body
38
*/
39
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, AMQP.BasicProperties props, byte[] body) throws IOException;
40
```
41
42
**Usage Examples:**
43
44
```java
45
// Basic message publishing
46
Channel channel = connection.createChannel();
47
String message = "Hello World!";
48
channel.basicPublish("", "hello", null, message.getBytes("UTF-8"));
49
```
50
51
```java
52
// Publishing with message properties
53
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
54
.contentType("text/plain")
55
.deliveryMode(2) // persistent
56
.priority(1)
57
.correlationId("abc123")
58
.replyTo("reply.queue")
59
.expiration("60000") // TTL in milliseconds
60
.messageId("msg001")
61
.timestamp(new Date())
62
.type("notification")
63
.userId("user123")
64
.appId("myapp")
65
.build();
66
67
channel.basicPublish("my.exchange", "routing.key", props, message.getBytes());
68
```
69
70
```java
71
// Publishing with mandatory flag (returns message if unroutable)
72
channel.addReturnListener(returnMessage -> {
73
System.out.println("Message returned: " + returnMessage.getReplyText());
74
});
75
76
channel.basicPublish("my.exchange", "nonexistent.key", true, props, message.getBytes());
77
```
78
79
### Exchange Management
80
81
Operations for declaring, deleting, and binding exchanges.
82
83
```java { .api }
84
/**
85
* Declare an exchange
86
* @param exchange - Exchange name
87
* @param type - Exchange type (direct, fanout, topic, headers)
88
* @return Exchange.DeclareOk confirmation
89
*/
90
AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;
91
92
/**
93
* Declare an exchange with full options
94
* @param exchange - Exchange name
95
* @param type - Exchange type
96
* @param durable - Exchange survives server restarts
97
* @param autoDelete - Exchange deleted when no longer in use
98
* @param arguments - Optional arguments for exchange configuration
99
*/
100
AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException;
101
102
/**
103
* Declare an exchange with all options
104
* @param exchange - Exchange name
105
* @param type - Exchange type
106
* @param durable - Exchange survives server restarts
107
* @param autoDelete - Exchange deleted when no longer in use
108
* @param internal - Exchange cannot be directly published to by clients
109
* @param arguments - Optional arguments
110
*/
111
AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;
112
113
/**
114
* Declare an exchange passively (check if exists without creating)
115
* @param exchange - Exchange name
116
*/
117
AMQP.Exchange.DeclareOk exchangeDeclarePassive(String exchange) throws IOException;
118
119
/**
120
* Delete an exchange
121
* @param exchange - Exchange name
122
*/
123
AMQP.Exchange.DeleteOk exchangeDelete(String exchange) throws IOException;
124
125
/**
126
* Delete an exchange with conditions
127
* @param exchange - Exchange name
128
* @param ifUnused - Only delete if exchange has no bindings
129
*/
130
AMQP.Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;
131
132
/**
133
* Bind one exchange to another
134
* @param destination - Destination exchange name
135
* @param source - Source exchange name
136
* @param routingKey - Routing key for binding
137
*/
138
AMQP.Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException;
139
140
/**
141
* Bind exchange with arguments
142
* @param destination - Destination exchange
143
* @param source - Source exchange
144
* @param routingKey - Routing key
145
* @param arguments - Binding arguments
146
*/
147
AMQP.Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
148
149
/**
150
* Unbind one exchange from another
151
* @param destination - Destination exchange
152
* @param source - Source exchange
153
* @param routingKey - Routing key to unbind
154
*/
155
AMQP.Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey) throws IOException;
156
157
/**
158
* Unbind exchange with arguments
159
* @param destination - Destination exchange
160
* @param source - Source exchange
161
* @param routingKey - Routing key
162
* @param arguments - Binding arguments to match
163
*/
164
AMQP.Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
165
```
166
167
**Usage Examples:**
168
169
```java
170
// Declare different types of exchanges
171
Channel channel = connection.createChannel();
172
173
// Direct exchange (default type)
174
channel.exchangeDeclare("my.direct", "direct", true, false, null);
175
176
// Topic exchange for pattern matching
177
channel.exchangeDeclare("my.topic", "topic", true, false, null);
178
179
// Fanout exchange for broadcasting
180
channel.exchangeDeclare("my.fanout", "fanout", true, false, null);
181
182
// Headers exchange for header-based routing
183
Map<String, Object> args = new HashMap<>();
184
args.put("x-match", "all");
185
channel.exchangeDeclare("my.headers", "headers", true, false, args);
186
```
187
188
```java
189
// Exchange to exchange binding
190
channel.exchangeDeclare("source.exchange", "topic", true, false, null);
191
channel.exchangeDeclare("dest.exchange", "direct", true, false, null);
192
193
// Bind exchanges
194
channel.exchangeBind("dest.exchange", "source.exchange", "routing.pattern.*");
195
196
// Unbind when no longer needed
197
channel.exchangeUnbind("dest.exchange", "source.exchange", "routing.pattern.*");
198
```
199
200
```java
201
// Delete exchanges
202
channel.exchangeDelete("temporary.exchange");
203
channel.exchangeDelete("conditional.exchange", true); // only if unused
204
```
205
206
### Built-in Exchange Types
207
208
```java { .api }
209
/**
210
* Enumeration of built-in exchange types
211
*/
212
public enum BuiltinExchangeType {
213
DIRECT("direct"),
214
FANOUT("fanout"),
215
TOPIC("topic"),
216
HEADERS("headers");
217
218
private final String type;
219
220
public String getType();
221
}
222
```
223
224
**Usage Examples:**
225
226
```java
227
// Using built-in exchange types
228
channel.exchangeDeclare("my.exchange", BuiltinExchangeType.TOPIC.getType(), true, false, null);
229
230
// Alternative direct usage
231
channel.exchangeDeclare("direct.exchange", "direct", true, false, null);
232
```
233
234
### Message Properties
235
236
```java { .api }
237
/**
238
* Message properties builder for creating AMQP.BasicProperties
239
*/
240
public static class AMQP.BasicProperties.Builder {
241
public Builder contentType(String contentType);
242
public Builder contentEncoding(String contentEncoding);
243
public Builder headers(Map<String, Object> headers);
244
public Builder deliveryMode(Integer deliveryMode); // 1=non-persistent, 2=persistent
245
public Builder priority(Integer priority);
246
public Builder correlationId(String correlationId);
247
public Builder replyTo(String replyTo);
248
public Builder expiration(String expiration); // TTL in milliseconds as string
249
public Builder messageId(String messageId);
250
public Builder timestamp(Date timestamp);
251
public Builder type(String type);
252
public Builder userId(String userId);
253
public Builder appId(String appId);
254
public Builder clusterId(String clusterId);
255
256
public AMQP.BasicProperties build();
257
}
258
259
/**
260
* Pre-built message properties for common use cases
261
*/
262
public class MessageProperties {
263
public static final AMQP.BasicProperties MINIMAL_BASIC;
264
public static final AMQP.BasicProperties MINIMAL_PERSISTENT_BASIC;
265
public static final AMQP.BasicProperties BASIC;
266
public static final AMQP.BasicProperties PERSISTENT_BASIC;
267
public static final AMQP.BasicProperties TEXT_PLAIN;
268
public static final AMQP.BasicProperties PERSISTENT_TEXT_PLAIN;
269
}
270
```
271
272
**Usage Examples:**
273
274
```java
275
// Using pre-built properties
276
channel.basicPublish("", "queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
277
278
// Custom properties with builder
279
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
280
.contentType("application/json")
281
.deliveryMode(2) // persistent
282
.headers(Map.of("version", "1.0", "priority", "high"))
283
.correlationId(UUID.randomUUID().toString())
284
.timestamp(new Date())
285
.build();
286
287
channel.basicPublish("api.exchange", "user.created", props, jsonPayload.getBytes());
288
```
289
290
## Types
291
292
### Exchange and Publishing Types
293
294
```java { .api }
295
// Exchange operation results
296
public static class AMQP.Exchange {
297
public static class DeclareOk {
298
// Confirmation of exchange declaration
299
}
300
301
public static class DeleteOk {
302
// Confirmation of exchange deletion
303
}
304
305
public static class BindOk {
306
// Confirmation of exchange binding
307
}
308
309
public static class UnbindOk {
310
// Confirmation of exchange unbinding
311
}
312
}
313
314
// Message properties class
315
public class AMQP.BasicProperties {
316
public String getContentType();
317
public String getContentEncoding();
318
public Map<String, Object> getHeaders();
319
public Integer getDeliveryMode(); // 1=non-persistent, 2=persistent
320
public Integer getPriority();
321
public String getCorrelationId();
322
public String getReplyTo();
323
public String getExpiration();
324
public String getMessageId();
325
public Date getTimestamp();
326
public String getType();
327
public String getUserId();
328
public String getAppId();
329
public String getClusterId();
330
}
331
```