CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kombu

Messaging library for Python that provides an idiomatic high-level interface for AMQP and other message brokers.

Pending
Overview
Eval results
Files

simple.mddocs/

Simple Interface

Queue-like API for simple use cases that provides an easy-to-use interface similar to Python's queue module. The simple interface abstracts away the complexity of AMQP entities and provides a straightforward way to send and receive messages.

Capabilities

SimpleQueue

Simple API for persistent queues that provides a high-level, queue-like interface for message passing.

class SimpleQueue:
    def __init__(self, channel, name, no_ack=None, queue_opts=None, queue_args=None, exchange_opts=None, serializer=None, compression=None, accept=None, **kwargs):
        """
        Create simple persistent queue.

        Parameters:
        - channel: AMQP channel to use
        - name (str): Queue name
        - no_ack (bool): Disable acknowledgments (None=default, False for persistence)
        - queue_opts (dict): Additional queue options (durable, exclusive, etc.)
        - queue_args (dict): Queue declaration arguments
        - exchange_opts (dict): Additional exchange options
        - serializer (str): Default serialization method
        - compression (str): Default compression method
        - accept (list): Accepted content types
        - **kwargs: Additional options
        """

    def get(self, block=True, timeout=None):
        """
        Get message from queue.

        Parameters:
        - block (bool): Block if queue is empty (default True)
        - timeout (float): Timeout in seconds for blocking get

        Returns:
        Decoded message body

        Raises:
        Empty: If queue is empty and block=False or timeout exceeded
        """

    def get_nowait(self):
        """
        Get message without blocking.

        Returns:
        Decoded message body

        Raises:
        Empty: If queue is empty
        """

    def put(self, message, serializer=None, headers=None, compression=None, routing_key=None, **kwargs):
        """
        Put message into queue.

        Parameters:
        - message: Message body to send
        - serializer (str): Serialization method override
        - headers (dict): Message headers
        - compression (str): Compression method override
        - routing_key (str): Routing key override
        - **kwargs: Additional publish parameters
        """

    def clear(self):
        """
        Clear all messages from queue.

        Returns:
        int: Number of messages cleared
        """

    def qsize(self):
        """
        Get approximate queue size.

        Returns:
        int: Number of messages in queue (approximate)

        Note:
        Not all transports support this operation
        """

    def close(self):
        """Close queue and cleanup resources."""

    # Properties
    @property
    def Empty(self):
        """Exception class raised when queue is empty"""

    @property
    def no_ack(self):
        """bool: Auto-acknowledgment flag"""

SimpleBuffer

Simple API for ephemeral queues that provides a high-level interface for temporary message passing with automatic cleanup.

class SimpleBuffer:
    def __init__(self, channel, name, no_ack=True, queue_opts=None, queue_args=None, exchange_opts=None, serializer=None, compression=None, accept=None, **kwargs):
        """
        Create simple ephemeral queue.

        Parameters:
        - channel: AMQP channel to use
        - name (str): Queue name
        - no_ack (bool): Disable acknowledgments (default True for performance)
        - queue_opts (dict): Additional queue options (auto_delete=True, durable=False by default)
        - queue_args (dict): Queue declaration arguments
        - exchange_opts (dict): Additional exchange options
        - serializer (str): Default serialization method
        - compression (str): Default compression method
        - accept (list): Accepted content types
        - **kwargs: Additional options
        """

    # Inherits all methods from SimpleQueue
    def get(self, block=True, timeout=None):
        """Get message from buffer (same as SimpleQueue.get)"""

    def get_nowait(self):
        """Get message without blocking (same as SimpleQueue.get_nowait)"""

    def put(self, message, serializer=None, headers=None, compression=None, routing_key=None, **kwargs):
        """Put message into buffer (same as SimpleQueue.put)"""

    def clear(self):
        """Clear all messages from buffer (same as SimpleQueue.clear)"""

    def qsize(self):
        """Get approximate buffer size (same as SimpleQueue.qsize)"""

    def close(self):
        """Close buffer and cleanup resources (same as SimpleQueue.close)"""

    # Properties
    @property
    def Empty(self):
        """Exception class raised when buffer is empty"""

    @property
    def no_ack(self):
        """bool: Auto-acknowledgment flag (True by default)"""

Usage Examples

Basic SimpleQueue Usage

from kombu import Connection

