CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-uamqp

An AMQP 1.0 client library for Python

Overview
Eval results
Files

async-operations.mddocs/

Async Operations

Asynchronous AMQP operations for Python 3.4+ with async/await support for high-performance messaging applications that need non-blocking I/O operations.

Capabilities

Async Send Client

Asynchronous high-level client for sending messages without blocking the event loop.

class SendClientAsync:
    def __init__(self, target, auth=None, client_name=None, debug=False,
                 msg_timeout=0, error_policy=None, keep_alive_interval=None,
                 send_settle_mode=None, auto_complete=True, encoding='UTF-8',
                 loop=None, **kwargs):
        """
        Async high-level client for sending AMQP messages.

        Parameters:
        - target (str or Target): Target endpoint for messages
        - auth (AMQPAuth): Authentication credentials
        - client_name (str): Client identifier
        - debug (bool): Enable debug logging
        - msg_timeout (int): Message send timeout in seconds
        - error_policy (ErrorPolicy): Error handling policy
        - keep_alive_interval (int): Keep-alive interval
        - send_settle_mode (SenderSettleMode): Message settlement mode
        - auto_complete (bool): Auto-complete sent messages
        - encoding (str): Character encoding
        - loop: Asyncio event loop
        """

Key Methods:

async def open_async(self):
    """Asynchronously open the client connection."""

async def close_async(self):
    """Asynchronously close the client connection."""

def queue_message(self, message):
    """Queue a message for sending (synchronous)."""

async def send_all_messages_async(self, close_on_done=True):
    """
    Asynchronously send all queued messages.

    Parameters:
    - close_on_done (bool): Whether to close connection after sending

    Returns:
    list[MessageState]: List of send results for each message
    """

async def send_message_batch_async(self, messages, close_on_done=True):
    """
    Asynchronously send a batch of messages.

    Parameters:
    - messages (list[Message]): Messages to send
    - close_on_done (bool): Whether to close connection after sending

    Returns:
    list[MessageState]: List of send results for each message
    """

Usage Examples:

import asyncio
from uamqp.async_ops import SendClientAsync
from uamqp import Message
from uamqp.authentication import SASTokenAsync

async def send_messages_async():
    target = "amqps://mynamespace.servicebus.windows.net/myqueue"
    auth = SASTokenAsync("mynamespace.servicebus.windows.net", token=sas_token)
    
    # Using async context manager
    async with SendClientAsync(target, auth=auth) as client:
        message = Message("Hello Async World")
        client.queue_message(message)
        results = await client.send_all_messages_async()
        print(f"Send results: {results}")

# Batch async sending
async def send_batch_async():
    messages = [Message(f"Async message {i}") for i in range(10)]
    
    async with SendClientAsync(target, auth=auth) as client:
        results = await client.send_message_batch_async(messages)
        print(f"Sent {len(results)} messages asynchronously")

# Run async functions
asyncio.run(send_messages_async())

Async Receive Client

Asynchronous high-level client for receiving messages with non-blocking message processing.

class ReceiveClientAsync:
    def __init__(self, source, auth=None, client_name=None, debug=False,
                 prefetch=300, auto_complete=True, error_policy=None,
                 keep_alive_interval=None, receive_settle_mode=None,
                 encoding='UTF-8', loop=None, **kwargs):
        """
        Async high-level client for receiving AMQP messages.

        Parameters:
        - source (str or Source): Source endpoint for messages
        - auth (AMQPAuth): Authentication credentials
        - client_name (str): Client identifier
        - debug (bool): Enable debug logging
        - prefetch (int): Number of messages to prefetch
        - auto_complete (bool): Auto-complete received messages
        - error_policy (ErrorPolicy): Error handling policy
        - keep_alive_interval (int): Keep-alive interval
        - receive_settle_mode (ReceiverSettleMode): Message settlement mode
        - encoding (str): Character encoding
        - loop: Asyncio event loop
        """

Key Methods:

async def open_async(self):
    """Asynchronously open the client connection."""

async def close_async(self):
    """Asynchronously close the client connection."""

async def receive_message_batch_async(self, max_batch_size=None, timeout=0):
    """
    Asynchronously receive a batch of messages.

    Parameters:
    - max_batch_size (int): Maximum messages to receive
    - timeout (int): Timeout in milliseconds

    Returns:
    list[Message]: Received messages
    """

def receive_messages_iter_async(self):
    """
    Create async iterator for receiving messages.

    Returns:
    AsyncMessageIter: Async message iterator
    """

Usage Examples:

import asyncio
from uamqp.async_ops import ReceiveClientAsync
from uamqp.authentication import SASLPlain

async def receive_messages_async():
    source = "amqps://amqp.example.com/myqueue"
    auth = SASLPlain("amqp.example.com", "user", "password")
    
    # Batch async receiving
    async with ReceiveClientAsync(source, auth=auth) as client:
        messages = await client.receive_message_batch_async(
            max_batch_size=10, 
            timeout=30000
        )
        
        print(f"Received {len(messages)} messages")
        for message in messages:
            print(f"Message: {message.get_data()}")
            message.accept()

# Async message iterator
async def process_messages_continuously():
    async with ReceiveClientAsync(source, auth=auth) as client:
        message_iter = client.receive_messages_iter_async()
        
        async for message in message_iter:
            try:
                data = message.get_data()
                print(f"Processing: {data}")
                
                # Simulate async processing
                await asyncio.sleep(0.1)
                
                message.accept()
            except Exception as e:
                print(f"Error: {e}")
                message.reject()

asyncio.run(receive_messages_async())

Async Message Iterator

Async iterator for processing messages as they arrive without blocking.

