CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pika

Pure Python implementation of the AMQP 0.9.1 protocol including RabbitMQ's extensions

Pending
Overview
Eval results
Files

channel-operations.mddocs/

Channel Operations

Channel-based message operations including publishing, consuming, queue and exchange management, and transaction support with comprehensive callback handling for AMQP messaging.

Capabilities

Channel Management

Basic channel lifecycle and flow control operations.

class BlockingChannel:
    """Synchronous channel for message operations."""
    
    def close(self, reply_code=200, reply_text='Normal shutdown'):
        """
        Close the channel.
        
        Parameters:
        - reply_code (int): AMQP reply code (default: 200)
        - reply_text (str): Human-readable close reason
        """
    
    def flow(self, active):
        """
        Enable or disable message flow.
        
        Parameters:
        - active (bool): True to enable flow, False to disable
        
        Returns:
        - bool: Current flow state
        """
    
    def add_on_cancel_callback(self, callback):
        """
        Add callback for consumer cancellation.
        
        Parameters:
        - callback (callable): Function called when consumer is cancelled
        """
    
    def add_on_return_callback(self, callback):
        """
        Add callback for returned messages.
        
        Parameters:
        - callback (callable): Function called with (channel, method, properties, body)
        """
    
    # Channel properties
    @property
    def channel_number(self) -> int:
        """Channel number."""
    
    @property
    def connection(self):
        """Parent connection instance."""
    
    @property
    def is_closed(self) -> bool:
        """True if channel is closed."""
    
    @property
    def is_open(self) -> bool:
        """True if channel is open."""
    
    @property
    def consumer_tags(self) -> set:
        """Set of active consumer tags."""

Message Publishing

Publish messages to exchanges with routing keys and properties.

def basic_publish(self, exchange, routing_key, body, properties=None, mandatory=False):
    """
    Publish a message.
    
    Parameters:
    - exchange (str): Exchange name (empty string for default exchange)
    - routing_key (str): Routing key for message routing
    - body (bytes or str): Message body
    - properties (BasicProperties): Message properties
    - mandatory (bool): If True, message must be routable or returned
    
    Returns:
    - bool: True if message was published (or raises exception)
    """

Message Consuming

Consume messages from queues with callback-based processing.

def basic_consume(self, queue, on_message_callback, auto_ack=False, exclusive=False, 
                  consumer_tag=None, arguments=None):
    """
    Start consuming messages from queue.
    
    Parameters:
    - queue (str): Queue name to consume from
    - on_message_callback (callable): Function called for each message (ch, method, properties, body)
    - auto_ack (bool): If True, automatically acknowledge messages
    - exclusive (bool): If True, only this consumer can access the queue
    - consumer_tag (str): Consumer identifier (auto-generated if None)
    - arguments (dict): Additional arguments for consume
    
    Returns:
    - str: Consumer tag
    """

def basic_cancel(self, consumer_tag):
    """
    Cancel a message consumer.
    
    Parameters:
    - consumer_tag (str): Consumer tag to cancel
    
    Returns:
    - str: Cancelled consumer tag
    """

def start_consuming(self):
    """Start consuming messages (blocking loop)."""

def stop_consuming(self, consumer_tag=None):
    """
    Stop consuming messages.
    
    Parameters:
    - consumer_tag (str, optional): Specific consumer to stop (all if None)
    """

def consume(self, queue, no_ack=False, exclusive=False, arguments=None):
    """
    Generator-based message consumption.
    
    Parameters:
    - queue (str): Queue name to consume from
    - no_ack (bool): If True, don't require acknowledgments
    - exclusive (bool): If True, exclusive access to queue
    - arguments (dict): Additional consume arguments
    
    Yields:
    - tuple: (method, properties, body) for each message
    """

Message Acknowledgment

Acknowledge, reject, or recover messages for reliable delivery.

def basic_ack(self, delivery_tag, multiple=False):
    """
    Acknowledge message delivery.
    
    Parameters:
    - delivery_tag (int): Delivery tag of message to acknowledge
    - multiple (bool): If True, acknowledge all messages up to delivery_tag
    """

