CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-uamqp

An AMQP 1.0 client library for Python

Overview
Eval results
Files

low-level-protocol.mddocs/

Low-Level Protocol Access

Direct access to AMQP protocol elements including message senders, receivers, and protocol-level message handling for advanced use cases that require fine-grained control over AMQP 1.0 protocol behavior.

Capabilities

Message Sender

Low-level message sender link that provides direct control over message transmission and settlement.

class MessageSender:
    def __init__(self, session, source, target, name=None, send_settle_mode=None,
                 max_message_size=None, link_properties=None, 
                 desired_capabilities=None):
        """
        Low-level message sender link.

        Parameters:
        - session (Session): AMQP session
        - source (Source): Link source address
        - target (Target): Link target address
        - name (str): Link name
        - send_settle_mode (SenderSettleMode): Settlement mode
        - max_message_size (int): Maximum message size
        - link_properties (dict): Link properties
        - desired_capabilities (list): Desired link capabilities
        """

Key Methods:

def open(self):
    """Open the sender link."""

def close(self):
    """Close the sender link."""

def send_async(self, message, callback=None):
    """
    Send a message asynchronously.

    Parameters:
    - message (Message): Message to send
    - callback (callable): Completion callback

    Returns:
    MessageState: Send operation state
    """

def work(self):
    """Process sender work (I/O and protocol handling)."""

def destroy(self):
    """Destroy the sender and free resources."""

Key Properties:

@property
def name: str
    """Link name."""

@property
def source: Source
    """Link source address."""

@property
def target: Target
    """Link target address."""

@property
def max_message_size: int
    """Maximum message size."""

@property
def send_settle_mode: int
    """Sender settlement mode."""

Usage Examples:

from uamqp import Connection, Session, MessageSender
from uamqp.address import Source, Target
from uamqp import Message
from uamqp.constants import SenderSettleMode

# Create low-level sender
connection = Connection("amqp.example.com", sasl=auth)
connection.open()

session = Session(connection)
session.begin()

source = Source()  # Null source for sender
target = Target("myqueue")

sender = MessageSender(
    session=session,
    source=source,
    target=target,
    name="my-sender",
    send_settle_mode=SenderSettleMode.Mixed,
    max_message_size=1048576  # 1MB
)

try:
    sender.open()
    
    # Send message with callback
    def send_callback(message, result, error):
        if error:
            print(f"Send failed: {error}")
        else:
            print(f"Send completed: {result}")
    
    message = Message("Hello World")
    result = sender.send_async(message, callback=send_callback)
    
    # Process until send completes
    while result == MessageState.WaitingForSendAck:
        sender.work()
        connection.work()
    
finally:
    sender.close()
    session.end()
    connection.close()

Message Receiver

Low-level message receiver link that provides direct control over message reception and flow control.

class MessageReceiver:
    def __init__(self, session, source, target, name=None, 
                 receive_settle_mode=None, max_message_size=None,
                 prefetch=None, link_properties=None, 
                 desired_capabilities=None):
        """
        Low-level message receiver link.

        Parameters:
        - session (Session): AMQP session
        - source (Source): Link source address
        - target (Target): Link target address
        - name (str): Link name
        - receive_settle_mode (ReceiverSettleMode): Settlement mode
        - max_message_size (int): Maximum message size
        - prefetch (int): Number of messages to prefetch
        - link_properties (dict): Link properties
        - desired_capabilities (list): Desired link capabilities
        """

Key Methods:

def open(self):
    """Open the receiver link."""

def close(self):
    """Close the receiver link."""

def receive_message_batch(self, max_batch_size=None):
    """
    Receive a batch of messages.

    Parameters:
    - max_batch_size (int): Maximum messages to receive

    Returns:
    list[Message]: Received messages
    """

def work(self):
    """Process receiver work (I/O and protocol handling)."""

def flow(self, link_credit):
    """
    Grant link credit for message flow control.

    Parameters:
    - link_credit (int): Number of credits to grant
    """

def destroy(self):
    """Destroy the receiver and free resources."""

