Pure Python implementation of the AMQP 0.9.1 protocol including RabbitMQ's extensions
—
Framework-specific connection adapters for asyncio, Tornado, Twisted, and Gevent integration enabling pika usage in different Python async frameworks and event loops.
Abstract base class for all connection adapters.
class BaseConnection:
"""Abstract base class for all connection adapters."""
def __init__(self, parameters, on_open_callback=None, on_open_error_callback=None,
on_close_callback=None):
"""
Base connection initialization.
Parameters:
- parameters (ConnectionParameters): Connection configuration
- on_open_callback (callable): Called when connection opens
- on_open_error_callback (callable): Called on connection error
- on_close_callback (callable): Called when connection closes
"""
def channel(self, on_open_callback, channel_number=None):
"""
Create a new channel asynchronously.
Parameters:
- on_open_callback (callable): Called when channel opens
- channel_number (int, optional): Specific channel number
"""
def close(self, reply_code=200, reply_text='Normal shutdown'):
"""
Close connection.
Parameters:
- reply_code (int): AMQP reply code
- reply_text (str): Human-readable reason
"""
# Connection state properties
@property
def is_closed(self) -> bool:
"""True if connection is closed."""
@property
def is_open(self) -> bool:
"""True if connection is open."""Native Python 3 asyncio integration for async/await patterns.
class AsyncioConnection(BaseConnection):
"""Connection adapter for Python asyncio."""
def __init__(self, parameters, on_open_callback=None, on_open_error_callback=None,
on_close_callback=None, loop=None):
"""
Create asyncio connection.
Parameters:
- parameters (ConnectionParameters): Connection configuration
- on_open_callback (callable): Called when connection opens
- on_open_error_callback (callable): Called on connection error
- on_close_callback (callable): Called when connection closes
- loop (asyncio.AbstractEventLoop): Event loop (uses current if None)
"""
def channel(self, on_open_callback, channel_number=None):
"""
Create asyncio channel.
Parameters:
- on_open_callback (callable): Called when channel opens
- channel_number (int, optional): Specific channel number
"""Integration with Tornado web framework and IOLoop.
class TornadoConnection(BaseConnection):
"""Connection adapter for Tornado web framework."""
def __init__(self, parameters, on_open_callback=None, on_open_error_callback=None,
on_close_callback=None, ioloop=None):
"""
Create Tornado connection.
Parameters:
- parameters (ConnectionParameters): Connection configuration
- on_open_callback (callable): Called when connection opens
- on_open_error_callback (callable): Called on connection error
- on_close_callback (callable): Called when connection closes
- ioloop (tornado.ioloop.IOLoop): Tornado IOLoop instance
"""Integration with Twisted framework and reactor pattern.
class TwistedProtocolConnection(BaseConnection):
"""Connection adapter for Twisted framework."""
def __init__(self, parameters, on_open_callback=None, on_open_error_callback=None,
on_close_callback=None):
"""
Create Twisted connection.
Parameters:
- parameters (ConnectionParameters): Connection configuration
- on_open_callback (callable): Called when connection opens
- on_open_error_callback (callable): Called on connection error
- on_close_callback (callable): Called when connection closes
"""
class TwistedConnection(TwistedProtocolConnection):
"""Alias for TwistedProtocolConnection."""Integration with Gevent cooperative multitasking library.
class GeventConnection(BaseConnection):
"""Connection adapter for Gevent."""
def __init__(self, parameters, on_open_callback=None, on_open_error_callback=None,
on_close_callback=None):
"""
Create Gevent connection.
Parameters:
- parameters (ConnectionParameters): Connection configuration
- on_open_callback (callable): Called when connection opens
- on_open_error_callback (callable): Called on connection error
- on_close_callback (callable): Called when connection closes
"""Event-driven connection using select/poll/epoll for high-performance async operations.
class SelectConnection(BaseConnection):
"""Event-driven connection using select/poll/epoll."""
def __init__(self, parameters, on_open_callback=None, on_open_error_callback=None,
on_close_callback=None, ioloop=None):
"""
Create select-based connection.
Parameters:
- parameters (ConnectionParameters): Connection configuration
- on_open_callback (callable): Called when connection opens
- on_open_error_callback (callable): Called on connection error
- on_close_callback (callable): Called when connection closes
- ioloop (IOLoop): Event loop instance
"""
@property
def ioloop(self):
"""Get the IOLoop instance."""
class IOLoop:
"""Event loop implementation for SelectConnection."""
def start(self):
"""Start the event loop."""
def stop(self):
"""Stop the event loop."""
def add_timeout(self, deadline, callback_method):
"""
Add a timeout callback.
Parameters:
- deadline (float): Time when callback should be called
- callback_method (callable): Callback function
"""
def remove_timeout(self, timeout_id):
"""
Remove a timeout callback.
Parameters:
- timeout_id: Timeout identifier to remove
"""
def call_later(self, delay, callback, *args, **kwargs):
"""
Schedule callback execution after delay.
Parameters:
- delay (float): Delay in seconds
- callback (callable): Callback function
- *args, **kwargs: Arguments for callback
"""import asyncio
import pika
from pika.adapters.asyncio_connection import AsyncioConnection
async def main():
# Connection callbacks
def on_connection_open(connection):
print('Connection opened')
connection.channel(on_open_callback=on_channel_open)
def on_connection_open_error(connection, error):
print(f'Connection failed: {error}')
def on_connection_closed(connection, reason):
print(f'Connection closed: {reason}')
def on_channel_open(channel):
print('Channel opened')
# Declare queue
channel.queue_declare(
queue='asyncio_test',
callback=on_queue_declared
)
def on_queue_declared(method_frame):
print('Queue declared')
# Publish message
channel.basic_publish(
exchange='',
routing_key='asyncio_test',
body='Hello AsyncIO!'
)
# Create connection
parameters = pika.ConnectionParameters('localhost')
connection = AsyncioConnection(
parameters,
on_open_callback=on_connection_open,
on_open_error_callback=on_connection_open_error,
on_close_callback=on_connection_closed
)
# Keep running
await asyncio.sleep(2)
connection.close()
# Run the async function
asyncio.run(main())import pika
from pika.adapters.tornado_connection import TornadoConnection
from tornado import ioloop
def on_connection_open(connection):
print('Connection opened')
connection.channel(on_open_callback=on_channel_open)
def on_connection_open_error(connection, error):
print(f'Connection failed: {error}')
ioloop.IOLoop.current().stop()
def on_connection_closed(connection, reason):
print(f'Connection closed: {reason}')
ioloop.IOLoop.current().stop()
def on_channel_open(channel):
print('Channel opened')
def on_queue_declared(method_frame):
# Start consuming
channel.basic_consume(
queue='tornado_test',
on_message_callback=on_message,
auto_ack=True
)
# Declare queue
channel.queue_declare(queue='tornado_test', callback=on_queue_declared)
def on_message(channel, method, properties, body):
print(f'Received: {body.decode()}')
# Stop after first message
channel.close()
# Create connection
parameters = pika.ConnectionParameters('localhost')
connection = TornadoConnection(
parameters,
on_open_callback=on_connection_open,
on_open_error_callback=on_connection_open_error,
on_close_callback=on_connection_closed
)
# Start Tornado IOLoop
ioloop.IOLoop.current().start()import pika
from pika.adapters.twisted_connection import TwistedProtocolConnection
from twisted.internet import reactor, defer
class TwistedExample:
def __init__(self):
self.connection = None
self.channel = None
def connect(self):
parameters = pika.ConnectionParameters('localhost')
self.connection = TwistedProtocolConnection(
parameters,
on_open_callback=self.on_connection_open,
on_open_error_callback=self.on_connection_error,
on_close_callback=self.on_connection_closed
)
def on_connection_open(self, connection):
print('Connection opened')
connection.channel(on_open_callback=self.on_channel_open)
def on_connection_error(self, connection, error):
print(f'Connection failed: {error}')
reactor.stop()
def on_connection_closed(self, connection, reason):
print(f'Connection closed: {reason}')
reactor.stop()
def on_channel_open(self, channel):
print('Channel opened')
self.channel = channel
# Declare queue
channel.queue_declare(
queue='twisted_test',
callback=self.on_queue_declared
)
def on_queue_declared(self, method_frame):
# Publish message
self.channel.basic_publish(
exchange='',
routing_key='twisted_test',
body='Hello Twisted!'
)
# Close connection after publishing
reactor.callLater(1, self.connection.close)
# Usage
example = TwistedExample()
example.connect()
reactor.run()import pika
from pika.adapters.gevent_connection import GeventConnection
import gevent
class GeventExample:
def __init__(self):
self.connection = None
self.channel = None
self.is_ready = False
def connect(self):
parameters = pika.ConnectionParameters('localhost')
self.connection = GeventConnection(
parameters,
on_open_callback=self.on_connection_open,
on_open_error_callback=self.on_connection_error,
on_close_callback=self.on_connection_closed
)
def on_connection_open(self, connection):
print('Connection opened')
connection.channel(on_open_callback=self.on_channel_open)
def on_connection_error(self, connection, error):
print(f'Connection failed: {error}')
def on_connection_closed(self, connection, reason):
print(f'Connection closed: {reason}')
def on_channel_open(self, channel):
print('Channel opened')
self.channel = channel
self.is_ready = True
def publish_message(self, message):
if self.is_ready:
self.channel.basic_publish(
exchange='',
routing_key='gevent_test',
body=message
)
print(f'Published: {message}')
# Usage
example = GeventExample()
example.connect()
# Wait for connection to be ready
while not example.is_ready:
gevent.sleep(0.1)
# Publish some messages
for i in range(5):
example.publish_message(f'Message {i}')
gevent.sleep(1)
example.connection.close()import pika
from pika.adapters.select_connection import SelectConnection, IOLoop
class SelectExample:
def __init__(self):
self.connection = None
self.channel = None
self.ioloop = None
def connect(self):
# Create custom IOLoop
self.ioloop = IOLoop()
parameters = pika.ConnectionParameters('localhost')
self.connection = SelectConnection(
parameters,
on_open_callback=self.on_connection_open,
on_open_error_callback=self.on_connection_error,
on_close_callback=self.on_connection_closed,
ioloop=self.ioloop
)
def on_connection_open(self, connection):
print('Connection opened')
connection.channel(on_open_callback=self.on_channel_open)
def on_connection_error(self, connection, error):
print(f'Connection failed: {error}')
self.ioloop.stop()
def on_connection_closed(self, connection, reason):
print(f'Connection closed: {reason}')
self.ioloop.stop()
def on_channel_open(self, channel):
print('Channel opened')
self.channel = channel
# Schedule periodic message publishing
self.schedule_publish()
def schedule_publish(self):
# Publish message
self.channel.basic_publish(
exchange='',
routing_key='select_test',
body='Hello Select!'
)
# Schedule next publish in 2 seconds
self.ioloop.call_later(2.0, self.schedule_publish)
def run(self):
# Start the IOLoop
self.ioloop.start()
# Usage
example = SelectExample()
example.connect()
# Stop after 10 seconds
example.ioloop.call_later(10.0, example.connection.close)
example.run()import pika
def create_connection(adapter_type='blocking'):
"""Create connection based on adapter type."""
parameters = pika.ConnectionParameters('localhost')
if adapter_type == 'blocking':
return pika.BlockingConnection(parameters)
elif adapter_type == 'select':
def on_open(connection):
print('Select connection opened')
return pika.SelectConnection(
parameters,
on_open_callback=on_open
)
elif adapter_type == 'asyncio':
from pika.adapters.asyncio_connection import AsyncioConnection
def on_open(connection):
print('AsyncIO connection opened')
return AsyncioConnection(
parameters,
on_open_callback=on_open
)
elif adapter_type == 'tornado':
from pika.adapters.tornado_connection import TornadoConnection
def on_open(connection):
print('Tornado connection opened')
return TornadoConnection(
parameters,
on_open_callback=on_open
)
else:
raise ValueError(f"Unsupported adapter type: {adapter_type}")
# Usage examples
print("Available adapters:")
print("- blocking: Synchronous operations")
print("- select: Event-driven with select/poll")
print("- asyncio: Python 3 async/await")
print("- tornado: Tornado web framework")
print("- twisted: Twisted framework")
print("- gevent: Gevent cooperative multitasking")Install with Tessl CLI
npx tessl i tessl/pypi-pika