An AMQP 1.0 client library for Python
Low-level AMQP protocol management including connection establishment, session management, and link creation for advanced messaging scenarios that require fine-grained control over AMQP protocol behavior.
Low-level AMQP connection management that handles the network connection, authentication, and protocol handshake.
class Connection:
def __init__(self, hostname, sasl=None, container_id=None, max_frame_size=None,
channel_max=None, idle_timeout=None, properties=None,
remote_idle_timeout_empty_frame_send_ratio=None, debug=False,
encoding='UTF-8'):
"""
AMQP connection management.
Parameters:
- hostname (str): AMQP broker hostname
- sasl (AMQPAuth): SASL authentication mechanism
- container_id (str): AMQP container identifier
- max_frame_size (int): Maximum frame size in bytes
- channel_max (int): Maximum number of channels/sessions
- idle_timeout (int): Connection idle timeout in milliseconds
- properties (dict): Connection properties
- remote_idle_timeout_empty_frame_send_ratio (float): Empty frame ratio
- debug (bool): Enable protocol debug logging
- encoding (str): Character encoding
"""Key Methods:
def open(self):
"""Open the AMQP connection."""
def close(self):
"""Close the AMQP connection."""
def work(self):
"""Process connection work (I/O and protocol handling)."""
def sleep(self, seconds):
"""Sleep while continuing to service the connection."""
def destroy(self):
"""Destroy the connection and free resources."""Key Properties:
@property
def container_id: str
"""AMQP container identifier."""
@property
def hostname: str
"""Broker hostname."""
@property
def max_frame_size: int
"""Maximum frame size in bytes."""
@property
def remote_max_frame_size: int
"""Remote peer's maximum frame size."""
@property
def channel_max: int
"""Maximum number of channels."""
@property
def idle_timeout: int
"""Idle timeout in milliseconds."""Usage Examples:
from uamqp import Connection
from uamqp.authentication import SASLPlain
# Basic connection
auth = SASLPlain("amqp.example.com", "user", "password")
connection = Connection("amqp.example.com", sasl=auth)
try:
connection.open()
print(f"Connected to {connection.hostname}")
# Connection is ready for session creation
# ... create sessions and links
# Keep connection alive
connection.work()
finally:
connection.close()
# Connection with custom properties
properties = {
'product': 'MyApp',
'version': '1.0.0',
'platform': 'Python'
}
connection = Connection(
hostname="amqp.example.com",
sasl=auth,
container_id="my-app-container",
max_frame_size=65536,
channel_max=100,
idle_timeout=60000, # 60 seconds
properties=properties,
debug=True
)AMQP session management that provides logical grouping and flow control for links within a connection.
class Session:
def __init__(self, connection, incoming_window=None, outgoing_window=None,
handle_max=None):
"""
AMQP session management.
Parameters:
- connection (Connection): AMQP connection
- incoming_window (int): Incoming transfer window size
- outgoing_window (int): Outgoing transfer window size
- handle_max (int): Maximum link handles
"""Key Methods:
def begin(self):
"""Begin the AMQP session."""
def end(self):
"""End the AMQP session."""
def destroy(self):
"""Destroy the session and free resources."""Key Properties:
@property
def incoming_window: int
"""Incoming transfer window size."""
@property
def outgoing_window: int
"""Outgoing transfer window size."""
@property
def handle_max: int
"""Maximum number of link handles."""
@property
def connection: Connection
"""Associated AMQP connection."""Usage Examples:
from uamqp import Connection, Session
# Create session on connection
connection = Connection("amqp.example.com", sasl=auth)
connection.open()
try:
# Create session with flow control
session = Session(
connection=connection,
incoming_window=1000, # Allow 1000 incoming transfers
outgoing_window=1000, # Allow 1000 outgoing transfers
handle_max=64 # Support up to 64 concurrent links
)
session.begin()
print("Session started")
# Session is ready for link creation
# ... create senders and receivers
finally:
session.end()
connection.close()
# Multiple sessions on one connection
session1 = Session(connection, incoming_window=500)
session2 = Session(connection, incoming_window=500)
session1.begin()
session2.begin()
# Use sessions for different purposes
# session1 for sending, session2 for receivingimport threading
from queue import Queue
class ConnectionPool:
def __init__(self, hostname, auth, pool_size=5):
self.hostname = hostname
self.auth = auth
self.pool_size = pool_size
self.connections = Queue(maxsize=pool_size)
self.lock = threading.Lock()
# Pre-create connections
for _ in range(pool_size):
conn = Connection(hostname, sasl=auth)
conn.open()
self.connections.put(conn)
def get_connection(self):
"""Get a connection from the pool."""
return self.connections.get()
def return_connection(self, connection):
"""Return a connection to the pool."""
if not self.connections.full():
self.connections.put(connection)
def close_all(self):
"""Close all connections in the pool."""
while not self.connections.empty():
conn = self.connections.get()
conn.close()
# Usage
pool = ConnectionPool("amqp.example.com", auth, pool_size=10)
# Get connection from pool
connection = pool.get_connection()
try:
session = Session(connection)
session.begin()
# Use session...
session.end()
finally:
pool.return_connection(connection)import time
import threading
class ConnectionMonitor:
def __init__(self, connection, check_interval=30):
self.connection = connection
self.check_interval = check_interval
self.running = False
self.monitor_thread = None
def start_monitoring(self):
"""Start connection health monitoring."""
self.running = True
self.monitor_thread = threading.Thread(target=self._monitor_loop)
self.monitor_thread.daemon = True
self.monitor_thread.start()
def stop_monitoring(self):
"""Stop connection monitoring."""
self.running = False
if self.monitor_thread:
self.monitor_thread.join()
def _monitor_loop(self):
"""Monitor connection health."""
while self.running:
try:
# Service the connection
self.connection.work()
# Check if connection is still alive
if not self._is_connection_alive():
print("Connection lost, attempting reconnect...")
self._reconnect()
time.sleep(self.check_interval)
except Exception as e:
print(f"Monitor error: {e}")
time.sleep(1)
def _is_connection_alive(self):
"""Check if connection is still alive."""
try:
# Simple check - could be enhanced
return True # Placeholder
except:
return False
def _reconnect(self):
"""Attempt to reconnect."""
try:
self.connection.close()
self.connection.open()
print("Reconnection successful")
except Exception as e:
print(f"Reconnection failed: {e}")
# Usage
monitor = ConnectionMonitor(connection, check_interval=10)
monitor.start_monitoring()
# Use connection...
monitor.stop_monitoring()from uamqp import Session, MessageReceiver
def create_controlled_receiver(session, source, credits=10):
"""Create receiver with manual credit management."""
receiver = MessageReceiver(session, source)
receiver.open()
# Set initial credits
receiver.flow(credits)
return receiver
def process_with_flow_control(session, source):
"""Process messages with explicit flow control."""
receiver = create_controlled_receiver(session, source, credits=5)
try:
processed = 0
while processed < 100: # Process 100 messages
messages = receiver.receive_message_batch(5)
for message in messages:
# Process message
print(f"Processing: {message.get_data()}")
message.accept()
processed += 1
# Replenish credits after processing batch
if len(messages) > 0:
receiver.flow(len(messages))
finally:
receiver.close()def create_windowed_session(connection, window_size=1000):
"""Create session with specific window size."""
session = Session(
connection=connection,
incoming_window=window_size,
outgoing_window=window_size
)
session.begin()
return session
def monitor_session_windows(session):
"""Monitor session window utilization."""
while True:
incoming_used = session.incoming_window - session.available_incoming
outgoing_used = session.outgoing_window - session.available_outgoing
print(f"Incoming window: {incoming_used}/{session.incoming_window}")
print(f"Outgoing window: {outgoing_used}/{session.outgoing_window}")
# Alert if windows are getting full
if incoming_used > session.incoming_window * 0.8:
print("Warning: Incoming window nearly full")
if outgoing_used > session.outgoing_window * 0.8:
print("Warning: Outgoing window nearly full")
time.sleep(5)import time
from uamqp.errors import AMQPConnectionError
class ReliableConnection:
def __init__(self, hostname, auth, max_retries=5):
self.hostname = hostname
self.auth = auth
self.max_retries = max_retries
self.connection = None
def connect(self):
"""Connect with automatic retry."""
for attempt in range(self.max_retries):
try:
self.connection = Connection(self.hostname, sasl=self.auth)
self.connection.open()
print(f"Connected on attempt {attempt + 1}")
return self.connection
except AMQPConnectionError as e:
print(f"Connection attempt {attempt + 1} failed: {e}")
if attempt < self.max_retries - 1:
delay = 2 ** attempt # Exponential backoff
time.sleep(delay)
else:
raise
def ensure_connected(self):
"""Ensure connection is active, reconnect if needed."""
if not self.connection or not self._is_connected():
print("Connection lost, reconnecting...")
return self.connect()
return self.connection
def _is_connected(self):
"""Check if connection is still active."""
try:
# Simple connectivity check
self.connection.work()
return True
except:
return False
# Usage
reliable_conn = ReliableConnection("amqp.example.com", auth)
connection = reliable_conn.connect()
# Ensure connection before creating session
connection = reliable_conn.ensure_connected()
session = Session(connection)# High-throughput connection configuration
connection = Connection(
hostname="amqp.example.com",
sasl=auth,
max_frame_size=65536, # Maximum frame size
channel_max=1000, # Many concurrent sessions
idle_timeout=300000, # 5 minute timeout
debug=False # Disable debug for performance
)
# Low-latency connection configuration
connection = Connection(
hostname="amqp.example.com",
sasl=auth,
max_frame_size=4096, # Smaller frames for faster processing
channel_max=10, # Fewer sessions
idle_timeout=30000, # 30 second timeout
debug=False
)# High-throughput session
session = Session(
connection=connection,
incoming_window=10000, # Large window for batching
outgoing_window=10000,
handle_max=100 # Many concurrent links
)
# Low-latency session
session = Session(
connection=connection,
incoming_window=1, # Minimal window for immediate processing
outgoing_window=1,
handle_max=10 # Fewer links
)Connection and session objects are not thread-safe. Use proper synchronization when accessing from multiple threads:
import threading
# Thread-safe connection wrapper
class ThreadSafeConnection:
def __init__(self, connection):
self.connection = connection
self.lock = threading.RLock()
def work(self):
with self.lock:
return self.connection.work()
def create_session(self, **kwargs):
with self.lock:
return Session(self.connection, **kwargs)
# Usage
safe_conn = ThreadSafeConnection(connection)Install with Tessl CLI
npx tessl i tessl/pypi-uamqp