An AMQP 1.0 client library for Python
Direct access to AMQP protocol elements including message senders, receivers, and protocol-level message handling for advanced use cases that require fine-grained control over AMQP 1.0 protocol behavior.
Low-level message sender link that provides direct control over message transmission and settlement.
class MessageSender:
def __init__(self, session, source, target, name=None, send_settle_mode=None,
max_message_size=None, link_properties=None,
desired_capabilities=None):
"""
Low-level message sender link.
Parameters:
- session (Session): AMQP session
- source (Source): Link source address
- target (Target): Link target address
- name (str): Link name
- send_settle_mode (SenderSettleMode): Settlement mode
- max_message_size (int): Maximum message size
- link_properties (dict): Link properties
- desired_capabilities (list): Desired link capabilities
"""Key Methods:
def open(self):
"""Open the sender link."""
def close(self):
"""Close the sender link."""
def send_async(self, message, callback=None):
"""
Send a message asynchronously.
Parameters:
- message (Message): Message to send
- callback (callable): Completion callback
Returns:
MessageState: Send operation state
"""
def work(self):
"""Process sender work (I/O and protocol handling)."""
def destroy(self):
"""Destroy the sender and free resources."""Key Properties:
@property
def name: str
"""Link name."""
@property
def source: Source
"""Link source address."""
@property
def target: Target
"""Link target address."""
@property
def max_message_size: int
"""Maximum message size."""
@property
def send_settle_mode: int
"""Sender settlement mode."""Usage Examples:
from uamqp import Connection, Session, MessageSender
from uamqp.address import Source, Target
from uamqp import Message
from uamqp.constants import SenderSettleMode
# Create low-level sender
connection = Connection("amqp.example.com", sasl=auth)
connection.open()
session = Session(connection)
session.begin()
source = Source() # Null source for sender
target = Target("myqueue")
sender = MessageSender(
session=session,
source=source,
target=target,
name="my-sender",
send_settle_mode=SenderSettleMode.Mixed,
max_message_size=1048576 # 1MB
)
try:
sender.open()
# Send message with callback
def send_callback(message, result, error):
if error:
print(f"Send failed: {error}")
else:
print(f"Send completed: {result}")
message = Message("Hello World")
result = sender.send_async(message, callback=send_callback)
# Process until send completes
while result == MessageState.WaitingForSendAck:
sender.work()
connection.work()
finally:
sender.close()
session.end()
connection.close()Low-level message receiver link that provides direct control over message reception and flow control.
class MessageReceiver:
def __init__(self, session, source, target, name=None,
receive_settle_mode=None, max_message_size=None,
prefetch=None, link_properties=None,
desired_capabilities=None):
"""
Low-level message receiver link.
Parameters:
- session (Session): AMQP session
- source (Source): Link source address
- target (Target): Link target address
- name (str): Link name
- receive_settle_mode (ReceiverSettleMode): Settlement mode
- max_message_size (int): Maximum message size
- prefetch (int): Number of messages to prefetch
- link_properties (dict): Link properties
- desired_capabilities (list): Desired link capabilities
"""Key Methods:
def open(self):
"""Open the receiver link."""
def close(self):
"""Close the receiver link."""
def receive_message_batch(self, max_batch_size=None):
"""
Receive a batch of messages.
Parameters:
- max_batch_size (int): Maximum messages to receive
Returns:
list[Message]: Received messages
"""
def work(self):
"""Process receiver work (I/O and protocol handling)."""
def flow(self, link_credit):
"""
Grant link credit for message flow control.
Parameters:
- link_credit (int): Number of credits to grant
"""
def destroy(self):
"""Destroy the receiver and free resources."""Key Properties:
@property
def name: str
"""Link name."""
@property
def source: Source
"""Link source address."""
@property
def target: Target
"""Link target address."""
@property
def max_message_size: int
"""Maximum message size."""
@property
def receive_settle_mode: int
"""Receiver settlement mode."""
@property
def prefetch: int
"""Prefetch count."""Usage Examples:
from uamqp import MessageReceiver
from uamqp.constants import ReceiverSettleMode
# Create low-level receiver
source = Source("myqueue")
target = Target() # Null target for receiver
receiver = MessageReceiver(
session=session,
source=source,
target=target,
name="my-receiver",
receive_settle_mode=ReceiverSettleMode.PeekLock,
prefetch=100,
max_message_size=1048576
)
try:
receiver.open()
# Grant initial credits
receiver.flow(10)
# Receive messages in loop
while True:
messages = receiver.receive_message_batch(max_batch_size=5)
if not messages:
# No messages, process connection
receiver.work()
connection.work()
continue
print(f"Received {len(messages)} messages")
for message in messages:
try:
data = message.get_data()
print(f"Message: {data}")
# Process message
process_message(data)
# Accept message
message.accept()
except Exception as e:
print(f"Processing error: {e}")
message.reject(
condition="processing-error",
description=str(e)
)
# Grant more credits after processing
receiver.flow(len(messages))
finally:
receiver.close()class CreditManager:
def __init__(self, receiver, initial_credits=10, min_credits=5):
self.receiver = receiver
self.initial_credits = initial_credits
self.min_credits = min_credits
self.granted_credits = 0
self.processed_messages = 0
def start(self):
"""Grant initial credits."""
self.receiver.flow(self.initial_credits)
self.granted_credits = self.initial_credits
def on_message_processed(self):
"""Call when a message is processed."""
self.processed_messages += 1
# Calculate remaining credits
remaining_credits = self.granted_credits - self.processed_messages
# Grant more credits if running low
if remaining_credits <= self.min_credits:
additional_credits = self.initial_credits - remaining_credits
self.receiver.flow(additional_credits)
self.granted_credits += additional_credits
print(f"Granted {additional_credits} additional credits")
# Usage
credit_manager = CreditManager(receiver, initial_credits=20, min_credits=5)
credit_manager.start()
while True:
messages = receiver.receive_message_batch(max_batch_size=10)
for message in messages:
# Process message
process_message(message)
message.accept()
# Update credit management
credit_manager.on_message_processed()from uamqp.constants import MessageSenderState, MessageReceiverState
def monitor_link_state(link, link_type="sender"):
"""Monitor and handle link state changes."""
if link_type == "sender":
states = MessageSenderState
else:
states = MessageReceiverState
current_state = link.get_state()
if current_state == states.Opening:
print("Link is opening...")
# Wait for open to complete
while link.get_state() == states.Opening:
link.work()
time.sleep(0.1)
elif current_state == states.Open:
print("Link is open and ready")
return True
elif current_state == states.Error:
print("Link is in error state")
error_info = link.get_error_info()
print(f"Error: {error_info}")
return False
elif current_state == states.Closing:
print("Link is closing...")
return False
return current_state == states.Open
# Usage
sender_ready = monitor_link_state(sender, "sender")
if sender_ready:
# Proceed with sending
passdef create_sender_with_properties(session, target):
"""Create sender with custom link properties."""
link_properties = {
'x-opt-jms-dest': 1, # JMS destination type
'x-opt-enqueuetime': True, # Include enqueue time
'product': 'MyApplication', # Application identifier
'version': '1.0.0' # Application version
}
desired_capabilities = [
'ANONYMOUS-RELAY', # Anonymous relay capability
'DELAYED_DELIVERY' # Delayed delivery capability
]
sender = MessageSender(
session=session,
source=Source(),
target=target,
link_properties=link_properties,
desired_capabilities=desired_capabilities
)
return sender
# Check if capabilities were granted
def check_link_capabilities(sender):
"""Check which capabilities were granted by peer."""
remote_capabilities = sender.get_remote_capabilities()
desired_capabilities = sender.desired_capabilities
granted = []
denied = []
for capability in desired_capabilities:
if capability in remote_capabilities:
granted.append(capability)
else:
denied.append(capability)
print(f"Granted capabilities: {granted}")
print(f"Denied capabilities: {denied}")
return granted, deniedclass BatchProcessor:
def __init__(self, receiver, batch_size=100, timeout=5.0):
self.receiver = receiver
self.batch_size = batch_size
self.timeout = timeout
self.message_buffer = []
def process_batches(self):
"""Process messages in batches for better throughput."""
start_time = time.time()
while True:
messages = self.receiver.receive_message_batch(
max_batch_size=self.batch_size - len(self.message_buffer)
)
self.message_buffer.extend(messages)
# Process batch if full or timeout reached
if (len(self.message_buffer) >= self.batch_size or
time.time() - start_time > self.timeout):
if self.message_buffer:
self._process_batch(self.message_buffer)
self.message_buffer = []
start_time = time.time()
# Service the connection
self.receiver.work()
def _process_batch(self, messages):
"""Process a batch of messages."""
print(f"Processing batch of {len(messages)} messages")
for message in messages:
try:
# Process message
data = message.get_data()
process_message_fast(data)
message.accept()
except Exception as e:
print(f"Error processing message: {e}")
message.reject()
# Usage
batch_processor = BatchProcessor(receiver, batch_size=50, timeout=2.0)
batch_processor.process_batches()import threading
from queue import Queue
class ParallelProcessor:
def __init__(self, receiver, worker_count=4):
self.receiver = receiver
self.worker_count = worker_count
self.message_queue = Queue()
self.workers = []
self.running = False
def start(self):
"""Start parallel message processing."""
self.running = True
# Start worker threads
for i in range(self.worker_count):
worker = threading.Thread(target=self._worker_thread, args=(i,))
worker.daemon = True
worker.start()
self.workers.append(worker)
# Start receiver thread
receiver_thread = threading.Thread(target=self._receiver_thread)
receiver_thread.daemon = True
receiver_thread.start()
def stop(self):
"""Stop parallel processing."""
self.running = False
# Wait for workers to finish
for worker in self.workers:
worker.join(timeout=5.0)
def _receiver_thread(self):
"""Receive messages and queue for processing."""
while self.running:
try:
messages = self.receiver.receive_message_batch(max_batch_size=10)
for message in messages:
self.message_queue.put(message)
self.receiver.work()
except Exception as e:
print(f"Receiver error: {e}")
time.sleep(1)
def _worker_thread(self, worker_id):
"""Worker thread for processing messages."""
print(f"Worker {worker_id} started")
while self.running:
try:
# Get message from queue with timeout
message = self.message_queue.get(timeout=1.0)
# Process message
data = message.get_data()
result = process_message_threadsafe(data)
if result:
message.accept()
else:
message.reject()
self.message_queue.task_done()
except Queue.Empty:
continue # Timeout, check if still running
except Exception as e:
print(f"Worker {worker_id} error: {e}")
# Usage
processor = ParallelProcessor(receiver, worker_count=8)
processor.start()
# Let it run for a while
time.sleep(60)
processor.stop()Low-level AMQP management operations for advanced broker interaction and administrative tasks.
class MgmtOperation:
def __init__(self, session, target=None, debug=False):
"""
AMQP management operation handler.
Parameters:
- session (Session): AMQP session
- target (Target): Management target endpoint
- debug (bool): Enable debug logging
"""
def open(self):
"""Open the management operation link."""
def close(self):
"""Close the management operation link."""
def execute_async(self, operation, op_type, locales=None, timeout=0):
"""
Execute a management operation asynchronously.
Parameters:
- operation (str): Operation name
- op_type (str): Operation type
- locales (list): Supported locales
- timeout (int): Operation timeout in milliseconds
Returns:
Management operation result
"""class MgmtOperationAsync:
def __init__(self, session, target=None, debug=False, loop=None):
"""
Async AMQP management operation handler.
Parameters:
- session (SessionAsync): Async AMQP session
- target (Target): Management target endpoint
- debug (bool): Enable debug logging
- loop: Asyncio event loop
"""
async def open_async(self):
"""Asynchronously open the management operation link."""
async def close_async(self):
"""Asynchronously close the management operation link."""
async def execute_async(self, operation, op_type, locales=None, timeout=0):
"""
Execute a management operation asynchronously.
Parameters:
- operation (str): Operation name
- op_type (str): Operation type
- locales (list): Supported locales
- timeout (int): Operation timeout in milliseconds
Returns:
Management operation result
"""Usage Examples:
from uamqp.mgmt_operation import MgmtOperation
from uamqp.address import Target
# Create management operation
mgmt_target = Target("$management")
mgmt_op = MgmtOperation(session, target=mgmt_target)
try:
mgmt_op.open()
# Execute management operation (e.g., get queue info)
result = mgmt_op.execute_async(
operation="READ",
op_type="com.microsoft:queue",
timeout=30000
)
print(f"Management result: {result}")
finally:
mgmt_op.close()
# Async management operations
from uamqp.async_ops.mgmt_operation_async import MgmtOperationAsync
async def async_management_example():
mgmt_op = MgmtOperationAsync(async_session, target=mgmt_target)
try:
await mgmt_op.open_async()
result = await mgmt_op.execute_async(
operation="CREATE",
op_type="com.microsoft:queue",
timeout=30000
)
print(f"Async management result: {result}")
finally:
await mgmt_op.close_async()def create_resilient_sender(session, target, max_retries=3):
"""Create sender with automatic recovery."""
for attempt in range(max_retries):
try:
sender = MessageSender(session, Source(), target)
sender.open()
# Wait for link to open
while sender.get_state() == MessageSenderState.Opening:
sender.work()
time.sleep(0.1)
if sender.get_state() == MessageSenderState.Open:
print(f"Sender opened successfully on attempt {attempt + 1}")
return sender
else:
raise Exception(f"Sender failed to open: {sender.get_state()}")
except Exception as e:
print(f"Sender creation attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
else:
raise
return None
# Usage with recovery
sender = create_resilient_sender(session, target)
if sender:
# Use sender...
passInstall with Tessl CLI
npx tessl i tessl/pypi-uamqp