Key Properties:

@property
def name: str
    """Link name."""

@property
def source: Source
    """Link source address."""

@property
def target: Target
    """Link target address."""

@property
def max_message_size: int
    """Maximum message size."""

@property
def receive_settle_mode: int
    """Receiver settlement mode."""

@property
def prefetch: int
    """Prefetch count."""

Usage Examples:

from uamqp import MessageReceiver
from uamqp.constants import ReceiverSettleMode

# Create low-level receiver
source = Source("myqueue")
target = Target()  # Null target for receiver

receiver = MessageReceiver(
    session=session,
    source=source,
    target=target,
    name="my-receiver",
    receive_settle_mode=ReceiverSettleMode.PeekLock,
    prefetch=100,
    max_message_size=1048576
)

try:
    receiver.open()
    
    # Grant initial credits
    receiver.flow(10)
    
    # Receive messages in loop
    while True:
        messages = receiver.receive_message_batch(max_batch_size=5)
        
        if not messages:
            # No messages, process connection
            receiver.work()
            connection.work()
            continue
        
        print(f"Received {len(messages)} messages")
        
        for message in messages:
            try:
                data = message.get_data()
                print(f"Message: {data}")
                
                # Process message
                process_message(data)
                
                # Accept message
                message.accept()
                
            except Exception as e:
                print(f"Processing error: {e}")
                message.reject(
                    condition="processing-error",
                    description=str(e)
                )
        
        # Grant more credits after processing
        receiver.flow(len(messages))

finally:
    receiver.close()

Advanced Protocol Control

Manual Credit Management

class CreditManager:
    def __init__(self, receiver, initial_credits=10, min_credits=5):
        self.receiver = receiver
        self.initial_credits = initial_credits
        self.min_credits = min_credits
        self.granted_credits = 0
        self.processed_messages = 0
    
    def start(self):
        """Grant initial credits."""
        self.receiver.flow(self.initial_credits)
        self.granted_credits = self.initial_credits
    
    def on_message_processed(self):
        """Call when a message is processed."""
        self.processed_messages += 1
        
        # Calculate remaining credits
        remaining_credits = self.granted_credits - self.processed_messages
        
        # Grant more credits if running low
        if remaining_credits <= self.min_credits:
            additional_credits = self.initial_credits - remaining_credits
            self.receiver.flow(additional_credits)
            self.granted_credits += additional_credits
            print(f"Granted {additional_credits} additional credits")

# Usage
credit_manager = CreditManager(receiver, initial_credits=20, min_credits=5)
credit_manager.start()

while True:
    messages = receiver.receive_message_batch(max_batch_size=10)
    
    for message in messages:
        # Process message
        process_message(message)
        message.accept()
        
        # Update credit management
        credit_manager.on_message_processed()

Link State Management

from uamqp.constants import MessageSenderState, MessageReceiverState

def monitor_link_state(link, link_type="sender"):
    """Monitor and handle link state changes."""
    
    if link_type == "sender":
        states = MessageSenderState
    else:
        states = MessageReceiverState
    
    current_state = link.get_state()
    
    if current_state == states.Opening:
        print("Link is opening...")
        # Wait for open to complete
        while link.get_state() == states.Opening:
            link.work()
            time.sleep(0.1)
    
    elif current_state == states.Open:
        print("Link is open and ready")
        return True
    
    elif current_state == states.Error:
        print("Link is in error state")
        error_info = link.get_error_info()
        print(f"Error: {error_info}")
        return False
    
    elif current_state == states.Closing:
        print("Link is closing...")
        return False
    
    return current_state == states.Open

# Usage
sender_ready = monitor_link_state(sender, "sender")
if sender_ready:
    # Proceed with sending
    pass

Custom Link Properties

