Pure Python implementation of the AMQP 0.9.1 protocol including RabbitMQ's extensions
npx @tessl/cli install tessl/pypi-pika@1.3.0Pure Python implementation of the AMQP 0.9.1 protocol including RabbitMQ's extensions. Pika provides a comprehensive client library for Python applications to communicate with RabbitMQ message brokers, supporting multiple connection adapters and various Python environments without requiring threads.
pip install pikaimport pikaEssential classes and functions:
from pika import ConnectionParameters, URLParameters, SSLOptions
from pika import PlainCredentials
from pika import BlockingConnection, SelectConnection
from pika import BasicProperties, DeliveryMode
from pika import AMQPConnectionWorkflowConnection adapters:
from pika.adapters import BlockingConnection, SelectConnection, BaseConnection, IOLoop
# Framework-specific adapters require direct imports:
from pika.adapters.asyncio_connection import AsyncioConnection
from pika.adapters.tornado_connection import TornadoConnection
from pika.adapters.twisted_connection import TwistedProtocolConnection
from pika.adapters.gevent_connection import GeventConnectionimport pika
# Simple connection with default parameters (localhost:5672)
connection = pika.BlockingConnection(pika.ConnectionParameters())
channel = connection.channel()
# Declare a queue
channel.queue_declare(queue='hello')
# Publish a message
channel.basic_publish(
exchange='',
routing_key='hello',
body='Hello World!'
)
# Consume messages
def callback(ch, method, properties, body):
print(f"Received {body.decode()}")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='hello', on_message_callback=callback)
channel.start_consuming()
connection.close()Pika's design enables flexible AMQP client implementation:
The library supports both synchronous (BlockingConnection) and asynchronous programming models, making it suitable for simple message publishing/consuming scenarios and complex asynchronous messaging architectures.
Comprehensive connection configuration and management with multiple adapter types supporting different networking approaches (blocking, select-based, asyncio, framework-specific).
class ConnectionParameters:
def __init__(self, host='localhost', port=5672, virtual_host='/',
credentials=None, channel_max=65535, frame_max=131072,
heartbeat=None, ssl_options=None, connection_attempts=1,
retry_delay=2.0, socket_timeout=10.0, stack_timeout=15.0,
locale='en_US', blocked_connection_timeout=None,
client_properties=None, tcp_options=None): ...
class URLParameters(ConnectionParameters):
def __init__(self, url): ...
class BlockingConnection:
def __init__(self, parameters): ...
def channel(self, channel_number=None): ...
def close(self): ...
class SelectConnection:
def __init__(self, parameters, on_open_callback, on_open_error_callback,
on_close_callback, ioloop=None): ...Channel-based message operations including publishing, consuming, queue/exchange management, and transaction support with comprehensive callback handling.
class BlockingChannel:
def basic_publish(self, exchange, routing_key, body, properties=None, mandatory=False): ...
def basic_consume(self, queue, on_message_callback, auto_ack=False, exclusive=False, consumer_tag=None, arguments=None): ...
def basic_ack(self, delivery_tag, multiple=False): ...
def queue_declare(self, queue='', passive=False, durable=False, exclusive=False, auto_delete=False, arguments=None): ...
def exchange_declare(self, exchange, exchange_type='direct', passive=False, durable=False, auto_delete=False, internal=False, arguments=None): ...Authentication credential management supporting username/password and external authentication methods with SSL/TLS connection security.
class PlainCredentials:
def __init__(self, username, password, erase_on_connect=False): ...
# ExternalCredentials available via: from pika.credentials import ExternalCredentials
class ExternalCredentials:
def __init__(self): ...
class SSLOptions:
def __init__(self, context=None, server_hostname=None): ...AMQP message properties, delivery modes, exchange types, and type definitions for comprehensive message handling and routing control.
class BasicProperties:
def __init__(self, content_type=None, content_encoding=None, headers=None,
delivery_mode=None, priority=None, correlation_id=None,
reply_to=None, expiration=None, message_id=None,
timestamp=None, type=None, user_id=None, app_id=None,
cluster_id=None): ...
class DeliveryMode(Enum):
Transient = 1
Persistent = 2
# ExchangeType available via: from pika.exchange_type import ExchangeType
class ExchangeType(Enum):
direct = 'direct'
fanout = 'fanout'
headers = 'headers'
topic = 'topic'Comprehensive exception hierarchy for connection, channel, and protocol errors with detailed error information and recovery patterns.
class AMQPConnectionError(AMQPError): ...
class ConnectionClosed(AMQPConnectionError):
def __init__(self, reply_code, reply_text): ...
@property
def reply_code(self): ...
@property
def reply_text(self): ...
class AMQPChannelError(AMQPError): ...
class ChannelClosed(AMQPChannelError):
def __init__(self, reply_code, reply_text): ...Framework-specific connection adapters for asyncio, Tornado, Twisted, and Gevent integration enabling pika usage in different Python async frameworks.
class AsyncioConnection(BaseConnection):
def __init__(self, parameters, on_open_callback, on_open_error_callback,
on_close_callback, loop=None): ...
class TornadoConnection(BaseConnection):
def __init__(self, parameters, on_open_callback, on_open_error_callback,
on_close_callback, ioloop=None): ...