# Connect and create simple queue
with Connection('redis://localhost:6379/0') as conn:
    # Create persistent queue
    queue = conn.SimpleQueue('task_queue')
    
    # Send messages
    queue.put({'task': 'process_data', 'id': 1})
    queue.put({'task': 'send_email', 'id': 2})
    queue.put({'task': 'generate_report', 'id': 3})
    
    # Receive messages
    while True:
        try:
            message = queue.get(timeout=5.0)
            print(f"Processing: {message}")
            
            # Simulate work
            if message['task'] == 'process_data':
                print(f"Processing data for task {message['id']}")
            elif message['task'] == 'send_email':
                print(f"Sending email for task {message['id']}")
            elif message['task'] == 'generate_report':
                print(f"Generating report for task {message['id']}")
                
        except queue.Empty:
            print("No more messages")
            break
    
    queue.close()

Non-blocking Queue Operations

from kombu import Connection

with Connection('redis://localhost:6379/0') as conn:
    queue = conn.SimpleQueue('work_queue')
    
    # Send some messages
    for i in range(5):
        queue.put(f'Message {i}')
    
    # Process messages without blocking
    processed = 0
    while True:
        try:
            message = queue.get_nowait()
            print(f"Got: {message}")
            processed += 1
        except queue.Empty:
            print(f"Queue empty, processed {processed} messages")
            break
    
    queue.close()

SimpleBuffer for Temporary Communication

from kombu import Connection
import threading
import time

def producer(conn, buffer_name):
    """Producer function"""
    buffer = conn.SimpleBuffer(buffer_name)
    
    for i in range(10):
        message = f'Temp message {i}'
        buffer.put(message)
        print(f"Sent: {message}")
        time.sleep(0.1)
    
    buffer.close()

def consumer(conn, buffer_name):
    """Consumer function"""
    buffer = conn.SimpleBuffer(buffer_name)
    
    while True:
        try:
            message = buffer.get(timeout=2.0)
            print(f"Received: {message}")
        except buffer.Empty:
            print("Buffer empty, stopping consumer")
            break
    
    buffer.close()

# Use SimpleBuffer for temporary communication
with Connection('redis://localhost:6379/0') as conn:
    buffer_name = 'temp_communication'
    
    # Start producer and consumer in separate threads
    producer_thread = threading.Thread(target=producer, args=(conn, buffer_name))
    consumer_thread = threading.Thread(target=consumer, args=(conn, buffer_name))
    
    producer_thread.start()
    consumer_thread.start()
    
    producer_thread.join()
    consumer_thread.join()

Queue Management Operations

from kombu import Connection

with Connection('amqp://localhost') as conn:
    queue = conn.SimpleQueue('management_queue')
    
    # Add several messages
    for i in range(100):
        queue.put(f'Message {i}')
    
    # Check queue size (if supported by transport)
    try:
        size = queue.qsize()
        print(f"Queue has approximately {size} messages")
    except NotImplementedError:
        print("Queue size checking not supported by this transport")
    
    # Process first 10 messages
    for i in range(10):
        try:
            message = queue.get_nowait()
            print(f"Processed: {message}")
        except queue.Empty:
            break
    
    # Clear remaining messages
    cleared = queue.clear()
    print(f"Cleared {cleared} remaining messages")
    
    queue.close()

Serialization and Compression

from kombu import Connection
import json
import pickle

with Connection('redis://localhost:6379/0') as conn:
    # Queue with JSON serialization
    json_queue = conn.SimpleQueue('json_queue', serializer='json')
    
    # Send complex data structure
    data = {
        'user_id': 12345,
        'action': 'purchase', 
        'items': [
            {'id': 1, 'name': 'Widget', 'price': 9.99},
            {'id': 2, 'name': 'Gadget', 'price': 19.99}
        ],
        'total': 29.98
    }
    
    json_queue.put(data)
    received = json_queue.get()
    print(f"JSON data: {received}")
    
    # Queue with pickle serialization and compression
    binary_queue = conn.SimpleQueue(
        'binary_queue',
        serializer='pickle',
        compression='gzip'
    )
    
    # Send binary data
    binary_data = {
        'large_list': list(range(1000)),
        'nested_dict': {'level1': {'level2': {'level3': 'deep_value'}}}
    }
    
    binary_queue.put(binary_data)
    received_binary = binary_queue.get()
    print(f"Binary data received: {len(received_binary['large_list'])} items")
    
    json_queue.close()
    binary_queue.close()