def create_sender_with_properties(session, target):
    """Create sender with custom link properties."""
    
    link_properties = {
        'x-opt-jms-dest': 1,           # JMS destination type
        'x-opt-enqueuetime': True,     # Include enqueue time
        'product': 'MyApplication',     # Application identifier
        'version': '1.0.0'             # Application version
    }
    
    desired_capabilities = [
        'ANONYMOUS-RELAY',              # Anonymous relay capability
        'DELAYED_DELIVERY'              # Delayed delivery capability
    ]
    
    sender = MessageSender(
        session=session,
        source=Source(),
        target=target,
        link_properties=link_properties,
        desired_capabilities=desired_capabilities
    )
    
    return sender

# Check if capabilities were granted
def check_link_capabilities(sender):
    """Check which capabilities were granted by peer."""
    
    remote_capabilities = sender.get_remote_capabilities()
    desired_capabilities = sender.desired_capabilities
    
    granted = []
    denied = []
    
    for capability in desired_capabilities:
        if capability in remote_capabilities:
            granted.append(capability)
        else:
            denied.append(capability)
    
    print(f"Granted capabilities: {granted}")
    print(f"Denied capabilities: {denied}")
    
    return granted, denied

Performance Optimization

Batch Processing

class BatchProcessor:
    def __init__(self, receiver, batch_size=100, timeout=5.0):
        self.receiver = receiver
        self.batch_size = batch_size
        self.timeout = timeout
        self.message_buffer = []
    
    def process_batches(self):
        """Process messages in batches for better throughput."""
        
        start_time = time.time()
        
        while True:
            messages = self.receiver.receive_message_batch(
                max_batch_size=self.batch_size - len(self.message_buffer)
            )
            
            self.message_buffer.extend(messages)
            
            # Process batch if full or timeout reached
            if (len(self.message_buffer) >= self.batch_size or 
                time.time() - start_time > self.timeout):
                
                if self.message_buffer:
                    self._process_batch(self.message_buffer)
                    self.message_buffer = []
                    start_time = time.time()
            
            # Service the connection
            self.receiver.work()
    
    def _process_batch(self, messages):
        """Process a batch of messages."""
        print(f"Processing batch of {len(messages)} messages")
        
        for message in messages:
            try:
                # Process message
                data = message.get_data()
                process_message_fast(data)
                message.accept()
                
            except Exception as e:
                print(f"Error processing message: {e}")
                message.reject()

# Usage
batch_processor = BatchProcessor(receiver, batch_size=50, timeout=2.0)
batch_processor.process_batches()

Parallel Processing

import threading
from queue import Queue

class ParallelProcessor:
    def __init__(self, receiver, worker_count=4):
        self.receiver = receiver
        self.worker_count = worker_count
        self.message_queue = Queue()
        self.workers = []
        self.running = False
    
    def start(self):
        """Start parallel message processing."""
        self.running = True
        
        # Start worker threads
        for i in range(self.worker_count):
            worker = threading.Thread(target=self._worker_thread, args=(i,))
            worker.daemon = True
            worker.start()
            self.workers.append(worker)
        
        # Start receiver thread
        receiver_thread = threading.Thread(target=self._receiver_thread)
        receiver_thread.daemon = True
        receiver_thread.start()
    
    def stop(self):
        """Stop parallel processing."""
        self.running = False
        
        # Wait for workers to finish
        for worker in self.workers:
            worker.join(timeout=5.0)
    
    def _receiver_thread(self):
        """Receive messages and queue for processing."""
        while self.running:
            try:
                messages = self.receiver.receive_message_batch(max_batch_size=10)
                
                for message in messages:
                    self.message_queue.put(message)
                
                self.receiver.work()
                
            except Exception as e:
                print(f"Receiver error: {e}")
                time.sleep(1)
    
    def _worker_thread(self, worker_id):
        """Worker thread for processing messages."""
        print(f"Worker {worker_id} started")
        
        while self.running:
            try:
                # Get message from queue with timeout
                message = self.message_queue.get(timeout=1.0)
                
                # Process message
                data = message.get_data()
                result = process_message_threadsafe(data)
                
                if result:
                    message.accept()
                else:
                    message.reject()
                
                self.message_queue.task_done()
                
            except Queue.Empty:
                continue  # Timeout, check if still running
            except Exception as e:
                print(f"Worker {worker_id} error: {e}")

