CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-uamqp

An AMQP 1.0 client library for Python

Overview
Eval results
Files

message-management.mddocs/

Message Management

Core message classes for creating, manipulating, and processing AMQP messages including support for different body types, properties, headers, annotations, and batch operations.

Capabilities

Message Class

The core AMQP message representation that encapsulates all message data including body, properties, headers, and annotations.

class Message:
    def __init__(self, body=None, properties=None, application_properties=None, 
                 annotations=None, header=None, msg_format=None, encoding='UTF-8', 
                 body_type=None, footer=None, delivery_annotations=None):
        """
        Create an AMQP message.

        Parameters:
        - body: Message body (str, bytes, list, or dict)
        - properties (MessageProperties): Standard AMQP message properties
        - application_properties (dict): Custom application-specific properties
        - annotations (dict): Message annotations (broker-specific metadata)
        - header (MessageHeader): Message header with delivery information
        - msg_format (int): Message format identifier
        - encoding (str): Text encoding for string data
        - body_type (MessageBodyType): Type of message body (Data, Value, Sequence)
        - footer (dict): Message footer annotations
        - delivery_annotations (dict): Delivery-specific annotations
        """

Key Properties:

# Message content access
@property
def data: bytes  # Binary message data
@property
def value: any   # Decoded message value
@property
def sequence: list  # Sequence message data

# Message metadata
@property
def properties: MessageProperties  # Standard message properties
@property
def application_properties: dict   # Custom properties
@property
def header: MessageHeader         # Message header
@property
def annotations: dict             # Message annotations
@property
def delivery_annotations: dict    # Delivery annotations
@property
def footer: dict                  # Message footer

# Message state
@property
def settled: bool  # Whether message is settled
@property
def state: MessageState  # Current message state (enum)
@property
def idle_time: int  # Time message has been idle
@property
def retries: int  # Number of retry attempts
@property
def delivery_no: int  # Delivery sequence number
@property
def delivery_tag: bytes  # Delivery tag identifier
@property
def message_annotations: dict  # Alias for annotations property

Key Methods:

def get_data(self):
    """Get the message body as bytes."""

def get_message(self):
    """Get the underlying C message object."""

def accept(self):
    """
    Accept the message (acknowledge successful processing).
    
    Returns:
        bool: True if settlement succeeded
    """

def reject(self, condition=None, description=None, info=None):
    """
    Reject the message (indicate processing failure).
    
    Parameters:
    - condition (str): Error condition code
    - description (str): Error description
    - info (dict): Additional error information
    
    Returns:
        bool: True if settlement succeeded
    """

def release(self):
    """
    Release the message (return to queue for redelivery).
    
    Returns:
        bool: True if settlement succeeded
    """

def modify(self, failed, deliverable, annotations=None):
    """
    Modify message state and annotations.
    
    Parameters:
    - failed (bool): Whether message processing failed
    - deliverable (bool): Whether message is deliverable
    - annotations (dict): Additional annotations to add
    """

def gather(self):
    """Gather multi-part message data."""

def get_message_encoded_size(self):
    """
    Get the encoded size of the message.
    
    Returns:
        int: Message size in bytes
    """

def encode_message(self):
    """
    Encode message to AMQP wire format.
    
    Returns:
        bytearray: Encoded message data
    """

@classmethod
def decode_from_bytes(cls, data, encoding='UTF-8'):
    """
    Decode message from raw AMQP bytes.
    
    Parameters:
    - data (bytes): Raw AMQP message data
    - encoding (str): Text encoding for string data
    
    Returns:
        Message: Decoded message instance
    """

Usage Examples:

from uamqp import Message
from uamqp.message import MessageProperties, MessageHeader

# Simple text message
message = Message("Hello World")

# Message with properties
properties = MessageProperties(
    message_id="msg-123",
    content_type="text/plain",
    reply_to="response-queue"
)
message = Message("Hello World", properties=properties)

# Message with custom application properties
app_props = {"priority": "high", "source": "sensor-1"}
message = Message("sensor data", application_properties=app_props)

# Binary message
binary_data = b'\x00\x01\x02\x03'
message = Message(binary_data)

# JSON message (will be serialized)
json_data = {"temperature": 23.5, "humidity": 65}
message = Message(json_data)

BatchMessage Class

Optimized message class for high-throughput scenarios that can contain multiple individual messages in a single batch.

class BatchMessage:
    def __init__(self, data=None, properties=None, application_properties=None,
                 annotations=None, header=None, multi_messages=False, encoding='UTF-8'):
        """
        Create a batch message for high-throughput scenarios.

        Parameters:
        - data: Batch data (list of messages or encoded batch data)
        - properties (MessageProperties): Batch-level message properties
        - application_properties (dict): Custom batch properties
        - annotations (dict): Batch annotations
        - header (MessageHeader): Batch header
        - multi_messages (bool): Whether data contains multiple distinct messages
        - encoding (str): Text encoding
        """

Key Properties:

@property
def batch_format: int  # Batch message format identifier
@property
def max_message_length: int  # Maximum individual message length
@property
def data: bytes  # Batch data

Key Methods:

def gather(self):
    """Gather and prepare batch data for transmission."""

Usage Example:

from uamqp import BatchMessage, Message

# Create individual messages
messages = [
    Message("Message 1"),
    Message("Message 2"),
    Message("Message 3")
]

# Create batch message
batch = BatchMessage(messages, multi_messages=True)

