CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pika

Pure Python implementation of the AMQP 0.9.1 protocol including RabbitMQ's extensions

Pending
Overview
Eval results
Files

connection-management.mddocs/

Connection Management

Comprehensive connection configuration and management with multiple adapter types supporting different networking approaches for connecting to RabbitMQ brokers.

Capabilities

Connection Parameters

Configure connection settings including host, port, credentials, timeouts, and protocol options.

class ConnectionParameters:
    """Connection parameters for AMQP connections."""
    
    def __init__(self, host='localhost', port=5672, virtual_host='/',
                 credentials=None, channel_max=65535, frame_max=131072,
                 heartbeat=None, ssl_options=None, connection_attempts=1,
                 retry_delay=2.0, socket_timeout=10.0, stack_timeout=15.0,
                 locale='en_US', blocked_connection_timeout=None,
                 client_properties=None, tcp_options=None):
        """
        Parameters:
        - host (str): RabbitMQ server hostname
        - port (int): RabbitMQ server port (default: 5672)
        - virtual_host (str): Virtual host (default: '/')
        - credentials (Credentials): Authentication credentials
        - channel_max (int): Maximum number of channels (default: 65535)
        - frame_max (int): Maximum frame size in bytes (default: 131072)
        - heartbeat (int): Heartbeat interval in seconds
        - ssl_options (SSLOptions): SSL configuration
        - connection_attempts (int): Number of connection attempts (default: 1)
        - retry_delay (float): Delay between attempts in seconds (default: 2.0)
        - socket_timeout (float): Socket connection timeout (default: 10.0)
        - stack_timeout (float): Full stack timeout (default: 15.0)
        - locale (str): Connection locale (default: 'en_US')
        - blocked_connection_timeout (float): Blocked connection timeout
        - client_properties (dict): Client identification properties
        - tcp_options (dict): TCP socket options
        """
    
    # Properties for all parameters
    @property
    def host(self) -> str: ...
    
    @host.setter
    def host(self, value: str): ...
    
    @property
    def port(self) -> int: ...
    
    @port.setter  
    def port(self, value: int): ...
    
    @property
    def virtual_host(self) -> str: ...
    
    @virtual_host.setter
    def virtual_host(self, value: str): ...
    
    @property
    def credentials(self): ...
    
    @credentials.setter
    def credentials(self, value): ...
    
    @property
    def channel_max(self) -> int: ...
    
    @channel_max.setter
    def channel_max(self, value: int): ...
    
    @property
    def frame_max(self) -> int: ...
    
    @frame_max.setter
    def frame_max(self, value: int): ...
    
    @property
    def heartbeat(self) -> int: ...
    
    @heartbeat.setter
    def heartbeat(self, value: int): ...
    
    @property
    def ssl_options(self): ...
    
    @ssl_options.setter
    def ssl_options(self, value): ...
    
    @property
    def connection_attempts(self) -> int: ...
    
    @connection_attempts.setter
    def connection_attempts(self, value: int): ...
    
    @property
    def retry_delay(self) -> float: ...
    
    @retry_delay.setter
    def retry_delay(self, value: float): ...
    
    @property
    def socket_timeout(self) -> float: ...
    
    @socket_timeout.setter
    def socket_timeout(self, value: float): ...
    
    @property
    def stack_timeout(self) -> float: ...
    
    @stack_timeout.setter
    def stack_timeout(self, value: float): ...
    
    @property
    def locale(self) -> str: ...
    
    @locale.setter
    def locale(self, value: str): ...
    
    @property
    def blocked_connection_timeout(self) -> float: ...
    
    @blocked_connection_timeout.setter
    def blocked_connection_timeout(self, value: float): ...
    
    @property
    def client_properties(self) -> dict: ...
    
    @client_properties.setter
    def client_properties(self, value: dict): ...
    
    @property
    def tcp_options(self) -> dict: ...
    
    @tcp_options.setter
    def tcp_options(self, value: dict): ...

URL-Based Parameters

Create connection parameters from AMQP URLs with automatic parsing of connection details.

class URLParameters(ConnectionParameters):
    """Connection parameters from AMQP URL."""
    
    def __init__(self, url):
        """
        Create connection parameters from AMQP URL.
        
        Parameters:
        - url (str): AMQP URL in format: amqp://username:password@host:port/<virtual_host>[?query-string]
        
        URL query parameters:
        - channel_max: Maximum number of channels
        - client_properties: Client properties as JSON
        - connection_attempts: Number of connection attempts
        - frame_max: Maximum frame size
        - heartbeat: Heartbeat interval
        - locale: Connection locale
        - ssl_options: SSL options as JSON
        - retry_delay: Retry delay in seconds
        - socket_timeout: Socket timeout in seconds
        - stack_timeout: Stack timeout in seconds
        - blocked_connection_timeout: Blocked connection timeout
        - tcp_options: TCP options as JSON
        """

Blocking Connection

Synchronous connection adapter for simple blocking operations.

