or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconfirms-returns.mdconnection-channel.mdconsumer-api.mdconsuming.mderror-recovery.mdindex.mdobservability.mdpublishing.mdrpc.md

connection-channel.mddocs/

0

# Connection and Channel Management

1

2

Core functionality for establishing connections to RabbitMQ brokers and creating channels for AMQP operations. Connections represent the TCP connection to the broker, while channels provide a lightweight way to multiplex multiple conversations over a single connection.

3

4

## Capabilities

5

6

### ConnectionFactory

7

8

Factory class for creating and configuring connections to RabbitMQ brokers.

9

10

```java { .api }

11

/**

12

* Factory class to facilitate opening a Connection to a RabbitMQ node.

13

* Most connection and socket settings are configured using this factory.

14

*/

15

public class ConnectionFactory implements Cloneable {

16

17

// Connection creation

18

public Connection newConnection() throws IOException, TimeoutException;

19

public Connection newConnection(ExecutorService executor) throws IOException, TimeoutException;

20

public Connection newConnection(Address[] addrs) throws IOException, TimeoutException;

21

public Connection newConnection(ExecutorService executor, Address[] addrs) throws IOException, TimeoutException;

22

public Connection newConnection(List<Address> addrs) throws IOException, TimeoutException;

23

public Connection newConnection(AddressResolver addressResolver) throws IOException, TimeoutException;

24

public Connection newConnection(String connectionName) throws IOException, TimeoutException;

25

26

// Basic connection settings

27

public void setHost(String host);

28

public String getHost();

29

public void setPort(int port);

30

public int getPort();

31

public void setUsername(String username);

32

public String getUsername();

33

public void setPassword(String password);

34

public String getPassword();

35

public void setVirtualHost(String virtualHost);

36

public String getVirtualHost();

37

38

// URI-based configuration

39

public void setUri(String uri) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException;

40

public void setUri(URI uri) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException;

41

public String getUri();

42

43

// Timeouts and limits

44

public void setConnectionTimeout(int timeout);

45

public int getConnectionTimeout();

46

public void setHandshakeTimeout(int timeout);

47

public int getHandshakeTimeout();

48

public void setShutdownTimeout(int shutdownTimeout);

49

public int getShutdownTimeout();

50

public void setRequestedHeartbeat(int requestedHeartbeat);

51

public int getRequestedHeartbeat();

52

public void setRequestedChannelMax(int requestedChannelMax);

53

public int getRequestedChannelMax();

54

public void setRequestedFrameMax(int requestedFrameMax);

55

public int getRequestedFrameMax();

56

public void setChannelRpcTimeout(int channelRpcTimeout);

57

public int getChannelRpcTimeout();

58

59

// Network configuration

60

public void setSocketFactory(SocketFactory factory);

61

public SocketFactory getSocketFactory();

62

public void setSocketConfigurator(SocketConfigurator socketConfigurator);

63

public SocketConfigurator getSocketConfigurator();

64

65

// SSL/TLS configuration

66

public void useSslProtocol() throws NoSuchAlgorithmException, KeyManagementException;

67

public void useSslProtocol(String protocol) throws NoSuchAlgorithmException, KeyManagementException;

68

public void useSslProtocol(SSLContext context);

69

public void setSocketFactory(SSLSocketFactory factory);

70

71

// Authentication

72

public void setSaslConfig(SaslConfig saslConfig);

73

public SaslConfig getSaslConfig();

74

75

// Recovery settings

76

public void setAutomaticRecoveryEnabled(boolean automaticRecovery);

77

public boolean isAutomaticRecoveryEnabled();

78

public void setNetworkRecoveryInterval(long networkRecoveryInterval);

79

public long getNetworkRecoveryInterval();

80

public void setRecoveryDelayHandler(RecoveryDelayHandler recoveryDelayHandler);

81

public RecoveryDelayHandler getRecoveryDelayHandler();

82

83

// Advanced settings

84

public void setExceptionHandler(ExceptionHandler exceptionHandler);

85

public ExceptionHandler getExceptionHandler();

86

public void setMetricsCollector(MetricsCollector metricsCollector);

87

public MetricsCollector getMetricsCollector();

88

public void setTrafficListener(TrafficListener trafficListener);

89

public TrafficListener getTrafficListener();

90

public void setObservationCollector(ObservationCollector observationCollector);

91

public ObservationCollector getObservationCollector();

92

93

// Credentials and authentication

94

public void setCredentialsProvider(CredentialsProvider credentialsProvider);

95

public CredentialsProvider getCredentialsProvider();

96

public void setCredentialsRefreshService(CredentialsRefreshService credentialsRefreshService);

97

public CredentialsRefreshService getCredentialsRefreshService();

98

99

// SSL Context Factory

100

public void setSslContextFactory(SslContextFactory sslContextFactory);

101

public SslContextFactory getSslContextFactory();

102

public void enableHostnameVerification();

103

104

// Topology Recovery

105

public void setTopologyRecoveryEnabled(boolean topologyRecovery);

106

public boolean isTopologyRecoveryEnabled();

107

public void setTopologyRecoveryExecutor(ExecutorService executor);

108

public ExecutorService getTopologyRecoveryExecutor();

109

110

// Channel configuration

111

public void setChannelShouldCheckRpcResponseType(boolean channelShouldCheckRpcResponseType);

112

public boolean isChannelShouldCheckRpcResponseType();

113

114

// Work pool configuration

115

public void setWorkPoolTimeout(int workPoolTimeout);

116

public int getWorkPoolTimeout();

117

118

// Error handling

119

public void setErrorOnWriteListener(ErrorOnWriteListener errorOnWriteListener);

120

public ErrorOnWriteListener getErrorOnWriteListener();

121

122

// Message size limits

123

public void setMaxInboundMessageBodySize(int maxInboundMessageBodySize);

124

public int getMaxInboundMessageBodySize();

125

126

// NIO configuration

127

public void useNio();

128

public void useBlockingIo();

129

public void setNioParams(NioParams nioParams);

130

public NioParams getNioParams();

131

132

// Configuration loading

133

public void load(Properties properties);

134

public void load(Properties properties, String prefix);

135

public void load(String propertyFileLocation) throws IOException;

136

public void load(String propertyFileLocation, String prefix) throws IOException;

137

138

// Cloning

139

public ConnectionFactory clone();

140

141

// Constants

142

public static final String DEFAULT_USER = "guest";

143

public static final String DEFAULT_PASS = "guest";

144

public static final String DEFAULT_VHOST = "/";

145

public static final String DEFAULT_HOST = "localhost";

146

public static final int DEFAULT_AMQP_PORT = 5672;

147

public static final int DEFAULT_AMQP_OVER_SSL_PORT = 5671;

148

public static final int DEFAULT_CONNECTION_TIMEOUT = 60000;

149

public static final int DEFAULT_HANDSHAKE_TIMEOUT = 10000;

150

public static final int DEFAULT_SHUTDOWN_TIMEOUT = 10000;

151

public static final int DEFAULT_HEARTBEAT = 60;

152

public static final int DEFAULT_CHANNEL_MAX = 2047;

153

public static final int DEFAULT_FRAME_MAX = 0;

154

public static final int DEFAULT_CHANNEL_RPC_TIMEOUT = 600000;

155

public static final long DEFAULT_NETWORK_RECOVERY_INTERVAL = 5000;

156

}

157

```