# Send batch message
from uamqp import SendClient
with SendClient(target) as client:
    client.queue_message(batch)
    client.send_all_messages()

MessageProperties Class

Container for standard AMQP message properties following the AMQP 1.0 specification.

class MessageProperties:
    def __init__(self, message_id=None, user_id=None, to=None, subject=None,
                 reply_to=None, correlation_id=None, content_type=None,
                 content_encoding=None, absolute_expiry_time=None,
                 creation_time=None, group_id=None, group_sequence=None,
                 reply_to_group_id=None):
        """
        Standard AMQP message properties.

        Parameters:
        - message_id: Unique message identifier
        - user_id: Identity of user responsible for producing message
        - to: Address of node the message is being sent to
        - subject: Application-specific subject
        - reply_to: Address of node to send replies to
        - correlation_id: Application correlation identifier
        - content_type: MIME content type
        - content_encoding: MIME content encoding
        - absolute_expiry_time: Absolute expiry time
        - creation_time: Message creation timestamp
        - group_id: Group identifier
        - group_sequence: Position within group
        - reply_to_group_id: Group identifier for replies
        """

def get_properties_obj(self):
    """
    Get underlying C properties reference.
    
    Returns:
        uamqp.c_uamqp.cProperties: C properties object
    """

Usage Example:

from uamqp.message import MessageProperties
import datetime

properties = MessageProperties(
    message_id="order-12345",
    content_type="application/json",
    reply_to="order-responses",
    correlation_id="req-98765",
    creation_time=datetime.datetime.utcnow(),
    subject="order-created"
)

message = Message(order_data, properties=properties)

MessageHeader Class

Container for AMQP message header information including delivery details and quality of service settings.

class MessageHeader:
    def __init__(self, durable=None, priority=None, time_to_live=None,
                 first_acquirer=None, delivery_count=None):
        """
        AMQP message header information.

        Parameters:
        - durable (bool): Whether message should survive broker restart
        - priority (int): Message priority (0-255)
        - time_to_live (int): Message TTL in milliseconds
        - first_acquirer (bool): Whether this is first attempt to acquire
        - delivery_count (int): Number of delivery attempts
        """

Key Properties:

@property
def delivery_count: int  # Number of delivery attempts
@property
def time_to_live: int   # TTL in milliseconds
@property
def ttl: int           # Alias for time_to_live
@property
def durable: bool       # Message durability
@property
def first_acquirer: bool  # First acquisition flag
@property
def priority: int       # Message priority (0-255)

def get_header_obj(self):
    """
    Get underlying C header reference.
    
    Returns:
        uamqp.c_uamqp.cHeader: C header object
    """

Usage Example:

from uamqp.message import MessageHeader

# High priority, durable message with 1 hour TTL
header = MessageHeader(
    durable=True,
    priority=255,
    time_to_live=3600000  # 1 hour in milliseconds
)

message = Message("Important data", header=header)

Message Body Types

Different message body formats supported by AMQP 1.0.

class MessageBody:
    """Base class for message bodies."""
    @property
    def type: int  # Body type identifier
    @property 
    def data: any  # Body data

class DataBody(MessageBody):
    """Binary data message body."""
    def append(self, data):
        """Append binary data to body."""
    
    def __len__(self):
        """Get number of data segments."""
        
    def __getitem__(self, index):
        """Get data segment by index."""
        
    def __str__(self):
        """String representation of data body."""
        
    def __bytes__(self):
        """Bytes representation of data body."""

class ValueBody(MessageBody):
    """Single encoded value message body."""
    def set(self, value):
        """Set the body value."""
        
    def __str__(self):
        """String representation of value body."""
        
    def __bytes__(self):
        """Bytes representation of value body."""

class SequenceBody(MessageBody):
    """Sequence of structured data message body."""
    def append(self, data):
        """Append data to sequence."""
    
    def __len__(self):
        """Get number of sequence items."""
        
    def __getitem__(self, index):
        """Get sequence item by index."""
        
    def __str__(self):
        """String representation of sequence body."""
        
    def __bytes__(self):
        """Bytes representation of sequence body."""

Usage Example:

from uamqp.message import DataBody, ValueBody, SequenceBody

# Binary data body
data_body = DataBody()
data_body.append(b"binary data chunk 1")
data_body.append(b"binary data chunk 2")

# Single value body
value_body = ValueBody()
value_body.set({"key": "value", "number": 42})

# Sequence body
sequence_body = SequenceBody()
sequence_body.append("item 1")
sequence_body.append("item 2")
sequence_body.append({"structured": "data"})

MessageBodyType Enum

Message body type constants that define how message data is structured:

class MessageBodyType:
    """Message body type identifiers."""
    Data = 0x75      # Binary data body
    Value = 0x77     # Single encoded value body  
    Sequence = 0x76  # Sequence of structured data body

Message Settlement

AMQP message settlement patterns for reliable message processing:

# Accept message (successful processing)
message.accept()

# Reject message (processing failed, don't retry)
message.reject(
    condition="processing-error",
    description="Invalid data format"
)

# Release message (temporary failure, allow retry)
message.release()

# Modify message (change delivery state/annotations) 
message.modify(
    failed=True,
    deliverable=False,
    annotations={"retry_count": 3}
)

Install with Tessl CLI

npx tessl i tessl/pypi-uamqp

docs

address-endpoints.md

async-operations.md

authentication.md

client-apis.md

connection-session.md

error-handling.md

high-level-messaging.md

index.md

low-level-protocol.md

message-management.md

types-constants.md

tile.json