CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pika

Pure Python implementation of the AMQP 0.9.1 protocol including RabbitMQ's extensions

Pending
Overview
Eval results
Files

connection-adapters.mddocs/

Connection Adapters

Framework-specific connection adapters for asyncio, Tornado, Twisted, and Gevent integration enabling pika usage in different Python async frameworks and event loops.

Capabilities

Base Connection

Abstract base class for all connection adapters.

class BaseConnection:
    """Abstract base class for all connection adapters."""
    
    def __init__(self, parameters, on_open_callback=None, on_open_error_callback=None,
                 on_close_callback=None):
        """
        Base connection initialization.
        
        Parameters:
        - parameters (ConnectionParameters): Connection configuration
        - on_open_callback (callable): Called when connection opens
        - on_open_error_callback (callable): Called on connection error
        - on_close_callback (callable): Called when connection closes
        """
    
    def channel(self, on_open_callback, channel_number=None):
        """
        Create a new channel asynchronously.
        
        Parameters:
        - on_open_callback (callable): Called when channel opens
        - channel_number (int, optional): Specific channel number
        """
    
    def close(self, reply_code=200, reply_text='Normal shutdown'):
        """
        Close connection.
        
        Parameters:
        - reply_code (int): AMQP reply code
        - reply_text (str): Human-readable reason
        """
    
    # Connection state properties
    @property
    def is_closed(self) -> bool:
        """True if connection is closed."""
    
    @property
    def is_open(self) -> bool:
        """True if connection is open."""

AsyncIO Connection

Native Python 3 asyncio integration for async/await patterns.

class AsyncioConnection(BaseConnection):
    """Connection adapter for Python asyncio."""
    
    def __init__(self, parameters, on_open_callback=None, on_open_error_callback=None,
                 on_close_callback=None, loop=None):
        """
        Create asyncio connection.
        
        Parameters:
        - parameters (ConnectionParameters): Connection configuration  
        - on_open_callback (callable): Called when connection opens
        - on_open_error_callback (callable): Called on connection error
        - on_close_callback (callable): Called when connection closes
        - loop (asyncio.AbstractEventLoop): Event loop (uses current if None)
        """
    
    def channel(self, on_open_callback, channel_number=None):
        """
        Create asyncio channel.
        
        Parameters:
        - on_open_callback (callable): Called when channel opens  
        - channel_number (int, optional): Specific channel number
        """

Tornado Connection

Integration with Tornado web framework and IOLoop.

class TornadoConnection(BaseConnection):
    """Connection adapter for Tornado web framework."""
    
    def __init__(self, parameters, on_open_callback=None, on_open_error_callback=None,
                 on_close_callback=None, ioloop=None):
        """
        Create Tornado connection.
        
        Parameters:
        - parameters (ConnectionParameters): Connection configuration
        - on_open_callback (callable): Called when connection opens
        - on_open_error_callback (callable): Called on connection error  
        - on_close_callback (callable): Called when connection closes
        - ioloop (tornado.ioloop.IOLoop): Tornado IOLoop instance
        """

Twisted Connection

Integration with Twisted framework and reactor pattern.

class TwistedProtocolConnection(BaseConnection):
    """Connection adapter for Twisted framework."""
    
    def __init__(self, parameters, on_open_callback=None, on_open_error_callback=None,
                 on_close_callback=None):
        """
        Create Twisted connection.
        
        Parameters:
        - parameters (ConnectionParameters): Connection configuration
        - on_open_callback (callable): Called when connection opens
        - on_open_error_callback (callable): Called on connection error
        - on_close_callback (callable): Called when connection closes
        """

class TwistedConnection(TwistedProtocolConnection):
    """Alias for TwistedProtocolConnection."""

Gevent Connection

Integration with Gevent cooperative multitasking library.

class GeventConnection(BaseConnection):
    """Connection adapter for Gevent."""
    
    def __init__(self, parameters, on_open_callback=None, on_open_error_callback=None,
                 on_close_callback=None):
        """
        Create Gevent connection.
        
        Parameters:
        - parameters (ConnectionParameters): Connection configuration
        - on_open_callback (callable): Called when connection opens
        - on_open_error_callback (callable): Called on connection error
        - on_close_callback (callable): Called when connection closes
        """

Select Connection

Event-driven connection using select/poll/epoll for high-performance async operations.

class SelectConnection(BaseConnection):
    """Event-driven connection using select/poll/epoll."""
    
    def __init__(self, parameters, on_open_callback=None, on_open_error_callback=None,
                 on_close_callback=None, ioloop=None):
        """
        Create select-based connection.
        
        Parameters:
        - parameters (ConnectionParameters): Connection configuration
        - on_open_callback (callable): Called when connection opens
        - on_open_error_callback (callable): Called on connection error
        - on_close_callback (callable): Called when connection closes
        - ioloop (IOLoop): Event loop instance
        """
    
    @property
    def ioloop(self):
        """Get the IOLoop instance."""

