CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-multiprocess

A fork of Python's multiprocessing module that extends multiprocessing to provide enhanced serialization using dill

Pending
Overview
Eval results
Files

synchronization.mddocs/

Synchronization Primitives

Thread-like synchronization objects for coordinating processes. These primitives provide mutual exclusion, signaling, and coordination mechanisms that work across process boundaries.

Capabilities

Lock Objects

Basic mutual exclusion locks for protecting shared resources.

def Lock():
    """
    Create a non-recursive lock object.
    
    Returns:
        Lock: A lock object that can be acquired and released
    """

def RLock():
    """
    Create a recursive lock object (reentrant lock).
    
    Returns:
        RLock: A lock that can be acquired multiple times by the same process
    """

Lock Methods

class Lock:
    def acquire(self, blocking=True, timeout=None):
        """
        Acquire the lock.
        
        Args:
            blocking: if True, block until lock is available
            timeout: maximum time to wait (seconds)
            
        Returns:
            bool: True if lock acquired, False if timeout occurred
        """
    
    def release(self):
        """Release the lock."""
    
    def __enter__(self):
        """Context manager entry - acquire lock."""
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit - release lock."""

Semaphore Objects

Counting semaphores for controlling access to resources with limited capacity.

def Semaphore(value=1):
    """
    Create a semaphore object.
    
    Args:
        value: initial value of the semaphore counter
        
    Returns:
        Semaphore: A semaphore with the specified initial value
    """

def BoundedSemaphore(value=1):
    """
    Create a bounded semaphore object.
    
    Args:
        value: initial and maximum value of the semaphore counter
        
    Returns:
        BoundedSemaphore: A semaphore that cannot be released above initial value
    """

Semaphore Methods

class Semaphore:
    def acquire(self, blocking=True, timeout=None):
        """
        Acquire the semaphore (decrement counter).
        
        Args:
            blocking: if True, block until semaphore is available
            timeout: maximum time to wait (seconds)
            
        Returns:
            bool: True if acquired, False if timeout occurred
        """
    
    def release(self):
        """Release the semaphore (increment counter)."""
    
    def __enter__(self):
        """Context manager entry - acquire semaphore."""
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit - release semaphore."""

Event Objects

Simple signaling mechanism for process coordination.

def Event():
    """
    Create an event object.
    
    Returns:
        Event: An event that can be set and cleared
    """

Event Methods

class Event:
    def is_set(self):
        """
        Return True if the event is set.
        
        Returns:
            bool: True if event is set, False otherwise
        """
    
    def set(self):
        """Set the event flag to True."""
    
    def clear(self):
        """Reset the event flag to False."""
    
    def wait(self, timeout=None):
        """
        Block until the event is set.
        
        Args:
            timeout: maximum time to wait (seconds)
            
        Returns:
            bool: True if event was set, False if timeout occurred
        """

Condition Variables

Advanced synchronization for complex coordination scenarios.

def Condition(lock=None):
    """
    Create a condition variable.
    
    Args:
        lock: optional lock to use (creates RLock if None)
        
    Returns:
        Condition: A condition variable object
    """

Condition Methods

