CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pika

Pure Python implementation of the AMQP 0.9.1 protocol including RabbitMQ's extensions

Pending
Overview
Eval results
Files

message-properties-types.mddocs/

Message Properties & Types

AMQP message properties, delivery modes, exchange types, and type definitions for comprehensive message handling and routing control in RabbitMQ.

Capabilities

Message Properties

AMQP BasicProperties for message metadata and routing information.

class BasicProperties:
    """AMQP message properties."""
    
    def __init__(self, content_type=None, content_encoding=None, headers=None,
                 delivery_mode=None, priority=None, correlation_id=None,
                 reply_to=None, expiration=None, message_id=None,
                 timestamp=None, type=None, user_id=None, app_id=None,
                 cluster_id=None):
        """
        Create message properties.
        
        Parameters:
        - content_type (str): MIME content type (e.g., 'application/json')
        - content_encoding (str): Content encoding (e.g., 'utf-8')
        - headers (dict): Application-specific headers
        - delivery_mode (int): Delivery mode (1=transient, 2=persistent)
        - priority (int): Message priority (0-9)
        - correlation_id (str): Correlation identifier for RPC
        - reply_to (str): Queue name for RPC replies
        - expiration (str): Message expiration time in milliseconds
        - message_id (str): Unique message identifier
        - timestamp (int): Message timestamp (Unix time)
        - type (str): Message type identifier
        - user_id (str): User ID that published the message
        - app_id (str): Application identifier
        - cluster_id (str): Cluster identifier
        """
    
    def decode(self, encoded_data):
        """
        Decode properties from bytes.
        
        Parameters:
        - encoded_data (bytes): Encoded property data
        """
    
    def encode(self):
        """
        Encode properties to bytes.
        
        Returns:
        - bytes: Encoded property data
        """
    
    @property
    def content_type(self) -> str:
        """MIME content type."""
    
    @property
    def content_encoding(self) -> str:
        """Content encoding."""
    
    @property
    def headers(self) -> dict:
        """Application headers dictionary."""
    
    @property
    def delivery_mode(self) -> int:
        """Delivery mode (1=transient, 2=persistent)."""
    
    @property
    def priority(self) -> int:
        """Message priority (0-9)."""
    
    @property
    def correlation_id(self) -> str:
        """Correlation ID for request/reply patterns."""
    
    @property
    def reply_to(self) -> str:
        """Reply queue name."""
    
    @property
    def expiration(self) -> str:
        """Message expiration in milliseconds."""
    
    @property
    def message_id(self) -> str:
        """Unique message identifier."""
    
    @property
    def timestamp(self) -> int:
        """Message timestamp (Unix time)."""
    
    @property
    def type(self) -> str:
        """Message type identifier."""
    
    @property
    def user_id(self) -> str:
        """User ID of message publisher."""
    
    @property
    def app_id(self) -> str:
        """Application identifier."""
    
    @property
    def cluster_id(self) -> str:
        """Cluster identifier."""

Delivery Mode

Message persistence modes for durability control.

from enum import Enum

class DeliveryMode(Enum):
    """Message delivery mode enumeration."""
    
    Transient = 1   # Non-persistent messages (default)
    Persistent = 2  # Persistent messages (survive broker restart)

Exchange Types

AMQP exchange types for message routing patterns.

from enum import Enum

class ExchangeType(Enum):
    """Exchange type enumeration."""
    
    direct = 'direct'     # Direct routing by routing key
    fanout = 'fanout'     # Broadcast to all bound queues
    headers = 'headers'   # Route based on header attributes
    topic = 'topic'       # Pattern-based routing with wildcards

Protocol Constants

AMQP protocol constants and frame definitions.

# Protocol version
PROTOCOL_VERSION = (0, 9, 1)
PORT = 5672

# Frame types
FRAME_METHOD = 1
FRAME_HEADER = 2
FRAME_BODY = 3
FRAME_HEARTBEAT = 8

# Frame size limits
FRAME_MAX_SIZE = 131072
FRAME_MIN_SIZE = 4096
FRAME_HEADER_SIZE = 7
FRAME_END_SIZE = 1
FRAME_END = 206

# Delivery mode constants
PERSISTENT_DELIVERY_MODE = 2

# AMQP reply codes
REPLY_SUCCESS = 200
CONTENT_TOO_LARGE = 311
NO_ROUTE = 312
NO_CONSUMERS = 313
CONNECTION_FORCED = 320
INVALID_PATH = 402
ACCESS_REFUSED = 403
NOT_FOUND = 404
PRECONDITION_FAILED = 406
FRAME_ERROR = 501
COMMAND_INVALID = 503
CHANNEL_ERROR = 504
NOT_ALLOWED = 530
NOT_IMPLEMENTED = 540
INTERNAL_ERROR = 541