class IOLoop:
    """Event loop implementation for SelectConnection."""
    
    def start(self):
        """Start the event loop."""
    
    def stop(self):
        """Stop the event loop."""
    
    def add_timeout(self, deadline, callback_method):
        """
        Add a timeout callback.
        
        Parameters:
        - deadline (float): Time when callback should be called
        - callback_method (callable): Callback function
        """
    
    def remove_timeout(self, timeout_id):
        """
        Remove a timeout callback.
        
        Parameters:
        - timeout_id: Timeout identifier to remove
        """
    
    def call_later(self, delay, callback, *args, **kwargs):
        """
        Schedule callback execution after delay.
        
        Parameters:
        - delay (float): Delay in seconds
        - callback (callable): Callback function
        - *args, **kwargs: Arguments for callback
        """

Usage Examples

AsyncIO Connection

import asyncio
import pika
from pika.adapters.asyncio_connection import AsyncioConnection

async def main():
    # Connection callbacks
    def on_connection_open(connection):
        print('Connection opened')
        connection.channel(on_open_callback=on_channel_open)
    
    def on_connection_open_error(connection, error):
        print(f'Connection failed: {error}')
    
    def on_connection_closed(connection, reason):
        print(f'Connection closed: {reason}')
    
    def on_channel_open(channel):
        print('Channel opened')
        
        # Declare queue
        channel.queue_declare(
            queue='asyncio_test',
            callback=on_queue_declared
        )
    
    def on_queue_declared(method_frame):
        print('Queue declared')
        
        # Publish message
        channel.basic_publish(
            exchange='',
            routing_key='asyncio_test',
            body='Hello AsyncIO!'
        )
    
    # Create connection
    parameters = pika.ConnectionParameters('localhost')
    connection = AsyncioConnection(
        parameters,
        on_open_callback=on_connection_open,
        on_open_error_callback=on_connection_open_error,
        on_close_callback=on_connection_closed
    )
    
    # Keep running
    await asyncio.sleep(2)
    connection.close()

# Run the async function
asyncio.run(main())

Tornado Connection

import pika
from pika.adapters.tornado_connection import TornadoConnection
from tornado import ioloop

def on_connection_open(connection):
    print('Connection opened')
    connection.channel(on_open_callback=on_channel_open)

def on_connection_open_error(connection, error):
    print(f'Connection failed: {error}')
    ioloop.IOLoop.current().stop()

def on_connection_closed(connection, reason):
    print(f'Connection closed: {reason}')
    ioloop.IOLoop.current().stop()

def on_channel_open(channel):
    print('Channel opened')
    
    def on_queue_declared(method_frame):
        # Start consuming
        channel.basic_consume(
            queue='tornado_test',
            on_message_callback=on_message,
            auto_ack=True
        )
    
    # Declare queue
    channel.queue_declare(queue='tornado_test', callback=on_queue_declared)

def on_message(channel, method, properties, body):
    print(f'Received: {body.decode()}')
    
    # Stop after first message
    channel.close()

# Create connection
parameters = pika.ConnectionParameters('localhost')
connection = TornadoConnection(
    parameters,
    on_open_callback=on_connection_open,
    on_open_error_callback=on_connection_open_error,
    on_close_callback=on_connection_closed
)

# Start Tornado IOLoop
ioloop.IOLoop.current().start()

Twisted Connection

import pika
from pika.adapters.twisted_connection import TwistedProtocolConnection
from twisted.internet import reactor, defer

class TwistedExample:
    def __init__(self):
        self.connection = None
        self.channel = None
    
    def connect(self):
        parameters = pika.ConnectionParameters('localhost')
        self.connection = TwistedProtocolConnection(
            parameters,
            on_open_callback=self.on_connection_open,
            on_open_error_callback=self.on_connection_error,
            on_close_callback=self.on_connection_closed
        )
    
    def on_connection_open(self, connection):
        print('Connection opened')
        connection.channel(on_open_callback=self.on_channel_open)
    
    def on_connection_error(self, connection, error):
        print(f'Connection failed: {error}')
        reactor.stop()
    
    def on_connection_closed(self, connection, reason):
        print(f'Connection closed: {reason}')
        reactor.stop()
    
    def on_channel_open(self, channel):
        print('Channel opened')
        self.channel = channel
        
        # Declare queue
        channel.queue_declare(
            queue='twisted_test',
            callback=self.on_queue_declared
        )
    
    def on_queue_declared(self, method_frame):
        # Publish message
        self.channel.basic_publish(
            exchange='',
            routing_key='twisted_test',
            body='Hello Twisted!'
        )
        
        # Close connection after publishing
        reactor.callLater(1, self.connection.close)