class BlockingConnection:
    """Synchronous connection to RabbitMQ."""
    
    def __init__(self, parameters):
        """
        Create blocking connection.
        
        Parameters:
        - parameters (ConnectionParameters or URLParameters): Connection configuration
        """
    
    def channel(self, channel_number=None):
        """
        Create a new channel.
        
        Parameters:
        - channel_number (int, optional): Specific channel number to use
        
        Returns:
        - BlockingChannel: New channel instance
        """
    
    def close(self):
        """Close the connection."""
    
    def process_data_events(self, time_limit=0):
        """
        Process pending data events.
        
        Parameters:
        - time_limit (float): Maximum time to process events (default: 0 for non-blocking)
        """
    
    def sleep(self, duration):
        """
        Sleep while processing connection events.
        
        Parameters:
        - duration (float): Sleep duration in seconds
        """
    
    def add_callback_threadsafe(self, callback):
        """
        Add callback to be executed in connection thread.
        
        Parameters:
        - callback (callable): Callback function to execute
        """
    
    def call_later(self, delay, callback, *args):
        """
        Schedule callback execution after delay.
        
        Parameters:
        - delay (float): Delay in seconds
        - callback (callable): Callback function
        - *args: Arguments for callback
        
        Returns:
        - object: Handle for canceling the call
        """
    
    def remove_timeout(self, timeout_id):
        """
        Remove scheduled timeout.
        
        Parameters:
        - timeout_id: Timeout handle to remove
        """
    
    def update_secret(self, new_secret, reason):
        """
        Update connection credentials.
        
        Parameters:
        - new_secret (str): New password/secret
        - reason (str): Reason for update
        """
    
    def add_on_connection_blocked_callback(self, callback):
        """
        Add callback for connection blocked events.
        
        Parameters:
        - callback (callable): Callback function receiving (connection, method)
        """
    
    def add_on_connection_unblocked_callback(self, callback):
        """
        Add callback for connection unblocked events.
        
        Parameters:
        - callback (callable): Callback function receiving (connection, method)
        """
    
    # Connection state properties
    @property
    def is_closed(self) -> bool:
        """True if connection is closed."""
    
    @property
    def is_open(self) -> bool:
        """True if connection is open."""
    
    # Server capability properties
    @property
    def basic_nack_supported(self) -> bool:
        """True if server supports basic.nack."""
    
    @property
    def consumer_cancel_notify_supported(self) -> bool:
        """True if server supports consumer cancel notifications."""
    
    @property
    def exchange_exchange_bindings_supported(self) -> bool:
        """True if server supports exchange-to-exchange bindings."""
    
    @property
    def publisher_confirms_supported(self) -> bool:
        """True if server supports publisher confirms."""

Select Connection

Event-driven connection adapter using select/poll/epoll for asynchronous operations.

class SelectConnection:
    """Event-driven connection using select/poll/epoll."""
    
    def __init__(self, parameters, on_open_callback=None, on_open_error_callback=None,
                 on_close_callback=None, ioloop=None):
        """
        Create select-based connection.
        
        Parameters:
        - parameters (ConnectionParameters): Connection configuration
        - on_open_callback (callable): Called when connection opens
        - on_open_error_callback (callable): Called on connection error
        - on_close_callback (callable): Called when connection closes
        - ioloop (IOLoop): Event loop instance (creates default if None)
        """
    
    def channel(self, on_open_callback, channel_number=None):
        """
        Create a new channel asynchronously.
        
        Parameters:
        - on_open_callback (callable): Called when channel opens
        - channel_number (int, optional): Specific channel number
        """
    
    def close(self, reply_code=200, reply_text='Normal shutdown'):
        """
        Close connection asynchronously.
        
        Parameters:
        - reply_code (int): AMQP reply code (default: 200)
        - reply_text (str): Human-readable close reason
        """
    
    def ioloop(self):
        """
        Get the IOLoop instance.
        
        Returns:
        - IOLoop: Event loop instance
        """

Connection Workflow

Default AMQP connection establishment workflow with retry and timeout handling.

class AMQPConnectionWorkflow:
    """Default connection establishment workflow."""
    
    def __init__(self, parameters, on_done_callback, nbio_interface):
        """
        Create connection workflow.
        
        Parameters:
        - parameters (ConnectionParameters): Connection configuration
        - on_done_callback (callable): Called when workflow completes
        - nbio_interface: Non-blocking I/O interface
        """
    
    def start(self):
        """Start the connection workflow."""
    
    def abort(self):
        """Abort the connection workflow."""

Usage Examples

Basic Connection

import pika

# Simple connection
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Use the channel...

connection.close()

Connection with Authentication

import pika

credentials = pika.PlainCredentials('username', 'password')
parameters = pika.ConnectionParameters(
    host='rabbitmq.example.com',
    port=5672,
    virtual_host='/app',
    credentials=credentials
)

connection = pika.BlockingConnection(parameters)

URL-Based Connection

import pika

# Connection from URL
url = 'amqp://user:pass@localhost:5672/%2F?heartbeat=300'
connection = pika.BlockingConnection(pika.URLParameters(url))

Connection with SSL

import pika
import ssl

context = ssl.create_default_context()
ssl_options = pika.SSLOptions(context, 'rabbitmq.example.com')

parameters = pika.ConnectionParameters(
    host='rabbitmq.example.com',
    port=5671,
    ssl_options=ssl_options
)

connection = pika.BlockingConnection(parameters)

Asynchronous Connection

import pika

def on_open(connection):
    print('Connection opened')
    connection.channel(on_open_callback=on_channel_open)

def on_open_error(connection, error):
    print(f'Connection failed: {error}')

def on_close(connection, reason):
    print(f'Connection closed: {reason}')

def on_channel_open(channel):
    print('Channel opened')
    # Use channel...
    
parameters = pika.ConnectionParameters('localhost')
connection = pika.SelectConnection(
    parameters,
    on_open_callback=on_open,
    on_open_error_callback=on_open_error,
    on_close_callback=on_close
)

# Start the IOLoop
connection.ioloop.start()

Install with Tessl CLI

npx tessl i tessl/pypi-pika

docs

authentication-security.md

channel-operations.md

connection-adapters.md

connection-management.md

exception-handling.md

index.md

message-properties-types.md

tile.json