An AMQP 1.0 client library for Python
High-level client classes for sending and receiving messages with built-in connection management, error handling, and batching capabilities that provide the easiest way to build AMQP messaging applications.
Base client class that provides common functionality for all AMQP client operations.
class AMQPClient:
def __init__(self, remote_address, auth=None, client_name=None, debug=False,
error_policy=None, keep_alive_interval=None, max_frame_size=None,
channel_max=None, idle_timeout=None, properties=None,
remote_idle_timeout_empty_frame_send_ratio=None, incoming_window=None,
outgoing_window=None, handle_max=None, on_attach=None,
auto_complete=True, encoding='UTF-8', desired_capabilities=None,
max_message_size=None, link_properties=None, timeout=0, **kwargs):
"""
Base AMQP client for connection and session management.
Parameters:
- remote_address (str): AMQP broker address
- auth (AMQPAuth): Authentication credentials
- client_name (str): Client identifier
- debug (bool): Enable debug logging
- error_policy (ErrorPolicy): Error handling policy
- keep_alive_interval (int): Keep-alive interval in seconds
- max_frame_size (int): Maximum frame size in bytes
- channel_max (int): Maximum number of channels
- idle_timeout (int): Connection idle timeout in seconds
- properties (dict): Connection properties
- incoming_window (int): Session incoming window size
- outgoing_window (int): Session outgoing window size
- handle_max (int): Maximum link handles
- on_attach (callable): Link attach callback
- auto_complete (bool): Auto-complete messages
- encoding (str): Character encoding
- desired_capabilities (list): Desired connection capabilities
- max_message_size (int): Maximum message size
- link_properties (dict): Link properties
- timeout (int): Operation timeout in seconds
"""Key Methods:
def open(self):
"""Open the client connection and session."""
def close(self):
"""Close the client connection."""
def __enter__(self):
"""Context manager entry."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
def mgmt_request(self, message, operation, op_type=None, node=None, callback=None, **kwargs):
"""
Send a management request.
Parameters:
- message (Message): Request message
- operation (str): Management operation name
- op_type (str): Operation type
- node (str): Target node name
- callback (callable): Response callback function
Returns:
Message: Response message
"""
def auth_complete(self):
"""
Check if authentication is complete.
Returns:
bool: True if authentication is complete
"""
def client_ready(self):
"""
Check if client is ready for operations.
Returns:
bool: True if client is ready
"""
def do_work(self, **kwargs):
"""Perform client work iteration."""Usage Example:
from uamqp.client import AMQPClient
from uamqp.authentication import SASLPlain
auth = SASLPlain("amqp.example.com", "user", "password")
# Using context manager (recommended)
with AMQPClient("amqps://amqp.example.com", auth=auth) as client:
# Client operations here
pass
# Manual open/close
client = AMQPClient("amqps://amqp.example.com", auth=auth)
client.open()
try:
# Client operations here
pass
finally:
client.close()High-level client for sending messages with automatic connection management and batch sending capabilities.
class SendClient(AMQPClient):
def __init__(self, target, auth=None, client_name=None, debug=False,
msg_timeout=0, error_policy=None, keep_alive_interval=None,
send_settle_mode=None, auto_complete=True, encoding='UTF-8',
**kwargs):
"""
High-level client for sending AMQP messages.
Parameters:
- target (str or Target): Target endpoint for messages
- auth (AMQPAuth): Authentication credentials
- client_name (str): Client identifier
- debug (bool): Enable debug logging
- msg_timeout (int): Message send timeout in seconds
- error_policy (ErrorPolicy): Error handling policy
- keep_alive_interval (int): Keep-alive interval
- send_settle_mode (SenderSettleMode): Message settlement mode
- auto_complete (bool): Auto-complete sent messages
- encoding (str): Character encoding
"""Key Methods:
def queue_message(self, message):
"""
Queue a message for sending.
Parameters:
- message (Message): Message to queue for sending
"""
def send_all_messages(self, close_on_done=True):
"""
Send all queued messages.
Parameters:
- close_on_done (bool): Whether to close connection after sending
Returns:
list[MessageState]: List of send results for each message
"""
def send_message_batch(self, messages, close_on_done=True):
"""
Send a batch of messages.
Parameters:
- messages (list[Message]): Messages to send
- close_on_done (bool): Whether to close connection after sending
Returns:
list[MessageState]: List of send results for each message
"""Usage Examples:
from uamqp import SendClient, Message
from uamqp.authentication import SASTokenAuth
# Single message sending
target = "amqps://mynamespace.servicebus.windows.net/myqueue"
auth = SASTokenAuth("mynamespace.servicebus.windows.net", token=sas_token)
with SendClient(target, auth=auth) as client:
message = Message("Hello World")
client.queue_message(message)
results = client.send_all_messages()
print(f"Send results: {results}")
# Batch message sending
messages = [
Message("Message 1"),
Message("Message 2"),
Message("Message 3")
]
with SendClient(target, auth=auth) as client:
results = client.send_message_batch(messages)
print(f"Sent {len(results)} messages")
# Queue multiple messages individually
with SendClient(target, auth=auth) as client:
for i in range(10):
client.queue_message(Message(f"Message {i}"))
results = client.send_all_messages()High-level client for receiving messages with automatic connection management and batch receiving capabilities.
class ReceiveClient(AMQPClient):
def __init__(self, source, auth=None, client_name=None, debug=False,
prefetch=300, auto_complete=True, error_policy=None,
keep_alive_interval=None, receive_settle_mode=None,
encoding='UTF-8', **kwargs):
"""
High-level client for receiving AMQP messages.
Parameters:
- source (str or Source): Source endpoint for messages
- auth (AMQPAuth): Authentication credentials
- client_name (str): Client identifier
- debug (bool): Enable debug logging
- prefetch (int): Number of messages to prefetch
- auto_complete (bool): Auto-complete received messages
- error_policy (ErrorPolicy): Error handling policy
- keep_alive_interval (int): Keep-alive interval
- receive_settle_mode (ReceiverSettleMode): Message settlement mode
- encoding (str): Character encoding
"""Key Methods:
def receive_message_batch(self, max_batch_size=None, timeout=0):
"""
Receive a batch of messages.
Parameters:
- max_batch_size (int): Maximum messages to receive (default: prefetch size)
- timeout (int): Timeout in milliseconds
Returns:
list[Message]: Received messages
"""
def receive_messages(self, timeout=0):
"""
Receive messages with iterator-like behavior.
Parameters:
- timeout (int): Timeout in milliseconds
Returns:
generator: Message iterator
"""Usage Examples:
from uamqp import ReceiveClient
from uamqp.authentication import SASLPlain
source = "amqps://amqp.example.com/myqueue"
auth = SASLPlain("amqp.example.com", "user", "password")
# Batch message receiving
with ReceiveClient(source, auth=auth, prefetch=50) as client:
messages = client.receive_message_batch(max_batch_size=10, timeout=30000)
print(f"Received {len(messages)} messages")
for message in messages:
print(f"Message: {message.get_data()}")
message.accept() # Acknowledge message
# Continuous message receiving
with ReceiveClient(source, auth=auth) as client:
for message in client.receive_messages():
try:
data = message.get_data()
print(f"Processing: {data}")
# Process message here
message.accept()
except Exception as e:
print(f"Error processing message: {e}")
message.reject()
# Break after processing 100 messages
if processed_count >= 100:
break
# Receive with custom settlement mode
from uamqp.constants import ReceiverSettleMode
with ReceiveClient(source, auth=auth,
receive_settle_mode=ReceiverSettleMode.PeekLock) as client:
messages = client.receive_message_batch(timeout=10000)
for message in messages:
# Message won't be removed from queue until explicitly settled
if process_message(message.get_data()):
message.accept() # Remove from queue
else:
message.release() # Return to queue for retryConfigure how clients handle errors and retries:
from uamqp.errors import ErrorPolicy, ErrorAction
# Custom error policy with retries
error_policy = ErrorPolicy(
max_retries=3,
on_error=ErrorAction(retry=True, backoff=2.0)
)
client = SendClient(target, auth=auth, error_policy=error_policy)Control message acknowledgment behavior:
from uamqp.constants import SenderSettleMode, ReceiverSettleMode
# Send client settlement
send_client = SendClient(
target,
auth=auth,
send_settle_mode=SenderSettleMode.Settled # Fire-and-forget
)
# Receive client settlement
receive_client = ReceiveClient(
source,
auth=auth,
receive_settle_mode=ReceiverSettleMode.PeekLock # Manual acknowledgment
)Set custom connection properties:
properties = {
'connection-name': 'MyApp-v1.0',
'product': 'MyApplication',
'platform': 'Python'
}
client = SendClient(target, auth=auth, properties=properties)Optimize message throughput:
# High throughput receiving
client = ReceiveClient(
source,
auth=auth,
prefetch=1000, # Prefetch more messages
max_frame_size=65536 # Larger frames
)
# Low latency receiving
client = ReceiveClient(
source,
auth=auth,
prefetch=1, # Minimal prefetch
auto_complete=False # Manual acknowledgment
)Use batch operations for better performance:
# Batch sending (more efficient than individual sends)
messages = [Message(f"Batch message {i}") for i in range(100)]
with SendClient(target, auth=auth) as client:
results = client.send_message_batch(messages)AMQP management request/response operations for interacting with broker management interfaces.
class MgmtOperation:
def __init__(self, session, target=None, debug=False,
status_code_field=b'statusCode',
description_fields=b'statusDescription',
encoding='UTF-8'):
"""
AMQP management operation client.
Parameters:
- session (Session): AMQP session for the operation
- target (bytes or str): Target node name (default: b"$management")
- debug (bool): Enable debug logging
- status_code_field (bytes): Response status code field name
- description_fields (bytes): Response description field name
- encoding (str): Character encoding
"""
def execute(self, operation, op_type, message, timeout=0):
"""
Execute a management operation.
Parameters:
- operation (str): Operation name
- op_type (str): Operation type
- message (Message): Request message
- timeout (int): Operation timeout in milliseconds
Returns:
Message: Response message
"""Usage Example:
from uamqp import Session, MgmtOperation, Message
# Create management operation
with Session(connection) as session:
mgmt_op = MgmtOperation(session, target=b"$management")
# Execute management request
request = Message({"operation": "read-queue-info"})
response = mgmt_op.execute("read", "queue", request, timeout=30000)
print(f"Response: {response.get_data()}")Client operations may raise these exceptions:
Install with Tessl CLI
npx tessl i tessl/pypi-uamqp