CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-billiard

Python multiprocessing fork with improvements and bugfixes for distributed task processing

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

queues.mddocs/

Queues

Thread-safe queues for inter-process communication using pipes, with support for task completion tracking and various queue implementations optimized for different use cases.

Capabilities

Standard Queue

Thread-safe FIFO queue implementation using pipes for robust inter-process communication.

class Queue:
    """
    A thread-safe queue implementation using pipes.
    """
    def __init__(self, maxsize=0, ctx=None):
        """
        Create a queue.
        
        Parameters:
        - maxsize: maximum size (0 for unlimited)
        - ctx: multiprocessing context
        """
    
    def put(self, obj, block=True, timeout=None):
        """
        Put an item into the queue.
        
        Parameters:
        - obj: object to put in queue
        - block: whether to block if queue is full
        - timeout: timeout in seconds (None for no timeout)
        
        Raises:
        - Full: if queue is full and block=False or timeout exceeded
        """
    
    def get(self, block=True, timeout=None):
        """
        Remove and return an item from the queue.
        
        Parameters:
        - block: whether to block if queue is empty
        - timeout: timeout in seconds (None for no timeout)
        
        Returns:
        Item from queue
        
        Raises:
        - Empty: if queue is empty and block=False or timeout exceeded
        """
    
    def put_nowait(self, obj):
        """
        Equivalent to put(obj, block=False).
        
        Parameters:
        - obj: object to put in queue
        
        Raises:
        - Full: if queue is full
        """
    
    def get_nowait(self):
        """
        Equivalent to get(block=False).
        
        Returns:
        Item from queue
        
        Raises:
        - Empty: if queue is empty
        """
    
    def qsize(self) -> int:
        """
        Return approximate size of queue.
        
        Returns:
        Approximate number of items in queue
        """
    
    def empty(self) -> bool:
        """
        Return True if queue is empty.
        
        Returns:
        True if queue appears empty
        """
    
    def full(self) -> bool:
        """
        Return True if queue is full.
        
        Returns:
        True if queue appears full
        """
    
    def close(self):
        """
        Close the queue and prevent further use.
        """
    
    def join_thread(self):
        """
        Join the background thread.
        """
    
    def cancel_join_thread(self):
        """
        Cancel join_thread().
        """

Usage example:

from billiard import Process, Queue
import time
import queue

def producer(q, items):
    """Producer process that puts items in queue"""
    for item in items:
        print(f"Producing: {item}")
        q.put(item)
        time.sleep(0.1)
    q.put(None)  # Sentinel to signal completion

def consumer(q, consumer_id):
    """Consumer process that gets items from queue"""
    while True:
        try:
            item = q.get(timeout=1)
            if item is None:
                q.put(None)  # Re-queue sentinel for other consumers
                break
            print(f"Consumer {consumer_id} consumed: {item}")
            time.sleep(0.2)
        except queue.Empty:
            print(f"Consumer {consumer_id} timed out")
            break

if __name__ == '__main__':
    # Create queue with max size
    q = Queue(maxsize=5)
    
    # Check initial state
    print(f"Queue size: {q.qsize()}")
    print(f"Queue empty: {q.empty()}")
    print(f"Queue full: {q.full()}")
    
    # Start producer and consumers
    items = list(range(10))
    processes = [
        Process(target=producer, args=(q, items)),
        Process(target=consumer, args=(q, 1)),
        Process(target=consumer, args=(q, 2))
    ]
    
    for p in processes:
        p.start()
    
    for p in processes:
        p.join()
    
    # Clean up
    q.close()

Joinable Queue

Queue with task completion tracking, allowing producers to wait for all tasks to be processed.

class JoinableQueue(Queue):
    """
    A Queue subclass that additionally tracks unfinished tasks.
    """
    def task_done(self):
        """
        Indicate that a formerly enqueued task is complete.
        
        Used by queue consumers. For each get() used to fetch a task,
        a subsequent call to task_done() tells the queue that processing
        is complete.
        
        Raises:
        - ValueError: if called more times than there were items in queue
        """
    
    def join(self):
        """
        Block until all items in queue have been gotten and processed.
        
        The count of unfinished tasks goes up whenever an item is added
        to the queue and goes down whenever task_done() is called.
        """

Usage example:

from billiard import Process, JoinableQueue
import time

def worker(q):
    """Worker that processes tasks from queue"""
    while True:
        item = q.get()
        if item is None:
            break
        
        print(f"Processing {item}...")
        time.sleep(0.5)  # Simulate work
        print(f"Completed {item}")
        
        q.task_done()  # Mark task as done