158

159

**Usage Examples:**

160

161

```java

162

// Basic connection setup

163

ConnectionFactory factory = new ConnectionFactory();

164

factory.setHost("localhost");

165

factory.setPort(5672);

166

factory.setUsername("user");

167

factory.setPassword("password");

168

factory.setVirtualHost("/");

169

170

Connection connection = factory.newConnection();

171

```

172

173

```java

174

// URI-based configuration

175

ConnectionFactory factory = new ConnectionFactory();

176

factory.setUri("amqp://user:password@localhost:5672/vhost");

177

Connection connection = factory.newConnection();

178

```

179

180

```java

181

// SSL connection

182

ConnectionFactory factory = new ConnectionFactory();

183

factory.setHost("localhost");

184

factory.setPort(5671);

185

factory.useSslProtocol();

186

Connection connection = factory.newConnection();

187

```

188

189

```java

190

// Connection with custom timeouts and recovery

191

ConnectionFactory factory = new ConnectionFactory();

192

factory.setHost("localhost");

193

factory.setConnectionTimeout(30000);

194

factory.setHandshakeTimeout(5000);

195

factory.setAutomaticRecoveryEnabled(true);

196

factory.setNetworkRecoveryInterval(10000);

197

198

Connection connection = factory.newConnection();

199

```

200

201

### Connection Interface

202

203

Interface representing a connection to a RabbitMQ broker.

204

205

