CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-billiard

Python multiprocessing fork with improvements and bugfixes for distributed task processing

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

synchronization.mddocs/

Synchronization

Synchronization primitives including locks, semaphores, events, conditions, and barriers for coordinating processes and ensuring thread-safe access to shared resources.

Capabilities

Locks

Mutual exclusion primitives for protecting critical sections and shared resources.

class Lock:
    """
    A non-recursive lock (mutual exclusion lock).
    """
    def acquire(self, block=True, timeout=None) -> bool:
        """
        Acquire the lock.
        
        Parameters:
        - block: whether to block if lock is unavailable
        - timeout: timeout in seconds (None for no timeout)
        
        Returns:
        True if lock acquired, False otherwise
        """
    
    def release(self):
        """
        Release the lock.
        
        Raises:
        - ValueError: if lock is not currently held
        """
    
    def __enter__(self):
        """Context manager entry."""
        return self.acquire()
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit."""
        self.release()

class RLock:
    """
    A reentrant lock (recursive lock).
    Can be acquired multiple times by the same process.
    """
    def acquire(self, block=True, timeout=None) -> bool:
        """
        Acquire the lock, incrementing recursion level.
        
        Parameters:
        - block: whether to block if lock is unavailable
        - timeout: timeout in seconds (None for no timeout)
        
        Returns:
        True if lock acquired, False otherwise
        """
    
    def release(self):
        """
        Release the lock, decrementing recursion level.
        
        Raises:
        - ValueError: if lock is not currently held by calling process
        """
    
    def __enter__(self):
        """Context manager entry."""
        return self.acquire()
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit."""
        self.release()

Usage example:

from billiard import Process, Lock, RLock
import time

# Shared resource counter
counter = 0
lock = Lock()

def worker_with_lock(name, iterations, shared_lock):
    """Worker that safely increments counter"""
    global counter
    
    for i in range(iterations):
        # Critical section
        with shared_lock:
            old_value = counter
            time.sleep(0.001)  # Simulate some work
            counter = old_value + 1
            print(f"{name}: incremented counter to {counter}")

def recursive_function(rlock, level):
    """Function that acquires lock recursively"""
    with rlock:
        print(f"Level {level}: acquired lock")
        if level > 0:
            recursive_function(rlock, level - 1)
        print(f"Level {level}: releasing lock")

if __name__ == '__main__':
    # Test regular lock
    processes = []
    for i in range(3):
        p = Process(target=worker_with_lock, args=(f"Worker-{i}", 5, lock))
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()
    
    print(f"Final counter value: {counter}")
    
    # Test recursive lock
    rlock = RLock()
    process = Process(target=recursive_function, args=(rlock, 3))
    process.start()
    process.join()

Semaphores

Counting semaphores for controlling access to resources with limited capacity.