class Condition:
    def acquire(self, blocking=True, timeout=None):
        """Acquire the underlying lock."""
    
    def release(self):
        """Release the underlying lock."""
    
    def wait(self, timeout=None):
        """
        Wait until notified or timeout occurs.
        
        Args:
            timeout: maximum time to wait (seconds)
            
        Returns:
            bool: True if notified, False if timeout occurred
        """
    
    def wait_for(self, predicate, timeout=None):
        """
        Wait until predicate becomes True.
        
        Args:
            predicate: callable that returns a boolean
            timeout: maximum time to wait (seconds)
            
        Returns:
            bool: The value of predicate
        """
    
    def notify(self, n=1):
        """
        Wake up one or more processes waiting on this condition.
        
        Args:
            n: number of processes to wake up
        """
    
    def notify_all(self):
        """Wake up all processes waiting on this condition."""
    
    def __enter__(self):
        """Context manager entry - acquire underlying lock."""
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit - release underlying lock."""

Barrier Objects

Synchronization barrier for coordinating multiple processes at specific points.

def Barrier(parties, action=None, timeout=None):
    """
    Create a barrier for synchronizing processes.
    
    Args:
        parties: number of processes that must call wait() before all are released
        action: optional callable to execute when barrier is released
        timeout: default timeout for wait operations
        
    Returns:
        Barrier: A barrier object
    """

Barrier Methods

class Barrier:
    def wait(self, timeout=None):
        """
        Wait at the barrier until all parties arrive.
        
        Args:
            timeout: maximum time to wait (seconds)
            
        Returns:
            int: index of this process (0 to parties-1)
            
        Raises:
            BrokenBarrierError: if barrier is broken
        """
    
    def reset(self):
        """Reset the barrier to its initial state."""
    
    def abort(self):
        """Put the barrier into a broken state."""
    
    # Properties
    parties: int  # Number of processes required
    n_waiting: int  # Number of processes currently waiting
    broken: bool  # True if barrier is broken

Usage Examples

Basic Lock Usage

from multiprocess import Process, Lock
import time

def worker(lock, worker_id):
    with lock:
        print(f"Worker {worker_id} acquired lock")
        time.sleep(1)  # Simulate work
        print(f"Worker {worker_id} releasing lock")

# Shared lock
lock = Lock()

# Create processes
processes = []
for i in range(3):
    p = Process(target=worker, args=(lock, i))
    p.start()
    processes.append(p)

for p in processes:
    p.join()

Semaphore for Resource Pool

from multiprocess import Process, Semaphore
import time

def use_resource(semaphore, worker_id):
    print(f"Worker {worker_id} waiting for resource")
    with semaphore:
        print(f"Worker {worker_id} acquired resource")
        time.sleep(2)  # Use resource
        print(f"Worker {worker_id} released resource")

# Allow 2 concurrent resource users
semaphore = Semaphore(2)

# Create 5 processes competing for 2 resources
processes = []
for i in range(5):
    p = Process(target=use_resource, args=(semaphore, i))
    p.start()
    processes.append(p)

for p in processes:
    p.join()

Event Signaling

from multiprocess import Process, Event
import time

def waiter(event, name):
    print(f"{name} waiting for event")
    event.wait()
    print(f"{name} received event")

def setter(event):
    time.sleep(2)
    print("Setting event")
    event.set()

# Shared event
event = Event()

# Create waiting processes
waiters = []
for i in range(3):
    p = Process(target=waiter, args=(event, f"Waiter-{i}"))
    p.start()
    waiters.append(p)

# Create setter process
setter_proc = Process(target=setter, args=(event,))
setter_proc.start()

# Wait for all
for p in waiters:
    p.join()
setter_proc.join()

Condition Variable Coordination

from multiprocess import Process, Condition
import time

items = []
condition = Condition()

def consumer(condition, consumer_id):
    with condition:
        while len(items) == 0:
            print(f"Consumer {consumer_id} waiting")
            condition.wait()
        item = items.pop()
        print(f"Consumer {consumer_id} consumed {item}")

def producer(condition):
    for i in range(5):
        time.sleep(1)
        with condition:
            item = f"item-{i}"
            items.append(item)
            print(f"Produced {item}")
            condition.notify()

# Create consumer processes
consumers = []
for i in range(2):
    p = Process(target=consumer, args=(condition, i))
    p.start()
    consumers.append(p)

# Create producer process
prod = Process(target=producer, args=(condition,))
prod.start()

prod.join()
for p in consumers:
    p.join()

Barrier Synchronization

from multiprocess import Process, Barrier
import time
import random

def worker(barrier, worker_id):
    # Phase 1: Individual work
    work_time = random.uniform(1, 3)
    print(f"Worker {worker_id} working for {work_time:.1f} seconds")
    time.sleep(work_time)
    
    print(f"Worker {worker_id} finished phase 1, waiting at barrier")
    try:
        index = barrier.wait(timeout=10)
        if index == 0:  # First process to cross barrier
            print("All workers completed phase 1!")
    except Exception as e:
        print(f"Worker {worker_id} barrier error: {e}")
        return
    
    # Phase 2: Synchronized work
    print(f"Worker {worker_id} starting phase 2")
    time.sleep(1)
    print(f"Worker {worker_id} completed phase 2")

# Create barrier for 3 workers
barrier = Barrier(3)

# Create worker processes
processes = []
for i in range(3):
    p = Process(target=worker, args=(barrier, i))
    p.start()
    processes.append(p)

for p in processes:
    p.join()

Install with Tessl CLI

npx tessl i tessl/pypi-multiprocess

docs

communication.md

context-config.md

index.md

pools.md

process-management.md

shared-objects.md

synchronization.md

tile.json