# Usage
processor = ParallelProcessor(receiver, worker_count=8)
processor.start()

# Let it run for a while
time.sleep(60)

processor.stop()

Management Operations

Low-level AMQP management operations for advanced broker interaction and administrative tasks.

Management Operation

class MgmtOperation:
    def __init__(self, session, target=None, debug=False):
        """
        AMQP management operation handler.

        Parameters:
        - session (Session): AMQP session
        - target (Target): Management target endpoint
        - debug (bool): Enable debug logging
        """

    def open(self):
        """Open the management operation link."""

    def close(self):
        """Close the management operation link."""

    def execute_async(self, operation, op_type, locales=None, timeout=0):
        """
        Execute a management operation asynchronously.

        Parameters:
        - operation (str): Operation name
        - op_type (str): Operation type
        - locales (list): Supported locales
        - timeout (int): Operation timeout in milliseconds

        Returns:
        Management operation result
        """

Async Management Operation

class MgmtOperationAsync:
    def __init__(self, session, target=None, debug=False, loop=None):
        """
        Async AMQP management operation handler.

        Parameters:
        - session (SessionAsync): Async AMQP session
        - target (Target): Management target endpoint
        - debug (bool): Enable debug logging
        - loop: Asyncio event loop
        """

    async def open_async(self):
        """Asynchronously open the management operation link."""

    async def close_async(self):
        """Asynchronously close the management operation link."""

    async def execute_async(self, operation, op_type, locales=None, timeout=0):
        """
        Execute a management operation asynchronously.

        Parameters:
        - operation (str): Operation name
        - op_type (str): Operation type
        - locales (list): Supported locales
        - timeout (int): Operation timeout in milliseconds

        Returns:
        Management operation result
        """

Usage Examples:

from uamqp.mgmt_operation import MgmtOperation
from uamqp.address import Target

# Create management operation
mgmt_target = Target("$management")
mgmt_op = MgmtOperation(session, target=mgmt_target)

try:
    mgmt_op.open()
    
    # Execute management operation (e.g., get queue info)
    result = mgmt_op.execute_async(
        operation="READ",
        op_type="com.microsoft:queue",
        timeout=30000
    )
    
    print(f"Management result: {result}")
    
finally:
    mgmt_op.close()

# Async management operations
from uamqp.async_ops.mgmt_operation_async import MgmtOperationAsync

async def async_management_example():
    mgmt_op = MgmtOperationAsync(async_session, target=mgmt_target)
    
    try:
        await mgmt_op.open_async()
        
        result = await mgmt_op.execute_async(
            operation="CREATE",
            op_type="com.microsoft:queue",
            timeout=30000
        )
        
        print(f"Async management result: {result}")
        
    finally:
        await mgmt_op.close_async()

Error Recovery

Link Recovery

def create_resilient_sender(session, target, max_retries=3):
    """Create sender with automatic recovery."""
    
    for attempt in range(max_retries):
        try:
            sender = MessageSender(session, Source(), target)
            sender.open()
            
            # Wait for link to open
            while sender.get_state() == MessageSenderState.Opening:
                sender.work()
                time.sleep(0.1)
            
            if sender.get_state() == MessageSenderState.Open:
                print(f"Sender opened successfully on attempt {attempt + 1}")
                return sender
            else:
                raise Exception(f"Sender failed to open: {sender.get_state()}")
        
        except Exception as e:
            print(f"Sender creation attempt {attempt + 1} failed: {e}")
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)  # Exponential backoff
            else:
                raise
    
    return None

# Usage with recovery
sender = create_resilient_sender(session, target)
if sender:
    # Use sender...
    pass

Install with Tessl CLI

npx tessl i tessl/pypi-uamqp

docs

address-endpoints.md

async-operations.md

authentication.md

client-apis.md

connection-session.md

error-handling.md

high-level-messaging.md

index.md

low-level-protocol.md

message-management.md

types-constants.md

tile.json