```java { .api }

206

/**

207

* Public API: Interface to an AMQ connection.

208

*/

209

public interface Connection extends Closeable, ShutdownNotifier {

210

211

// Channel management

212

Channel createChannel() throws IOException;

213

Channel createChannel(int channelNumber) throws IOException;

214

215

// Connection state

216

boolean isOpen();

217

InetAddress getAddress();

218

int getPort();

219

220

// Connection properties

221

Map<String, Object> getServerProperties();

222

Map<String, Object> getClientProperties();

223

String getClientProvidedName();

224

225

// Connection information

226

int getChannelMax();

227

int getFrameMax();

228

int getHeartbeat();

229

String getId();

230

231

// Shutdown and cleanup

232

void close() throws IOException;

233

void close(int closeCode, String closeMessage) throws IOException;

234

void close(int timeout) throws IOException;

235

void close(int closeCode, String closeMessage, int timeout) throws IOException;

236

void abort();

237

void abort(int closeCode, String closeMessage);

238

void abort(int timeout);

239

void abort(int closeCode, String closeMessage, int timeout);

240

241

// Blocked connection handling

242

void addBlockedListener(BlockedListener listener);

243

BlockedListener addBlockedListener(BlockedCallback blockedCallback, UnblockedCallback unblockedCallback);

244

boolean removeBlockedListener(BlockedListener listener);

245

void clearBlockedListeners();

246

}

247

```

248

249

**Usage Examples:**

250

251

```java

252

// Create and use connection

253

ConnectionFactory factory = new ConnectionFactory();

254

factory.setHost("localhost");

255

Connection connection = factory.newConnection();

256

257

// Check connection state

258

if (connection.isOpen()) {

259

System.out.println("Connection is open");

260

}

261

262

// Get server information

263

Map<String, Object> serverProps = connection.getServerProperties();

264

System.out.println("Server version: " + serverProps.get("version"));

265

266

// Create channels

267

Channel channel1 = connection.createChannel();

268

Channel channel2 = connection.createChannel();

269

270

// Close connection

271

connection.close();

272

```

273

274

### Channel Interface

275

276

Interface representing a channel within a connection for AMQP operations.

277

278

```java { .api }

279

/**

280

* Interface to a channel. All non-deprecated methods of this interface are part of the public API.

281

*/

282

public interface Channel extends Closeable, ShutdownNotifier {

283

284

// Channel information

285

int getChannelNumber();

286

Connection getConnection();

287

boolean isOpen();

288

289

// Channel state and flow control

290

void abort() throws IOException;

291

void abort(int closeCode, String closeMessage) throws IOException;

292

void close() throws IOException, TimeoutException;

293

void close(int closeCode, String closeMessage) throws IOException, TimeoutException;

294

295

// Consumer management

296

Consumer getDefaultConsumer();

297

void setDefaultConsumer(Consumer consumer);

298

299

// Quality of Service (flow control)

300

void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

301

void basicQos(int prefetchCount, boolean global) throws IOException;

302

void basicQos(int prefetchCount) throws IOException;

303

304

// Exchange operations

305

AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;

306

AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;

307

AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException;

308

AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;

309

void exchangeDeclareNoWait(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;

310

AMQP.Exchange.DeclareOk exchangeDeclarePassive(String exchange) throws IOException;

311

AMQP.Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;

312

AMQP.Exchange.DeleteOk exchangeDelete(String exchange) throws IOException;

313

void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;

314

AMQP.Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException;

315

AMQP.Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;

316

void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;

317

AMQP.Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey) throws IOException;

318

AMQP.Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;

319

void exchangeUnbindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;

320

321

// Queue operations

322

AMQP.Queue.DeclareOk queueDeclare() throws IOException;

323

AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;

324

void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;

325

AMQP.Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;

326

AMQP.Queue.DeleteOk queueDelete(String queue) throws IOException;

327

AMQP.Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;

328

void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;

329

AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;

330

AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

331

void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

332

AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException;

333

AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

334

AMQP.Queue.PurgeOk queuePurge(String queue) throws IOException;

335

int messageCount(String queue) throws IOException;

336

int consumerCount(String queue) throws IOException;

337

338

// Message publishing

339

void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) throws IOException;

340

void basicPublish(String exchange, String routingKey, boolean mandatory, AMQP.BasicProperties props, byte[] body) throws IOException;

341

void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, AMQP.BasicProperties props, byte[] body) throws IOException;

342

343

// Message consuming

344

GetResponse basicGet(String queue, boolean autoAck) throws IOException;

345

String basicConsume(String queue, Consumer callback) throws IOException;

346

String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

347

String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;

348

String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;

349

String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;

350

String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;

351

String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;

352

String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;

353

String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;

354

void basicCancel(String consumerTag) throws IOException;

355

356

// Message acknowledgment

357

void basicAck(long deliveryTag, boolean multiple) throws IOException;

358

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

359

void basicReject(long deliveryTag, boolean requeue) throws IOException;

360

AMQP.Basic.RecoverOk basicRecover() throws IOException;

361

AMQP.Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

362

363

// Transaction support

364

AMQP.Tx.SelectOk txSelect() throws IOException;

365

AMQP.Tx.CommitOk txCommit() throws IOException;

366

AMQP.Tx.RollbackOk txRollback() throws IOException;

367

368

// Publisher confirms

369

AMQP.Confirm.SelectOk confirmSelect() throws IOException;

370

long getNextPublishSeqNo();

371

boolean waitForConfirms() throws InterruptedException;

372

boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException;

373

void waitForConfirmsOrDie() throws IOException, InterruptedException;

374

void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;

375

376

// Listener management

377

void addReturnListener(ReturnListener listener);

378

ReturnListener addReturnListener(ReturnCallback returnCallback);

379

boolean removeReturnListener(ReturnListener listener);

380

void clearReturnListeners();

381

void addConfirmListener(ConfirmListener listener);

382

ConfirmListener addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback);

383

boolean removeConfirmListener(ConfirmListener listener);

384

void clearConfirmListeners();

385

386

// Low-level operations

387

Method rpc(Method method) throws IOException, ShutdownSignalException;

388

void asyncRpc(Method method) throws IOException;

389

CompletableFuture<Command> asyncCompletableRpc(Method method) throws IOException;

390

}

391

```

