CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-uamqp

An AMQP 1.0 client library for Python

Overview
Eval results
Files

address-endpoints.mddocs/

Address and Endpoints

AMQP endpoint addressing including source and target configuration with support for filters, dynamic addresses, and link properties for flexible message routing and delivery.

Capabilities

Base Address Class

Base class for AMQP endpoint representation that handles address parsing and configuration.

class Address:
    def __init__(self, address=None, encoding='UTF-8'):
        """
        Base AMQP endpoint representation.

        Parameters:
        - address (str): AMQP address string
        - encoding (str): Character encoding
        """

    @property
    def hostname: str
        """Hostname from address."""

    @property
    def scheme: str
        """URI scheme (amqp, amqps)."""

    @property
    def username: str
        """Username from address."""

    @property
    def password: str
        """Password from address."""

    @property
    def address: str
        """The address string."""

    @property
    def durable: bool
        """Whether the address is durable."""

    @property
    def expiry_policy: str
        """Address expiry policy."""

    @property
    def timeout: int
        """Address timeout."""

    @property
    def dynamic: bool
        """Whether the address is dynamic."""

    @property
    def distribution_mode: str
        """Message distribution mode."""

    @classmethod
    def from_c_obj(cls, c_value, encoding='UTF-8'):
        """Create Address from C object."""

Usage Examples:

from uamqp.address import Address

# Simple address
addr = Address("myqueue")
print(f"Address: {addr.address}")

# Full URI address
addr = Address("amqps://user:pass@broker.com:5671/queue?timeout=30")
print(f"Hostname: {addr.hostname}")
print(f"Scheme: {addr.scheme}")
print(f"Username: {addr.username}")

Source Address

AMQP source endpoint for receiving messages with filtering capabilities and advanced configuration options.

class Source(Address):
    def __init__(self, address=None, **kwargs):
        """
        AMQP source endpoint for receiving messages.

        Parameters:
        - address (str): Source address string
        - **kwargs: Additional source configuration options
        """

    @property
    def filter_key: str
        """Filter key for message selection."""

    def get_filter(self, name):
        """
        Get a filter by name.

        Parameters:
        - name (str): Filter name

        Returns:
        Filter value or None
        """

    def set_filter(self, value, name=None, descriptor=None):
        """
        Set a message filter.

        Parameters:
        - value: Filter value
        - name (str): Filter name
        - descriptor: Filter descriptor
        """

Usage Examples:

from uamqp.address import Source

# Basic source
source = Source("amqps://servicebus.windows.net/myqueue")

# Source with filter
source = Source("amqps://servicebus.windows.net/mytopic/Subscriptions/mysub")

# Set SQL filter for message selection
source.set_filter(
    "color = 'red' AND price > 100",
    name="sql-filter",
    descriptor="apache.org:selector-filter:string"
)

# Set correlation ID filter
source.set_filter(
    "correlation-123",
    name="correlation-filter"
)

# Get filter value
current_filter = source.get_filter("sql-filter")
print(f"Current filter: {current_filter}")

Target Address

AMQP target endpoint for sending messages with routing and delivery configuration.

class Target(Address):
    def __init__(self, address=None, **kwargs):
        """
        AMQP target endpoint for sending messages.

        Parameters:
        - address (str): Target address string
        - **kwargs: Additional target configuration options
        """

Usage Examples:

from uamqp.address import Target

# Basic target
target = Target("amqps://servicebus.windows.net/myqueue")

# Target with specific properties
target = Target(
    address="amqps://servicebus.windows.net/mytopic",
    durable=True,
    timeout=30000
)

# Dynamic target (broker assigns address)
target = Target(dynamic=True)

Address Configuration

Address Properties

Configure various address properties for different messaging patterns.

# Durable address (survives broker restart)
source = Source("persistent-queue", durable=True)

# Temporary address (deleted when unused)
source = Source("temp-queue", durable=False, expiry_policy="session-end")

# Dynamic address (broker generates name)
source = Source(dynamic=True)

# Address with timeout
target = Target("slow-queue", timeout=60000)  # 60 seconds