class Semaphore:
    """
    A counting semaphore.
    """
    def __init__(self, value=1, ctx=None):
        """
        Create a semaphore.
        
        Parameters:
        - value: initial count (default: 1)
        - ctx: multiprocessing context
        """
    
    def acquire(self, block=True, timeout=None) -> bool:
        """
        Acquire the semaphore, decrementing internal counter.
        
        Parameters:
        - block: whether to block if semaphore unavailable
        - timeout: timeout in seconds (None for no timeout)
        
        Returns:
        True if semaphore acquired, False otherwise
        """
    
    def release(self):
        """
        Release the semaphore, incrementing internal counter.
        """
    
    def __enter__(self):
        """Context manager entry."""
        return self.acquire()
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit."""
        self.release()

class BoundedSemaphore(Semaphore):
    """
    A bounded semaphore that prevents release() from raising count above initial value.
    """
    def release(self):
        """
        Release the semaphore, but prevent count from exceeding initial value.
        
        Raises:
        - ValueError: if release() would increase count above initial value
        """

Usage example:

from billiard import Process, Semaphore, BoundedSemaphore
import time
import random

# Semaphore limiting concurrent access to 3 resources
resource_semaphore = Semaphore(3)

def worker_with_semaphore(worker_id, semaphore):
    """Worker that uses limited resource"""
    print(f"Worker {worker_id}: requesting resource...")
    
    with semaphore:
        print(f"Worker {worker_id}: acquired resource")
        # Simulate work with resource
        work_time = random.uniform(0.5, 2.0)
        time.sleep(work_time)
        print(f"Worker {worker_id}: releasing resource after {work_time:.1f}s")
    
    print(f"Worker {worker_id}: done")

def bounded_semaphore_example():
    """Demonstrate bounded semaphore behavior"""
    bounded_sem = BoundedSemaphore(2)
    
    # Acquire twice (should work)
    bounded_sem.acquire()
    bounded_sem.acquire()
    print("Acquired semaphore twice")
    
    # Release twice
    bounded_sem.release()
    bounded_sem.release()
    print("Released semaphore twice")
    
    # Try to release again (should raise ValueError)
    try:
        bounded_sem.release()
    except ValueError as e:
        print(f"Bounded semaphore prevented over-release: {e}")

if __name__ == '__main__':
    # Test resource limiting
    workers = []
    for i in range(8):
        p = Process(target=worker_with_semaphore, args=(i, resource_semaphore))
        workers.append(p)
        p.start()
    
    for p in workers:
        p.join()
    
    # Test bounded semaphore
    bounded_semaphore_example()

Events

Simple signaling mechanism for coordinating processes.

class Event:
    """
    A simple event object for process synchronization.
    """
    def set(self):
        """
        Set the internal flag to True.
        All processes waiting for it become unblocked.
        """
    
    def clear(self):
        """
        Set the internal flag to False.
        """
    
    def is_set(self) -> bool:
        """
        Return True if internal flag is True.
        
        Returns:
        True if event is set, False otherwise
        """
    
    def wait(self, timeout=None) -> bool:
        """
        Block until internal flag is True.
        
        Parameters:
        - timeout: timeout in seconds (None for no timeout)
        
        Returns:
        True if event was set, False if timeout occurred
        """

Usage example:

from billiard import Process, Event
import time
import random

def waiter(event, worker_id):
    """Process that waits for event"""
    print(f"Waiter {worker_id}: waiting for event...")
    
    if event.wait(timeout=5):
        print(f"Waiter {worker_id}: event received!")
    else:
        print(f"Waiter {worker_id}: timeout waiting for event")

def setter(event, delay):
    """Process that sets event after delay"""
    print(f"Setter: waiting {delay} seconds before setting event")
    time.sleep(delay)
    
    print("Setter: setting event")
    event.set()

def event_coordination_example():
    """Demonstrate event coordination"""
    event = Event()
    
    # Start multiple waiters
    waiters = []
    for i in range(3):
        p = Process(target=waiter, args=(event, i))
        waiters.append(p)
        p.start()
    
    # Start setter with random delay
    delay = random.uniform(1, 3)
    setter_process = Process(target=setter, args=(event, delay))
    setter_process.start()
    
    # Wait for all processes
    setter_process.join()
    for p in waiters:
        p.join()
    
    print(f"Event is set: {event.is_set()}")
    
    # Clear and test again
    event.clear()
    print(f"Event after clear: {event.is_set()}")

if __name__ == '__main__':
    event_coordination_example()

Conditions

Advanced synchronization allowing processes to wait for specific conditions.

class Condition:
    """
    A condition variable for process synchronization.
    """
    def __init__(self, lock=None, ctx=None):
        """
        Create a condition variable.
        
        Parameters:
        - lock: underlying lock (Lock() if None)
        - ctx: multiprocessing context
        """
    
    def acquire(self, block=True, timeout=None) -> bool:
        """
        Acquire the underlying lock.
        
        Parameters:
        - block: whether to block if lock unavailable
        - timeout: timeout in seconds (None for no timeout)
        
        Returns:
        True if lock acquired, False otherwise
        """
    
    def release(self):
        """
        Release the underlying lock.
        """
    
    def wait(self, timeout=None) -> bool:
        """
        Wait until notified or timeout.
        Must be called with lock held.
        
        Parameters:
        - timeout: timeout in seconds (None for no timeout)
        
        Returns:
        True if notified, False if timeout
        """
    
    def notify(self, n=1):
        """
        Wake up at most n processes waiting on condition.
        Must be called with lock held.
        
        Parameters:
        - n: number of processes to wake up
        """
    
    def notify_all(self):
        """
        Wake up all processes waiting on condition.
        Must be called with lock held.
        """
    
    def __enter__(self):
        """Context manager entry."""
        return self.acquire()
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit."""
        self.release()

Usage example:

from billiard import Process, Condition
import time
import random

# Shared state
items = []
condition = Condition()

def consumer(consumer_id, condition, items):
    """Consumer that waits for items"""
    with condition:
        while len(items) == 0:
            print(f"Consumer {consumer_id}: waiting for items...")
            condition.wait()
        
        item = items.pop(0)
        print(f"Consumer {consumer_id}: consumed {item}")

