An AMQP 1.0 client library for Python
Type system for AMQP values and comprehensive constants for protocol states, error codes, and configuration options that provide the foundation for AMQP 1.0 protocol operations.
Base classes and specific types for representing AMQP 1.0 protocol values with proper encoding and decoding.
class AMQPType:
def __init__(self, value):
"""
Base class for AMQP types.
Parameters:
- value: The Python value to wrap
"""
@property
def value: any
"""The Python value."""
@property
def c_data: any
"""The underlying C data structure."""class AMQPSymbol(AMQPType):
"""AMQP symbol type for identifiers and names."""
class AMQPChar(AMQPType):
"""Single UTF-32 character type."""
class AMQPLong(AMQPType):
"""64-bit signed integer type."""
class AMQPuLong(AMQPType):
"""64-bit unsigned integer type."""
class AMQPInt(AMQPType):
"""32-bit signed integer type."""
class AMQPuInt(AMQPType):
"""32-bit unsigned integer type."""
class AMQPShort(AMQPType):
"""16-bit signed integer type."""
class AMQPuShort(AMQPType):
"""16-bit unsigned integer type."""
class AMQPByte(AMQPType):
"""8-bit signed integer type."""
class AMQPuByte(AMQPType):
"""8-bit unsigned integer type."""
class AMQPArray(AMQPType):
"""Array of AMQP values of the same type."""
class AMQPDescribed(AMQPType):
"""Described value with descriptor and actual value."""Usage Examples:
from uamqp.types import AMQPSymbol, AMQPLong, AMQPArray
# Symbol type for identifiers
symbol = AMQPSymbol("queue-name")
# Numeric types
long_val = AMQPLong(9223372036854775807)
uint_val = AMQPuInt(4294967295)
# Array type
numbers = AMQPArray([1, 2, 3, 4, 5])
# Use in message properties
from uamqp import Message
from uamqp.message import MessageProperties
properties = MessageProperties(
message_id=AMQPSymbol("msg-12345"),
group_sequence=AMQPuInt(1)
)
message = Message("data", properties=properties)States that indicate the processing status of messages during send/receive operations.
class MessageState:
WaitingToBeSent = 0 # Message queued for sending
WaitingForSendAck = 1 # Message sent, waiting for acknowledgment
SendComplete = 2 # Message successfully sent and acknowledged
SendFailed = 3 # Message send failed
ReceivedUnsettled = 4 # Message received but not settled
ReceivedSettled = 5 # Message received and settledResults returned from message send operations.
class MessageSendResult:
Ok = 0 # Send operation succeeded
Error = 1 # Send operation failed with error
Timeout = 2 # Send operation timed out
Cancelled = 3 # Send operation was cancelledStates for AMQP links (message senders and receivers).
class MessageSenderState:
Idle = 0 # Sender not active
Opening = 1 # Sender opening
Open = 2 # Sender ready for messages
Closing = 3 # Sender closing
Error = 4 # Sender in error state
class MessageReceiverState:
Idle = 0 # Receiver not active
Opening = 1 # Receiver opening
Open = 2 # Receiver ready for messages
Closing = 3 # Receiver closing
Error = 4 # Receiver in error stateAvailable transport protocols for AMQP connections.
class TransportType:
Amqp = 0 # Standard AMQP over TCP
AmqpOverWebsocket = 1 # AMQP over WebSocketUsage Example:
from uamqp.constants import TransportType
from uamqp.authentication import SASLPlain
# Use WebSocket transport for firewall-friendly connections
auth = SASLPlain(
hostname="amqp.example.com",
username="user",
password="pass",
transport_type=TransportType.AmqpOverWebsocket,
port=443
)Types of message body content supported by AMQP 1.0.
class MessageBodyType:
Data = 0 # Binary data sections
Value = 1 # Single AMQP value
Sequence = 2 # Sequence of AMQP valuesUsage Example:
from uamqp import Message
from uamqp.constants import MessageBodyType
# Explicit body type specification
message = Message(
body={"key": "value"},
body_type=MessageBodyType.Value
)Configuration for message acknowledgment behavior.
class SenderSettleMode:
Unsettled = 0 # Sender waits for receiver disposition
Settled = 1 # Sender considers message settled immediately
Mixed = 2 # Sender may use either mode per message
class ReceiverSettleMode:
PeekLock = 0 # First: receive, second: settle (explicit ack)
ReceiveAndDelete = 1 # Auto-settle on receiveUsage Example:
from uamqp import SendClient, ReceiveClient
from uamqp.constants import SenderSettleMode, ReceiverSettleMode
# Fire-and-forget sending
send_client = SendClient(
target,
auth=auth,
send_settle_mode=SenderSettleMode.Settled
)
# Manual acknowledgment receiving
receive_client = ReceiveClient(
source,
auth=auth,
receive_settle_mode=ReceiverSettleMode.PeekLock
)Roles that links can take in AMQP connections.
class Role:
Sender = 0 # Link sends messages
Receiver = 1 # Link receives messagesStandard AMQP error condition codes.
class ErrorCodes:
InternalServerError = "amqp:internal-error"
NotFound = "amqp:not-found"
UnauthorizedAccess = "amqp:unauthorized-access"
DecodeError = "amqp:decode-error"
ResourceLimitExceeded = "amqp:resource-limit-exceeded"
NotAllowed = "amqp:not-allowed"
InvalidField = "amqp:invalid-field"
NotImplemented = "amqp:not-implemented"
ResourceLocked = "amqp:resource-locked"
PreconditionFailed = "amqp:precondition-failed"
ResourceDeleted = "amqp:resource-deleted"
IllegalState = "amqp:illegal-state"
FrameSizeTooSmall = "amqp:frame-size-too-small"States for CBS (Claims-Based Security) authentication operations.
class CBSOperationResult:
Ok = 0 # Authentication succeeded
Error = 1 # Authentication failed
OperationFailed = 2 # Operation failed
Timeout = 3 # Authentication timed out
class CBSOpenState:
Closed = 0 # CBS link closed
Opening = 1 # CBS link opening
Open = 2 # CBS link open and ready
Error = 3 # CBS link in error state
class CBSAuthStatus:
Idle = 0 # Not authenticating
InProgress = 1 # Authentication in progress
Complete = 2 # Authentication complete
Expired = 3 # Authentication expired
Error = 4 # Authentication error
RefreshRequired = 5 # Token refresh requiredStates for AMQP management operations.
class MgmtExecuteResult:
Ok = 0 # Management operation succeeded
Error = 1 # Management operation failed
Timeout = 2 # Management operation timed out
class MgmtOpenStatus:
Closed = 0 # Management link closed
Opening = 1 # Management link opening
Open = 2 # Management link open
Error = 3 # Management link in error stateDEFAULT_AMQPS_PORT = 5671 # Default AMQPS (TLS) port
DEFAULT_AMQP_WSS_PORT = 443 # Default AMQP WebSocket Secure portAUTH_EXPIRATION_SECS = 3600 # Default token expiration (1 hour)
AUTH_REFRESH_SECS = 600 # Default token refresh interval (10 minutes)MAX_FRAME_SIZE_BYTES = 65536 # Maximum AMQP frame size (64KB)
MAX_MESSAGE_LENGTH_BYTES = 1048576 # Maximum message length (1MB)BATCH_MESSAGE_FORMAT = 0x80013700 # Batch message format identifierUsage Examples:
from uamqp.constants import (
DEFAULT_AMQPS_PORT,
MAX_FRAME_SIZE_BYTES,
AUTH_EXPIRATION_SECS
)
# Use default port
auth = SASLPlain("amqp.example.com", "user", "pass", port=DEFAULT_AMQPS_PORT)
# Configure frame size
client = SendClient(target, auth=auth, max_frame_size=MAX_FRAME_SIZE_BYTES)
# Token expiration
import datetime
expires_at = datetime.datetime.utcnow() + datetime.timedelta(seconds=AUTH_EXPIRATION_SECS)from uamqp.types import AMQPSymbol, AMQPLong
# Convert Python values to AMQP types
message_id = AMQPSymbol("msg-12345") # String to symbol
sequence_num = AMQPLong(1001) # Int to long
# Use in message properties
properties = {"message-id": message_id, "sequence": sequence_num}from uamqp.constants import MessageState, MessageSenderState
# Check message send results
results = client.send_all_messages()
for result in results:
if result == MessageState.SendComplete:
print("Message sent successfully")
elif result == MessageState.SendFailed:
print("Message send failed")
# Check sender state
if sender.get_state() == MessageSenderState.Open:
# Sender is ready for messages
sender.send_async(message)from uamqp.constants import (
TransportType,
SenderSettleMode,
ReceiverSettleMode
)
# WebSocket transport with specific settlement modes
send_client = SendClient(
target,
auth=auth,
transport_type=TransportType.AmqpOverWebsocket,
send_settle_mode=SenderSettleMode.Mixed
)
receive_client = ReceiveClient(
source,
auth=auth,
receive_settle_mode=ReceiverSettleMode.PeekLock
)Install with Tessl CLI
npx tessl i tessl/pypi-uamqp