CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-uamqp

An AMQP 1.0 client library for Python

Overview
Eval results
Files

client-apis.mddocs/

Client APIs

High-level client classes for sending and receiving messages with built-in connection management, error handling, and batching capabilities that provide the easiest way to build AMQP messaging applications.

Capabilities

Base AMQP Client

Base client class that provides common functionality for all AMQP client operations.

class AMQPClient:
    def __init__(self, remote_address, auth=None, client_name=None, debug=False,
                 error_policy=None, keep_alive_interval=None, max_frame_size=None,
                 channel_max=None, idle_timeout=None, properties=None,
                 remote_idle_timeout_empty_frame_send_ratio=None, incoming_window=None,
                 outgoing_window=None, handle_max=None, on_attach=None, 
                 auto_complete=True, encoding='UTF-8', desired_capabilities=None,
                 max_message_size=None, link_properties=None, timeout=0, **kwargs):
        """
        Base AMQP client for connection and session management.

        Parameters:
        - remote_address (str): AMQP broker address
        - auth (AMQPAuth): Authentication credentials
        - client_name (str): Client identifier
        - debug (bool): Enable debug logging
        - error_policy (ErrorPolicy): Error handling policy
        - keep_alive_interval (int): Keep-alive interval in seconds
        - max_frame_size (int): Maximum frame size in bytes
        - channel_max (int): Maximum number of channels
        - idle_timeout (int): Connection idle timeout in seconds
        - properties (dict): Connection properties
        - incoming_window (int): Session incoming window size
        - outgoing_window (int): Session outgoing window size
        - handle_max (int): Maximum link handles
        - on_attach (callable): Link attach callback
        - auto_complete (bool): Auto-complete messages
        - encoding (str): Character encoding
        - desired_capabilities (list): Desired connection capabilities
        - max_message_size (int): Maximum message size
        - link_properties (dict): Link properties
        - timeout (int): Operation timeout in seconds
        """

Key Methods:

def open(self):
    """Open the client connection and session."""

def close(self):
    """Close the client connection."""

def __enter__(self):
    """Context manager entry."""

def __exit__(self, exc_type, exc_val, exc_tb):
    """Context manager exit."""

def mgmt_request(self, message, operation, op_type=None, node=None, callback=None, **kwargs):
    """
    Send a management request.
    
    Parameters:
    - message (Message): Request message
    - operation (str): Management operation name
    - op_type (str): Operation type
    - node (str): Target node name
    - callback (callable): Response callback function
    
    Returns:
        Message: Response message
    """

def auth_complete(self):
    """
    Check if authentication is complete.
    
    Returns:
        bool: True if authentication is complete
    """

def client_ready(self):
    """
    Check if client is ready for operations.
    
    Returns:
        bool: True if client is ready
    """

def do_work(self, **kwargs):
    """Perform client work iteration."""

Usage Example:

from uamqp.client import AMQPClient
from uamqp.authentication import SASLPlain

auth = SASLPlain("amqp.example.com", "user", "password")

# Using context manager (recommended)
with AMQPClient("amqps://amqp.example.com", auth=auth) as client:
    # Client operations here
    pass

# Manual open/close
client = AMQPClient("amqps://amqp.example.com", auth=auth)
client.open()
try:
    # Client operations here
    pass
finally:
    client.close()

Send Client

High-level client for sending messages with automatic connection management and batch sending capabilities.

class SendClient(AMQPClient):
    def __init__(self, target, auth=None, client_name=None, debug=False,
                 msg_timeout=0, error_policy=None, keep_alive_interval=None,
                 send_settle_mode=None, auto_complete=True, encoding='UTF-8',
                 **kwargs):
        """
        High-level client for sending AMQP messages.

        Parameters:
        - target (str or Target): Target endpoint for messages
        - auth (AMQPAuth): Authentication credentials
        - client_name (str): Client identifier
        - debug (bool): Enable debug logging
        - msg_timeout (int): Message send timeout in seconds
        - error_policy (ErrorPolicy): Error handling policy
        - keep_alive_interval (int): Keep-alive interval
        - send_settle_mode (SenderSettleMode): Message settlement mode
        - auto_complete (bool): Auto-complete sent messages
        - encoding (str): Character encoding
        """