def producer(producer_id, condition, items):
    """Producer that creates items"""
    for i in range(3):
        item = f"item-{producer_id}-{i}"
        
        time.sleep(random.uniform(0.5, 1.5))
        
        with condition:
            items.append(item)
            print(f"Producer {producer_id}: produced {item}")
            condition.notify()  # Wake up one consumer

if __name__ == '__main__':
    # Start consumers
    consumers = []
    for i in range(2):
        p = Process(target=consumer, args=(i, condition, items))
        consumers.append(p)
        p.start()
    
    # Start producers
    producers = []
    for i in range(2):
        p = Process(target=producer, args=(i, condition, items))
        producers.append(p)
        p.start()
    
    # Wait for completion
    for p in producers:
        p.join()
    
    # Notify remaining consumers to check final state
    with condition:
        condition.notify_all()
    
    for p in consumers:
        p.join()

Barriers

Synchronization point where processes wait for all participants to arrive.

class Barrier:
    """
    A barrier object for synchronizing processes.
    """
    def __init__(self, parties, action=None, timeout=None, ctx=None):
        """
        Create a barrier.
        
        Parameters:
        - parties: number of processes that must call wait()
        - action: callable to invoke when barrier is released
        - timeout: default timeout for wait()
        - ctx: multiprocessing context
        """
    
    def wait(self, timeout=None) -> int:
        """
        Wait for all processes to reach barrier.
        
        Parameters:
        - timeout: timeout in seconds (None uses barrier default)
        
        Returns:
        Index in range(parties) identifying this process
        
        Raises:
        - BrokenBarrierError: if barrier is broken or reset while waiting
        """
    
    def reset(self):
        """
        Reset barrier to initial state.
        Any processes waiting will receive BrokenBarrierError.
        """
    
    def abort(self):
        """
        Put barrier in broken state.
        Any current or future calls to wait() will raise BrokenBarrierError.
        """
    
    @property
    def parties(self) -> int:
        """Number of processes required to trip barrier."""
    
    @property
    def n_waiting(self) -> int:
        """Number of processes currently waiting."""
    
    @property
    def broken(self) -> bool:
        """True if barrier is broken."""

Usage example:

from billiard import Process, Barrier
import time
import random

def barrier_action():
    """Action to perform when all processes reach barrier"""
    print("*** All processes synchronized - continuing! ***")

def worker_with_barrier(worker_id, barrier, phase_count):
    """Worker that synchronizes at barrier between phases"""
    for phase in range(phase_count):
        # Do some work
        work_time = random.uniform(0.5, 2.0)
        print(f"Worker {worker_id}: working on phase {phase} for {work_time:.1f}s")
        time.sleep(work_time)
        
        print(f"Worker {worker_id}: finished phase {phase}, waiting at barrier")
        
        # Wait for all workers to complete this phase
        try:
            index = barrier.wait(timeout=5)
            print(f"Worker {worker_id}: barrier passed (index {index})")
        except Exception as e:
            print(f"Worker {worker_id}: barrier error: {e}")
            break
    
    print(f"Worker {worker_id}: all phases complete")

if __name__ == '__main__':
    num_workers = 4
    num_phases = 3
    
    # Create barrier for all workers
    barrier = Barrier(num_workers, action=barrier_action)
    
    print(f"Barrier created for {barrier.parties} processes")
    
    # Start workers
    workers = []
    for i in range(num_workers):
        p = Process(target=worker_with_barrier, args=(i, barrier, num_phases))
        workers.append(p)
        p.start()
    
    # Monitor barrier state
    for phase in range(num_phases):
        time.sleep(1)
        print(f"Phase {phase}: {barrier.n_waiting} processes waiting, "
              f"broken: {barrier.broken}")
    
    # Wait for all workers to complete
    for p in workers:
        p.join()
    
    print("All workers completed")

Synchronization Best Practices

  1. Always use context managers (with statements) when possible to ensure locks are properly released
  2. Avoid deadlocks by acquiring locks in consistent order across processes
  3. Use timeouts for acquire() and wait() operations to prevent indefinite blocking
  4. Choose appropriate primitives:
    • Lock/RLock: Mutual exclusion of critical sections
    • Semaphore: Rate limiting and resource counting
    • Event: Simple signaling between processes
    • Condition: Complex coordination with state changes
    • Barrier: Synchronized phases in parallel algorithms

Install with Tessl CLI

npx tessl i tessl/pypi-billiard

docs

communication.md

context-management.md

index.md

managers.md

process-management.md

process-pools.md

queues.md

shared-memory.md

synchronization.md

tile.json