Pure Python implementation of the AMQP 0.9.1 protocol including RabbitMQ's extensions
—
AMQP message properties, delivery modes, exchange types, and type definitions for comprehensive message handling and routing control in RabbitMQ.
AMQP BasicProperties for message metadata and routing information.
class BasicProperties:
"""AMQP message properties."""
def __init__(self, content_type=None, content_encoding=None, headers=None,
delivery_mode=None, priority=None, correlation_id=None,
reply_to=None, expiration=None, message_id=None,
timestamp=None, type=None, user_id=None, app_id=None,
cluster_id=None):
"""
Create message properties.
Parameters:
- content_type (str): MIME content type (e.g., 'application/json')
- content_encoding (str): Content encoding (e.g., 'utf-8')
- headers (dict): Application-specific headers
- delivery_mode (int): Delivery mode (1=transient, 2=persistent)
- priority (int): Message priority (0-9)
- correlation_id (str): Correlation identifier for RPC
- reply_to (str): Queue name for RPC replies
- expiration (str): Message expiration time in milliseconds
- message_id (str): Unique message identifier
- timestamp (int): Message timestamp (Unix time)
- type (str): Message type identifier
- user_id (str): User ID that published the message
- app_id (str): Application identifier
- cluster_id (str): Cluster identifier
"""
def decode(self, encoded_data):
"""
Decode properties from bytes.
Parameters:
- encoded_data (bytes): Encoded property data
"""
def encode(self):
"""
Encode properties to bytes.
Returns:
- bytes: Encoded property data
"""
@property
def content_type(self) -> str:
"""MIME content type."""
@property
def content_encoding(self) -> str:
"""Content encoding."""
@property
def headers(self) -> dict:
"""Application headers dictionary."""
@property
def delivery_mode(self) -> int:
"""Delivery mode (1=transient, 2=persistent)."""
@property
def priority(self) -> int:
"""Message priority (0-9)."""
@property
def correlation_id(self) -> str:
"""Correlation ID for request/reply patterns."""
@property
def reply_to(self) -> str:
"""Reply queue name."""
@property
def expiration(self) -> str:
"""Message expiration in milliseconds."""
@property
def message_id(self) -> str:
"""Unique message identifier."""
@property
def timestamp(self) -> int:
"""Message timestamp (Unix time)."""
@property
def type(self) -> str:
"""Message type identifier."""
@property
def user_id(self) -> str:
"""User ID of message publisher."""
@property
def app_id(self) -> str:
"""Application identifier."""
@property
def cluster_id(self) -> str:
"""Cluster identifier."""Message persistence modes for durability control.
from enum import Enum
class DeliveryMode(Enum):
"""Message delivery mode enumeration."""
Transient = 1 # Non-persistent messages (default)
Persistent = 2 # Persistent messages (survive broker restart)AMQP exchange types for message routing patterns.
from enum import Enum
class ExchangeType(Enum):
"""Exchange type enumeration."""
direct = 'direct' # Direct routing by routing key
fanout = 'fanout' # Broadcast to all bound queues
headers = 'headers' # Route based on header attributes
topic = 'topic' # Pattern-based routing with wildcardsAMQP protocol constants and frame definitions.
# Protocol version
PROTOCOL_VERSION = (0, 9, 1)
PORT = 5672
# Frame types
FRAME_METHOD = 1
FRAME_HEADER = 2
FRAME_BODY = 3
FRAME_HEARTBEAT = 8
# Frame size limits
FRAME_MAX_SIZE = 131072
FRAME_MIN_SIZE = 4096
FRAME_HEADER_SIZE = 7
FRAME_END_SIZE = 1
FRAME_END = 206
# Delivery mode constants
PERSISTENT_DELIVERY_MODE = 2
# AMQP reply codes
REPLY_SUCCESS = 200
CONTENT_TOO_LARGE = 311
NO_ROUTE = 312
NO_CONSUMERS = 313
CONNECTION_FORCED = 320
INVALID_PATH = 402
ACCESS_REFUSED = 403
NOT_FOUND = 404
PRECONDITION_FAILED = 406
FRAME_ERROR = 501
COMMAND_INVALID = 503
CHANNEL_ERROR = 504
NOT_ALLOWED = 530
NOT_IMPLEMENTED = 540
INTERNAL_ERROR = 541import pika
properties = pika.BasicProperties(
content_type='application/json',
delivery_mode=2, # Persistent message
headers={'source': 'web-app', 'version': '1.0'}
)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.basic_publish(
exchange='',
routing_key='data_queue',
body='{"message": "Hello World"}',
properties=properties
)
connection.close()import pika
# Persistent message using enum
properties = pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent.value
)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.basic_publish(
exchange='',
routing_key='persistent_queue',
body='This message will survive broker restart',
properties=properties
)
connection.close()import pika
import uuid
class RpcClient:
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
self.channel = self.connection.channel()
# Declare callback queue
result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True
)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, message):
self.response = None
self.corr_id = str(uuid.uuid4())
# Publish with reply properties
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
content_type='text/plain'
),
body=message
)
# Wait for response
while self.response is None:
self.connection.process_data_events()
return self.response
# Usage
rpc = RpcClient()
response = rpc.call("Hello RPC")
print(f"Response: {response.decode()}")import pika
# Message expires after 30 seconds
properties = pika.BasicProperties(
expiration='30000', # 30 seconds in milliseconds
delivery_mode=2
)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.basic_publish(
exchange='',
routing_key='temp_queue',
body='This message expires in 30 seconds',
properties=properties
)
connection.close()import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Declare headers exchange
channel.exchange_declare(exchange='headers_exchange', exchange_type='headers')
# Publish with custom headers
properties = pika.BasicProperties(
headers={
'format': 'json',
'source': 'api',
'priority': 'high',
'x-match': 'all' # Match all headers
}
)
channel.basic_publish(
exchange='headers_exchange',
routing_key='', # Ignored for headers exchange
body='{"data": "headers routing example"}',
properties=properties
)
connection.close()import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Declare topic exchange
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
# Publish messages with different routing keys and properties
messages = [
('app.error.database', 'Database connection failed', 'high'),
('app.warning.cache', 'Cache miss rate high', 'medium'),
('app.info.startup', 'Application started', 'low')
]
for routing_key, message, priority in messages:
properties = pika.BasicProperties(
priority={'high': 9, 'medium': 5, 'low': 1}[priority],
timestamp=int(time.time()),
type='log_message',
headers={'level': routing_key.split('.')[1]}
)
channel.basic_publish(
exchange='topic_logs',
routing_key=routing_key,
body=message,
properties=properties
)
connection.close()import pika
def callback(ch, method, properties, body):
print(f"Message: {body.decode()}")
print(f"Content Type: {properties.content_type}")
print(f"Delivery Mode: {properties.delivery_mode}")
print(f"Priority: {properties.priority}")
print(f"Headers: {properties.headers}")
print(f"Timestamp: {properties.timestamp}")
print(f"Message ID: {properties.message_id}")
ch.basic_ack(delivery_tag=method.delivery_tag)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='inspect_queue')
channel.basic_consume(queue='inspect_queue', on_message_callback=callback)
print('Waiting for messages...')
channel.start_consuming()import pika
import json
import time
# Custom message with application-specific properties
properties = pika.BasicProperties(
content_type='application/json',
content_encoding='utf-8',
delivery_mode=2,
priority=5,
correlation_id='req-12345',
message_id=f'msg-{int(time.time())}',
timestamp=int(time.time()),
type='user_event',
user_id='user123',
app_id='web-service-v1.0',
headers={
'event_type': 'user_signup',
'version': '2.1',
'source_ip': '192.168.1.100',
'user_agent': 'Mozilla/5.0...'
}
)
message_data = {
'user_id': 'user123',
'event': 'signup',
'timestamp': time.time()
}
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.basic_publish(
exchange='user_events',
routing_key='signup',
body=json.dumps(message_data),
properties=properties
)
connection.close()Install with Tessl CLI
npx tessl i tessl/pypi-pika