Key Methods:

def queue_message(self, message):
    """
    Queue a message for sending.

    Parameters:
    - message (Message): Message to queue for sending
    """

def send_all_messages(self, close_on_done=True):
    """
    Send all queued messages.

    Parameters:
    - close_on_done (bool): Whether to close connection after sending

    Returns:
    list[MessageState]: List of send results for each message
    """

def send_message_batch(self, messages, close_on_done=True):
    """
    Send a batch of messages.

    Parameters:
    - messages (list[Message]): Messages to send
    - close_on_done (bool): Whether to close connection after sending

    Returns:
    list[MessageState]: List of send results for each message
    """

Usage Examples:

from uamqp import SendClient, Message
from uamqp.authentication import SASTokenAuth

# Single message sending
target = "amqps://mynamespace.servicebus.windows.net/myqueue"
auth = SASTokenAuth("mynamespace.servicebus.windows.net", token=sas_token)

with SendClient(target, auth=auth) as client:
    message = Message("Hello World")
    client.queue_message(message)
    results = client.send_all_messages()
    print(f"Send results: {results}")

# Batch message sending
messages = [
    Message("Message 1"),
    Message("Message 2"), 
    Message("Message 3")
]

with SendClient(target, auth=auth) as client:
    results = client.send_message_batch(messages)
    print(f"Sent {len(results)} messages")

# Queue multiple messages individually
with SendClient(target, auth=auth) as client:
    for i in range(10):
        client.queue_message(Message(f"Message {i}"))
    results = client.send_all_messages()

Receive Client

High-level client for receiving messages with automatic connection management and batch receiving capabilities.

class ReceiveClient(AMQPClient):
    def __init__(self, source, auth=None, client_name=None, debug=False,
                 prefetch=300, auto_complete=True, error_policy=None,
                 keep_alive_interval=None, receive_settle_mode=None,
                 encoding='UTF-8', **kwargs):
        """
        High-level client for receiving AMQP messages.

        Parameters:
        - source (str or Source): Source endpoint for messages
        - auth (AMQPAuth): Authentication credentials
        - client_name (str): Client identifier
        - debug (bool): Enable debug logging
        - prefetch (int): Number of messages to prefetch
        - auto_complete (bool): Auto-complete received messages
        - error_policy (ErrorPolicy): Error handling policy
        - keep_alive_interval (int): Keep-alive interval
        - receive_settle_mode (ReceiverSettleMode): Message settlement mode
        - encoding (str): Character encoding
        """

Key Methods:

def receive_message_batch(self, max_batch_size=None, timeout=0):
    """
    Receive a batch of messages.

    Parameters:
    - max_batch_size (int): Maximum messages to receive (default: prefetch size)
    - timeout (int): Timeout in milliseconds

    Returns:
    list[Message]: Received messages
    """

def receive_messages(self, timeout=0):
    """
    Receive messages with iterator-like behavior.

    Parameters:
    - timeout (int): Timeout in milliseconds

    Returns:
    generator: Message iterator
    """

Usage Examples:

from uamqp import ReceiveClient
from uamqp.authentication import SASLPlain

source = "amqps://amqp.example.com/myqueue"
auth = SASLPlain("amqp.example.com", "user", "password")

# Batch message receiving
with ReceiveClient(source, auth=auth, prefetch=50) as client:
    messages = client.receive_message_batch(max_batch_size=10, timeout=30000)
    print(f"Received {len(messages)} messages")
    
    for message in messages:
        print(f"Message: {message.get_data()}")
        message.accept()  # Acknowledge message

# Continuous message receiving
with ReceiveClient(source, auth=auth) as client:
    for message in client.receive_messages():
        try:
            data = message.get_data()
            print(f"Processing: {data}")
            # Process message here
            message.accept()
        except Exception as e:
            print(f"Error processing message: {e}")
            message.reject()
        
        # Break after processing 100 messages
        if processed_count >= 100:
            break

