CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-uamqp

An AMQP 1.0 client library for Python

Overview
Eval results
Files

connection-session.mddocs/

Connection and Session Management

Low-level AMQP protocol management including connection establishment, session management, and link creation for advanced messaging scenarios that require fine-grained control over AMQP protocol behavior.

Capabilities

Connection Management

Low-level AMQP connection management that handles the network connection, authentication, and protocol handshake.

class Connection:
    def __init__(self, hostname, sasl=None, container_id=None, max_frame_size=None,
                 channel_max=None, idle_timeout=None, properties=None,
                 remote_idle_timeout_empty_frame_send_ratio=None, debug=False,
                 encoding='UTF-8'):
        """
        AMQP connection management.

        Parameters:
        - hostname (str): AMQP broker hostname
        - sasl (AMQPAuth): SASL authentication mechanism
        - container_id (str): AMQP container identifier
        - max_frame_size (int): Maximum frame size in bytes
        - channel_max (int): Maximum number of channels/sessions
        - idle_timeout (int): Connection idle timeout in milliseconds
        - properties (dict): Connection properties
        - remote_idle_timeout_empty_frame_send_ratio (float): Empty frame ratio
        - debug (bool): Enable protocol debug logging
        - encoding (str): Character encoding
        """

Key Methods:

def open(self):
    """Open the AMQP connection."""

def close(self):
    """Close the AMQP connection."""

def work(self):
    """Process connection work (I/O and protocol handling)."""

def sleep(self, seconds):
    """Sleep while continuing to service the connection."""

def destroy(self):
    """Destroy the connection and free resources."""

Key Properties:

@property
def container_id: str
    """AMQP container identifier."""

@property
def hostname: str
    """Broker hostname."""

@property
def max_frame_size: int
    """Maximum frame size in bytes."""

@property
def remote_max_frame_size: int
    """Remote peer's maximum frame size."""

@property
def channel_max: int
    """Maximum number of channels."""

@property
def idle_timeout: int
    """Idle timeout in milliseconds."""

Usage Examples:

from uamqp import Connection
from uamqp.authentication import SASLPlain

# Basic connection
auth = SASLPlain("amqp.example.com", "user", "password")
connection = Connection("amqp.example.com", sasl=auth)

try:
    connection.open()
    print(f"Connected to {connection.hostname}")
    
    # Connection is ready for session creation
    # ... create sessions and links
    
    # Keep connection alive
    connection.work()
    
finally:
    connection.close()

# Connection with custom properties
properties = {
    'product': 'MyApp',
    'version': '1.0.0',
    'platform': 'Python'
}

connection = Connection(
    hostname="amqp.example.com",
    sasl=auth,
    container_id="my-app-container",
    max_frame_size=65536,
    channel_max=100,
    idle_timeout=60000,  # 60 seconds
    properties=properties,
    debug=True
)

Session Management

AMQP session management that provides logical grouping and flow control for links within a connection.

class Session:
    def __init__(self, connection, incoming_window=None, outgoing_window=None,
                 handle_max=None):
        """
        AMQP session management.

        Parameters:
        - connection (Connection): AMQP connection
        - incoming_window (int): Incoming transfer window size
        - outgoing_window (int): Outgoing transfer window size  
        - handle_max (int): Maximum link handles
        """

Key Methods:

def begin(self):
    """Begin the AMQP session."""

def end(self):
    """End the AMQP session."""

def destroy(self):
    """Destroy the session and free resources."""

Key Properties:

@property
def incoming_window: int
    """Incoming transfer window size."""

@property
def outgoing_window: int
    """Outgoing transfer window size."""

@property
def handle_max: int
    """Maximum number of link handles."""

@property
def connection: Connection
    """Associated AMQP connection."""

Usage Examples:

from uamqp import Connection, Session

# Create session on connection
connection = Connection("amqp.example.com", sasl=auth)
connection.open()