def basic_nack(self, delivery_tag, multiple=False, requeue=True):
    """
    Negative acknowledgment of message delivery.
    
    Parameters:
    - delivery_tag (int): Delivery tag of message to nack
    - multiple (bool): If True, nack all messages up to delivery_tag
    - requeue (bool): If True, requeue the message(s)
    """

def basic_reject(self, delivery_tag, requeue=True):
    """
    Reject a single message.
    
    Parameters:
    - delivery_tag (int): Delivery tag of message to reject
    - requeue (bool): If True, requeue the message
    """

def basic_recover(self, requeue=True):
    """
    Recover unacknowledged messages.
    
    Parameters:
    - requeue (bool): If True, requeue unacknowledged messages
    """

Single Message Retrieval

Get individual messages from queues without setting up consumers.

def basic_get(self, queue, auto_ack=False):
    """
    Get a single message from queue.
    
    Parameters:
    - queue (str): Queue name to get message from
    - auto_ack (bool): If True, automatically acknowledge message
    
    Returns:
    - tuple or None: (method, properties, body) if message available, None otherwise
    """

Quality of Service

Control message delivery rate and prefetch behavior.

def basic_qos(self, prefetch_size=0, prefetch_count=0, global_qos=False):
    """
    Set quality of service parameters.
    
    Parameters:
    - prefetch_size (int): Prefetch window size in bytes (0 = no limit)
    - prefetch_count (int): Number of messages to prefetch (0 = no limit)
    - global_qos (bool): If True, apply QoS globally on connection
    """

Queue Operations

Declare, delete, purge, and bind queues.

def queue_declare(self, queue='', passive=False, durable=False, exclusive=False,
                  auto_delete=False, arguments=None):
    """
    Declare a queue.
    
    Parameters:
    - queue (str): Queue name (empty string for server-generated name)
    - passive (bool): If True, only check if queue exists
    - durable (bool): If True, queue survives broker restart
    - exclusive (bool): If True, queue is exclusive to this connection
    - auto_delete (bool): If True, queue deletes when last consumer disconnects
    - arguments (dict): Additional queue arguments
    
    Returns:
    - QueueDeclareOk: Result with queue name, message count, consumer count
    """

def queue_delete(self, queue, if_unused=False, if_empty=False):
    """
    Delete a queue.
    
    Parameters:
    - queue (str): Queue name to delete
    - if_unused (bool): If True, only delete if no consumers
    - if_empty (bool): If True, only delete if no messages
    
    Returns:
    - QueueDeleteOk: Result with message count
    """

def queue_purge(self, queue):
    """
    Purge messages from queue.
    
    Parameters:
    - queue (str): Queue name to purge
    
    Returns:
    - QueuePurgeOk: Result with purged message count
    """

def queue_bind(self, queue, exchange, routing_key=None, arguments=None):
    """
    Bind queue to exchange.
    
    Parameters:
    - queue (str): Queue name to bind
    - exchange (str): Exchange name to bind to
    - routing_key (str): Routing key for binding
    - arguments (dict): Additional binding arguments
    """

def queue_unbind(self, queue, exchange, routing_key=None, arguments=None):
    """
    Unbind queue from exchange.
    
    Parameters:
    - queue (str): Queue name to unbind
    - exchange (str): Exchange name to unbind from
    - routing_key (str): Routing key for binding
    - arguments (dict): Additional binding arguments
    """

Exchange Operations

Declare, delete, and bind exchanges.

def exchange_declare(self, exchange, exchange_type='direct', passive=False,
                     durable=False, auto_delete=False, internal=False, arguments=None):
    """
    Declare an exchange.
    
    Parameters:
    - exchange (str): Exchange name
    - exchange_type (str): Exchange type ('direct', 'fanout', 'topic', 'headers')
    - passive (bool): If True, only check if exchange exists
    - durable (bool): If True, exchange survives broker restart
    - auto_delete (bool): If True, exchange deletes when last queue unbinds
    - internal (bool): If True, exchange is internal (cannot be published to directly)
    - arguments (dict): Additional exchange arguments
    """

