or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconfirms-returns.mdconnection-channel.mdconsumer-api.mdconsuming.mderror-recovery.mdindex.mdobservability.mdpublishing.mdrpc.md

publishing.mddocs/

0

# Message Publishing

1

2

Operations for publishing messages to exchanges and managing exchange topology. Publishing is the primary way to send messages into RabbitMQ, where they are routed to queues based on exchange type and routing rules.

3

4

## Capabilities

5

6

### Basic Publishing

7

8

Core message publishing functionality for sending messages to exchanges.

9

10

```java { .api }

11

/**

12

* Publish a message to an exchange with routing key

13

* @param exchange - Exchange name (empty string for default exchange)

14

* @param routingKey - Routing key for message routing

15

* @param props - Message properties (headers, content type, etc.)

16

* @param body - Message body as byte array

17

*/

18

void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) throws IOException;

19

20

/**

21

* Publish a message with mandatory flag

22

* @param exchange - Exchange name

23

* @param routingKey - Routing key

24

* @param mandatory - If true, message will be returned if unroutable

25

* @param props - Message properties

26

* @param body - Message body

27

*/

28

void basicPublish(String exchange, String routingKey, boolean mandatory, AMQP.BasicProperties props, byte[] body) throws IOException;

29

30

/**

31

* Publish a message with both mandatory and immediate flags

32

* @param exchange - Exchange name

33

* @param routingKey - Routing key

34

* @param mandatory - If true, message will be returned if unroutable

35

* @param immediate - If true, message will be returned if no consumer can immediately receive it (deprecated)

36

* @param props - Message properties

37

* @param body - Message body

38

*/

39

void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, AMQP.BasicProperties props, byte[] body) throws IOException;

40

```

41

42

**Usage Examples:**

43

44

```java

45

// Basic message publishing

46

Channel channel = connection.createChannel();

47

String message = "Hello World!";

48

channel.basicPublish("", "hello", null, message.getBytes("UTF-8"));

49

```

50

51

```java

52

// Publishing with message properties

53

AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()

54

.contentType("text/plain")

55

.deliveryMode(2) // persistent

56

.priority(1)

57

.correlationId("abc123")

58

.replyTo("reply.queue")

59

.expiration("60000") // TTL in milliseconds

60

.messageId("msg001")

61

.timestamp(new Date())

62

.type("notification")

63

.userId("user123")

64

.appId("myapp")

65

.build();

66

67

channel.basicPublish("my.exchange", "routing.key", props, message.getBytes());

68

```

69

70

```java

71

// Publishing with mandatory flag (returns message if unroutable)

72

channel.addReturnListener(returnMessage -> {

73

System.out.println("Message returned: " + returnMessage.getReplyText());

74

});

75

76

channel.basicPublish("my.exchange", "nonexistent.key", true, props, message.getBytes());

77

```

78

79

### Exchange Management

80

81

Operations for declaring, deleting, and binding exchanges.

82

83

```java { .api }

84

/**

85

* Declare an exchange

86

* @param exchange - Exchange name

87

* @param type - Exchange type (direct, fanout, topic, headers)

88

* @return Exchange.DeclareOk confirmation

89

*/

90

AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;

91

92

/**

93

* Declare an exchange with full options

94

* @param exchange - Exchange name

95

* @param type - Exchange type

96

* @param durable - Exchange survives server restarts

97

* @param autoDelete - Exchange deleted when no longer in use

98

* @param arguments - Optional arguments for exchange configuration

99

*/

100

AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException;

101

102

/**

103

* Declare an exchange with all options

104

* @param exchange - Exchange name

105

* @param type - Exchange type

106

* @param durable - Exchange survives server restarts

107

* @param autoDelete - Exchange deleted when no longer in use

108

* @param internal - Exchange cannot be directly published to by clients

109

* @param arguments - Optional arguments

110

*/

111

AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;

112

113

/**

114

* Declare an exchange passively (check if exists without creating)

115

* @param exchange - Exchange name

116

*/

117

AMQP.Exchange.DeclareOk exchangeDeclarePassive(String exchange) throws IOException;

118

119

/**

120

* Delete an exchange

121

* @param exchange - Exchange name

122

*/

123

AMQP.Exchange.DeleteOk exchangeDelete(String exchange) throws IOException;

124

125

/**

126

* Delete an exchange with conditions

127

* @param exchange - Exchange name

128

* @param ifUnused - Only delete if exchange has no bindings

129

*/

130

AMQP.Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;

131

132

/**

133

* Bind one exchange to another

134

* @param destination - Destination exchange name

135

* @param source - Source exchange name

136

* @param routingKey - Routing key for binding

137

*/

138

AMQP.Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException;

139

140

/**

141

* Bind exchange with arguments

142

* @param destination - Destination exchange

143

* @param source - Source exchange

144

* @param routingKey - Routing key

145

* @param arguments - Binding arguments

146

*/

147

AMQP.Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;

148

149

/**

150

* Unbind one exchange from another

151

* @param destination - Destination exchange

152

* @param source - Source exchange

153

* @param routingKey - Routing key to unbind

154

*/

155

AMQP.Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey) throws IOException;

156

157

/**

158

* Unbind exchange with arguments

159

* @param destination - Destination exchange

160

* @param source - Source exchange

161

* @param routingKey - Routing key

162

* @param arguments - Binding arguments to match

163

*/

164

AMQP.Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;

165

```