def producer_with_join(q, items):
    """Producer that waits for all tasks to complete"""
    # Add tasks to queue
    for item in items:
        print(f"Adding task: {item}")
        q.put(item)
    
    # Wait for all tasks to be processed
    print("Waiting for all tasks to complete...")
    q.join()
    print("All tasks completed!")
    
    # Signal workers to stop
    q.put(None)
    q.put(None)

if __name__ == '__main__':
    # Create joinable queue
    q = JoinableQueue()
    
    # Start workers
    workers = [
        Process(target=worker, args=(q,)),
        Process(target=worker, args=(q,))
    ]
    
    for w in workers:
        w.start()
    
    # Start producer
    tasks = ['task1', 'task2', 'task3', 'task4', 'task5']
    producer_process = Process(target=producer_with_join, args=(q, tasks))
    producer_process.start()
    producer_process.join()
    
    # Stop workers
    for w in workers:
        w.join()

Simple Queue

Simplified queue implementation with basic put/get operations and minimal overhead.

class SimpleQueue:
    """
    A simplified queue implementation.
    """
    def get(self):
        """
        Remove and return an item from the queue (blocks until available).
        
        Returns:
        Item from queue
        """
    
    def put(self, obj):
        """
        Put an item into the queue.
        
        Parameters:
        - obj: object to put in queue
        """
    
    def empty(self) -> bool:
        """
        Return True if queue is empty.
        
        Returns:
        True if queue appears empty
        """
    
    def close(self):
        """
        Close the queue.
        """

Usage example:

from billiard import Process, SimpleQueue
import time

def simple_worker(q, worker_id):
    """Simple worker using SimpleQueue"""
    while True:
        if q.empty():
            time.sleep(0.1)
            continue
        
        try:
            item = q.get()
            if item is None:
                break
            
            print(f"Worker {worker_id} got: {item}")
            time.sleep(0.2)
        except:
            break

def simple_producer(q, items):
    """Simple producer"""
    for item in items:
        q.put(item)
        time.sleep(0.1)
    
    # Signal completion
    q.put(None)
    q.put(None)

if __name__ == '__main__':
    # Create simple queue
    q = SimpleQueue()
    
    # Start processes
    processes = [
        Process(target=simple_producer, args=(q, ['A', 'B', 'C', 'D'])),
        Process(target=simple_worker, args=(q, 1)),
        Process(target=simple_worker, args=(q, 2))
    ]
    
    for p in processes:
        p.start()
    
    for p in processes:
        p.join()
    
    q.close()

Queue Exceptions

Exception classes for queue operations.

class Empty(Exception):
    """
    Exception raised by Queue.get(block=False) when queue is empty.
    """

class Full(Exception):
    """
    Exception raised by Queue.put(block=False) when queue is full.
    """

Usage example:

from billiard import Queue
from billiard.queues import Empty, Full
import time

def handle_queue_exceptions():
    """Demonstrate queue exception handling"""
    q = Queue(maxsize=2)
    
    try:
        # Fill the queue
        q.put("item1")
        q.put("item2")
        print("Queue filled")
        
        # Try to put more (will raise Full)
        q.put_nowait("item3")
    except Full:
        print("Queue is full!")
    
    try:
        # Empty the queue
        print("Item:", q.get_nowait())
        print("Item:", q.get_nowait())
        print("Queue emptied")
        
        # Try to get more (will raise Empty)
        q.get_nowait()
    except Empty:
        print("Queue is empty!")
    
    # With timeouts
    try:
        q.put("timeout_test", timeout=0.1)
        result = q.get(timeout=0.1)
        print("Got with timeout:", result)
        
        # This will timeout
        q.get(timeout=0.1)
    except Empty:
        print("Get operation timed out")

if __name__ == '__main__':
    handle_queue_exceptions()

Queue Selection Guidelines

  • Queue: Use for general-purpose inter-process communication with flow control (maxsize)
  • JoinableQueue: Use when you need to wait for all queued tasks to be processed
  • SimpleQueue: Use for minimal overhead scenarios where you don't need size limits or advanced features

All queue types are process-safe and thread-safe, making them suitable for complex producer-consumer scenarios in multiprocessing applications.

Install with Tessl CLI

npx tessl i tessl/pypi-billiard

docs

communication.md

context-management.md

index.md

managers.md

process-management.md

process-pools.md

queues.md

shared-memory.md

synchronization.md

tile.json