0
# Connection and Channel Management
1
2
Core functionality for establishing connections to RabbitMQ brokers and creating channels for AMQP operations. Connections represent the TCP connection to the broker, while channels provide a lightweight way to multiplex multiple conversations over a single connection.
3
4
## Capabilities
5
6
### ConnectionFactory
7
8
Factory class for creating and configuring connections to RabbitMQ brokers.
9
10
```java { .api }
11
/**
12
* Factory class to facilitate opening a Connection to a RabbitMQ node.
13
* Most connection and socket settings are configured using this factory.
14
*/
15
public class ConnectionFactory implements Cloneable {
16
17
// Connection creation
18
public Connection newConnection() throws IOException, TimeoutException;
19
public Connection newConnection(ExecutorService executor) throws IOException, TimeoutException;
20
public Connection newConnection(Address[] addrs) throws IOException, TimeoutException;
21
public Connection newConnection(ExecutorService executor, Address[] addrs) throws IOException, TimeoutException;
22
public Connection newConnection(List<Address> addrs) throws IOException, TimeoutException;
23
public Connection newConnection(AddressResolver addressResolver) throws IOException, TimeoutException;
24
public Connection newConnection(String connectionName) throws IOException, TimeoutException;
25
26
// Basic connection settings
27
public void setHost(String host);
28
public String getHost();
29
public void setPort(int port);
30
public int getPort();
31
public void setUsername(String username);
32
public String getUsername();
33
public void setPassword(String password);
34
public String getPassword();
35
public void setVirtualHost(String virtualHost);
36
public String getVirtualHost();
37
38
// URI-based configuration
39
public void setUri(String uri) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException;
40
public void setUri(URI uri) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException;
41
public String getUri();
42
43
// Timeouts and limits
44
public void setConnectionTimeout(int timeout);
45
public int getConnectionTimeout();
46
public void setHandshakeTimeout(int timeout);
47
public int getHandshakeTimeout();
48
public void setShutdownTimeout(int shutdownTimeout);
49
public int getShutdownTimeout();
50
public void setRequestedHeartbeat(int requestedHeartbeat);
51
public int getRequestedHeartbeat();
52
public void setRequestedChannelMax(int requestedChannelMax);
53
public int getRequestedChannelMax();
54
public void setRequestedFrameMax(int requestedFrameMax);
55
public int getRequestedFrameMax();
56
public void setChannelRpcTimeout(int channelRpcTimeout);
57
public int getChannelRpcTimeout();
58
59
// Network configuration
60
public void setSocketFactory(SocketFactory factory);
61
public SocketFactory getSocketFactory();
62
public void setSocketConfigurator(SocketConfigurator socketConfigurator);
63
public SocketConfigurator getSocketConfigurator();
64
65
// SSL/TLS configuration
66
public void useSslProtocol() throws NoSuchAlgorithmException, KeyManagementException;
67
public void useSslProtocol(String protocol) throws NoSuchAlgorithmException, KeyManagementException;
68
public void useSslProtocol(SSLContext context);
69
public void setSocketFactory(SSLSocketFactory factory);
70
71
// Authentication
72
public void setSaslConfig(SaslConfig saslConfig);
73
public SaslConfig getSaslConfig();
74
75
// Recovery settings
76
public void setAutomaticRecoveryEnabled(boolean automaticRecovery);
77
public boolean isAutomaticRecoveryEnabled();
78
public void setNetworkRecoveryInterval(long networkRecoveryInterval);
79
public long getNetworkRecoveryInterval();
80
public void setRecoveryDelayHandler(RecoveryDelayHandler recoveryDelayHandler);
81
public RecoveryDelayHandler getRecoveryDelayHandler();
82
83
// Advanced settings
84
public void setExceptionHandler(ExceptionHandler exceptionHandler);
85
public ExceptionHandler getExceptionHandler();
86
public void setMetricsCollector(MetricsCollector metricsCollector);
87
public MetricsCollector getMetricsCollector();
88
public void setTrafficListener(TrafficListener trafficListener);
89
public TrafficListener getTrafficListener();
90
public void setObservationCollector(ObservationCollector observationCollector);
91
public ObservationCollector getObservationCollector();
92
93
// Credentials and authentication
94
public void setCredentialsProvider(CredentialsProvider credentialsProvider);
95
public CredentialsProvider getCredentialsProvider();
96
public void setCredentialsRefreshService(CredentialsRefreshService credentialsRefreshService);
97
public CredentialsRefreshService getCredentialsRefreshService();
98
99
// SSL Context Factory
100
public void setSslContextFactory(SslContextFactory sslContextFactory);
101
public SslContextFactory getSslContextFactory();
102
public void enableHostnameVerification();
103
104
// Topology Recovery
105
public void setTopologyRecoveryEnabled(boolean topologyRecovery);
106
public boolean isTopologyRecoveryEnabled();
107
public void setTopologyRecoveryExecutor(ExecutorService executor);
108
public ExecutorService getTopologyRecoveryExecutor();
109
110
// Channel configuration
111
public void setChannelShouldCheckRpcResponseType(boolean channelShouldCheckRpcResponseType);
112
public boolean isChannelShouldCheckRpcResponseType();
113
114
// Work pool configuration
115
public void setWorkPoolTimeout(int workPoolTimeout);
116
public int getWorkPoolTimeout();
117
118
// Error handling
119
public void setErrorOnWriteListener(ErrorOnWriteListener errorOnWriteListener);
120
public ErrorOnWriteListener getErrorOnWriteListener();
121
122
// Message size limits
123
public void setMaxInboundMessageBodySize(int maxInboundMessageBodySize);
124
public int getMaxInboundMessageBodySize();
125
126
// NIO configuration
127
public void useNio();
128
public void useBlockingIo();
129
public void setNioParams(NioParams nioParams);
130
public NioParams getNioParams();
131
132
// Configuration loading
133
public void load(Properties properties);
134
public void load(Properties properties, String prefix);
135
public void load(String propertyFileLocation) throws IOException;
136
public void load(String propertyFileLocation, String prefix) throws IOException;
137
138
// Cloning
139
public ConnectionFactory clone();
140
141
// Constants
142
public static final String DEFAULT_USER = "guest";
143
public static final String DEFAULT_PASS = "guest";
144
public static final String DEFAULT_VHOST = "/";
145
public static final String DEFAULT_HOST = "localhost";
146
public static final int DEFAULT_AMQP_PORT = 5672;
147
public static final int DEFAULT_AMQP_OVER_SSL_PORT = 5671;
148
public static final int DEFAULT_CONNECTION_TIMEOUT = 60000;
149
public static final int DEFAULT_HANDSHAKE_TIMEOUT = 10000;
150
public static final int DEFAULT_SHUTDOWN_TIMEOUT = 10000;
151
public static final int DEFAULT_HEARTBEAT = 60;
152
public static final int DEFAULT_CHANNEL_MAX = 2047;
153
public static final int DEFAULT_FRAME_MAX = 0;
154
public static final int DEFAULT_CHANNEL_RPC_TIMEOUT = 600000;
155
public static final long DEFAULT_NETWORK_RECOVERY_INTERVAL = 5000;
156
}
157
```
158
159
**Usage Examples:**
160
161
```java
162
// Basic connection setup
163
ConnectionFactory factory = new ConnectionFactory();
164
factory.setHost("localhost");
165
factory.setPort(5672);
166
factory.setUsername("user");
167
factory.setPassword("password");
168
factory.setVirtualHost("/");
169
170
Connection connection = factory.newConnection();
171
```
172
173
```java
174
// URI-based configuration
175
ConnectionFactory factory = new ConnectionFactory();
176
factory.setUri("amqp://user:password@localhost:5672/vhost");
177
Connection connection = factory.newConnection();
178
```
179
180
```java
181
// SSL connection
182
ConnectionFactory factory = new ConnectionFactory();
183
factory.setHost("localhost");
184
factory.setPort(5671);
185
factory.useSslProtocol();
186
Connection connection = factory.newConnection();
187
```
188
189
```java
190
// Connection with custom timeouts and recovery
191
ConnectionFactory factory = new ConnectionFactory();
192
factory.setHost("localhost");
193
factory.setConnectionTimeout(30000);
194
factory.setHandshakeTimeout(5000);
195
factory.setAutomaticRecoveryEnabled(true);
196
factory.setNetworkRecoveryInterval(10000);
197
198
Connection connection = factory.newConnection();
199
```
200
201
### Connection Interface
202
203
Interface representing a connection to a RabbitMQ broker.
204
205
```java { .api }
206
/**
207
* Public API: Interface to an AMQ connection.
208
*/
209
public interface Connection extends Closeable, ShutdownNotifier {
210
211
// Channel management
212
Channel createChannel() throws IOException;
213
Channel createChannel(int channelNumber) throws IOException;
214
215
// Connection state
216
boolean isOpen();
217
InetAddress getAddress();
218
int getPort();
219
220
// Connection properties
221
Map<String, Object> getServerProperties();
222
Map<String, Object> getClientProperties();
223
String getClientProvidedName();
224
225
// Connection information
226
int getChannelMax();
227
int getFrameMax();
228
int getHeartbeat();
229
String getId();
230
231
// Shutdown and cleanup
232
void close() throws IOException;
233
void close(int closeCode, String closeMessage) throws IOException;
234
void close(int timeout) throws IOException;
235
void close(int closeCode, String closeMessage, int timeout) throws IOException;
236
void abort();
237
void abort(int closeCode, String closeMessage);
238
void abort(int timeout);
239
void abort(int closeCode, String closeMessage, int timeout);
240
241
// Blocked connection handling
242
void addBlockedListener(BlockedListener listener);
243
BlockedListener addBlockedListener(BlockedCallback blockedCallback, UnblockedCallback unblockedCallback);
244
boolean removeBlockedListener(BlockedListener listener);
245
void clearBlockedListeners();
246
}
247
```
248
249
**Usage Examples:**
250
251
```java
252
// Create and use connection
253
ConnectionFactory factory = new ConnectionFactory();
254
factory.setHost("localhost");
255
Connection connection = factory.newConnection();
256
257
// Check connection state
258
if (connection.isOpen()) {
259
System.out.println("Connection is open");
260
}
261
262
// Get server information
263
Map<String, Object> serverProps = connection.getServerProperties();
264
System.out.println("Server version: " + serverProps.get("version"));
265
266
// Create channels
267
Channel channel1 = connection.createChannel();
268
Channel channel2 = connection.createChannel();
269
270
// Close connection
271
connection.close();
272
```
273
274
### Channel Interface
275
276
Interface representing a channel within a connection for AMQP operations.
277
278
```java { .api }
279
/**
280
* Interface to a channel. All non-deprecated methods of this interface are part of the public API.
281
*/
282
public interface Channel extends Closeable, ShutdownNotifier {
283
284
// Channel information
285
int getChannelNumber();
286
Connection getConnection();
287
boolean isOpen();
288
289
// Channel state and flow control
290
void abort() throws IOException;
291
void abort(int closeCode, String closeMessage) throws IOException;
292
void close() throws IOException, TimeoutException;
293
void close(int closeCode, String closeMessage) throws IOException, TimeoutException;
294
295
// Consumer management
296
Consumer getDefaultConsumer();
297
void setDefaultConsumer(Consumer consumer);
298
299
// Quality of Service (flow control)
300
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
301
void basicQos(int prefetchCount, boolean global) throws IOException;
302
void basicQos(int prefetchCount) throws IOException;
303
304
// Exchange operations
305
AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;
306
AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;
307
AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException;
308
AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;
309
void exchangeDeclareNoWait(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;
310
AMQP.Exchange.DeclareOk exchangeDeclarePassive(String exchange) throws IOException;
311
AMQP.Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;
312
AMQP.Exchange.DeleteOk exchangeDelete(String exchange) throws IOException;
313
void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;
314
AMQP.Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException;
315
AMQP.Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
316
void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
317
AMQP.Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey) throws IOException;
318
AMQP.Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
319
void exchangeUnbindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
320
321
// Queue operations
322
AMQP.Queue.DeclareOk queueDeclare() throws IOException;
323
AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
324
void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
325
AMQP.Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;
326
AMQP.Queue.DeleteOk queueDelete(String queue) throws IOException;
327
AMQP.Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
328
void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
329
AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;
330
AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
331
void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
332
AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException;
333
AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
334
AMQP.Queue.PurgeOk queuePurge(String queue) throws IOException;
335
int messageCount(String queue) throws IOException;
336
int consumerCount(String queue) throws IOException;
337
338
// Message publishing
339
void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) throws IOException;
340
void basicPublish(String exchange, String routingKey, boolean mandatory, AMQP.BasicProperties props, byte[] body) throws IOException;
341
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, AMQP.BasicProperties props, byte[] body) throws IOException;
342
343
// Message consuming
344
GetResponse basicGet(String queue, boolean autoAck) throws IOException;
345
String basicConsume(String queue, Consumer callback) throws IOException;
346
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
347
String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;
348
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
349
String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
350
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
351
String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
352
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
353
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
354
void basicCancel(String consumerTag) throws IOException;
355
356
// Message acknowledgment
357
void basicAck(long deliveryTag, boolean multiple) throws IOException;
358
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
359
void basicReject(long deliveryTag, boolean requeue) throws IOException;
360
AMQP.Basic.RecoverOk basicRecover() throws IOException;
361
AMQP.Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
362
363
// Transaction support
364
AMQP.Tx.SelectOk txSelect() throws IOException;
365
AMQP.Tx.CommitOk txCommit() throws IOException;
366
AMQP.Tx.RollbackOk txRollback() throws IOException;
367
368
// Publisher confirms
369
AMQP.Confirm.SelectOk confirmSelect() throws IOException;
370
long getNextPublishSeqNo();
371
boolean waitForConfirms() throws InterruptedException;
372
boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException;
373
void waitForConfirmsOrDie() throws IOException, InterruptedException;
374
void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;
375
376
// Listener management
377
void addReturnListener(ReturnListener listener);
378
ReturnListener addReturnListener(ReturnCallback returnCallback);
379
boolean removeReturnListener(ReturnListener listener);
380
void clearReturnListeners();
381
void addConfirmListener(ConfirmListener listener);
382
ConfirmListener addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback);
383
boolean removeConfirmListener(ConfirmListener listener);
384
void clearConfirmListeners();
385
386
// Low-level operations
387
Method rpc(Method method) throws IOException, ShutdownSignalException;
388
void asyncRpc(Method method) throws IOException;
389
CompletableFuture<Command> asyncCompletableRpc(Method method) throws IOException;
390
}
391
```
392
393
### RecoverableConnection
394
395
Interface for connections that support automatic recovery from network failures.
396
397
```java { .api }
398
/**
399
* Connection that can automatically recover from network failures.
400
*/
401
public interface RecoverableConnection extends Connection {
402
void addRecoveryListener(RecoveryListener listener);
403
void removeRecoveryListener(RecoveryListener listener);
404
}
405
```
406
407
### RecoverableChannel
408
409
Interface for channels that support automatic recovery.
410
411
```java { .api }
412
/**
413
* Channel that can automatically recover from network failures.
414
*/
415
public interface RecoverableChannel extends Channel {
416
// Inherits recovery capabilities from the connection
417
}
418
```
419
420
**Usage Examples:**
421
422
```java
423
// Working with recoverable connections
424
ConnectionFactory factory = new ConnectionFactory();
425
factory.setAutomaticRecoveryEnabled(true);
426
factory.setNetworkRecoveryInterval(5000);
427
428
RecoverableConnection connection = (RecoverableConnection) factory.newConnection();
429
430
// Add recovery listener
431
connection.addRecoveryListener(new RecoveryListener() {
432
@Override
433
public void handleRecovery(Recoverable recoverable) {
434
System.out.println("Connection recovered!");
435
}
436
437
@Override
438
public void handleRecoveryStarted(Recoverable recoverable) {
439
System.out.println("Recovery started...");
440
}
441
});
442
443
RecoverableChannel channel = (RecoverableChannel) connection.createChannel();
444
```
445
446
## Types
447
448
### Supporting Types
449
450
```java { .api }
451
// Shutdown notification support
452
public interface ShutdownNotifier {
453
void addShutdownListener(ShutdownListener listener);
454
void removeShutdownListener(ShutdownListener listener);
455
ShutdownSignalException getCloseReason();
456
}
457
458
public interface ShutdownListener {
459
void shutdownCompleted(ShutdownSignalException cause);
460
}
461
462
// Blocked connection support
463
public interface BlockedListener {
464
void handleBlocked(String reason) throws IOException;
465
void handleUnblocked() throws IOException;
466
}
467
468
@FunctionalInterface
469
public interface BlockedCallback {
470
void handle(String reason) throws IOException;
471
}
472
473
@FunctionalInterface
474
public interface UnblockedCallback {
475
void handle() throws IOException;
476
}
477
478
// Recovery support
479
public interface RecoveryListener {
480
void handleRecovery(Recoverable recoverable);
481
void handleRecoveryStarted(Recoverable recoverable);
482
}
483
484
public interface Recoverable {
485
void addRecoveryListener(RecoveryListener listener);
486
void removeRecoveryListener(RecoveryListener listener);
487
}
488
```