A fork of Python's multiprocessing module that extends multiprocessing to provide enhanced serialization using dill
—
Thread-like synchronization objects for coordinating processes. These primitives provide mutual exclusion, signaling, and coordination mechanisms that work across process boundaries.
Basic mutual exclusion locks for protecting shared resources.
def Lock():
"""
Create a non-recursive lock object.
Returns:
Lock: A lock object that can be acquired and released
"""
def RLock():
"""
Create a recursive lock object (reentrant lock).
Returns:
RLock: A lock that can be acquired multiple times by the same process
"""class Lock:
def acquire(self, blocking=True, timeout=None):
"""
Acquire the lock.
Args:
blocking: if True, block until lock is available
timeout: maximum time to wait (seconds)
Returns:
bool: True if lock acquired, False if timeout occurred
"""
def release(self):
"""Release the lock."""
def __enter__(self):
"""Context manager entry - acquire lock."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit - release lock."""Counting semaphores for controlling access to resources with limited capacity.
def Semaphore(value=1):
"""
Create a semaphore object.
Args:
value: initial value of the semaphore counter
Returns:
Semaphore: A semaphore with the specified initial value
"""
def BoundedSemaphore(value=1):
"""
Create a bounded semaphore object.
Args:
value: initial and maximum value of the semaphore counter
Returns:
BoundedSemaphore: A semaphore that cannot be released above initial value
"""class Semaphore:
def acquire(self, blocking=True, timeout=None):
"""
Acquire the semaphore (decrement counter).
Args:
blocking: if True, block until semaphore is available
timeout: maximum time to wait (seconds)
Returns:
bool: True if acquired, False if timeout occurred
"""
def release(self):
"""Release the semaphore (increment counter)."""
def __enter__(self):
"""Context manager entry - acquire semaphore."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit - release semaphore."""Simple signaling mechanism for process coordination.
def Event():
"""
Create an event object.
Returns:
Event: An event that can be set and cleared
"""class Event:
def is_set(self):
"""
Return True if the event is set.
Returns:
bool: True if event is set, False otherwise
"""
def set(self):
"""Set the event flag to True."""
def clear(self):
"""Reset the event flag to False."""
def wait(self, timeout=None):
"""
Block until the event is set.
Args:
timeout: maximum time to wait (seconds)
Returns:
bool: True if event was set, False if timeout occurred
"""Advanced synchronization for complex coordination scenarios.
def Condition(lock=None):
"""
Create a condition variable.
Args:
lock: optional lock to use (creates RLock if None)
Returns:
Condition: A condition variable object
"""class Condition:
def acquire(self, blocking=True, timeout=None):
"""Acquire the underlying lock."""
def release(self):
"""Release the underlying lock."""
def wait(self, timeout=None):
"""
Wait until notified or timeout occurs.
Args:
timeout: maximum time to wait (seconds)
Returns:
bool: True if notified, False if timeout occurred
"""
def wait_for(self, predicate, timeout=None):
"""
Wait until predicate becomes True.
Args:
predicate: callable that returns a boolean
timeout: maximum time to wait (seconds)
Returns:
bool: The value of predicate
"""
def notify(self, n=1):
"""
Wake up one or more processes waiting on this condition.
Args:
n: number of processes to wake up
"""
def notify_all(self):
"""Wake up all processes waiting on this condition."""
def __enter__(self):
"""Context manager entry - acquire underlying lock."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit - release underlying lock."""Synchronization barrier for coordinating multiple processes at specific points.
def Barrier(parties, action=None, timeout=None):
"""
Create a barrier for synchronizing processes.
Args:
parties: number of processes that must call wait() before all are released
action: optional callable to execute when barrier is released
timeout: default timeout for wait operations
Returns:
Barrier: A barrier object
"""class Barrier:
def wait(self, timeout=None):
"""
Wait at the barrier until all parties arrive.
Args:
timeout: maximum time to wait (seconds)
Returns:
int: index of this process (0 to parties-1)
Raises:
BrokenBarrierError: if barrier is broken
"""
def reset(self):
"""Reset the barrier to its initial state."""
def abort(self):
"""Put the barrier into a broken state."""
# Properties
parties: int # Number of processes required
n_waiting: int # Number of processes currently waiting
broken: bool # True if barrier is brokenfrom multiprocess import Process, Lock
import time
def worker(lock, worker_id):
with lock:
print(f"Worker {worker_id} acquired lock")
time.sleep(1) # Simulate work
print(f"Worker {worker_id} releasing lock")
# Shared lock
lock = Lock()
# Create processes
processes = []
for i in range(3):
p = Process(target=worker, args=(lock, i))
p.start()
processes.append(p)
for p in processes:
p.join()from multiprocess import Process, Semaphore
import time
def use_resource(semaphore, worker_id):
print(f"Worker {worker_id} waiting for resource")
with semaphore:
print(f"Worker {worker_id} acquired resource")
time.sleep(2) # Use resource
print(f"Worker {worker_id} released resource")
# Allow 2 concurrent resource users
semaphore = Semaphore(2)
# Create 5 processes competing for 2 resources
processes = []
for i in range(5):
p = Process(target=use_resource, args=(semaphore, i))
p.start()
processes.append(p)
for p in processes:
p.join()from multiprocess import Process, Event
import time
def waiter(event, name):
print(f"{name} waiting for event")
event.wait()
print(f"{name} received event")
def setter(event):
time.sleep(2)
print("Setting event")
event.set()
# Shared event
event = Event()
# Create waiting processes
waiters = []
for i in range(3):
p = Process(target=waiter, args=(event, f"Waiter-{i}"))
p.start()
waiters.append(p)
# Create setter process
setter_proc = Process(target=setter, args=(event,))
setter_proc.start()
# Wait for all
for p in waiters:
p.join()
setter_proc.join()from multiprocess import Process, Condition
import time
items = []
condition = Condition()
def consumer(condition, consumer_id):
with condition:
while len(items) == 0:
print(f"Consumer {consumer_id} waiting")
condition.wait()
item = items.pop()
print(f"Consumer {consumer_id} consumed {item}")
def producer(condition):
for i in range(5):
time.sleep(1)
with condition:
item = f"item-{i}"
items.append(item)
print(f"Produced {item}")
condition.notify()
# Create consumer processes
consumers = []
for i in range(2):
p = Process(target=consumer, args=(condition, i))
p.start()
consumers.append(p)
# Create producer process
prod = Process(target=producer, args=(condition,))
prod.start()
prod.join()
for p in consumers:
p.join()from multiprocess import Process, Barrier
import time
import random
def worker(barrier, worker_id):
# Phase 1: Individual work
work_time = random.uniform(1, 3)
print(f"Worker {worker_id} working for {work_time:.1f} seconds")
time.sleep(work_time)
print(f"Worker {worker_id} finished phase 1, waiting at barrier")
try:
index = barrier.wait(timeout=10)
if index == 0: # First process to cross barrier
print("All workers completed phase 1!")
except Exception as e:
print(f"Worker {worker_id} barrier error: {e}")
return
# Phase 2: Synchronized work
print(f"Worker {worker_id} starting phase 2")
time.sleep(1)
print(f"Worker {worker_id} completed phase 2")
# Create barrier for 3 workers
barrier = Barrier(3)
# Create worker processes
processes = []
for i in range(3):
p = Process(target=worker, args=(barrier, i))
p.start()
processes.append(p)
for p in processes:
p.join()Install with Tessl CLI
npx tessl i tessl/pypi-multiprocess