try:
    # Create session with flow control
    session = Session(
        connection=connection,
        incoming_window=1000,  # Allow 1000 incoming transfers
        outgoing_window=1000,  # Allow 1000 outgoing transfers
        handle_max=64          # Support up to 64 concurrent links
    )
    
    session.begin()
    print("Session started")
    
    # Session is ready for link creation
    # ... create senders and receivers
    
finally:
    session.end()
    connection.close()

# Multiple sessions on one connection
session1 = Session(connection, incoming_window=500)
session2 = Session(connection, incoming_window=500)

session1.begin()
session2.begin()

# Use sessions for different purposes
# session1 for sending, session2 for receiving

Advanced Connection Patterns

Connection Pooling

import threading
from queue import Queue

class ConnectionPool:
    def __init__(self, hostname, auth, pool_size=5):
        self.hostname = hostname
        self.auth = auth
        self.pool_size = pool_size
        self.connections = Queue(maxsize=pool_size)
        self.lock = threading.Lock()
        
        # Pre-create connections
        for _ in range(pool_size):
            conn = Connection(hostname, sasl=auth)
            conn.open()
            self.connections.put(conn)
    
    def get_connection(self):
        """Get a connection from the pool."""
        return self.connections.get()
    
    def return_connection(self, connection):
        """Return a connection to the pool."""
        if not self.connections.full():
            self.connections.put(connection)
    
    def close_all(self):
        """Close all connections in the pool."""
        while not self.connections.empty():
            conn = self.connections.get()
            conn.close()

# Usage
pool = ConnectionPool("amqp.example.com", auth, pool_size=10)

# Get connection from pool
connection = pool.get_connection()
try:
    session = Session(connection)
    session.begin()
    # Use session...
    session.end()
finally:
    pool.return_connection(connection)

Connection Monitoring

import time
import threading

class ConnectionMonitor:
    def __init__(self, connection, check_interval=30):
        self.connection = connection
        self.check_interval = check_interval
        self.running = False
        self.monitor_thread = None
    
    def start_monitoring(self):
        """Start connection health monitoring."""
        self.running = True
        self.monitor_thread = threading.Thread(target=self._monitor_loop)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
    
    def stop_monitoring(self):
        """Stop connection monitoring."""
        self.running = False
        if self.monitor_thread:
            self.monitor_thread.join()
    
    def _monitor_loop(self):
        """Monitor connection health."""
        while self.running:
            try:
                # Service the connection
                self.connection.work()
                
                # Check if connection is still alive
                if not self._is_connection_alive():
                    print("Connection lost, attempting reconnect...")
                    self._reconnect()
                
                time.sleep(self.check_interval)
                
            except Exception as e:
                print(f"Monitor error: {e}")
                time.sleep(1)
    
    def _is_connection_alive(self):
        """Check if connection is still alive."""
        try:
            # Simple check - could be enhanced
            return True  # Placeholder
        except:
            return False
    
    def _reconnect(self):
        """Attempt to reconnect."""
        try:
            self.connection.close()
            self.connection.open()
            print("Reconnection successful")
        except Exception as e:
            print(f"Reconnection failed: {e}")

# Usage
monitor = ConnectionMonitor(connection, check_interval=10)
monitor.start_monitoring()

# Use connection...

monitor.stop_monitoring()

Session Flow Control

Credit-Based Flow Control

from uamqp import Session, MessageReceiver

def create_controlled_receiver(session, source, credits=10):
    """Create receiver with manual credit management."""
    
    receiver = MessageReceiver(session, source)
    receiver.open()
    
    # Set initial credits
    receiver.flow(credits)
    
    return receiver

def process_with_flow_control(session, source):
    """Process messages with explicit flow control."""
    
    receiver = create_controlled_receiver(session, source, credits=5)
    
    try:
        processed = 0
        while processed < 100:  # Process 100 messages
            messages = receiver.receive_message_batch(5)
            
            for message in messages:
                # Process message
                print(f"Processing: {message.get_data()}")
                message.accept()
                processed += 1
            
            # Replenish credits after processing batch
            if len(messages) > 0:
                receiver.flow(len(messages))
        
    finally:
        receiver.close()

Window Management