# Receive with custom settlement mode
from uamqp.constants import ReceiverSettleMode

with ReceiveClient(source, auth=auth, 
                  receive_settle_mode=ReceiverSettleMode.PeekLock) as client:
    messages = client.receive_message_batch(timeout=10000)
    for message in messages:
        # Message won't be removed from queue until explicitly settled
        if process_message(message.get_data()):
            message.accept()  # Remove from queue
        else:
            message.release()  # Return to queue for retry

Configuration Options

Error Policy

Configure how clients handle errors and retries:

from uamqp.errors import ErrorPolicy, ErrorAction

# Custom error policy with retries
error_policy = ErrorPolicy(
    max_retries=3,
    on_error=ErrorAction(retry=True, backoff=2.0)
)

client = SendClient(target, auth=auth, error_policy=error_policy)

Settlement Modes

Control message acknowledgment behavior:

from uamqp.constants import SenderSettleMode, ReceiverSettleMode

# Send client settlement
send_client = SendClient(
    target, 
    auth=auth,
    send_settle_mode=SenderSettleMode.Settled  # Fire-and-forget
)

# Receive client settlement  
receive_client = ReceiveClient(
    source,
    auth=auth, 
    receive_settle_mode=ReceiverSettleMode.PeekLock  # Manual acknowledgment
)

Connection Properties

Set custom connection properties:

properties = {
    'connection-name': 'MyApp-v1.0',
    'product': 'MyApplication',
    'platform': 'Python'
}

client = SendClient(target, auth=auth, properties=properties)

Performance Tuning

Prefetch Settings

Optimize message throughput:

# High throughput receiving
client = ReceiveClient(
    source, 
    auth=auth,
    prefetch=1000,  # Prefetch more messages
    max_frame_size=65536  # Larger frames
)

# Low latency receiving
client = ReceiveClient(
    source,
    auth=auth, 
    prefetch=1,  # Minimal prefetch
    auto_complete=False  # Manual acknowledgment
)

Batch Operations

Use batch operations for better performance:

# Batch sending (more efficient than individual sends)
messages = [Message(f"Batch message {i}") for i in range(100)]
with SendClient(target, auth=auth) as client:
    results = client.send_message_batch(messages)

Management Operations

AMQP management request/response operations for interacting with broker management interfaces.

class MgmtOperation:
    def __init__(self, session, target=None, debug=False,
                 status_code_field=b'statusCode', 
                 description_fields=b'statusDescription',
                 encoding='UTF-8'):
        """
        AMQP management operation client.
        
        Parameters:
        - session (Session): AMQP session for the operation
        - target (bytes or str): Target node name (default: b"$management")
        - debug (bool): Enable debug logging
        - status_code_field (bytes): Response status code field name
        - description_fields (bytes): Response description field name
        - encoding (str): Character encoding
        """

def execute(self, operation, op_type, message, timeout=0):
    """
    Execute a management operation.
    
    Parameters:
    - operation (str): Operation name
    - op_type (str): Operation type
    - message (Message): Request message
    - timeout (int): Operation timeout in milliseconds
    
    Returns:
        Message: Response message
    """

Usage Example:

from uamqp import Session, MgmtOperation, Message

# Create management operation
with Session(connection) as session:
    mgmt_op = MgmtOperation(session, target=b"$management")
    
    # Execute management request
    request = Message({"operation": "read-queue-info"})
    response = mgmt_op.execute("read", "queue", request, timeout=30000)
    
    print(f"Response: {response.get_data()}")

Common Client Errors

Client operations may raise these exceptions:

  • AMQPConnectionError: Connection to broker failed
  • MessageSendFailed: Message sending failed
  • ClientTimeout: Operation timed out
  • AMQPClientShutdown: Client was shut down
  • LinkDetach: Link was detached by broker

Install with Tessl CLI

npx tessl i tessl/pypi-uamqp

docs

address-endpoints.md

async-operations.md

authentication.md

client-apis.md

connection-session.md

error-handling.md

high-level-messaging.md

index.md

low-level-protocol.md

message-management.md

types-constants.md

tile.json