CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-uamqp

An AMQP 1.0 client library for Python

Overview
Eval results
Files

high-level-messaging.mddocs/

High-Level Messaging API

Simple functions for common messaging operations that provide the easiest way to send and receive AMQP messages without managing connections or sessions directly.

Capabilities

Send Single Message

Sends a single message to an AMQP endpoint with automatic connection management and cleanup.

def send_message(target, data, auth=None, debug=False):
    """
    Send a single message to AMQP endpoint.

    Parameters:
    - target (str, bytes, or Target): The target AMQP endpoint
    - data (str, bytes, or Message): The contents of the message to send
    - auth (AMQPAuth, optional): Authentication credentials (defaults to SASLAnonymous)
    - debug (bool): Whether to turn on network trace logs

    Returns:
    list[MessageState]: A list of states for each message sent
    """

Usage Example:

import uamqp
from uamqp.authentication import SASLPlain

# Send simple string message
target = "amqps://servicebus.example.com/myqueue"
auth = SASLPlain("myuser", "mypassword")
result = uamqp.send_message(target, "Hello World", auth=auth)

# Send custom message object
from uamqp import Message
message = Message("Hello World", properties={'message_id': '123'})
result = uamqp.send_message(target, message, auth=auth)

Receive Single Message

Receives a single message from an AMQP endpoint with automatic connection management.

def receive_message(source, auth=None, timeout=0, debug=False):
    """
    Receive a single message from an AMQP endpoint.

    Parameters:
    - source (str, bytes, or Source): The AMQP source endpoint to receive from
    - auth (AMQPAuth, optional): Authentication credentials (defaults to SASLAnonymous)
    - timeout (int): Timeout in milliseconds (0 = no timeout)
    - debug (bool): Whether to turn on network trace logs

    Returns:
    Message or None: The received message, or None if timeout reached
    """

Usage Example:

import uamqp
from uamqp.authentication import SASLPlain

source = "amqps://servicebus.example.com/myqueue"
auth = SASLPlain("myuser", "mypassword")

# Receive with 30 second timeout
message = uamqp.receive_message(source, auth=auth, timeout=30000)
if message:
    print(f"Received: {message.get_data()}")
    message.accept()  # Acknowledge the message
else:
    print("No message received within timeout")

Receive Multiple Messages

Receives a batch of messages from an AMQP endpoint with configurable batch size and timeout.

def receive_messages(source, auth=None, max_batch_size=None, timeout=0, debug=False, **kwargs):
    """
    Receive a batch of messages from an AMQP endpoint.

    Parameters:
    - source (str, bytes, or Source): The AMQP source endpoint to receive from
    - auth (AMQPAuth, optional): Authentication credentials (defaults to SASLAnonymous)
    - max_batch_size (int, optional): Maximum number of messages to return
    - timeout (int): Timeout in milliseconds (0 = no timeout)
    - debug (bool): Whether to turn on network trace logs
    - **kwargs: Additional receiver configuration options

    Returns:
    list[Message]: List of received messages (may be empty if timeout reached)
    """

def get_platform_info():
    """
    Get current platform and library information.

    Returns:
    str: Platform information string including library version and platform details
    """

Usage Example:

import uamqp
from uamqp.authentication import SASTokenAuth

source = "amqps://servicebus.example.com/myqueue"
token = "SharedAccessSignature sr=..."
auth = SASTokenAuth("servicebus.example.com", token=token)

# Receive up to 10 messages with 5 second timeout
messages = uamqp.receive_messages(
    source, 
    auth=auth, 
    max_batch_size=10, 
    timeout=5000
)

print(f"Received {len(messages)} messages")
for message in messages:
    print(f"Message: {message.get_data()}")
    message.accept()  # Acknowledge each message

Get Platform Information

Retrieves platform and library version information useful for debugging and diagnostics.

Usage Example:

import uamqp

# Get platform information
platform_info = uamqp.get_platform_info()
print(f"Platform info: {platform_info}")

# Typical output might include library version, platform, and build info

Error Handling

All high-level messaging functions may raise AMQP-related exceptions:

  • AMQPConnectionError: Connection to broker failed
  • AMQPError: General AMQP protocol error
  • AuthenticationException: Authentication failed
  • MessageSendFailed: Message sending failed
  • ClientTimeout: Operation timed out
import uamqp
from uamqp.errors import AMQPConnectionError, AuthenticationException

try:
    message = uamqp.receive_message(source, auth=auth, timeout=10000)
    if message:
        message.accept()
except AMQPConnectionError as e:
    print(f"Connection failed: {e}")
except AuthenticationException as e:
    print(f"Authentication failed: {e}")

Common Parameters

Authentication Types

All high-level messaging functions accept these authentication types:

  • None: Uses SASLAnonymous (for brokers that allow anonymous access)
  • SASLPlain: Username/password authentication
  • SASLAnonymous: Anonymous authentication (explicit)
  • SASTokenAuth: Shared Access Signature token authentication
  • JWTTokenAuth: JSON Web Token authentication

Target and Source Formats

Target and source parameters accept:

  • String: AMQP URI (e.g., "amqps://host:5671/queue")
  • Bytes: AMQP URI as bytes
  • Target/Source objects: For advanced configuration with filters, properties, etc.

Debug Mode

Setting debug=True enables network trace logging at INFO level, useful for troubleshooting connection and protocol issues.

Install with Tessl CLI

npx tessl i tessl/pypi-uamqp

docs

address-endpoints.md

async-operations.md

authentication.md

client-apis.md

connection-session.md

error-handling.md

high-level-messaging.md

index.md

low-level-protocol.md

message-management.md

types-constants.md

tile.json