Pure Python implementation of the AMQP 0.9.1 protocol including RabbitMQ's extensions
—
Comprehensive connection configuration and management with multiple adapter types supporting different networking approaches for connecting to RabbitMQ brokers.
Configure connection settings including host, port, credentials, timeouts, and protocol options.
class ConnectionParameters:
"""Connection parameters for AMQP connections."""
def __init__(self, host='localhost', port=5672, virtual_host='/',
credentials=None, channel_max=65535, frame_max=131072,
heartbeat=None, ssl_options=None, connection_attempts=1,
retry_delay=2.0, socket_timeout=10.0, stack_timeout=15.0,
locale='en_US', blocked_connection_timeout=None,
client_properties=None, tcp_options=None):
"""
Parameters:
- host (str): RabbitMQ server hostname
- port (int): RabbitMQ server port (default: 5672)
- virtual_host (str): Virtual host (default: '/')
- credentials (Credentials): Authentication credentials
- channel_max (int): Maximum number of channels (default: 65535)
- frame_max (int): Maximum frame size in bytes (default: 131072)
- heartbeat (int): Heartbeat interval in seconds
- ssl_options (SSLOptions): SSL configuration
- connection_attempts (int): Number of connection attempts (default: 1)
- retry_delay (float): Delay between attempts in seconds (default: 2.0)
- socket_timeout (float): Socket connection timeout (default: 10.0)
- stack_timeout (float): Full stack timeout (default: 15.0)
- locale (str): Connection locale (default: 'en_US')
- blocked_connection_timeout (float): Blocked connection timeout
- client_properties (dict): Client identification properties
- tcp_options (dict): TCP socket options
"""
# Properties for all parameters
@property
def host(self) -> str: ...
@host.setter
def host(self, value: str): ...
@property
def port(self) -> int: ...
@port.setter
def port(self, value: int): ...
@property
def virtual_host(self) -> str: ...
@virtual_host.setter
def virtual_host(self, value: str): ...
@property
def credentials(self): ...
@credentials.setter
def credentials(self, value): ...
@property
def channel_max(self) -> int: ...
@channel_max.setter
def channel_max(self, value: int): ...
@property
def frame_max(self) -> int: ...
@frame_max.setter
def frame_max(self, value: int): ...
@property
def heartbeat(self) -> int: ...
@heartbeat.setter
def heartbeat(self, value: int): ...
@property
def ssl_options(self): ...
@ssl_options.setter
def ssl_options(self, value): ...
@property
def connection_attempts(self) -> int: ...
@connection_attempts.setter
def connection_attempts(self, value: int): ...
@property
def retry_delay(self) -> float: ...
@retry_delay.setter
def retry_delay(self, value: float): ...
@property
def socket_timeout(self) -> float: ...
@socket_timeout.setter
def socket_timeout(self, value: float): ...
@property
def stack_timeout(self) -> float: ...
@stack_timeout.setter
def stack_timeout(self, value: float): ...
@property
def locale(self) -> str: ...
@locale.setter
def locale(self, value: str): ...
@property
def blocked_connection_timeout(self) -> float: ...
@blocked_connection_timeout.setter
def blocked_connection_timeout(self, value: float): ...
@property
def client_properties(self) -> dict: ...
@client_properties.setter
def client_properties(self, value: dict): ...
@property
def tcp_options(self) -> dict: ...
@tcp_options.setter
def tcp_options(self, value: dict): ...Create connection parameters from AMQP URLs with automatic parsing of connection details.
class URLParameters(ConnectionParameters):
"""Connection parameters from AMQP URL."""
def __init__(self, url):
"""
Create connection parameters from AMQP URL.
Parameters:
- url (str): AMQP URL in format: amqp://username:password@host:port/<virtual_host>[?query-string]
URL query parameters:
- channel_max: Maximum number of channels
- client_properties: Client properties as JSON
- connection_attempts: Number of connection attempts
- frame_max: Maximum frame size
- heartbeat: Heartbeat interval
- locale: Connection locale
- ssl_options: SSL options as JSON
- retry_delay: Retry delay in seconds
- socket_timeout: Socket timeout in seconds
- stack_timeout: Stack timeout in seconds
- blocked_connection_timeout: Blocked connection timeout
- tcp_options: TCP options as JSON
"""Synchronous connection adapter for simple blocking operations.
class BlockingConnection:
"""Synchronous connection to RabbitMQ."""
def __init__(self, parameters):
"""
Create blocking connection.
Parameters:
- parameters (ConnectionParameters or URLParameters): Connection configuration
"""
def channel(self, channel_number=None):
"""
Create a new channel.
Parameters:
- channel_number (int, optional): Specific channel number to use
Returns:
- BlockingChannel: New channel instance
"""
def close(self):
"""Close the connection."""
def process_data_events(self, time_limit=0):
"""
Process pending data events.
Parameters:
- time_limit (float): Maximum time to process events (default: 0 for non-blocking)
"""
def sleep(self, duration):
"""
Sleep while processing connection events.
Parameters:
- duration (float): Sleep duration in seconds
"""
def add_callback_threadsafe(self, callback):
"""
Add callback to be executed in connection thread.
Parameters:
- callback (callable): Callback function to execute
"""
def call_later(self, delay, callback, *args):
"""
Schedule callback execution after delay.
Parameters:
- delay (float): Delay in seconds
- callback (callable): Callback function
- *args: Arguments for callback
Returns:
- object: Handle for canceling the call
"""
def remove_timeout(self, timeout_id):
"""
Remove scheduled timeout.
Parameters:
- timeout_id: Timeout handle to remove
"""
def update_secret(self, new_secret, reason):
"""
Update connection credentials.
Parameters:
- new_secret (str): New password/secret
- reason (str): Reason for update
"""
def add_on_connection_blocked_callback(self, callback):
"""
Add callback for connection blocked events.
Parameters:
- callback (callable): Callback function receiving (connection, method)
"""
def add_on_connection_unblocked_callback(self, callback):
"""
Add callback for connection unblocked events.
Parameters:
- callback (callable): Callback function receiving (connection, method)
"""
# Connection state properties
@property
def is_closed(self) -> bool:
"""True if connection is closed."""
@property
def is_open(self) -> bool:
"""True if connection is open."""
# Server capability properties
@property
def basic_nack_supported(self) -> bool:
"""True if server supports basic.nack."""
@property
def consumer_cancel_notify_supported(self) -> bool:
"""True if server supports consumer cancel notifications."""
@property
def exchange_exchange_bindings_supported(self) -> bool:
"""True if server supports exchange-to-exchange bindings."""
@property
def publisher_confirms_supported(self) -> bool:
"""True if server supports publisher confirms."""Event-driven connection adapter using select/poll/epoll for asynchronous operations.
class SelectConnection:
"""Event-driven connection using select/poll/epoll."""
def __init__(self, parameters, on_open_callback=None, on_open_error_callback=None,
on_close_callback=None, ioloop=None):
"""
Create select-based connection.
Parameters:
- parameters (ConnectionParameters): Connection configuration
- on_open_callback (callable): Called when connection opens
- on_open_error_callback (callable): Called on connection error
- on_close_callback (callable): Called when connection closes
- ioloop (IOLoop): Event loop instance (creates default if None)
"""
def channel(self, on_open_callback, channel_number=None):
"""
Create a new channel asynchronously.
Parameters:
- on_open_callback (callable): Called when channel opens
- channel_number (int, optional): Specific channel number
"""
def close(self, reply_code=200, reply_text='Normal shutdown'):
"""
Close connection asynchronously.
Parameters:
- reply_code (int): AMQP reply code (default: 200)
- reply_text (str): Human-readable close reason
"""
def ioloop(self):
"""
Get the IOLoop instance.
Returns:
- IOLoop: Event loop instance
"""Default AMQP connection establishment workflow with retry and timeout handling.
class AMQPConnectionWorkflow:
"""Default connection establishment workflow."""
def __init__(self, parameters, on_done_callback, nbio_interface):
"""
Create connection workflow.
Parameters:
- parameters (ConnectionParameters): Connection configuration
- on_done_callback (callable): Called when workflow completes
- nbio_interface: Non-blocking I/O interface
"""
def start(self):
"""Start the connection workflow."""
def abort(self):
"""Abort the connection workflow."""import pika
# Simple connection
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Use the channel...
connection.close()import pika
credentials = pika.PlainCredentials('username', 'password')
parameters = pika.ConnectionParameters(
host='rabbitmq.example.com',
port=5672,
virtual_host='/app',
credentials=credentials
)
connection = pika.BlockingConnection(parameters)import pika
# Connection from URL
url = 'amqp://user:pass@localhost:5672/%2F?heartbeat=300'
connection = pika.BlockingConnection(pika.URLParameters(url))import pika
import ssl
context = ssl.create_default_context()
ssl_options = pika.SSLOptions(context, 'rabbitmq.example.com')
parameters = pika.ConnectionParameters(
host='rabbitmq.example.com',
port=5671,
ssl_options=ssl_options
)
connection = pika.BlockingConnection(parameters)import pika
def on_open(connection):
print('Connection opened')
connection.channel(on_open_callback=on_channel_open)
def on_open_error(connection, error):
print(f'Connection failed: {error}')
def on_close(connection, reason):
print(f'Connection closed: {reason}')
def on_channel_open(channel):
print('Channel opened')
# Use channel...
parameters = pika.ConnectionParameters('localhost')
connection = pika.SelectConnection(
parameters,
on_open_callback=on_open,
on_open_error_callback=on_open_error,
on_close_callback=on_close
)
# Start the IOLoop
connection.ioloop.start()Install with Tessl CLI
npx tessl i tessl/pypi-pika