An AMQP 1.0 client library for Python
Asynchronous AMQP operations for Python 3.4+ with async/await support for high-performance messaging applications that need non-blocking I/O operations.
Asynchronous high-level client for sending messages without blocking the event loop.
class SendClientAsync:
def __init__(self, target, auth=None, client_name=None, debug=False,
msg_timeout=0, error_policy=None, keep_alive_interval=None,
send_settle_mode=None, auto_complete=True, encoding='UTF-8',
loop=None, **kwargs):
"""
Async high-level client for sending AMQP messages.
Parameters:
- target (str or Target): Target endpoint for messages
- auth (AMQPAuth): Authentication credentials
- client_name (str): Client identifier
- debug (bool): Enable debug logging
- msg_timeout (int): Message send timeout in seconds
- error_policy (ErrorPolicy): Error handling policy
- keep_alive_interval (int): Keep-alive interval
- send_settle_mode (SenderSettleMode): Message settlement mode
- auto_complete (bool): Auto-complete sent messages
- encoding (str): Character encoding
- loop: Asyncio event loop
"""Key Methods:
async def open_async(self):
"""Asynchronously open the client connection."""
async def close_async(self):
"""Asynchronously close the client connection."""
def queue_message(self, message):
"""Queue a message for sending (synchronous)."""
async def send_all_messages_async(self, close_on_done=True):
"""
Asynchronously send all queued messages.
Parameters:
- close_on_done (bool): Whether to close connection after sending
Returns:
list[MessageState]: List of send results for each message
"""
async def send_message_batch_async(self, messages, close_on_done=True):
"""
Asynchronously send a batch of messages.
Parameters:
- messages (list[Message]): Messages to send
- close_on_done (bool): Whether to close connection after sending
Returns:
list[MessageState]: List of send results for each message
"""Usage Examples:
import asyncio
from uamqp.async_ops import SendClientAsync
from uamqp import Message
from uamqp.authentication import SASTokenAsync
async def send_messages_async():
target = "amqps://mynamespace.servicebus.windows.net/myqueue"
auth = SASTokenAsync("mynamespace.servicebus.windows.net", token=sas_token)
# Using async context manager
async with SendClientAsync(target, auth=auth) as client:
message = Message("Hello Async World")
client.queue_message(message)
results = await client.send_all_messages_async()
print(f"Send results: {results}")
# Batch async sending
async def send_batch_async():
messages = [Message(f"Async message {i}") for i in range(10)]
async with SendClientAsync(target, auth=auth) as client:
results = await client.send_message_batch_async(messages)
print(f"Sent {len(results)} messages asynchronously")
# Run async functions
asyncio.run(send_messages_async())Asynchronous high-level client for receiving messages with non-blocking message processing.
class ReceiveClientAsync:
def __init__(self, source, auth=None, client_name=None, debug=False,
prefetch=300, auto_complete=True, error_policy=None,
keep_alive_interval=None, receive_settle_mode=None,
encoding='UTF-8', loop=None, **kwargs):
"""
Async high-level client for receiving AMQP messages.
Parameters:
- source (str or Source): Source endpoint for messages
- auth (AMQPAuth): Authentication credentials
- client_name (str): Client identifier
- debug (bool): Enable debug logging
- prefetch (int): Number of messages to prefetch
- auto_complete (bool): Auto-complete received messages
- error_policy (ErrorPolicy): Error handling policy
- keep_alive_interval (int): Keep-alive interval
- receive_settle_mode (ReceiverSettleMode): Message settlement mode
- encoding (str): Character encoding
- loop: Asyncio event loop
"""Key Methods:
async def open_async(self):
"""Asynchronously open the client connection."""
async def close_async(self):
"""Asynchronously close the client connection."""
async def receive_message_batch_async(self, max_batch_size=None, timeout=0):
"""
Asynchronously receive a batch of messages.
Parameters:
- max_batch_size (int): Maximum messages to receive
- timeout (int): Timeout in milliseconds
Returns:
list[Message]: Received messages
"""
def receive_messages_iter_async(self):
"""
Create async iterator for receiving messages.
Returns:
AsyncMessageIter: Async message iterator
"""Usage Examples:
import asyncio
from uamqp.async_ops import ReceiveClientAsync
from uamqp.authentication import SASLPlain
async def receive_messages_async():
source = "amqps://amqp.example.com/myqueue"
auth = SASLPlain("amqp.example.com", "user", "password")
# Batch async receiving
async with ReceiveClientAsync(source, auth=auth) as client:
messages = await client.receive_message_batch_async(
max_batch_size=10,
timeout=30000
)
print(f"Received {len(messages)} messages")
for message in messages:
print(f"Message: {message.get_data()}")
message.accept()
# Async message iterator
async def process_messages_continuously():
async with ReceiveClientAsync(source, auth=auth) as client:
message_iter = client.receive_messages_iter_async()
async for message in message_iter:
try:
data = message.get_data()
print(f"Processing: {data}")
# Simulate async processing
await asyncio.sleep(0.1)
message.accept()
except Exception as e:
print(f"Error: {e}")
message.reject()
asyncio.run(receive_messages_async())Async iterator for processing messages as they arrive without blocking.
class AsyncMessageIter:
def __init__(self, client):
"""
Async iterator for AMQP messages.
Parameters:
- client: ReceiveClientAsync instance
"""
def __aiter__(self):
"""Return async iterator."""
async def __anext__(self):
"""Get next message asynchronously."""Usage Example:
async def process_messages_with_iterator():
async with ReceiveClientAsync(source, auth=auth) as client:
message_iter = client.receive_messages_iter_async()
message_count = 0
async for message in message_iter:
await process_message_async(message)
message.accept()
message_count += 1
if message_count >= 100: # Process only 100 messages
break
async def process_message_async(message):
"""Custom async message processing."""
data = message.get_data()
# Simulate async work (database query, API call, etc.)
await asyncio.sleep(0.1)
print(f"Processed: {data}")Asynchronous versions of low-level protocol classes for advanced scenarios.
class ConnectionAsync:
def __init__(self, hostname, sasl=None, container_id=None,
max_frame_size=None, channel_max=None, idle_timeout=None,
properties=None, remote_idle_timeout_empty_frame_send_ratio=None,
debug=False, encoding='UTF-8', loop=None):
"""Async AMQP connection management."""
async def open_async(self):
"""Asynchronously open connection."""
async def close_async(self):
"""Asynchronously close connection."""class SessionAsync:
def __init__(self, connection, incoming_window=None, outgoing_window=None,
handle_max=None, loop=None):
"""Async AMQP session management."""
async def begin_async(self):
"""Asynchronously begin session."""
async def end_async(self):
"""Asynchronously end session."""class MessageSenderAsync:
def __init__(self, session, source, target, name=None,
send_settle_mode=None, max_message_size=None,
link_properties=None, desired_capabilities=None,
loop=None):
"""Async low-level message sender."""
async def open_async(self):
"""Asynchronously open sender link."""
async def send_async(self, message, callback=None):
"""
Asynchronously send a message.
Parameters:
- message (Message): Message to send
- callback (callable): Completion callback
Returns:
MessageState: Send operation state
"""class MessageReceiverAsync:
def __init__(self, session, source, target, name=None,
receive_settle_mode=None, max_message_size=None,
prefetch=None, link_properties=None,
desired_capabilities=None, loop=None):
"""Async low-level message receiver."""
async def open_async(self):
"""Asynchronously open receiver link."""
async def receive_message_batch_async(self, max_batch_size=None):
"""
Asynchronously receive message batch.
Parameters:
- max_batch_size (int): Maximum messages to receive
Returns:
list[Message]: Received messages
"""Usage Example:
async def low_level_async_example():
# Create async connection
connection = ConnectionAsync("amqp.example.com", sasl=auth_sasl)
await connection.open_async()
try:
# Create async session
session = SessionAsync(connection)
await session.begin_async()
# Create async sender
sender = MessageSenderAsync(session, source="source", target="target")
await sender.open_async()
# Send message asynchronously
message = Message("Low-level async message")
result = await sender.send_async(message)
print(f"Send result: {result}")
finally:
await connection.close_async()Use async authentication classes with async operations:
from uamqp.authentication import SASTokenAsync, JWTTokenAsync
# Async SAS token auth
auth = SASTokenAsync(
hostname="mynamespace.servicebus.windows.net",
token=sas_token
)
# Async JWT token auth
auth = JWTTokenAsync(
hostname="service.example.com",
token=jwt_token,
audience="https://service.example.com"
)Handle errors in async operations:
from uamqp.errors import AMQPConnectionError, MessageSendFailed
async def robust_async_sending():
try:
async with SendClientAsync(target, auth=auth) as client:
client.queue_message(Message("Test message"))
results = await client.send_all_messages_async()
except AMQPConnectionError as e:
print(f"Connection failed: {e}")
# Implement retry logic
except MessageSendFailed as e:
print(f"Send failed: {e}")
# Handle send failure
except Exception as e:
print(f"Unexpected error: {e}")# Use existing event loop
import asyncio
async def main():
loop = asyncio.get_running_loop()
async with SendClientAsync(target, auth=auth, loop=loop) as client:
# Operations use the specified loop
pass
asyncio.run(main())async def concurrent_operations():
# Send and receive concurrently
send_task = asyncio.create_task(send_messages_async())
receive_task = asyncio.create_task(receive_messages_async())
# Wait for both to complete
await asyncio.gather(send_task, receive_task)Always use async context managers for proper resource cleanup:
# Correct async usage
async with SendClientAsync(target, auth=auth) as client:
await client.send_all_messages_async()
# Manual async lifecycle management
client = SendClientAsync(target, auth=auth)
await client.open_async()
try:
await client.send_all_messages_async()
finally:
await client.close_async()Install with Tessl CLI
npx tessl i tessl/pypi-uamqp