Messaging library for Python that provides an idiomatic high-level interface for AMQP and other message brokers.
—
Queue-like API for simple use cases that provides an easy-to-use interface similar to Python's queue module. The simple interface abstracts away the complexity of AMQP entities and provides a straightforward way to send and receive messages.
Simple API for persistent queues that provides a high-level, queue-like interface for message passing.
class SimpleQueue:
def __init__(self, channel, name, no_ack=None, queue_opts=None, queue_args=None, exchange_opts=None, serializer=None, compression=None, accept=None, **kwargs):
"""
Create simple persistent queue.
Parameters:
- channel: AMQP channel to use
- name (str): Queue name
- no_ack (bool): Disable acknowledgments (None=default, False for persistence)
- queue_opts (dict): Additional queue options (durable, exclusive, etc.)
- queue_args (dict): Queue declaration arguments
- exchange_opts (dict): Additional exchange options
- serializer (str): Default serialization method
- compression (str): Default compression method
- accept (list): Accepted content types
- **kwargs: Additional options
"""
def get(self, block=True, timeout=None):
"""
Get message from queue.
Parameters:
- block (bool): Block if queue is empty (default True)
- timeout (float): Timeout in seconds for blocking get
Returns:
Decoded message body
Raises:
Empty: If queue is empty and block=False or timeout exceeded
"""
def get_nowait(self):
"""
Get message without blocking.
Returns:
Decoded message body
Raises:
Empty: If queue is empty
"""
def put(self, message, serializer=None, headers=None, compression=None, routing_key=None, **kwargs):
"""
Put message into queue.
Parameters:
- message: Message body to send
- serializer (str): Serialization method override
- headers (dict): Message headers
- compression (str): Compression method override
- routing_key (str): Routing key override
- **kwargs: Additional publish parameters
"""
def clear(self):
"""
Clear all messages from queue.
Returns:
int: Number of messages cleared
"""
def qsize(self):
"""
Get approximate queue size.
Returns:
int: Number of messages in queue (approximate)
Note:
Not all transports support this operation
"""
def close(self):
"""Close queue and cleanup resources."""
# Properties
@property
def Empty(self):
"""Exception class raised when queue is empty"""
@property
def no_ack(self):
"""bool: Auto-acknowledgment flag"""Simple API for ephemeral queues that provides a high-level interface for temporary message passing with automatic cleanup.
class SimpleBuffer:
def __init__(self, channel, name, no_ack=True, queue_opts=None, queue_args=None, exchange_opts=None, serializer=None, compression=None, accept=None, **kwargs):
"""
Create simple ephemeral queue.
Parameters:
- channel: AMQP channel to use
- name (str): Queue name
- no_ack (bool): Disable acknowledgments (default True for performance)
- queue_opts (dict): Additional queue options (auto_delete=True, durable=False by default)
- queue_args (dict): Queue declaration arguments
- exchange_opts (dict): Additional exchange options
- serializer (str): Default serialization method
- compression (str): Default compression method
- accept (list): Accepted content types
- **kwargs: Additional options
"""
# Inherits all methods from SimpleQueue
def get(self, block=True, timeout=None):
"""Get message from buffer (same as SimpleQueue.get)"""
def get_nowait(self):
"""Get message without blocking (same as SimpleQueue.get_nowait)"""
def put(self, message, serializer=None, headers=None, compression=None, routing_key=None, **kwargs):
"""Put message into buffer (same as SimpleQueue.put)"""
def clear(self):
"""Clear all messages from buffer (same as SimpleQueue.clear)"""
def qsize(self):
"""Get approximate buffer size (same as SimpleQueue.qsize)"""
def close(self):
"""Close buffer and cleanup resources (same as SimpleQueue.close)"""
# Properties
@property
def Empty(self):
"""Exception class raised when buffer is empty"""
@property
def no_ack(self):
"""bool: Auto-acknowledgment flag (True by default)"""from kombu import Connection
# Connect and create simple queue
with Connection('redis://localhost:6379/0') as conn:
# Create persistent queue
queue = conn.SimpleQueue('task_queue')
# Send messages
queue.put({'task': 'process_data', 'id': 1})
queue.put({'task': 'send_email', 'id': 2})
queue.put({'task': 'generate_report', 'id': 3})
# Receive messages
while True:
try:
message = queue.get(timeout=5.0)
print(f"Processing: {message}")
# Simulate work
if message['task'] == 'process_data':
print(f"Processing data for task {message['id']}")
elif message['task'] == 'send_email':
print(f"Sending email for task {message['id']}")
elif message['task'] == 'generate_report':
print(f"Generating report for task {message['id']}")
except queue.Empty:
print("No more messages")
break
queue.close()from kombu import Connection
with Connection('redis://localhost:6379/0') as conn:
queue = conn.SimpleQueue('work_queue')
# Send some messages
for i in range(5):
queue.put(f'Message {i}')
# Process messages without blocking
processed = 0
while True:
try:
message = queue.get_nowait()
print(f"Got: {message}")
processed += 1
except queue.Empty:
print(f"Queue empty, processed {processed} messages")
break
queue.close()from kombu import Connection
import threading
import time
def producer(conn, buffer_name):
"""Producer function"""
buffer = conn.SimpleBuffer(buffer_name)
for i in range(10):
message = f'Temp message {i}'
buffer.put(message)
print(f"Sent: {message}")
time.sleep(0.1)
buffer.close()
def consumer(conn, buffer_name):
"""Consumer function"""
buffer = conn.SimpleBuffer(buffer_name)
while True:
try:
message = buffer.get(timeout=2.0)
print(f"Received: {message}")
except buffer.Empty:
print("Buffer empty, stopping consumer")
break
buffer.close()
# Use SimpleBuffer for temporary communication
with Connection('redis://localhost:6379/0') as conn:
buffer_name = 'temp_communication'
# Start producer and consumer in separate threads
producer_thread = threading.Thread(target=producer, args=(conn, buffer_name))
consumer_thread = threading.Thread(target=consumer, args=(conn, buffer_name))
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()from kombu import Connection
with Connection('amqp://localhost') as conn:
queue = conn.SimpleQueue('management_queue')
# Add several messages
for i in range(100):
queue.put(f'Message {i}')
# Check queue size (if supported by transport)
try:
size = queue.qsize()
print(f"Queue has approximately {size} messages")
except NotImplementedError:
print("Queue size checking not supported by this transport")
# Process first 10 messages
for i in range(10):
try:
message = queue.get_nowait()
print(f"Processed: {message}")
except queue.Empty:
break
# Clear remaining messages
cleared = queue.clear()
print(f"Cleared {cleared} remaining messages")
queue.close()from kombu import Connection
import json
import pickle
with Connection('redis://localhost:6379/0') as conn:
# Queue with JSON serialization
json_queue = conn.SimpleQueue('json_queue', serializer='json')
# Send complex data structure
data = {
'user_id': 12345,
'action': 'purchase',
'items': [
{'id': 1, 'name': 'Widget', 'price': 9.99},
{'id': 2, 'name': 'Gadget', 'price': 19.99}
],
'total': 29.98
}
json_queue.put(data)
received = json_queue.get()
print(f"JSON data: {received}")
# Queue with pickle serialization and compression
binary_queue = conn.SimpleQueue(
'binary_queue',
serializer='pickle',
compression='gzip'
)
# Send binary data
binary_data = {
'large_list': list(range(1000)),
'nested_dict': {'level1': {'level2': {'level3': 'deep_value'}}}
}
binary_queue.put(binary_data)
received_binary = binary_queue.get()
print(f"Binary data received: {len(received_binary['large_list'])} items")
json_queue.close()
binary_queue.close()from kombu import Connection
import time
with Connection('redis://localhost:6379/0') as conn:
queue = conn.SimpleQueue('header_queue')
# Send message with custom headers
queue.put(
{'task': 'important_work'},
headers={
'priority': 'high',
'created_by': 'worker_service',
'timestamp': time.time(),
'retry_count': 0
}
)
# The headers are automatically included with the message
# but access depends on the underlying implementation
message = queue.get()
print(f"Received: {message}")
queue.close()from kombu import Connection
import socket
def robust_queue_processing(queue_name, conn_url):
"""Robust queue processing with error handling"""
try:
with Connection(conn_url) as conn:
queue = conn.SimpleQueue(queue_name)
while True:
try:
# Try to get message with timeout
message = queue.get(timeout=30.0)
# Process message
print(f"Processing: {message}")
# Simulate processing that might fail
if message.get('should_fail'):
raise ValueError("Simulated processing error")
print("Processing completed successfully")
except queue.Empty:
print("No messages received in 30 seconds, continuing...")
continue
except ValueError as e:
print(f"Processing error: {e}")
# With SimpleQueue, failed messages are lost unless
# you implement your own retry mechanism
continue
except KeyboardInterrupt:
print("Shutting down gracefully...")
break
queue.close()
except socket.error as e:
print(f"Connection error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
# Usage
robust_queue_processing('work_queue', 'redis://localhost:6379/0')from kombu import Connection
import threading
import time
import random
class TaskProducer:
def __init__(self, conn, queue_name):
self.queue = conn.SimpleQueue(queue_name)
self.running = True
def produce_tasks(self):
"""Produce tasks continuously"""
task_id = 0
while self.running:
task = {
'id': task_id,
'type': random.choice(['email', 'report', 'cleanup']),
'created_at': time.time()
}
self.queue.put(task)
print(f"Produced task {task_id}: {task['type']}")
task_id += 1
time.sleep(random.uniform(0.5, 2.0))
def stop(self):
self.running = False
self.queue.close()
class TaskConsumer:
def __init__(self, conn, queue_name, consumer_id):
self.queue = conn.SimpleQueue(queue_name)
self.consumer_id = consumer_id
self.running = True
def consume_tasks(self):
"""Consume tasks continuously"""
while self.running:
try:
task = self.queue.get(timeout=1.0)
# Simulate processing time
processing_time = random.uniform(0.1, 1.0)
print(f"Consumer {self.consumer_id} processing task {task['id']}")
time.sleep(processing_time)
print(f"Consumer {self.consumer_id} completed task {task['id']}")
except self.queue.Empty:
continue
except KeyboardInterrupt:
break
def stop(self):
self.running = False
self.queue.close()
# Run producer-consumer system
with Connection('redis://localhost:6379/0') as conn:
queue_name = 'task_processing'
# Create producer and consumers
producer = TaskProducer(conn, queue_name)
consumers = [
TaskConsumer(conn, queue_name, i)
for i in range(3) # 3 consumer workers
]
# Start threads
producer_thread = threading.Thread(target=producer.produce_tasks)
consumer_threads = [
threading.Thread(target=consumer.consume_tasks)
for consumer in consumers
]
producer_thread.start()
for thread in consumer_threads:
thread.start()
try:
# Run for 30 seconds
time.sleep(30)
except KeyboardInterrupt:
pass
# Graceful shutdown
producer.stop()
for consumer in consumers:
consumer.stop()
producer_thread.join()
for thread in consumer_threads:
thread.join()
print("All threads stopped")Install with Tessl CLI
npx tessl i tessl/pypi-kombu