Distribution Modes

Configure how messages are distributed to multiple consumers.

# Copy distribution (each consumer gets a copy)
source = Source("broadcast-topic", distribution_mode="copy")

# Move distribution (only one consumer gets each message)  
source = Source("work-queue", distribution_mode="move")

Message Filtering

SQL Filters

Use SQL-like expressions to filter messages based on properties.

from uamqp.address import Source

# Create source with SQL filter
source = Source("amqps://servicebus.windows.net/mytopic/Subscriptions/mysub")

# Filter by message properties
sql_filter = """
    (priority = 'high' OR priority = 'critical') 
    AND region = 'us-west' 
    AND timestamp > '2023-01-01'
"""

source.set_filter(
    sql_filter,
    name="priority-region-filter",
    descriptor="apache.org:selector-filter:string"
)

# Use with receive client
from uamqp import ReceiveClient
with ReceiveClient(source, auth=auth) as client:
    messages = client.receive_message_batch(timeout=30000)
    # Only messages matching filter will be received

Correlation Filters

Filter messages by correlation ID for request-response patterns.

# Set correlation filter for specific conversation
correlation_id = "conversation-12345"
source.set_filter(
    correlation_id,
    name="correlation-filter",
    descriptor="apache.org:correlation-filter:string"
)

# Multiple correlation filters
correlation_ids = ["conv-1", "conv-2", "conv-3"]
for i, cid in enumerate(correlation_ids):
    source.set_filter(
        cid,
        name=f"correlation-filter-{i}",
        descriptor="apache.org:correlation-filter:string"
    )

Custom Filters

Create custom filters for advanced message selection.

# XPath filter for XML message content
xpath_filter = "//order[@status='pending' and @total>1000]"
source.set_filter(
    xpath_filter,
    name="xpath-filter", 
    descriptor="apache.org:xpath-filter:string"
)

# Binary filter for exact byte matching
binary_filter = b'\x01\x02\x03'
source.set_filter(
    binary_filter,
    name="binary-filter",
    descriptor="apache.org:binary-filter:binary"
)

Advanced Address Patterns

Request-Response Pattern

Configure addresses for request-response messaging.

from uamqp.address import Source, Target
from uamqp import Message, SendClient, ReceiveClient
import uuid

def setup_request_response():
    # Request target
    request_target = Target("amqps://broker.com/requests")
    
    # Dynamic response source (broker creates unique address)
    response_source = Source(dynamic=True)
    
    return request_target, response_source

def send_request_with_response(request_data, auth):
    request_target, response_source = setup_request_response()
    
    # Create response receiver first
    with ReceiveClient(response_source, auth=auth) as response_client:
        # Get the actual response address assigned by broker
        response_address = response_client.source_address
        
        # Create request message with reply-to
        request_id = str(uuid.uuid4())
        request = Message(
            request_data,
            properties={
                'reply_to': response_address,
                'correlation_id': request_id
            }
        )
        
        # Send request
        with SendClient(request_target, auth=auth) as request_client:
            request_client.queue_message(request)
            request_client.send_all_messages()
        
        # Wait for response
        response_source.set_filter(
            request_id,
            name="correlation-filter"
        )
        
        responses = response_client.receive_message_batch(timeout=30000)
        if responses:
            return responses[0].get_data()
        else:
            raise TimeoutError("No response received")

Publish-Subscribe Pattern

Configure addresses for publish-subscribe messaging.

def setup_pub_sub(topic_name, subscription_names):
    """Setup publish-subscribe addresses."""
    
    # Publisher target
    publisher_target = Target(f"amqps://broker.com/{topic_name}")
    
    # Subscriber sources
    subscriber_sources = []
    for sub_name in subscription_names:
        source = Source(
            f"amqps://broker.com/{topic_name}/Subscriptions/{sub_name}",
            distribution_mode="copy"  # Each subscriber gets a copy
        )
        subscriber_sources.append(source)
    
    return publisher_target, subscriber_sources

# Usage
target, sources = setup_pub_sub("events", ["sub1", "sub2", "sub3"])