class AsyncMessageIter:
    def __init__(self, client):
        """
        Async iterator for AMQP messages.

        Parameters:
        - client: ReceiveClientAsync instance
        """

    def __aiter__(self):
        """Return async iterator."""

    async def __anext__(self):
        """Get next message asynchronously."""

Usage Example:

async def process_messages_with_iterator():
    async with ReceiveClientAsync(source, auth=auth) as client:
        message_iter = client.receive_messages_iter_async()
        
        message_count = 0
        async for message in message_iter:
            await process_message_async(message)
            message.accept()
            
            message_count += 1
            if message_count >= 100:  # Process only 100 messages
                break

async def process_message_async(message):
    """Custom async message processing."""
    data = message.get_data()
    
    # Simulate async work (database query, API call, etc.)
    await asyncio.sleep(0.1)
    
    print(f"Processed: {data}")

Async Low-Level Protocol

Asynchronous versions of low-level protocol classes for advanced scenarios.

Async Connection

class ConnectionAsync:
    def __init__(self, hostname, sasl=None, container_id=None, 
                 max_frame_size=None, channel_max=None, idle_timeout=None,
                 properties=None, remote_idle_timeout_empty_frame_send_ratio=None,
                 debug=False, encoding='UTF-8', loop=None):
        """Async AMQP connection management."""

    async def open_async(self):
        """Asynchronously open connection."""

    async def close_async(self):
        """Asynchronously close connection."""

Async Session

class SessionAsync:
    def __init__(self, connection, incoming_window=None, outgoing_window=None,
                 handle_max=None, loop=None):
        """Async AMQP session management."""

    async def begin_async(self):
        """Asynchronously begin session."""

    async def end_async(self):
        """Asynchronously end session."""

Async Message Sender

class MessageSenderAsync:
    def __init__(self, session, source, target, name=None, 
                 send_settle_mode=None, max_message_size=None,
                 link_properties=None, desired_capabilities=None,
                 loop=None):
        """Async low-level message sender."""

    async def open_async(self):
        """Asynchronously open sender link."""

    async def send_async(self, message, callback=None):
        """
        Asynchronously send a message.

        Parameters:  
        - message (Message): Message to send
        - callback (callable): Completion callback

        Returns:
        MessageState: Send operation state
        """

Async Message Receiver

class MessageReceiverAsync:
    def __init__(self, session, source, target, name=None,
                 receive_settle_mode=None, max_message_size=None,
                 prefetch=None, link_properties=None, 
                 desired_capabilities=None, loop=None):
        """Async low-level message receiver."""

    async def open_async(self):
        """Asynchronously open receiver link."""

    async def receive_message_batch_async(self, max_batch_size=None):
        """
        Asynchronously receive message batch.

        Parameters:
        - max_batch_size (int): Maximum messages to receive

        Returns:
        list[Message]: Received messages
        """

Usage Example:

async def low_level_async_example():
    # Create async connection
    connection = ConnectionAsync("amqp.example.com", sasl=auth_sasl)
    await connection.open_async()
    
    try:
        # Create async session
        session = SessionAsync(connection)
        await session.begin_async()
        
        # Create async sender
        sender = MessageSenderAsync(session, source="source", target="target")
        await sender.open_async()
        
        # Send message asynchronously
        message = Message("Low-level async message")
        result = await sender.send_async(message)
        print(f"Send result: {result}")
        
    finally:
        await connection.close_async()

Async Authentication

Use async authentication classes with async operations:

from uamqp.authentication import SASTokenAsync, JWTTokenAsync

# Async SAS token auth
auth = SASTokenAsync(
    hostname="mynamespace.servicebus.windows.net",
    token=sas_token
)

# Async JWT token auth  
auth = JWTTokenAsync(
    hostname="service.example.com",
    token=jwt_token,
    audience="https://service.example.com"
)

Async Error Handling

Handle errors in async operations:

from uamqp.errors import AMQPConnectionError, MessageSendFailed

async def robust_async_sending():
    try:
        async with SendClientAsync(target, auth=auth) as client:
            client.queue_message(Message("Test message"))
            results = await client.send_all_messages_async()
            
    except AMQPConnectionError as e:
        print(f"Connection failed: {e}")
        # Implement retry logic
        
    except MessageSendFailed as e:
        print(f"Send failed: {e}")
        # Handle send failure
        
    except Exception as e:
        print(f"Unexpected error: {e}")

Async Performance Considerations

Event Loop Integration

# Use existing event loop
import asyncio

async def main():
    loop = asyncio.get_running_loop()
    
    async with SendClientAsync(target, auth=auth, loop=loop) as client:
        # Operations use the specified loop
        pass

asyncio.run(main())

Concurrent Operations

async def concurrent_operations():
    # Send and receive concurrently
    send_task = asyncio.create_task(send_messages_async())
    receive_task = asyncio.create_task(receive_messages_async())
    
    # Wait for both to complete
    await asyncio.gather(send_task, receive_task)

Async Context Managers

Always use async context managers for proper resource cleanup:

# Correct async usage
async with SendClientAsync(target, auth=auth) as client:
    await client.send_all_messages_async()
    
# Manual async lifecycle management
client = SendClientAsync(target, auth=auth)
await client.open_async()
try:
    await client.send_all_messages_async()
finally:
    await client.close_async()

Install with Tessl CLI

npx tessl i tessl/pypi-uamqp

docs

address-endpoints.md

async-operations.md

authentication.md

client-apis.md

connection-session.md

error-handling.md

high-level-messaging.md

index.md

low-level-protocol.md

message-management.md

types-constants.md

tile.json