0
# Connection and Session Management
1
2
Low-level AMQP protocol management including connection establishment, session management, and link creation for advanced messaging scenarios that require fine-grained control over AMQP protocol behavior.
3
4
## Capabilities
5
6
### Connection Management
7
8
Low-level AMQP connection management that handles the network connection, authentication, and protocol handshake.
9
10
```python { .api }
11
class Connection:
12
def __init__(self, hostname, sasl=None, container_id=None, max_frame_size=None,
13
channel_max=None, idle_timeout=None, properties=None,
14
remote_idle_timeout_empty_frame_send_ratio=None, debug=False,
15
encoding='UTF-8'):
16
"""
17
AMQP connection management.
18
19
Parameters:
20
- hostname (str): AMQP broker hostname
21
- sasl (AMQPAuth): SASL authentication mechanism
22
- container_id (str): AMQP container identifier
23
- max_frame_size (int): Maximum frame size in bytes
24
- channel_max (int): Maximum number of channels/sessions
25
- idle_timeout (int): Connection idle timeout in milliseconds
26
- properties (dict): Connection properties
27
- remote_idle_timeout_empty_frame_send_ratio (float): Empty frame ratio
28
- debug (bool): Enable protocol debug logging
29
- encoding (str): Character encoding
30
"""
31
```
32
33
**Key Methods:**
34
35
```python { .api }
36
def open(self):
37
"""Open the AMQP connection."""
38
39
def close(self):
40
"""Close the AMQP connection."""
41
42
def work(self):
43
"""Process connection work (I/O and protocol handling)."""
44
45
def sleep(self, seconds):
46
"""Sleep while continuing to service the connection."""
47
48
def destroy(self):
49
"""Destroy the connection and free resources."""
50
```
51
52
**Key Properties:**
53
54
```python { .api }
55
@property
56
def container_id: str
57
"""AMQP container identifier."""
58
59
@property
60
def hostname: str
61
"""Broker hostname."""
62
63
@property
64
def max_frame_size: int
65
"""Maximum frame size in bytes."""
66
67
@property
68
def remote_max_frame_size: int
69
"""Remote peer's maximum frame size."""
70
71
@property
72
def channel_max: int
73
"""Maximum number of channels."""
74
75
@property
76
def idle_timeout: int
77
"""Idle timeout in milliseconds."""
78
```
79
80
**Usage Examples:**
81
82
```python
83
from uamqp import Connection
84
from uamqp.authentication import SASLPlain
85
86
# Basic connection
87
auth = SASLPlain("amqp.example.com", "user", "password")
88
connection = Connection("amqp.example.com", sasl=auth)
89
90
try:
91
connection.open()
92
print(f"Connected to {connection.hostname}")
93
94
# Connection is ready for session creation
95
# ... create sessions and links
96
97
# Keep connection alive
98
connection.work()
99
100
finally:
101
connection.close()
102
103
# Connection with custom properties
104
properties = {
105
'product': 'MyApp',
106
'version': '1.0.0',
107
'platform': 'Python'
108
}
109
110
connection = Connection(
111
hostname="amqp.example.com",
112
sasl=auth,
113
container_id="my-app-container",
114
max_frame_size=65536,
115
channel_max=100,
116
idle_timeout=60000, # 60 seconds
117
properties=properties,
118
debug=True
119
)
120
```
121
122
### Session Management
123
124
AMQP session management that provides logical grouping and flow control for links within a connection.
125
126
```python { .api }
127
class Session:
128
def __init__(self, connection, incoming_window=None, outgoing_window=None,
129
handle_max=None):
130
"""
131
AMQP session management.
132
133
Parameters:
134
- connection (Connection): AMQP connection
135
- incoming_window (int): Incoming transfer window size
136
- outgoing_window (int): Outgoing transfer window size
137
- handle_max (int): Maximum link handles
138
"""
139
```
140
141
**Key Methods:**
142
143
```python { .api }
144
def begin(self):
145
"""Begin the AMQP session."""
146
147
def end(self):
148
"""End the AMQP session."""
149
150
def destroy(self):
151
"""Destroy the session and free resources."""
152
```
153
154
**Key Properties:**
155
156
```python { .api }
157
@property
158
def incoming_window: int
159
"""Incoming transfer window size."""
160
161
@property
162
def outgoing_window: int
163
"""Outgoing transfer window size."""
164
165
@property
166
def handle_max: int
167
"""Maximum number of link handles."""
168
169
@property
170
def connection: Connection
171
"""Associated AMQP connection."""
172
```
173
174
**Usage Examples:**
175
176
```python
177
from uamqp import Connection, Session
178
179
# Create session on connection
180
connection = Connection("amqp.example.com", sasl=auth)
181
connection.open()
182
183
try:
184
# Create session with flow control
185
session = Session(
186
connection=connection,
187
incoming_window=1000, # Allow 1000 incoming transfers
188
outgoing_window=1000, # Allow 1000 outgoing transfers
189
handle_max=64 # Support up to 64 concurrent links
190
)
191
192
session.begin()
193
print("Session started")
194
195
# Session is ready for link creation
196
# ... create senders and receivers
197
198
finally:
199
session.end()
200
connection.close()
201
202
# Multiple sessions on one connection
203
session1 = Session(connection, incoming_window=500)
204
session2 = Session(connection, incoming_window=500)
205
206
session1.begin()
207
session2.begin()
208
209
# Use sessions for different purposes
210
# session1 for sending, session2 for receiving
211
```
212
213
### Advanced Connection Patterns
214
215
#### Connection Pooling
216
217
```python
218
import threading
219
from queue import Queue
220
221
class ConnectionPool:
222
def __init__(self, hostname, auth, pool_size=5):
223
self.hostname = hostname
224
self.auth = auth
225
self.pool_size = pool_size
226
self.connections = Queue(maxsize=pool_size)
227
self.lock = threading.Lock()
228
229
# Pre-create connections
230
for _ in range(pool_size):
231
conn = Connection(hostname, sasl=auth)
232
conn.open()
233
self.connections.put(conn)
234
235
def get_connection(self):
236
"""Get a connection from the pool."""
237
return self.connections.get()
238
239
def return_connection(self, connection):
240
"""Return a connection to the pool."""
241
if not self.connections.full():
242
self.connections.put(connection)
243
244
def close_all(self):
245
"""Close all connections in the pool."""
246
while not self.connections.empty():
247
conn = self.connections.get()
248
conn.close()
249
250
# Usage
251
pool = ConnectionPool("amqp.example.com", auth, pool_size=10)
252
253
# Get connection from pool
254
connection = pool.get_connection()
255
try:
256
session = Session(connection)
257
session.begin()
258
# Use session...
259
session.end()
260
finally:
261
pool.return_connection(connection)
262
```
263
264
#### Connection Monitoring
265
266
```python
267
import time
268
import threading
269
270
class ConnectionMonitor:
271
def __init__(self, connection, check_interval=30):
272
self.connection = connection
273
self.check_interval = check_interval
274
self.running = False
275
self.monitor_thread = None
276
277
def start_monitoring(self):
278
"""Start connection health monitoring."""
279
self.running = True
280
self.monitor_thread = threading.Thread(target=self._monitor_loop)
281
self.monitor_thread.daemon = True
282
self.monitor_thread.start()
283
284
def stop_monitoring(self):
285
"""Stop connection monitoring."""
286
self.running = False
287
if self.monitor_thread:
288
self.monitor_thread.join()
289
290
def _monitor_loop(self):
291
"""Monitor connection health."""
292
while self.running:
293
try:
294
# Service the connection
295
self.connection.work()
296
297
# Check if connection is still alive
298
if not self._is_connection_alive():
299
print("Connection lost, attempting reconnect...")
300
self._reconnect()
301
302
time.sleep(self.check_interval)
303
304
except Exception as e:
305
print(f"Monitor error: {e}")
306
time.sleep(1)
307
308
def _is_connection_alive(self):
309
"""Check if connection is still alive."""
310
try:
311
# Simple check - could be enhanced
312
return True # Placeholder
313
except:
314
return False
315
316
def _reconnect(self):
317
"""Attempt to reconnect."""
318
try:
319
self.connection.close()
320
self.connection.open()
321
print("Reconnection successful")
322
except Exception as e:
323
print(f"Reconnection failed: {e}")
324
325
# Usage
326
monitor = ConnectionMonitor(connection, check_interval=10)
327
monitor.start_monitoring()
328
329
# Use connection...
330
331
monitor.stop_monitoring()
332
```
333
334
### Session Flow Control
335
336
#### Credit-Based Flow Control
337
338
```python
339
from uamqp import Session, MessageReceiver
340
341
def create_controlled_receiver(session, source, credits=10):
342
"""Create receiver with manual credit management."""
343
344
receiver = MessageReceiver(session, source)
345
receiver.open()
346
347
# Set initial credits
348
receiver.flow(credits)
349
350
return receiver
351
352
def process_with_flow_control(session, source):
353
"""Process messages with explicit flow control."""
354
355
receiver = create_controlled_receiver(session, source, credits=5)
356
357
try:
358
processed = 0
359
while processed < 100: # Process 100 messages
360
messages = receiver.receive_message_batch(5)
361
362
for message in messages:
363
# Process message
364
print(f"Processing: {message.get_data()}")
365
message.accept()
366
processed += 1
367
368
# Replenish credits after processing batch
369
if len(messages) > 0:
370
receiver.flow(len(messages))
371
372
finally:
373
receiver.close()
374
```
375
376
#### Window Management
377
378
```python
379
def create_windowed_session(connection, window_size=1000):
380
"""Create session with specific window size."""
381
382
session = Session(
383
connection=connection,
384
incoming_window=window_size,
385
outgoing_window=window_size
386
)
387
388
session.begin()
389
return session
390
391
def monitor_session_windows(session):
392
"""Monitor session window utilization."""
393
394
while True:
395
incoming_used = session.incoming_window - session.available_incoming
396
outgoing_used = session.outgoing_window - session.available_outgoing
397
398
print(f"Incoming window: {incoming_used}/{session.incoming_window}")
399
print(f"Outgoing window: {outgoing_used}/{session.outgoing_window}")
400
401
# Alert if windows are getting full
402
if incoming_used > session.incoming_window * 0.8:
403
print("Warning: Incoming window nearly full")
404
405
if outgoing_used > session.outgoing_window * 0.8:
406
print("Warning: Outgoing window nearly full")
407
408
time.sleep(5)
409
```
410
411
### Connection Recovery
412
413
#### Automatic Reconnection
414
415
```python
416
import time
417
from uamqp.errors import AMQPConnectionError
418
419
class ReliableConnection:
420
def __init__(self, hostname, auth, max_retries=5):
421
self.hostname = hostname
422
self.auth = auth
423
self.max_retries = max_retries
424
self.connection = None
425
426
def connect(self):
427
"""Connect with automatic retry."""
428
for attempt in range(self.max_retries):
429
try:
430
self.connection = Connection(self.hostname, sasl=self.auth)
431
self.connection.open()
432
print(f"Connected on attempt {attempt + 1}")
433
return self.connection
434
435
except AMQPConnectionError as e:
436
print(f"Connection attempt {attempt + 1} failed: {e}")
437
if attempt < self.max_retries - 1:
438
delay = 2 ** attempt # Exponential backoff
439
time.sleep(delay)
440
else:
441
raise
442
443
def ensure_connected(self):
444
"""Ensure connection is active, reconnect if needed."""
445
if not self.connection or not self._is_connected():
446
print("Connection lost, reconnecting...")
447
return self.connect()
448
return self.connection
449
450
def _is_connected(self):
451
"""Check if connection is still active."""
452
try:
453
# Simple connectivity check
454
self.connection.work()
455
return True
456
except:
457
return False
458
459
# Usage
460
reliable_conn = ReliableConnection("amqp.example.com", auth)
461
connection = reliable_conn.connect()
462
463
# Ensure connection before creating session
464
connection = reliable_conn.ensure_connected()
465
session = Session(connection)
466
```
467
468
## Performance Optimization
469
470
### Connection Tuning
471
472
```python
473
# High-throughput connection configuration
474
connection = Connection(
475
hostname="amqp.example.com",
476
sasl=auth,
477
max_frame_size=65536, # Maximum frame size
478
channel_max=1000, # Many concurrent sessions
479
idle_timeout=300000, # 5 minute timeout
480
debug=False # Disable debug for performance
481
)
482
483
# Low-latency connection configuration
484
connection = Connection(
485
hostname="amqp.example.com",
486
sasl=auth,
487
max_frame_size=4096, # Smaller frames for faster processing
488
channel_max=10, # Fewer sessions
489
idle_timeout=30000, # 30 second timeout
490
debug=False
491
)
492
```
493
494
### Session Optimization
495
496
```python
497
# High-throughput session
498
session = Session(
499
connection=connection,
500
incoming_window=10000, # Large window for batching
501
outgoing_window=10000,
502
handle_max=100 # Many concurrent links
503
)
504
505
# Low-latency session
506
session = Session(
507
connection=connection,
508
incoming_window=1, # Minimal window for immediate processing
509
outgoing_window=1,
510
handle_max=10 # Fewer links
511
)
512
```
513
514
## Thread Safety
515
516
Connection and session objects are not thread-safe. Use proper synchronization when accessing from multiple threads:
517
518
```python
519
import threading
520
521
# Thread-safe connection wrapper
522
class ThreadSafeConnection:
523
def __init__(self, connection):
524
self.connection = connection
525
self.lock = threading.RLock()
526
527
def work(self):
528
with self.lock:
529
return self.connection.work()
530
531
def create_session(self, **kwargs):
532
with self.lock:
533
return Session(self.connection, **kwargs)
534
535
# Usage
536
safe_conn = ThreadSafeConnection(connection)
537
```