# Publish message
with SendClient(target, auth=auth) as publisher:
    event = Message({"event": "user_login", "user_id": 12345})
    publisher.queue_message(event)
    publisher.send_all_messages()

# Subscribe to messages
for i, source in enumerate(sources):
    with ReceiveClient(source, auth=auth) as subscriber:
        messages = subscriber.receive_message_batch(timeout=10000)
        print(f"Subscriber {i+1} received {len(messages)} messages")

Work Queue Pattern

Configure addresses for work distribution patterns.

def setup_work_queue(queue_name, worker_count):
    """Setup work queue with multiple workers."""
    
    # Work queue source with move distribution
    work_source = Source(
        f"amqps://broker.com/{queue_name}",
        distribution_mode="move"  # Only one worker gets each message
    )
    
    # Dead letter queue for failed messages
    dlq_target = Target(f"amqps://broker.com/{queue_name}-dlq")
    
    return work_source, dlq_target

def process_work_items(work_source, dlq_target, auth):
    """Process work items with error handling."""
    
    with ReceiveClient(work_source, auth=auth) as worker, \
         SendClient(dlq_target, auth=auth) as dlq_sender:
        
        while True:
            messages = worker.receive_message_batch(max_batch_size=10, timeout=30000)
            
            if not messages:
                break  # No more work
            
            for message in messages:
                try:
                    # Process work item
                    work_data = message.get_data()
                    result = process_work_item(work_data)
                    
                    # Success - accept message
                    message.accept()
                    
                except Exception as e:
                    # Failed - send to dead letter queue
                    error_msg = Message({
                        "original_data": message.get_data(),
                        "error": str(e),
                        "timestamp": time.time()
                    })
                    
                    dlq_sender.queue_message(error_msg)
                    dlq_sender.send_all_messages()
                    
                    # Reject original message
                    message.reject(
                        condition="processing-error",
                        description=str(e)
                    )

def process_work_item(data):
    """Process individual work item."""
    # Simulate work processing
    import time
    time.sleep(0.1)
    return f"Processed: {data}"

Address Validation and Parsing

Address Validation

def validate_address(address_str):
    """Validate AMQP address format."""
    try:
        addr = Address(address_str)
        
        # Check required components
        if not addr.hostname:
            raise ValueError("Hostname required")
        
        if addr.scheme not in ['amqp', 'amqps']:
            raise ValueError("Scheme must be amqp or amqps")
        
        if addr.scheme == 'amqps' and not addr.hostname:
            raise ValueError("TLS requires valid hostname")
        
        return True
        
    except Exception as e:
        print(f"Invalid address '{address_str}': {e}")
        return False

# Usage
addresses = [
    "amqps://broker.com/queue",      # Valid
    "amqp://broker.com:5672/topic",  # Valid
    "invalid://broker.com/queue",    # Invalid scheme
    "amqps:///queue"                 # Invalid - missing hostname
]

for addr in addresses:
    is_valid = validate_address(addr)
    print(f"{addr}: {'Valid' if is_valid else 'Invalid'}")

Address Parsing

def parse_address_components(address_str):
    """Parse address into components."""
    addr = Address(address_str)
    
    return {
        'full_address': addr.address,
        'scheme': addr.scheme,
        'hostname': addr.hostname,
        'username': addr.username,
        'password': '***' if addr.password else None,  # Don't log passwords
        'port': getattr(addr, 'port', None),
        'path': getattr(addr, 'path', None),
        'is_secure': addr.scheme == 'amqps',
        'is_dynamic': addr.dynamic,
        'is_durable': addr.durable
    }

# Usage
components = parse_address_components("amqps://user@broker.com:5671/myqueue?durable=true")
for key, value in components.items():
    print(f"{key}: {value}")

Install with Tessl CLI

npx tessl i tessl/pypi-uamqp

docs

address-endpoints.md

async-operations.md

authentication.md

client-apis.md

connection-session.md

error-handling.md

high-level-messaging.md

index.md

low-level-protocol.md

message-management.md

types-constants.md

tile.json