An AMQP 1.0 client library for Python
AMQP endpoint addressing including source and target configuration with support for filters, dynamic addresses, and link properties for flexible message routing and delivery.
Base class for AMQP endpoint representation that handles address parsing and configuration.
class Address:
def __init__(self, address=None, encoding='UTF-8'):
"""
Base AMQP endpoint representation.
Parameters:
- address (str): AMQP address string
- encoding (str): Character encoding
"""
@property
def hostname: str
"""Hostname from address."""
@property
def scheme: str
"""URI scheme (amqp, amqps)."""
@property
def username: str
"""Username from address."""
@property
def password: str
"""Password from address."""
@property
def address: str
"""The address string."""
@property
def durable: bool
"""Whether the address is durable."""
@property
def expiry_policy: str
"""Address expiry policy."""
@property
def timeout: int
"""Address timeout."""
@property
def dynamic: bool
"""Whether the address is dynamic."""
@property
def distribution_mode: str
"""Message distribution mode."""
@classmethod
def from_c_obj(cls, c_value, encoding='UTF-8'):
"""Create Address from C object."""Usage Examples:
from uamqp.address import Address
# Simple address
addr = Address("myqueue")
print(f"Address: {addr.address}")
# Full URI address
addr = Address("amqps://user:pass@broker.com:5671/queue?timeout=30")
print(f"Hostname: {addr.hostname}")
print(f"Scheme: {addr.scheme}")
print(f"Username: {addr.username}")AMQP source endpoint for receiving messages with filtering capabilities and advanced configuration options.
class Source(Address):
def __init__(self, address=None, **kwargs):
"""
AMQP source endpoint for receiving messages.
Parameters:
- address (str): Source address string
- **kwargs: Additional source configuration options
"""
@property
def filter_key: str
"""Filter key for message selection."""
def get_filter(self, name):
"""
Get a filter by name.
Parameters:
- name (str): Filter name
Returns:
Filter value or None
"""
def set_filter(self, value, name=None, descriptor=None):
"""
Set a message filter.
Parameters:
- value: Filter value
- name (str): Filter name
- descriptor: Filter descriptor
"""Usage Examples:
from uamqp.address import Source
# Basic source
source = Source("amqps://servicebus.windows.net/myqueue")
# Source with filter
source = Source("amqps://servicebus.windows.net/mytopic/Subscriptions/mysub")
# Set SQL filter for message selection
source.set_filter(
"color = 'red' AND price > 100",
name="sql-filter",
descriptor="apache.org:selector-filter:string"
)
# Set correlation ID filter
source.set_filter(
"correlation-123",
name="correlation-filter"
)
# Get filter value
current_filter = source.get_filter("sql-filter")
print(f"Current filter: {current_filter}")AMQP target endpoint for sending messages with routing and delivery configuration.
class Target(Address):
def __init__(self, address=None, **kwargs):
"""
AMQP target endpoint for sending messages.
Parameters:
- address (str): Target address string
- **kwargs: Additional target configuration options
"""Usage Examples:
from uamqp.address import Target
# Basic target
target = Target("amqps://servicebus.windows.net/myqueue")
# Target with specific properties
target = Target(
address="amqps://servicebus.windows.net/mytopic",
durable=True,
timeout=30000
)
# Dynamic target (broker assigns address)
target = Target(dynamic=True)Configure various address properties for different messaging patterns.
# Durable address (survives broker restart)
source = Source("persistent-queue", durable=True)
# Temporary address (deleted when unused)
source = Source("temp-queue", durable=False, expiry_policy="session-end")
# Dynamic address (broker generates name)
source = Source(dynamic=True)
# Address with timeout
target = Target("slow-queue", timeout=60000) # 60 secondsConfigure how messages are distributed to multiple consumers.
# Copy distribution (each consumer gets a copy)
source = Source("broadcast-topic", distribution_mode="copy")
# Move distribution (only one consumer gets each message)
source = Source("work-queue", distribution_mode="move")Use SQL-like expressions to filter messages based on properties.
from uamqp.address import Source
# Create source with SQL filter
source = Source("amqps://servicebus.windows.net/mytopic/Subscriptions/mysub")
# Filter by message properties
sql_filter = """
(priority = 'high' OR priority = 'critical')
AND region = 'us-west'
AND timestamp > '2023-01-01'
"""
source.set_filter(
sql_filter,
name="priority-region-filter",
descriptor="apache.org:selector-filter:string"
)
# Use with receive client
from uamqp import ReceiveClient
with ReceiveClient(source, auth=auth) as client:
messages = client.receive_message_batch(timeout=30000)
# Only messages matching filter will be receivedFilter messages by correlation ID for request-response patterns.
# Set correlation filter for specific conversation
correlation_id = "conversation-12345"
source.set_filter(
correlation_id,
name="correlation-filter",
descriptor="apache.org:correlation-filter:string"
)
# Multiple correlation filters
correlation_ids = ["conv-1", "conv-2", "conv-3"]
for i, cid in enumerate(correlation_ids):
source.set_filter(
cid,
name=f"correlation-filter-{i}",
descriptor="apache.org:correlation-filter:string"
)Create custom filters for advanced message selection.
# XPath filter for XML message content
xpath_filter = "//order[@status='pending' and @total>1000]"
source.set_filter(
xpath_filter,
name="xpath-filter",
descriptor="apache.org:xpath-filter:string"
)
# Binary filter for exact byte matching
binary_filter = b'\x01\x02\x03'
source.set_filter(
binary_filter,
name="binary-filter",
descriptor="apache.org:binary-filter:binary"
)Configure addresses for request-response messaging.
from uamqp.address import Source, Target
from uamqp import Message, SendClient, ReceiveClient
import uuid
def setup_request_response():
# Request target
request_target = Target("amqps://broker.com/requests")
# Dynamic response source (broker creates unique address)
response_source = Source(dynamic=True)
return request_target, response_source
def send_request_with_response(request_data, auth):
request_target, response_source = setup_request_response()
# Create response receiver first
with ReceiveClient(response_source, auth=auth) as response_client:
# Get the actual response address assigned by broker
response_address = response_client.source_address
# Create request message with reply-to
request_id = str(uuid.uuid4())
request = Message(
request_data,
properties={
'reply_to': response_address,
'correlation_id': request_id
}
)
# Send request
with SendClient(request_target, auth=auth) as request_client:
request_client.queue_message(request)
request_client.send_all_messages()
# Wait for response
response_source.set_filter(
request_id,
name="correlation-filter"
)
responses = response_client.receive_message_batch(timeout=30000)
if responses:
return responses[0].get_data()
else:
raise TimeoutError("No response received")Configure addresses for publish-subscribe messaging.
def setup_pub_sub(topic_name, subscription_names):
"""Setup publish-subscribe addresses."""
# Publisher target
publisher_target = Target(f"amqps://broker.com/{topic_name}")
# Subscriber sources
subscriber_sources = []
for sub_name in subscription_names:
source = Source(
f"amqps://broker.com/{topic_name}/Subscriptions/{sub_name}",
distribution_mode="copy" # Each subscriber gets a copy
)
subscriber_sources.append(source)
return publisher_target, subscriber_sources
# Usage
target, sources = setup_pub_sub("events", ["sub1", "sub2", "sub3"])
# Publish message
with SendClient(target, auth=auth) as publisher:
event = Message({"event": "user_login", "user_id": 12345})
publisher.queue_message(event)
publisher.send_all_messages()
# Subscribe to messages
for i, source in enumerate(sources):
with ReceiveClient(source, auth=auth) as subscriber:
messages = subscriber.receive_message_batch(timeout=10000)
print(f"Subscriber {i+1} received {len(messages)} messages")Configure addresses for work distribution patterns.
def setup_work_queue(queue_name, worker_count):
"""Setup work queue with multiple workers."""
# Work queue source with move distribution
work_source = Source(
f"amqps://broker.com/{queue_name}",
distribution_mode="move" # Only one worker gets each message
)
# Dead letter queue for failed messages
dlq_target = Target(f"amqps://broker.com/{queue_name}-dlq")
return work_source, dlq_target
def process_work_items(work_source, dlq_target, auth):
"""Process work items with error handling."""
with ReceiveClient(work_source, auth=auth) as worker, \
SendClient(dlq_target, auth=auth) as dlq_sender:
while True:
messages = worker.receive_message_batch(max_batch_size=10, timeout=30000)
if not messages:
break # No more work
for message in messages:
try:
# Process work item
work_data = message.get_data()
result = process_work_item(work_data)
# Success - accept message
message.accept()
except Exception as e:
# Failed - send to dead letter queue
error_msg = Message({
"original_data": message.get_data(),
"error": str(e),
"timestamp": time.time()
})
dlq_sender.queue_message(error_msg)
dlq_sender.send_all_messages()
# Reject original message
message.reject(
condition="processing-error",
description=str(e)
)
def process_work_item(data):
"""Process individual work item."""
# Simulate work processing
import time
time.sleep(0.1)
return f"Processed: {data}"def validate_address(address_str):
"""Validate AMQP address format."""
try:
addr = Address(address_str)
# Check required components
if not addr.hostname:
raise ValueError("Hostname required")
if addr.scheme not in ['amqp', 'amqps']:
raise ValueError("Scheme must be amqp or amqps")
if addr.scheme == 'amqps' and not addr.hostname:
raise ValueError("TLS requires valid hostname")
return True
except Exception as e:
print(f"Invalid address '{address_str}': {e}")
return False
# Usage
addresses = [
"amqps://broker.com/queue", # Valid
"amqp://broker.com:5672/topic", # Valid
"invalid://broker.com/queue", # Invalid scheme
"amqps:///queue" # Invalid - missing hostname
]
for addr in addresses:
is_valid = validate_address(addr)
print(f"{addr}: {'Valid' if is_valid else 'Invalid'}")def parse_address_components(address_str):
"""Parse address into components."""
addr = Address(address_str)
return {
'full_address': addr.address,
'scheme': addr.scheme,
'hostname': addr.hostname,
'username': addr.username,
'password': '***' if addr.password else None, # Don't log passwords
'port': getattr(addr, 'port', None),
'path': getattr(addr, 'path', None),
'is_secure': addr.scheme == 'amqps',
'is_dynamic': addr.dynamic,
'is_durable': addr.durable
}
# Usage
components = parse_address_components("amqps://user@broker.com:5671/myqueue?durable=true")
for key, value in components.items():
print(f"{key}: {value}")Install with Tessl CLI
npx tessl i tessl/pypi-uamqp