Message Headers and Properties

from kombu import Connection
import time

with Connection('redis://localhost:6379/0') as conn:
    queue = conn.SimpleQueue('header_queue')
    
    # Send message with custom headers
    queue.put(
        {'task': 'important_work'},
        headers={
            'priority': 'high',
            'created_by': 'worker_service',
            'timestamp': time.time(),
            'retry_count': 0
        }
    )
    
    # The headers are automatically included with the message
    # but access depends on the underlying implementation
    message = queue.get()
    print(f"Received: {message}")
    
    queue.close()

Error Handling with Simple Interface

from kombu import Connection
import socket

def robust_queue_processing(queue_name, conn_url):
    """Robust queue processing with error handling"""
    
    try:
        with Connection(conn_url) as conn:
            queue = conn.SimpleQueue(queue_name)
            
            while True:
                try:
                    # Try to get message with timeout
                    message = queue.get(timeout=30.0)
                    
                    # Process message
                    print(f"Processing: {message}")
                    
                    # Simulate processing that might fail
                    if message.get('should_fail'):
                        raise ValueError("Simulated processing error")
                        
                    print("Processing completed successfully")
                    
                except queue.Empty:
                    print("No messages received in 30 seconds, continuing...")
                    continue
                    
                except ValueError as e:
                    print(f"Processing error: {e}")
                    # With SimpleQueue, failed messages are lost unless
                    # you implement your own retry mechanism
                    continue
                    
                except KeyboardInterrupt:
                    print("Shutting down gracefully...")
                    break
                    
            queue.close()
            
    except socket.error as e:
        print(f"Connection error: {e}")
    except Exception as e:
        print(f"Unexpected error: {e}")

# Usage
robust_queue_processing('work_queue', 'redis://localhost:6379/0')

Producer-Consumer Pattern

from kombu import Connection
import threading
import time
import random

class TaskProducer:
    def __init__(self, conn, queue_name):
        self.queue = conn.SimpleQueue(queue_name)
        self.running = True
    
    def produce_tasks(self):
        """Produce tasks continuously"""
        task_id = 0
        while self.running:
            task = {
                'id': task_id,
                'type': random.choice(['email', 'report', 'cleanup']),
                'created_at': time.time()
            }
            
            self.queue.put(task)
            print(f"Produced task {task_id}: {task['type']}")
            
            task_id += 1
            time.sleep(random.uniform(0.5, 2.0))
    
    def stop(self):
        self.running = False
        self.queue.close()

class TaskConsumer:
    def __init__(self, conn, queue_name, consumer_id):
        self.queue = conn.SimpleQueue(queue_name)
        self.consumer_id = consumer_id
        self.running = True
    
    def consume_tasks(self):
        """Consume tasks continuously"""
        while self.running:
            try:
                task = self.queue.get(timeout=1.0)
                
                # Simulate processing time
                processing_time = random.uniform(0.1, 1.0)
                print(f"Consumer {self.consumer_id} processing task {task['id']}")
                time.sleep(processing_time)
                
                print(f"Consumer {self.consumer_id} completed task {task['id']}")
                
            except self.queue.Empty:
                continue
            except KeyboardInterrupt:
                break
    
    def stop(self):
        self.running = False
        self.queue.close()

# Run producer-consumer system
with Connection('redis://localhost:6379/0') as conn:
    queue_name = 'task_processing'
    
    # Create producer and consumers
    producer = TaskProducer(conn, queue_name)
    consumers = [
        TaskConsumer(conn, queue_name, i) 
        for i in range(3)  # 3 consumer workers
    ]
    
    # Start threads
    producer_thread = threading.Thread(target=producer.produce_tasks)
    consumer_threads = [
        threading.Thread(target=consumer.consume_tasks)
        for consumer in consumers
    ]
    
    producer_thread.start()
    for thread in consumer_threads:
        thread.start()
    
    try:
        # Run for 30 seconds
        time.sleep(30)
    except KeyboardInterrupt:
        pass
    
    # Graceful shutdown
    producer.stop()
    for consumer in consumers:
        consumer.stop()
    
    producer_thread.join()
    for thread in consumer_threads:
        thread.join()
    
    print("All threads stopped")

Install with Tessl CLI

npx tessl i tessl/pypi-kombu

docs

compression.md

connection.md

entities.md

exceptions.md

index.md

messaging.md

mixins.md

pools.md

serialization.md

simple.md

tile.json