0
# Connection Management
1
2
Comprehensive connection configuration and management with multiple adapter types supporting different networking approaches for connecting to RabbitMQ brokers.
3
4
## Capabilities
5
6
### Connection Parameters
7
8
Configure connection settings including host, port, credentials, timeouts, and protocol options.
9
10
```python { .api }
11
class ConnectionParameters:
12
"""Connection parameters for AMQP connections."""
13
14
def __init__(self, host='localhost', port=5672, virtual_host='/',
15
credentials=None, channel_max=65535, frame_max=131072,
16
heartbeat=None, ssl_options=None, connection_attempts=1,
17
retry_delay=2.0, socket_timeout=10.0, stack_timeout=15.0,
18
locale='en_US', blocked_connection_timeout=None,
19
client_properties=None, tcp_options=None):
20
"""
21
Parameters:
22
- host (str): RabbitMQ server hostname
23
- port (int): RabbitMQ server port (default: 5672)
24
- virtual_host (str): Virtual host (default: '/')
25
- credentials (Credentials): Authentication credentials
26
- channel_max (int): Maximum number of channels (default: 65535)
27
- frame_max (int): Maximum frame size in bytes (default: 131072)
28
- heartbeat (int): Heartbeat interval in seconds
29
- ssl_options (SSLOptions): SSL configuration
30
- connection_attempts (int): Number of connection attempts (default: 1)
31
- retry_delay (float): Delay between attempts in seconds (default: 2.0)
32
- socket_timeout (float): Socket connection timeout (default: 10.0)
33
- stack_timeout (float): Full stack timeout (default: 15.0)
34
- locale (str): Connection locale (default: 'en_US')
35
- blocked_connection_timeout (float): Blocked connection timeout
36
- client_properties (dict): Client identification properties
37
- tcp_options (dict): TCP socket options
38
"""
39
40
# Properties for all parameters
41
@property
42
def host(self) -> str: ...
43
44
@host.setter
45
def host(self, value: str): ...
46
47
@property
48
def port(self) -> int: ...
49
50
@port.setter
51
def port(self, value: int): ...
52
53
@property
54
def virtual_host(self) -> str: ...
55
56
@virtual_host.setter
57
def virtual_host(self, value: str): ...
58
59
@property
60
def credentials(self): ...
61
62
@credentials.setter
63
def credentials(self, value): ...
64
65
@property
66
def channel_max(self) -> int: ...
67
68
@channel_max.setter
69
def channel_max(self, value: int): ...
70
71
@property
72
def frame_max(self) -> int: ...
73
74
@frame_max.setter
75
def frame_max(self, value: int): ...
76
77
@property
78
def heartbeat(self) -> int: ...
79
80
@heartbeat.setter
81
def heartbeat(self, value: int): ...
82
83
@property
84
def ssl_options(self): ...
85
86
@ssl_options.setter
87
def ssl_options(self, value): ...
88
89
@property
90
def connection_attempts(self) -> int: ...
91
92
@connection_attempts.setter
93
def connection_attempts(self, value: int): ...
94
95
@property
96
def retry_delay(self) -> float: ...
97
98
@retry_delay.setter
99
def retry_delay(self, value: float): ...
100
101
@property
102
def socket_timeout(self) -> float: ...
103
104
@socket_timeout.setter
105
def socket_timeout(self, value: float): ...
106
107
@property
108
def stack_timeout(self) -> float: ...
109
110
@stack_timeout.setter
111
def stack_timeout(self, value: float): ...
112
113
@property
114
def locale(self) -> str: ...
115
116
@locale.setter
117
def locale(self, value: str): ...
118
119
@property
120
def blocked_connection_timeout(self) -> float: ...
121
122
@blocked_connection_timeout.setter
123
def blocked_connection_timeout(self, value: float): ...
124
125
@property
126
def client_properties(self) -> dict: ...
127
128
@client_properties.setter
129
def client_properties(self, value: dict): ...
130
131
@property
132
def tcp_options(self) -> dict: ...
133
134
@tcp_options.setter
135
def tcp_options(self, value: dict): ...
136
```
137
138
### URL-Based Parameters
139
140
Create connection parameters from AMQP URLs with automatic parsing of connection details.
141
142
```python { .api }
143
class URLParameters(ConnectionParameters):
144
"""Connection parameters from AMQP URL."""
145
146
def __init__(self, url):
147
"""
148
Create connection parameters from AMQP URL.
149
150
Parameters:
151
- url (str): AMQP URL in format: amqp://username:password@host:port/<virtual_host>[?query-string]
152
153
URL query parameters:
154
- channel_max: Maximum number of channels
155
- client_properties: Client properties as JSON
156
- connection_attempts: Number of connection attempts
157
- frame_max: Maximum frame size
158
- heartbeat: Heartbeat interval
159
- locale: Connection locale
160
- ssl_options: SSL options as JSON
161
- retry_delay: Retry delay in seconds
162
- socket_timeout: Socket timeout in seconds
163
- stack_timeout: Stack timeout in seconds
164
- blocked_connection_timeout: Blocked connection timeout
165
- tcp_options: TCP options as JSON
166
"""
167
```
168
169
### Blocking Connection
170
171
Synchronous connection adapter for simple blocking operations.
172
173
```python { .api }
174
class BlockingConnection:
175
"""Synchronous connection to RabbitMQ."""
176
177
def __init__(self, parameters):
178
"""
179
Create blocking connection.
180
181
Parameters:
182
- parameters (ConnectionParameters or URLParameters): Connection configuration
183
"""
184
185
def channel(self, channel_number=None):
186
"""
187
Create a new channel.
188
189
Parameters:
190
- channel_number (int, optional): Specific channel number to use
191
192
Returns:
193
- BlockingChannel: New channel instance
194
"""
195
196
def close(self):
197
"""Close the connection."""
198
199
def process_data_events(self, time_limit=0):
200
"""
201
Process pending data events.
202
203
Parameters:
204
- time_limit (float): Maximum time to process events (default: 0 for non-blocking)
205
"""
206
207
def sleep(self, duration):
208
"""
209
Sleep while processing connection events.
210
211
Parameters:
212
- duration (float): Sleep duration in seconds
213
"""
214
215
def add_callback_threadsafe(self, callback):
216
"""
217
Add callback to be executed in connection thread.
218
219
Parameters:
220
- callback (callable): Callback function to execute
221
"""
222
223
def call_later(self, delay, callback, *args):
224
"""
225
Schedule callback execution after delay.
226
227
Parameters:
228
- delay (float): Delay in seconds
229
- callback (callable): Callback function
230
- *args: Arguments for callback
231
232
Returns:
233
- object: Handle for canceling the call
234
"""
235
236
def remove_timeout(self, timeout_id):
237
"""
238
Remove scheduled timeout.
239
240
Parameters:
241
- timeout_id: Timeout handle to remove
242
"""
243
244
def update_secret(self, new_secret, reason):
245
"""
246
Update connection credentials.
247
248
Parameters:
249
- new_secret (str): New password/secret
250
- reason (str): Reason for update
251
"""
252
253
def add_on_connection_blocked_callback(self, callback):
254
"""
255
Add callback for connection blocked events.
256
257
Parameters:
258
- callback (callable): Callback function receiving (connection, method)
259
"""
260
261
def add_on_connection_unblocked_callback(self, callback):
262
"""
263
Add callback for connection unblocked events.
264
265
Parameters:
266
- callback (callable): Callback function receiving (connection, method)
267
"""
268
269
# Connection state properties
270
@property
271
def is_closed(self) -> bool:
272
"""True if connection is closed."""
273
274
@property
275
def is_open(self) -> bool:
276
"""True if connection is open."""
277
278
# Server capability properties
279
@property
280
def basic_nack_supported(self) -> bool:
281
"""True if server supports basic.nack."""
282
283
@property
284
def consumer_cancel_notify_supported(self) -> bool:
285
"""True if server supports consumer cancel notifications."""
286
287
@property
288
def exchange_exchange_bindings_supported(self) -> bool:
289
"""True if server supports exchange-to-exchange bindings."""
290
291
@property
292
def publisher_confirms_supported(self) -> bool:
293
"""True if server supports publisher confirms."""
294
```
295
296
### Select Connection
297
298
Event-driven connection adapter using select/poll/epoll for asynchronous operations.
299
300
```python { .api }
301
class SelectConnection:
302
"""Event-driven connection using select/poll/epoll."""
303
304
def __init__(self, parameters, on_open_callback=None, on_open_error_callback=None,
305
on_close_callback=None, ioloop=None):
306
"""
307
Create select-based connection.
308
309
Parameters:
310
- parameters (ConnectionParameters): Connection configuration
311
- on_open_callback (callable): Called when connection opens
312
- on_open_error_callback (callable): Called on connection error
313
- on_close_callback (callable): Called when connection closes
314
- ioloop (IOLoop): Event loop instance (creates default if None)
315
"""
316
317
def channel(self, on_open_callback, channel_number=None):
318
"""
319
Create a new channel asynchronously.
320
321
Parameters:
322
- on_open_callback (callable): Called when channel opens
323
- channel_number (int, optional): Specific channel number
324
"""
325
326
def close(self, reply_code=200, reply_text='Normal shutdown'):
327
"""
328
Close connection asynchronously.
329
330
Parameters:
331
- reply_code (int): AMQP reply code (default: 200)
332
- reply_text (str): Human-readable close reason
333
"""
334
335
def ioloop(self):
336
"""
337
Get the IOLoop instance.
338
339
Returns:
340
- IOLoop: Event loop instance
341
"""
342
```
343
344
### Connection Workflow
345
346
Default AMQP connection establishment workflow with retry and timeout handling.
347
348
```python { .api }
349
class AMQPConnectionWorkflow:
350
"""Default connection establishment workflow."""
351
352
def __init__(self, parameters, on_done_callback, nbio_interface):
353
"""
354
Create connection workflow.
355
356
Parameters:
357
- parameters (ConnectionParameters): Connection configuration
358
- on_done_callback (callable): Called when workflow completes
359
- nbio_interface: Non-blocking I/O interface
360
"""
361
362
def start(self):
363
"""Start the connection workflow."""
364
365
def abort(self):
366
"""Abort the connection workflow."""
367
```
368
369
## Usage Examples
370
371
### Basic Connection
372
373
```python
374
import pika
375
376
# Simple connection
377
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
378
channel = connection.channel()
379
380
# Use the channel...
381
382
connection.close()
383
```
384
385
### Connection with Authentication
386
387
```python
388
import pika
389
390
credentials = pika.PlainCredentials('username', 'password')
391
parameters = pika.ConnectionParameters(
392
host='rabbitmq.example.com',
393
port=5672,
394
virtual_host='/app',
395
credentials=credentials
396
)
397
398
connection = pika.BlockingConnection(parameters)
399
```
400
401
### URL-Based Connection
402
403
```python
404
import pika
405
406
# Connection from URL
407
url = 'amqp://user:pass@localhost:5672/%2F?heartbeat=300'
408
connection = pika.BlockingConnection(pika.URLParameters(url))
409
```
410
411
### Connection with SSL
412
413
```python
414
import pika
415
import ssl
416
417
context = ssl.create_default_context()
418
ssl_options = pika.SSLOptions(context, 'rabbitmq.example.com')
419
420
parameters = pika.ConnectionParameters(
421
host='rabbitmq.example.com',
422
port=5671,
423
ssl_options=ssl_options
424
)
425
426
connection = pika.BlockingConnection(parameters)
427
```
428
429
### Asynchronous Connection
430
431
```python
432
import pika
433
434
def on_open(connection):
435
print('Connection opened')
436
connection.channel(on_open_callback=on_channel_open)
437
438
def on_open_error(connection, error):
439
print(f'Connection failed: {error}')
440
441
def on_close(connection, reason):
442
print(f'Connection closed: {reason}')
443
444
def on_channel_open(channel):
445
print('Channel opened')
446
# Use channel...
447
448
parameters = pika.ConnectionParameters('localhost')
449
connection = pika.SelectConnection(
450
parameters,
451
on_open_callback=on_open,
452
on_open_error_callback=on_open_error,
453
on_close_callback=on_close
454
)
455
456
# Start the IOLoop
457
connection.ioloop.start()
458
```