def create_windowed_session(connection, window_size=1000):
    """Create session with specific window size."""
    
    session = Session(
        connection=connection,
        incoming_window=window_size,
        outgoing_window=window_size
    )
    
    session.begin()
    return session

def monitor_session_windows(session):
    """Monitor session window utilization."""
    
    while True:
        incoming_used = session.incoming_window - session.available_incoming
        outgoing_used = session.outgoing_window - session.available_outgoing
        
        print(f"Incoming window: {incoming_used}/{session.incoming_window}")
        print(f"Outgoing window: {outgoing_used}/{session.outgoing_window}")
        
        # Alert if windows are getting full
        if incoming_used > session.incoming_window * 0.8:
            print("Warning: Incoming window nearly full")
        
        if outgoing_used > session.outgoing_window * 0.8:
            print("Warning: Outgoing window nearly full")
        
        time.sleep(5)

Connection Recovery

Automatic Reconnection

import time
from uamqp.errors import AMQPConnectionError

class ReliableConnection:
    def __init__(self, hostname, auth, max_retries=5):
        self.hostname = hostname
        self.auth = auth
        self.max_retries = max_retries
        self.connection = None
        
    def connect(self):
        """Connect with automatic retry."""
        for attempt in range(self.max_retries):
            try:
                self.connection = Connection(self.hostname, sasl=self.auth)
                self.connection.open()
                print(f"Connected on attempt {attempt + 1}")
                return self.connection
                
            except AMQPConnectionError as e:
                print(f"Connection attempt {attempt + 1} failed: {e}")
                if attempt < self.max_retries - 1:
                    delay = 2 ** attempt  # Exponential backoff
                    time.sleep(delay)
                else:
                    raise
    
    def ensure_connected(self):
        """Ensure connection is active, reconnect if needed."""
        if not self.connection or not self._is_connected():
            print("Connection lost, reconnecting...")
            return self.connect()
        return self.connection
    
    def _is_connected(self):
        """Check if connection is still active."""
        try:
            # Simple connectivity check
            self.connection.work()
            return True
        except:
            return False

# Usage
reliable_conn = ReliableConnection("amqp.example.com", auth)
connection = reliable_conn.connect()

# Ensure connection before creating session
connection = reliable_conn.ensure_connected()
session = Session(connection)

Performance Optimization

Connection Tuning

# High-throughput connection configuration
connection = Connection(
    hostname="amqp.example.com",
    sasl=auth,
    max_frame_size=65536,      # Maximum frame size
    channel_max=1000,          # Many concurrent sessions
    idle_timeout=300000,       # 5 minute timeout
    debug=False                # Disable debug for performance
)

# Low-latency connection configuration  
connection = Connection(
    hostname="amqp.example.com",
    sasl=auth,
    max_frame_size=4096,       # Smaller frames for faster processing
    channel_max=10,            # Fewer sessions
    idle_timeout=30000,        # 30 second timeout
    debug=False
)

Session Optimization

# High-throughput session
session = Session(
    connection=connection,
    incoming_window=10000,     # Large window for batching
    outgoing_window=10000,
    handle_max=100             # Many concurrent links
)

# Low-latency session
session = Session(
    connection=connection, 
    incoming_window=1,         # Minimal window for immediate processing
    outgoing_window=1,
    handle_max=10              # Fewer links
)

Thread Safety

Connection and session objects are not thread-safe. Use proper synchronization when accessing from multiple threads:

import threading

# Thread-safe connection wrapper
class ThreadSafeConnection:
    def __init__(self, connection):
        self.connection = connection
        self.lock = threading.RLock()
    
    def work(self):
        with self.lock:
            return self.connection.work()
    
    def create_session(self, **kwargs):
        with self.lock:
            return Session(self.connection, **kwargs)

# Usage
safe_conn = ThreadSafeConnection(connection)

Install with Tessl CLI

npx tessl i tessl/pypi-uamqp

docs

address-endpoints.md

async-operations.md

authentication.md

client-apis.md

connection-session.md

error-handling.md

high-level-messaging.md

index.md

low-level-protocol.md

message-management.md

types-constants.md

tile.json