166

167

**Usage Examples:**

168

169

```java

170

// Declare different types of exchanges

171

Channel channel = connection.createChannel();

172

173

// Direct exchange (default type)

174

channel.exchangeDeclare("my.direct", "direct", true, false, null);

175

176

// Topic exchange for pattern matching

177

channel.exchangeDeclare("my.topic", "topic", true, false, null);

178

179

// Fanout exchange for broadcasting

180

channel.exchangeDeclare("my.fanout", "fanout", true, false, null);

181

182

// Headers exchange for header-based routing

183

Map<String, Object> args = new HashMap<>();

184

args.put("x-match", "all");

185

channel.exchangeDeclare("my.headers", "headers", true, false, args);

186

```

187

188

```java

189

// Exchange to exchange binding

190

channel.exchangeDeclare("source.exchange", "topic", true, false, null);

191

channel.exchangeDeclare("dest.exchange", "direct", true, false, null);

192

193

// Bind exchanges

194

channel.exchangeBind("dest.exchange", "source.exchange", "routing.pattern.*");

195

196

// Unbind when no longer needed

197

channel.exchangeUnbind("dest.exchange", "source.exchange", "routing.pattern.*");

198

```

199

200

```java

201

// Delete exchanges

202

channel.exchangeDelete("temporary.exchange");

203

channel.exchangeDelete("conditional.exchange", true); // only if unused

204

```

205

206

### Built-in Exchange Types

207

208

```java { .api }

209

/**

210

* Enumeration of built-in exchange types

211

*/

212

public enum BuiltinExchangeType {

213

DIRECT("direct"),

214

FANOUT("fanout"),

215

TOPIC("topic"),

216

HEADERS("headers");

217

218

private final String type;

219

220

public String getType();

221

}

222

```

223

224

**Usage Examples:**

225

226

```java

227

// Using built-in exchange types

228

channel.exchangeDeclare("my.exchange", BuiltinExchangeType.TOPIC.getType(), true, false, null);

229

230

// Alternative direct usage

231

channel.exchangeDeclare("direct.exchange", "direct", true, false, null);

232

```

233

234

### Message Properties

235

236

```java { .api }

237

/**

238

* Message properties builder for creating AMQP.BasicProperties

239

*/

240

public static class AMQP.BasicProperties.Builder {

241

public Builder contentType(String contentType);

242

public Builder contentEncoding(String contentEncoding);

243

public Builder headers(Map<String, Object> headers);

244

public Builder deliveryMode(Integer deliveryMode); // 1=non-persistent, 2=persistent

245

public Builder priority(Integer priority);

246

public Builder correlationId(String correlationId);

247

public Builder replyTo(String replyTo);

248

public Builder expiration(String expiration); // TTL in milliseconds as string

249

public Builder messageId(String messageId);

250

public Builder timestamp(Date timestamp);

251

public Builder type(String type);

252

public Builder userId(String userId);

253

public Builder appId(String appId);

254

public Builder clusterId(String clusterId);

255

256

public AMQP.BasicProperties build();

257

}

258

259

/**

260

* Pre-built message properties for common use cases

261

*/

262

public class MessageProperties {

263

public static final AMQP.BasicProperties MINIMAL_BASIC;

264

public static final AMQP.BasicProperties MINIMAL_PERSISTENT_BASIC;

265

public static final AMQP.BasicProperties BASIC;

266

public static final AMQP.BasicProperties PERSISTENT_BASIC;

267

public static final AMQP.BasicProperties TEXT_PLAIN;

268

public static final AMQP.BasicProperties PERSISTENT_TEXT_PLAIN;

269

}

270

```

271

272

**Usage Examples:**

273

274

```java

275

// Using pre-built properties

276

channel.basicPublish("", "queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

277

278

// Custom properties with builder

279

AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()

280

.contentType("application/json")

281

.deliveryMode(2) // persistent

282

.headers(Map.of("version", "1.0", "priority", "high"))

283

.correlationId(UUID.randomUUID().toString())

284

.timestamp(new Date())

285

.build();

286

287

channel.basicPublish("api.exchange", "user.created", props, jsonPayload.getBytes());

288

```

289

290

## Types

291

292

### Exchange and Publishing Types

293

294

```java { .api }

295

// Exchange operation results

296

public static class AMQP.Exchange {

297

public static class DeclareOk {

298

// Confirmation of exchange declaration

299

}

300

301

public static class DeleteOk {

302

// Confirmation of exchange deletion

303

}

304

305

public static class BindOk {

306

// Confirmation of exchange binding

307

}

308

309

public static class UnbindOk {

310

// Confirmation of exchange unbinding

311

}

312

}

313

314

// Message properties class

315

public class AMQP.BasicProperties {

316

public String getContentType();

317

public String getContentEncoding();

318

public Map<String, Object> getHeaders();

319

public Integer getDeliveryMode(); // 1=non-persistent, 2=persistent

320

public Integer getPriority();

321

public String getCorrelationId();

322

public String getReplyTo();

323

public String getExpiration();

324

public String getMessageId();

325

public Date getTimestamp();

326

public String getType();

327

public String getUserId();

328

public String getAppId();

329

public String getClusterId();

330

}

331

```