Usage Examples

Basic Message Properties

import pika

properties = pika.BasicProperties(
    content_type='application/json',
    delivery_mode=2,  # Persistent message
    headers={'source': 'web-app', 'version': '1.0'}
)

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.basic_publish(
    exchange='',
    routing_key='data_queue',
    body='{"message": "Hello World"}',
    properties=properties
)

connection.close()

Using Delivery Mode Enum

import pika

# Persistent message using enum
properties = pika.BasicProperties(
    delivery_mode=pika.DeliveryMode.Persistent.value
)

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.basic_publish(
    exchange='',
    routing_key='persistent_queue',
    body='This message will survive broker restart',
    properties=properties
)

connection.close()

RPC Pattern with Properties

import pika
import uuid

class RpcClient:
    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()
        
        # Declare callback queue
        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue
        
        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True
        )
    
    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body
    
    def call(self, message):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        
        # Publish with reply properties
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
                content_type='text/plain'
            ),
            body=message
        )
        
        # Wait for response
        while self.response is None:
            self.connection.process_data_events()
        
        return self.response

# Usage
rpc = RpcClient()
response = rpc.call("Hello RPC")
print(f"Response: {response.decode()}")

Message Expiration

import pika

# Message expires after 30 seconds
properties = pika.BasicProperties(
    expiration='30000',  # 30 seconds in milliseconds
    delivery_mode=2
)

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.basic_publish(
    exchange='',
    routing_key='temp_queue',
    body='This message expires in 30 seconds',
    properties=properties
)

connection.close()

Headers-Based Routing

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Declare headers exchange
channel.exchange_declare(exchange='headers_exchange', exchange_type='headers')

# Publish with custom headers
properties = pika.BasicProperties(
    headers={
        'format': 'json',
        'source': 'api',
        'priority': 'high',
        'x-match': 'all'  # Match all headers
    }
)

channel.basic_publish(
    exchange='headers_exchange',
    routing_key='',  # Ignored for headers exchange
    body='{"data": "headers routing example"}',
    properties=properties
)

connection.close()

Topic Exchange with Properties

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Declare topic exchange
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

# Publish messages with different routing keys and properties
messages = [
    ('app.error.database', 'Database connection failed', 'high'),
    ('app.warning.cache', 'Cache miss rate high', 'medium'),
    ('app.info.startup', 'Application started', 'low')
]

for routing_key, message, priority in messages:
    properties = pika.BasicProperties(
        priority={'high': 9, 'medium': 5, 'low': 1}[priority],
        timestamp=int(time.time()),
        type='log_message',
        headers={'level': routing_key.split('.')[1]}
    )
    
    channel.basic_publish(
        exchange='topic_logs',
        routing_key=routing_key,
        body=message,
        properties=properties
    )

connection.close()

Message Metadata Inspection

import pika

def callback(ch, method, properties, body):
    print(f"Message: {body.decode()}")
    print(f"Content Type: {properties.content_type}")
    print(f"Delivery Mode: {properties.delivery_mode}")
    print(f"Priority: {properties.priority}")
    print(f"Headers: {properties.headers}")
    print(f"Timestamp: {properties.timestamp}")
    print(f"Message ID: {properties.message_id}")
    
    ch.basic_ack(delivery_tag=method.delivery_tag)

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='inspect_queue')
channel.basic_consume(queue='inspect_queue', on_message_callback=callback)

print('Waiting for messages...')
channel.start_consuming()

Custom Application Properties

import pika
import json
import time

# Custom message with application-specific properties
properties = pika.BasicProperties(
    content_type='application/json',
    content_encoding='utf-8',
    delivery_mode=2,
    priority=5,
    correlation_id='req-12345',
    message_id=f'msg-{int(time.time())}',
    timestamp=int(time.time()),
    type='user_event',
    user_id='user123',
    app_id='web-service-v1.0',
    headers={
        'event_type': 'user_signup',
        'version': '2.1',
        'source_ip': '192.168.1.100',
        'user_agent': 'Mozilla/5.0...'
    }
)

message_data = {
    'user_id': 'user123',
    'event': 'signup',
    'timestamp': time.time()
}

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.basic_publish(
    exchange='user_events',
    routing_key='signup',
    body=json.dumps(message_data),
    properties=properties
)

connection.close()

Install with Tessl CLI

npx tessl i tessl/pypi-pika

docs

authentication-security.md

channel-operations.md

connection-adapters.md

connection-management.md

exception-handling.md

index.md

message-properties-types.md

tile.json