# Usage
example = TwistedExample()
example.connect()
reactor.run()

Gevent Connection

import pika
from pika.adapters.gevent_connection import GeventConnection
import gevent

class GeventExample:
    def __init__(self):
        self.connection = None
        self.channel = None
        self.is_ready = False
    
    def connect(self):
        parameters = pika.ConnectionParameters('localhost')
        self.connection = GeventConnection(
            parameters,
            on_open_callback=self.on_connection_open,
            on_open_error_callback=self.on_connection_error,
            on_close_callback=self.on_connection_closed
        )
    
    def on_connection_open(self, connection):
        print('Connection opened')
        connection.channel(on_open_callback=self.on_channel_open)
    
    def on_connection_error(self, connection, error):
        print(f'Connection failed: {error}')
    
    def on_connection_closed(self, connection, reason):
        print(f'Connection closed: {reason}')
    
    def on_channel_open(self, channel):
        print('Channel opened')
        self.channel = channel
        self.is_ready = True
    
    def publish_message(self, message):
        if self.is_ready:
            self.channel.basic_publish(
                exchange='',
                routing_key='gevent_test',
                body=message
            )
            print(f'Published: {message}')

# Usage
example = GeventExample()
example.connect()

# Wait for connection to be ready
while not example.is_ready:
    gevent.sleep(0.1)

# Publish some messages
for i in range(5):
    example.publish_message(f'Message {i}')
    gevent.sleep(1)

example.connection.close()

Select Connection with Custom IOLoop

import pika
from pika.adapters.select_connection import SelectConnection, IOLoop

class SelectExample:
    def __init__(self):
        self.connection = None
        self.channel = None
        self.ioloop = None
    
    def connect(self):
        # Create custom IOLoop
        self.ioloop = IOLoop()
        
        parameters = pika.ConnectionParameters('localhost')
        self.connection = SelectConnection(
            parameters,
            on_open_callback=self.on_connection_open,
            on_open_error_callback=self.on_connection_error,
            on_close_callback=self.on_connection_closed,
            ioloop=self.ioloop
        )
    
    def on_connection_open(self, connection):
        print('Connection opened')
        connection.channel(on_open_callback=self.on_channel_open)
    
    def on_connection_error(self, connection, error):
        print(f'Connection failed: {error}')
        self.ioloop.stop()
    
    def on_connection_closed(self, connection, reason):
        print(f'Connection closed: {reason}')
        self.ioloop.stop()
    
    def on_channel_open(self, channel):
        print('Channel opened')
        self.channel = channel
        
        # Schedule periodic message publishing
        self.schedule_publish()
    
    def schedule_publish(self):
        # Publish message
        self.channel.basic_publish(
            exchange='',
            routing_key='select_test',
            body='Hello Select!'
        )
        
        # Schedule next publish in 2 seconds
        self.ioloop.call_later(2.0, self.schedule_publish)
    
    def run(self):
        # Start the IOLoop
        self.ioloop.start()

# Usage
example = SelectExample()
example.connect()

# Stop after 10 seconds
example.ioloop.call_later(10.0, example.connection.close)

example.run()

Adapter Comparison

import pika

def create_connection(adapter_type='blocking'):
    """Create connection based on adapter type."""
    parameters = pika.ConnectionParameters('localhost')
    
    if adapter_type == 'blocking':
        return pika.BlockingConnection(parameters)
    
    elif adapter_type == 'select':
        def on_open(connection):
            print('Select connection opened')
        
        return pika.SelectConnection(
            parameters,
            on_open_callback=on_open
        )
    
    elif adapter_type == 'asyncio':
        from pika.adapters.asyncio_connection import AsyncioConnection
        
        def on_open(connection):
            print('AsyncIO connection opened')
        
        return AsyncioConnection(
            parameters,
            on_open_callback=on_open
        )
    
    elif adapter_type == 'tornado':
        from pika.adapters.tornado_connection import TornadoConnection
        
        def on_open(connection):
            print('Tornado connection opened')
        
        return TornadoConnection(
            parameters,
            on_open_callback=on_open
        )
    
    else:
        raise ValueError(f"Unsupported adapter type: {adapter_type}")

# Usage examples
print("Available adapters:")
print("- blocking: Synchronous operations")  
print("- select: Event-driven with select/poll")
print("- asyncio: Python 3 async/await")
print("- tornado: Tornado web framework")
print("- twisted: Twisted framework") 
print("- gevent: Gevent cooperative multitasking")

Install with Tessl CLI

npx tessl i tessl/pypi-pika

docs

authentication-security.md

channel-operations.md

connection-adapters.md

connection-management.md

exception-handling.md

index.md

message-properties-types.md

tile.json