392

393

### RecoverableConnection

394

395

Interface for connections that support automatic recovery from network failures.

396

397

```java { .api }

398

/**

399

* Connection that can automatically recover from network failures.

400

*/

401

public interface RecoverableConnection extends Connection {

402

void addRecoveryListener(RecoveryListener listener);

403

void removeRecoveryListener(RecoveryListener listener);

404

}

405

```

406

407

### RecoverableChannel

408

409

Interface for channels that support automatic recovery.

410

411

```java { .api }

412

/**

413

* Channel that can automatically recover from network failures.

414

*/

415

public interface RecoverableChannel extends Channel {

416

// Inherits recovery capabilities from the connection

417

}

418

```

419

420

**Usage Examples:**

421

422

```java

423

// Working with recoverable connections

424

ConnectionFactory factory = new ConnectionFactory();

425

factory.setAutomaticRecoveryEnabled(true);

426

factory.setNetworkRecoveryInterval(5000);

427

428

RecoverableConnection connection = (RecoverableConnection) factory.newConnection();

429

430

// Add recovery listener

431

connection.addRecoveryListener(new RecoveryListener() {

432

@Override

433

public void handleRecovery(Recoverable recoverable) {

434

System.out.println("Connection recovered!");

435

}

436

437

@Override

438

public void handleRecoveryStarted(Recoverable recoverable) {

439

System.out.println("Recovery started...");

440

}

441

});

442

443

RecoverableChannel channel = (RecoverableChannel) connection.createChannel();

444

```

445

446

## Types

447

448

### Supporting Types

449

450

```java { .api }

451

// Shutdown notification support

452

public interface ShutdownNotifier {

453

void addShutdownListener(ShutdownListener listener);

454

void removeShutdownListener(ShutdownListener listener);

455

ShutdownSignalException getCloseReason();

456

}

457

458

public interface ShutdownListener {

459

void shutdownCompleted(ShutdownSignalException cause);

460

}

461

462

// Blocked connection support

463

public interface BlockedListener {

464

void handleBlocked(String reason) throws IOException;

465

void handleUnblocked() throws IOException;

466

}

467

468

@FunctionalInterface

469

public interface BlockedCallback {

470

void handle(String reason) throws IOException;

471

}

472

473

@FunctionalInterface

474

public interface UnblockedCallback {

475

void handle() throws IOException;

476

}

477

478

// Recovery support

479

public interface RecoveryListener {

480

void handleRecovery(Recoverable recoverable);

481

void handleRecoveryStarted(Recoverable recoverable);

482

}

483

484

public interface Recoverable {

485

void addRecoveryListener(RecoveryListener listener);

486

void removeRecoveryListener(RecoveryListener listener);

487

}

488

```