def exchange_delete(self, exchange, if_unused=False):
    """
    Delete an exchange.
    
    Parameters:
    - exchange (str): Exchange name to delete
    - if_unused (bool): If True, only delete if no queue bindings
    """

def exchange_bind(self, destination, source, routing_key='', arguments=None):
    """
    Bind exchange to another exchange.
    
    Parameters:
    - destination (str): Destination exchange name
    - source (str): Source exchange name
    - routing_key (str): Routing key for binding
    - arguments (dict): Additional binding arguments
    """

def exchange_unbind(self, destination, source, routing_key='', arguments=None):
    """
    Unbind exchange from another exchange.
    
    Parameters:
    - destination (str): Destination exchange name
    - source (str): Source exchange name
    - routing_key (str): Routing key for binding
    - arguments (dict): Additional binding arguments
    """

Transaction Support

AMQP transaction support for atomic message operations.

def tx_select(self):
    """Start a transaction."""

def tx_commit(self):
    """Commit the current transaction."""

def tx_rollback(self):
    """Rollback the current transaction."""

Publisher Confirms

Enable publisher confirmations for reliable message delivery.

def confirm_delivery(self):
    """
    Enable publisher confirmations.
    
    Returns:
    - bool: True if confirmations enabled
    """

@property
def publisher_confirms(self) -> bool:
    """True if publisher confirmations are enabled."""

Usage Examples

Basic Publishing

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Declare a queue
channel.queue_declare(queue='task_queue', durable=True)

# Publish a message
message = "Hello World!"
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(delivery_mode=2)  # Make message persistent
)

connection.close()

Basic Consuming

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

def callback(ch, method, properties, body):
    print(f"Received {body.decode()}")
    # Simulate work
    import time
    time.sleep(1)
    
    # Acknowledge the message
    ch.basic_ack(delivery_tag=method.delivery_tag)

# Set up consumer
channel.basic_qos(prefetch_count=1)  # Fair dispatch
channel.basic_consume(queue='task_queue', on_message_callback=callback)

print('Waiting for messages. Press CTRL+C to exit')
channel.start_consuming()

Publisher Confirms

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Enable publisher confirms
channel.confirm_delivery()

try:
    channel.basic_publish(
        exchange='',
        routing_key='test_queue',
        body='Hello World!',
        mandatory=True
    )
    print("Message published successfully")
except pika.exceptions.UnroutableError:
    print("Message was returned as unroutable")
except pika.exceptions.NackError:
    print("Message was nacked by broker")

connection.close()

Generator-Based Consuming

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='test_queue')

# Consume messages using generator
for method, properties, body in channel.consume('test_queue', auto_ack=True):
    print(f"Received: {body.decode()}")
    
    # Process 10 messages then stop
    if method.delivery_tag == 10:
        channel.cancel()
        break

connection.close()

Exchange and Queue Setup

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Declare topic exchange
channel.exchange_declare(exchange='logs', exchange_type='topic', durable=True)

# Declare queue with TTL
queue_args = {'x-message-ttl': 60000}  # 60 seconds
result = channel.queue_declare(queue='', exclusive=True, arguments=queue_args)
queue_name = result.method.queue

# Bind queue to exchange with routing key pattern
channel.queue_bind(exchange='logs', queue=queue_name, routing_key='app.*.error')

connection.close()

Transaction Usage

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Start transaction
channel.tx_select()

try:
    # Publish multiple messages in transaction
    for i in range(5):
        channel.basic_publish(
            exchange='',
            routing_key='transactional_queue',
            body=f'Message {i}'
        )
    
    # Commit transaction
    channel.tx_commit()
    print("All messages published successfully")
    
except Exception as e:
    # Rollback on error
    channel.tx_rollback()
    print(f"Transaction rolled back: {e}")

connection.close()

Install with Tessl CLI

npx tessl i tessl/pypi-pika

docs

authentication-security.md

channel-operations.md

connection-adapters.md

connection-management.md

exception-handling.md

index.md

message-properties-types.md

tile.json