Pure Python implementation of the AMQP 0.9.1 protocol including RabbitMQ's extensions
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Pure Python implementation of the AMQP 0.9.1 protocol including RabbitMQ's extensions. Pika provides a comprehensive client library for Python applications to communicate with RabbitMQ message brokers, supporting multiple connection adapters and various Python environments without requiring threads.
pip install pikaimport pikaEssential classes and functions:
from pika import ConnectionParameters, URLParameters, SSLOptions
from pika import PlainCredentials
from pika import BlockingConnection, SelectConnection
from pika import BasicProperties, DeliveryMode
from pika import AMQPConnectionWorkflowConnection adapters:
from pika.adapters import BlockingConnection, SelectConnection, BaseConnection, IOLoop
# Framework-specific adapters require direct imports:
from pika.adapters.asyncio_connection import AsyncioConnection
from pika.adapters.tornado_connection import TornadoConnection
from pika.adapters.twisted_connection import TwistedProtocolConnection
from pika.adapters.gevent_connection import GeventConnectionimport pika
# Simple connection with default parameters (localhost:5672)
connection = pika.BlockingConnection(pika.ConnectionParameters())
channel = connection.channel()
# Declare a queue
channel.queue_declare(queue='hello')
# Publish a message
channel.basic_publish(
exchange='',
routing_key='hello',
body='Hello World!'
)
# Consume messages
def callback(ch, method, properties, body):
print(f"Received {body.decode()}")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='hello', on_message_callback=callback)
channel.start_consuming()
connection.close()Pika's design enables flexible AMQP client implementation:
The library supports both synchronous (BlockingConnection) and asynchronous programming models, making it suitable for simple message publishing/consuming scenarios and complex asynchronous messaging architectures.
Comprehensive connection configuration and management with multiple adapter types supporting different networking approaches (blocking, select-based, asyncio, framework-specific).
class ConnectionParameters:
def __init__(self, host='localhost', port=5672, virtual_host='/',
credentials=None, channel_max=65535, frame_max=131072,
heartbeat=None, ssl_options=None, connection_attempts=1,
retry_delay=2.0, socket_timeout=10.0, stack_timeout=15.0,
locale='en_US', blocked_connection_timeout=None,
client_properties=None, tcp_options=None): ...
class URLParameters(ConnectionParameters):
def __init__(self, url): ...
class BlockingConnection:
def __init__(self, parameters): ...
def channel(self, channel_number=None): ...
def close(self): ...
class SelectConnection:
def __init__(self, parameters, on_open_callback, on_open_error_callback,
on_close_callback, ioloop=None): ...Channel-based message operations including publishing, consuming, queue/exchange management, and transaction support with comprehensive callback handling.
class BlockingChannel:
def basic_publish(self, exchange, routing_key, body, properties=None, mandatory=False): ...
def basic_consume(self, queue, on_message_callback, auto_ack=False, exclusive=False, consumer_tag=None, arguments=None): ...
def basic_ack(self, delivery_tag, multiple=False): ...
def queue_declare(self, queue='', passive=False, durable=False, exclusive=False, auto_delete=False, arguments=None): ...
def exchange_declare(self, exchange, exchange_type='direct', passive=False, durable=False, auto_delete=False, internal=False, arguments=None): ...Authentication credential management supporting username/password and external authentication methods with SSL/TLS connection security.
class PlainCredentials:
def __init__(self, username, password, erase_on_connect=False): ...
# ExternalCredentials available via: from pika.credentials import ExternalCredentials
class ExternalCredentials:
def __init__(self): ...
class SSLOptions:
def __init__(self, context=None, server_hostname=None): ...AMQP message properties, delivery modes, exchange types, and type definitions for comprehensive message handling and routing control.
class BasicProperties:
def __init__(self, content_type=None, content_encoding=None, headers=None,
delivery_mode=None, priority=None, correlation_id=None,
reply_to=None, expiration=None, message_id=None,
timestamp=None, type=None, user_id=None, app_id=None,
cluster_id=None): ...
class DeliveryMode(Enum):
Transient = 1
Persistent = 2
# ExchangeType available via: from pika.exchange_type import ExchangeType
class ExchangeType(Enum):
direct = 'direct'
fanout = 'fanout'
headers = 'headers'
topic = 'topic'Comprehensive exception hierarchy for connection, channel, and protocol errors with detailed error information and recovery patterns.
class AMQPConnectionError(AMQPError): ...
class ConnectionClosed(AMQPConnectionError):
def __init__(self, reply_code, reply_text): ...
@property
def reply_code(self): ...
@property
def reply_text(self): ...
class AMQPChannelError(AMQPError): ...
class ChannelClosed(AMQPChannelError):
def __init__(self, reply_code, reply_text): ...Framework-specific connection adapters for asyncio, Tornado, Twisted, and Gevent integration enabling pika usage in different Python async frameworks.
class AsyncioConnection(BaseConnection):
def __init__(self, parameters, on_open_callback, on_open_error_callback,
on_close_callback, loop=None): ...
class TornadoConnection(BaseConnection):
def __init__(self, parameters, on_open_callback, on_open_error_callback,
on_close_callback, ioloop=None): ...